Twister4Azure Application Development Guide

  1. Add new MapReduce application project
  2. Implement the Mapper
  3. Implementing the Reducer
  4. Implement the Driver for your MapReduce Program
  5. Add the new MapReduce application to Twister4Azure
  6. Run/Debug/Deploy
  7. Client API

Check out the quick start guides on traditional MapReduce and Iterative MapReduce to learn how to run Twister4Azure applications.

1. Add new MapReduce application project

2. Implement the Mapper

Eg: WordCount Mapper

using System.Collections.Generic;
using AzureMRCore;
using AzureMRCore.DataTypes;
using AzureMRCore.MapRed;

namespace HelloT4ASample
{
    internal class WordCountMapper : Mapper<IntKey, StringValue, StringKey, IntValue, NullKey, NullValue>
    {
        protected override int Map(IntKey key, StringValue value, List<KeyValuePair<NullKey, NullValue>> dynamicData, IOutputCollector<StringKey, IntValue> outputCollector, string programArgs)
        {
            string line = value.GetTextValue();
            string[] words = line.Split(' ');
            foreach (string word in words)
            {
                outputCollector.Collect(StringKey.GetInstance(word), IntValue.GetInstance(1));
            }
            return 0;
        }
    }
}

 

3. Implement the Reducer

Eg: WordCount Reducer

using System.Collections.Generic;
using System.Linq;
using AzureMRCore;
using AzureMRCore.DataTypes;
using AzureMRCore.MapRed;
namespace HelloT4ASample
{
    internal class WordCountReducer : Reducer<StringKey, IntValue, StringKey, IntValue>
    {
        public override int Reduce(StringKey key, List<IntValue> values, IOutputCollector<StringKey, IntValue> outputCollector, string programArgs)
        {
            int count = values.Sum(value => value.Value);
            var outValue = new IntValue {Value = count};
            outputCollector.Collect(key, outValue);
            return 0;
        }
    }
}

 

4. Implement the Driver for your MapReduce Program

The driver program configures your MapReduce application.

Eg: WordCount Driver

using AzureMRCore.DataTypes;
using AzureMRCore.Drivers;
using AzureMRCore.InputFormat;
using AzureMRCore.MapRed;
using AzureMRCore.OutputFormat;
using AzureMRCore.Partitioners;

namespace HelloT4ASample
{
    public class WordCountMR : MapReduceDriver<IntKey, StringValue, StringKey, IntValue, StringKey, IntValue>
    {
        public override string Name
        {
            get { return "WordCountMR"; }
        }

        public override Mapper<IntKey, StringValue, StringKey, IntValue, NullKey, NullValue> MapperType
        {
            get { return new WordCountMapper(); }
        }

        public override IInputFormat<IntKey, StringValue> InputFormatType
        {
            get { return new CachedLineInputFormat(); }
        }

        public override Reducer<StringKey, IntValue, StringKey, IntValue> ReducerType
        {
            get { return new WordCountReducer(); }
        }

        public override Reducer<StringKey, IntValue, StringKey, IntValue> CombinerType
        {
            get { return new WordCountReducer(); }
        }

        public override IPartitioner PartitionerType
        {
            get { return new HashPartitioner(); }
        }

        public override IOutputFormat<StringKey, IntValue> MapOutputFormatType
        {
            get { return new SequenceOutputFormat<StringKey, IntValue>(); }
        }
    }
}

 

5. Add the new MapReduce application to Twister4Azure

image

image

 

6. Running/Debugging

 

7. Client API

NOTE: You need to make sure to provide the same queue names in your service deployment as well as in the client program.

AzureMRCore.Client.ClientUtils.ProcessMapRed(string mrAppName, string inputBlobContainerURI, int iteration, string programParams, int numReduceTasks, string outputContainerName, string bcastURI = null, Boolean doMerge = false)

Program params can be used to pass an optional program parameter to all the Map and Reduce tasks."inputBlobContainerURI" should contain the files that needs to be processed.The output will be stored in the "outputContainerName".

After submitting the job using the above API, you can optionally wait for the completion of the job using the following method.

AzureMRCore.Client.ClientUtils.waitForCompletion(string jobid, CloudStorageAccount storageAccount, int sleepTime)

"sleepTime" is the polling interval that will be used to poll for the status of the job.

Eg: WordCount Client

CloudStorageAccount storageAccount = ClientCredentials.GetClientStorageAccount();
TwisterAzureClient twisterAzureClient = new TwisterAzureClient(storageAccount, jobID, "mapschedq", "reduceschedq");
twisterAzureClient.ProcessMapRed("WordCountMR", inputContainer, 0, "", numReduceTasks, outputContainer);
twisterAzureClient.WaitForCompletion(500);