In the previous issue of this series, we discussed a simple method of using multiple input files : Side Data Distribution. But it was of limited use as input files can only be of minimal size. In this issue, we’ll use our playground to investigate another approach to facilitate multiple input files offered by Hadoop.
This approach as a matter of fact is very simple and effective. Here we simply need to understand the concept of number of mappers needed. As you may know, mapper extract its input from the input file. When there are more than input file , we need the same number of mapper to read records from input files. For instance, if we are using two input files then we need two mapper classes.
We use MultipleInputs class which supports MapReduce jobs that have multiple input paths with a different InputFormat and Mapper for each path. To understand the concept more clearly let us take a case where user want to take input from two input files with similar structure. Also assume that both the input files have 2 columns, first having "Name" and second having "Age". We want to simply combine the data and sort it by "Name". What we need to do? Just two things:
This approach as a matter of fact is very simple and effective. Here we simply need to understand the concept of number of mappers needed. As you may know, mapper extract its input from the input file. When there are more than input file , we need the same number of mapper to read records from input files. For instance, if we are using two input files then we need two mapper classes.
We use MultipleInputs class which supports MapReduce jobs that have multiple input paths with a different InputFormat and Mapper for each path. To understand the concept more clearly let us take a case where user want to take input from two input files with similar structure. Also assume that both the input files have 2 columns, first having "Name" and second having "Age". We want to simply combine the data and sort it by "Name". What we need to do? Just two things:
- Use two mapper classes.
- Specify the mapper classes in MultipleInputs class object in run/main method.
Here is the code for the same. Notice two mapper classes with same logic and only single reducer.
File 1 File 2 Aman 19 Ash 12 Tom 20 James 21 Tony 15 Punk 21 John 18 Frank 20 Johnny 19 Hugh 17
import java.io.IOException;
import mutipleInput.Join;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class multiInputFile extends Configured implements Tool
{
public static class CounterMapper extends Mapper
{
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException
{
String[] line=value.toString().split("\t");
context.write(new Text(line[0]), new Text(line[1]));
}
}
public static class CountertwoMapper extends Mapper
{
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException
{
String[] line=value.toString().split("\t");
context.write(new Text(line[0]), new Text(line[1]));
}
}
public static class CounterReducer extends Reducer
{
String line=null;
public void reduce(Text key, Iterable values, Context context )
throws IOException, InterruptedException
{
for(Text value:values)
{
line = value.toString();
}
context.write(key, new Text(line));
}
}
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "aggprog");
job.setJarByClass(multiInputFile.class);
MultipleInputs.addInputPath(job,new Path(args[0]),TextInputFormat.class,CounterMapper.class);
MultipleInputs.addInputPath(job,new Path(args[1]),TextInputFormat.class,CountertwoMapper.class);
FileOutputFormat.setOutputPath(job, new Path(args[2]));
job.setReducerClass(CounterReducer.class);
job.setNumReduceTasks(1);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
return (job.waitForCompletion(true) ? 0 : 1);
}
public static void main(String[] args) throws Exception {
int ecode = ToolRunner.run(new multiInputFile(), args);
System.exit(ecode);
}
}
Here is the output.
Ash 12 Tony 15 Hugh 17 John 18 Aman 19 Johnny 19 Frank 20 Tom 20 James 21 Punk 21