Tuesday 21 January 2014

First Program In MapReduce

Unlike going for typical "Hello World" program we will start by word count. As name shows this program counts number of words. First thing first , every mapreduce program need not to have a map and reduce method or mapper and reducder class(here is the code in decompression section with no map and reduce method).

In our first program we are going to use both map and reduce method. While making the program make sure the project in which the class is have all thenecessary jar files.

How do add jar files into your project for MapReduce?
  1. It's simple just go to location of your project in the eclipse-workspace, open the project,create a new folder ,copy all the jar files into it.
  2. Now open the project in eclipse, right click on project name, click on properties, a window will open in that choose java build path on the left panel, now choose Add External Jars, go the same location of your project select all jar files, click on ok.

Now we are good to go or our first program.


public class WordCount { public static class WordCountMap extends Mapper <LongWritable,Text,Text,IntWritable> { @Override public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException { String[] Line=value.toString().split("\t"); StringTokenizer tokenizer = new StringTokenizer(Line[0]); while (tokenizer.hasMoreTokens()) { String Count = tokenizer.nextToken(); context.write(new Text(Count), new IntWritable(1)); } } } public static class WordCountReduce extends Reducer <Text,IntWritable,Text,IntWritable> { public void reduce(Text key,Iterable values,Context context) throws IOException, InterruptedException { int sum=0; for(IntWritable value : values) { sum += value.get(); } context.write(key, new IntWritable(sum)); } } public static void main(String args[]) throws Exception { Job job = new Job(); job.setJarByClass(WordCount.class); FileInputFormat.addInputPath(job, new Path(args [0])); FileOutputFormat.setOutputPath(job, new Path(args [1])); job.setMapperClass(WordCountMap.class); job.setReducerClass(WordCountReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
The main class is defined ( here WordCount ),in which two class are defined which extends mapper and reducer classes. After the extending there is a list of parameters written, in our code we have Mapper < LongWritable, Text, Text, IntWritable >. It is in the form of < key_in, value_in, key_out, value_out >. Similarly in reducer's parameters Reducer < Text, IntWritable, Text, IntWritable > Mapper class contains map method and reducer class contains reduce method.

Map method has 3 parameters, map(LongWritable key, Text value, Context context).The first two arguments are the same as of mapper parameters i.e. key_in, value_in, in this case LongWritable and Text. The third argument Context's object is used to communicate between map and reduce methods(including setup and clean method). Now have a look at the first line of the map method.


String[] Line=value.toString().split("\t");


It split the input file on the basis of delimiter (here tab), parse the text into string and stores the string in the string array.This is most important line of the whole code.It uses StringTokenizer's object and take String line as argument. StringTokenizer break a string into tokens. The StringTokenizer methods do not distinguish among identifiers, numbers, and quoted strings, nor do they recognize and skip comments which is what we need in wordcount. the next loop simply counts the occurences of tokens and by emitting token as key and 1 as value. Why 1 as value? Its explained in the next section. Before we move to the reducer method let's understand the concept of Context:


context.write(new Text(Count), new IntWritable(1));


Context is the only means of communication between map and reduce methods. Whatever value is passed to the reduce method through map method is passed through the object of context. The parameter of context is context.write(key_out,value_out). So the types key_out and value_out should be same as given in the Mapper class's declaration. Now move on to the reduce method:


public void reduce(Text key, Iterable values, Context context)


Here also reduce method has 3 arguments.The first two arguments are the same as of map method's parameters i.e. key_out, value_out, in this case Text and IntWritable. But here instead of IntWritable we have written Iterable. This is due to the output format of the map method.Let us understand this with the help of an example.Suppose our input file has this data.

He is a king. He is nice.

Now internally the map method will emit the key and value like this.

< He,(1)>
< is,(1)>
< a,(1)>
< king.,(1)>
< He,(1)>
< is,(1)>
< nice,(1)>

Before the result reaches reducer , short and suffle phase conert the result into this intermeddiate reult

< He,(1,1)>
< is,(1,1)>
< a,(1)>
< king.,(1)>
< nice,(1)>

Reducer receives the above intermeddiate result and all the logics of reduce method are performed on this result not on the result produced by the map method.So the reduce method will just add the number of 1's in order to count the total occurences of the token. Here is the value which it emits along with the number of occurences. So the final result would be.

He 2
is 2
a 1
nice 1