Thursday 30 January 2014

Mutiple Input Files In MapReduce: The Easy Way

In the previous issue of this series, we discussed a simple method of using multiple input files : Side Data Distribution. But it was of limited use as input files can only be of minimal size. In this issue, we’ll use our playground to investigate another approach to facilitate multiple input files offered by Hadoop.

This approach as a matter of fact is very simple and effective. Here we simply need to understand the concept of number of mappers needed. As you may know, mapper extract its input from the input file. When there are more than input file , we need the same number of mapper to read records from input files. For instance, if we are using two input files then we need two mapper classes.

We use MultipleInputs class which supports MapReduce jobs that have multiple input paths with a different InputFormat and Mapper for each path. To understand the concept more clearly let us take a case where user want to take input from two input files with similar structure. Also assume that both the input files have 2 columns, first having "Name" and second having "Age". We want to simply combine the data and sort it by "Name". What we need to do? Just two things:
  1. Use two mapper classes.
  2. Specify the mapper classes in MultipleInputs class object in run/main method.

File 1 File 2 Aman 19 Ash 12 Tom 20 James 21 Tony 15 Punk 21 John 18 Frank 20 Johnny 19 Hugh 17
Here is the code for the same. Notice two mapper classes with same logic and only single reducer.
import java.io.IOException; import mutipleInput.Join; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class multiInputFile extends Configured implements Tool { public static class CounterMapper extends Mapper { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] line=value.toString().split("\t"); context.write(new Text(line[0]), new Text(line[1])); } } public static class CountertwoMapper extends Mapper { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] line=value.toString().split("\t"); context.write(new Text(line[0]), new Text(line[1])); } } public static class CounterReducer extends Reducer { String line=null; public void reduce(Text key, Iterable values, Context context ) throws IOException, InterruptedException { for(Text value:values) { line = value.toString(); } context.write(key, new Text(line)); } } public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "aggprog"); job.setJarByClass(multiInputFile.class); MultipleInputs.addInputPath(job,new Path(args[0]),TextInputFormat.class,CounterMapper.class); MultipleInputs.addInputPath(job,new Path(args[1]),TextInputFormat.class,CountertwoMapper.class); FileOutputFormat.setOutputPath(job, new Path(args[2])); job.setReducerClass(CounterReducer.class); job.setNumReduceTasks(1); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); return (job.waitForCompletion(true) ? 0 : 1); } public static void main(String[] args) throws Exception { int ecode = ToolRunner.run(new multiInputFile(), args); System.exit(ecode); } }
Here is the output.
Ash 12
Tony 15
Hugh 17
John 18
Aman 19 
Johnny 19
Frank 20          
Tom  20
James 21
Punk 21