Monday 27 January 2014

Multiple Input Files In MapReduce: Side Data Distribution

You may come to face problems which require more than one input files. For instance, you may want to join records from two input files. In such cases, where we want to use more than one input file, we have following options to do that.

  1. First, we can put the number of input files we want to use in a single directory, and give the path of directory as input file path.
  2. Second, we can use the concept of side data distribution, which implements distributed cache API.
  3. Third, we can simply use for more than one input files, and specify their paths.
Let us understand first two approaches here(Third method will be explained in my next post).

In first approach, we just put all input files in a single directory and give the path of the directory. This approach has a limitation that we can't use input files with different data structures. Thus this approach is of very limited use. In second approach, we use a main (usually large) input file or main dataset and other small input files. Ever heard the term "Look up file" ? In our case understand it in this way: It is a file containing very less volume of data compared to our main input file ( look up files in Distributed Cache ). This approach implements the concept of side data distribution. Side data can be defined as extra read-only data needed by a job to process the main dataset.

Distributed Cache

Rather than serializing side data in the job configuration, it is preferable to distribute datasets using Hadoop’s distributed cache mechanism. This provides a service for copying files and archives to the task nodes in time for the tasks to use them when they run. To save network bandwidth, files are normally copied to any particular node once per job. To understand this concept more clearly, take this example: Suppose we have two input files, one small and another comparatively large. Let us assume this the larger file i.e the input file .

101 Vince 12000
102 James 33
103 Tony 32
104 John 25
105 Nataliya 19
106 Anna 20
107 Harold 29
And this is the smaller file.

101 Vince 12000
102 James 10000
103 Tony 20000
104 John 25000
105 Nataliya 15000

Now what we want is to get those results which have common Id Number. So, in order to achieve this use smaller file as look up file and larger file as input file. The complete java code and explanation of each component is given below:
public class Join extends Configured implements Tool { public static class JoinMapper extends Mapper { Path[] cachefiles = new Path[0]; //To store the path of lookup files List exEmployees = new ArrayList();//To store the data of lookup files /********************Setup Method******************************************/ @Override public void setup(Context context) { Configuration conf = context.getConfiguration(); try { cachefiles = DistributedCache.getLocalCacheFiles(conf); BufferedReader reader = new BufferedReader(new FileReader(cachefiles[0].toString())); String line; while ((line = reader.readLine())!= null) { exEmployees.add(line); //Data of lookup files get stored in list object } } catch (IOException e) { e.printStackTrace(); } } setup method ends /***********************************************************************/ /********************Map Method******************************************/ public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] line = value.toString().split("\t"); for (String e : exEmployees) { String[] listLine = e.toString().split("\t"); if(line[0].equals(listLine[0])) { context.write(new Text(line[0]), new Text(line[1]+"\t"+line[2]+"\t"+listLine[2])); } } } //map method ends /***********************************************************************/ } /********************run Method******************************************/ public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "aggprog"); job.setJarByClass(Join.class); DistributedCache.addCacheFile(new Path(args[0]).toUri(), job.getConfiguration()); FileInputFormat.addInputPath(job, new Path(args [1])); FileOutputFormat.setOutputPath(job, new Path(args [2])); job.setMapperClass(JoinMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); return (job.waitForCompletion(true) ? 0 : 1); } public static void main (String[] args) throws Exception { int ecode = ToolRunner.run(new Join(), args); System.exit(ecode); } }
This is the result we will get after running the above code.

102 James 33 10000 103 Tony 32 20000 104 John 25 25000 105 Nataliya 19 15000

Run() Method:


In run() Method, we used
public void addCacheFile(URI uri)
method to add file to distributed cache. If you go through code carefully, you will notice there is no reduce() method. Hence, there is no job.setReducerClass() in run method. In our above example there is in fact no need for using reducer as the common Id numbers are identified in map method only. Due to the same reason the job.setOutputkeyClass(Text.class); and job.setOutputValueClass(Text.class); have data-types of key_out and value_out datatypes of the mapper, and not the data-types of reducer.

Setup() method :


In setup() method,
cachefiles=DistributedCache.getLocalCacheFiles(conf);
is very important to understand. Here we are extracting the path of the file in distributed cache.
BufferedReader reader = new BufferedReader(new FileReader(cachefiles[0].toString()));
After that we have stored the contents of the file using BufferReader in a List object for further operations. Remember when the input files were created, we gave tab("\t") as delimiter to read it properly later.

Map() method :


In map method, we receive and extract the lines of main dataset one by one, break them into words, by using tab("\t") as delimiter, parse them into string and store them in a string array( String[] line).
String[] line = value.toString().split("\t");

We do the same processing with contents of the string to match the id i.e. first column of both the main data set and the look up file.
String[] listLine = e.toString().split("\t");
If the Id number matches i.e. Id of a record in main dataset is also present in the look up file, then the contents of both the files are emitted using context object.
if(line[0].equals(listLine[0])); context.write(new Text(line[0]), new Text(line[1]+"\t"+line[2]+"\t"+listLine[2]));