Thursday 23 January 2014

Partitioning in MapReduce


As you may know, when a job (it is a MapReduce term for program) is run it goes to the the mapper, and the output of the mapper goes to the reducer. Ever wondered how many mapper and how many reducers is required for a job execution? What are parameters taken into consideration for deciding number of mapper and reducer required in order to complete the execution of a job successfully? Can we decide it? If yes then how?

Let us understand the concept of passing the job to jobtracker upto the final result produced by the reducer.

Initially the job resides on the HDFS, when it is executed it goes to the jobtracker. Now jobtracker decides on the bais of size of the job that how many mappers are required. In MapReduce the size of the block is 128 MB, so if the size of the job is of 256 MB then jobtracker split the job into two blocks, each of 128 MB. These blocks are sent to datanodes or Tasktracer for execution. Each datanode has 2 mappers slots and 2 reducers slots. Now jobtrackers has the option to choose which mapper slot it want to assign the block.

How does jobtracker decides which mapper slot to use and from which datanode?

In our example our 256 MB block was splitted into 2 128 MB blocks. Suppose there are two datanodes availablw , with all empty mapper slots.Now there are two possibilities:
  1. Either jobtracker assigns both blocks to a single tasktracker.
  2. Or jobtracker assigns one block to one tasktracker and one to another.

Jobtracker will follow the second approach.It will assign one 128 block to one mapper slot of a tasktracker/datanode and another 128 MB block to another tasktracker/datanode. If another job comes then jobtracker can use the unused mapper slot of the datanode. After mapper's job is done the output of the mapper goes to one of the reducers. Which one? The mechanism sending specific key-value pairs to specific reducers is called partitioning. In Hadoop, the default partitioner is HashPartitioner, which hashes a record’s key to determine which partition (and thus which reducer) the record belongs in.The number of partition is then equal to the number of reduce tasks for the job.

Why Partitioning is important? First, partitioning has a direct impact on the overall performance of job we want to run. Second, it maybe sometimes required to control the key/value pairs (emitted from mapper) partitioning over the reducers. Let's understand this with the help of a simple example. Suppose we want to sort the output of the wordcount on the basis of number of occurences of tokens. Assume that our job will be handled by 2 reducers( We can specify that by using conf.setNumReduceTasks(0);). If we run our job without using any user defined partitioner, we will get output like this:
No_Of_Occur   Word/Token                No_Of_Occur   Word/Token

1                Hi                      2              so
3                a                       4              because
6                the                     5              is

      Reducer 1                               Reducer 2

This is certainly not what we wanted. Intead we were expecting the output to come like this:
No_Of_Occur   Word/Token                No_Of_Occur   Word/Token

1                Hi                         4              because
2                so                         5              is
3                a                          6              the

      Reducer 1                               Reducer 2

This would happen if we use correct partitioning function: all the tokens having a number of occurrences less than N (here 4) are sent to reducer 1 and the others are sent to reducer 2, resulting in two partitions. Since the tokens are sorted on each partition, we get the expected total order on the number of occurrences. Suppose we hava sample data.

aman 1
ashima 2
kaushik 3
sood 4
tony 5
stark 6
bruce 7
wayne 8
james 9
bond 10
mark 11
zuckerberg 12
saun 13
parker 14

And we want the result in such a way that names with number from 1 to 5 should appear in one file and rest in another file.Here is the code to achieve that:

package Partitioner; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class UserDefinedPartitioner { static String[] line=null; public static class PartitionerMap extends Mapper { @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { line=value.toString().split("\t"); context.write(new Text(line[0]), new Text(line[1])); } } public static class MyPartitioner extends org.apache.hadoop.mapreduce.Partitioner&ltText,Text&gt { @Override public int getPartition(Text key, Text value, int numPartitions) { int seed =Integer.parseInt(line[1]); if((seed>=1)&&(seed<=5)) return 0; else return 1; } } public static void main(String args[]) throws Exception { Job job = new Job(); job.setJarByClass(UserDefinedPartitioner.class); FileInputFormat.addInputPath(job, new Path(args [0])); FileOutputFormat.setOutputPath(job, new Path(args [1])); job.setMapperClass(PartitionerMap.class); job.setPartitionerClass(MyPartitioner.class); job.setNumReduceTasks(2); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
When we will list the contents of our output folder, we will get to files, each having result of one partitioner

$ hadoop fs -ls training/aman/nwc14 Found 3 items -rw-r--r-- 2 gpadmin hadoop 0 2014-01-24 16:17 training/aman/nwc14/_SUCCESS -rw-r--r-- 2 gpadmin hadoop 0 2014-01-24 16:17 training/aman/nwc14/part-r-00000 -rw-r--r-- 2 gpadmin hadoop 120 2014-01-24 16:17 training/aman/nwc14/part-r-00001 $ hadoop fs -cat training/aman/nwc15/part-r-00001 bond 10 bruce 7 james 9 mark 11 parker 14 saun 13 stark 6 wayne 8 zuckerberg 12 $ hadoop fs -cat training/aman/nwc15/part-r-00000 aman 1 ashima 2 kaushik 3 sood 4 tony 5
Obsereve the job.setNumReduceTasks(2); line in run method? If we don't write this line our code will work. Two partitions would be created but if size of the partition in too small, chances are there that only one outfile file would be created . So to delibrately tell the compiler to create two output files for each partition this line must be used.