Monday, 27 January 2014

Multiple Input Files In MapReduce: Side Data Distribution

You may come to face problems which require more than one input files. For instance, you may want to join records from two input files. In such cases, where we want to use more than one input file, we have following options to do that.

  1. First, we can put the number of input files we want to use in a single directory, and give the path of directory as input file path.
  2. Second, we can use the concept of side data distribution, which implements distributed cache API.
  3. Third, we can simply use for more than one input files, and specify their paths.
Let us understand first two approaches here(Third method will be explained in my next post).

In first approach, we just put all input files in a single directory and give the path of the directory. This approach has a limitation that we can't use input files with different data structures. Thus this approach is of very limited use. In second approach, we use a main (usually large) input file or main dataset and other small input files. Ever heard the term "Look up file" ? In our case understand it in this way: It is a file containing very less volume of data compared to our main input file ( look up files in Distributed Cache ). This approach implements the concept of side data distribution. Side data can be defined as extra read-only data needed by a job to process the main dataset.

Distributed Cache

Rather than serializing side data in the job configuration, it is preferable to distribute datasets using Hadoop’s distributed cache mechanism. This provides a service for copying files and archives to the task nodes in time for the tasks to use them when they run. To save network bandwidth, files are normally copied to any particular node once per job. To understand this concept more clearly, take this example: Suppose we have two input files, one small and another comparatively large. Let us assume this the larger file i.e the input file .

101 Vince 12000
102 James 33
103 Tony 32
104 John 25
105 Nataliya 19
106 Anna 20
107 Harold 29
And this is the smaller file.

101 Vince 12000
102 James 10000
103 Tony 20000
104 John 25000
105 Nataliya 15000

Now what we want is to get those results which have common Id Number. So, in order to achieve this use smaller file as look up file and larger file as input file. The complete java code and explanation of each component is given below:
public class Join extends Configured implements Tool { public static class JoinMapper extends Mapper { Path[] cachefiles = new Path[0]; //To store the path of lookup files List exEmployees = new ArrayList();//To store the data of lookup files /********************Setup Method******************************************/ @Override public void setup(Context context) { Configuration conf = context.getConfiguration(); try { cachefiles = DistributedCache.getLocalCacheFiles(conf); BufferedReader reader = new BufferedReader(new FileReader(cachefiles[0].toString())); String line; while ((line = reader.readLine())!= null) { exEmployees.add(line); //Data of lookup files get stored in list object } } catch (IOException e) { e.printStackTrace(); } } setup method ends /***********************************************************************/ /********************Map Method******************************************/ public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] line = value.toString().split("\t"); for (String e : exEmployees) { String[] listLine = e.toString().split("\t"); if(line[0].equals(listLine[0])) { context.write(new Text(line[0]), new Text(line[1]+"\t"+line[2]+"\t"+listLine[2])); } } } //map method ends /***********************************************************************/ } /********************run Method******************************************/ public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "aggprog"); job.setJarByClass(Join.class); DistributedCache.addCacheFile(new Path(args[0]).toUri(), job.getConfiguration()); FileInputFormat.addInputPath(job, new Path(args [1])); FileOutputFormat.setOutputPath(job, new Path(args [2])); job.setMapperClass(JoinMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); return (job.waitForCompletion(true) ? 0 : 1); } public static void main (String[] args) throws Exception { int ecode = ToolRunner.run(new Join(), args); System.exit(ecode); } }
This is the result we will get after running the above code.

102 James 33 10000 103 Tony 32 20000 104 John 25 25000 105 Nataliya 19 15000

Run() Method:


In run() Method, we used
public void addCacheFile(URI uri)
method to add file to distributed cache. If you go through code carefully, you will notice there is no reduce() method. Hence, there is no job.setReducerClass() in run method. In our above example there is in fact no need for using reducer as the common Id numbers are identified in map method only. Due to the same reason the job.setOutputkeyClass(Text.class); and job.setOutputValueClass(Text.class); have data-types of key_out and value_out datatypes of the mapper, and not the data-types of reducer.

Setup() method :


In setup() method,
cachefiles=DistributedCache.getLocalCacheFiles(conf);
is very important to understand. Here we are extracting the path of the file in distributed cache.
BufferedReader reader = new BufferedReader(new FileReader(cachefiles[0].toString()));
After that we have stored the contents of the file using BufferReader in a List object for further operations. Remember when the input files were created, we gave tab("\t") as delimiter to read it properly later.

Map() method :


In map method, we receive and extract the lines of main dataset one by one, break them into words, by using tab("\t") as delimiter, parse them into string and store them in a string array( String[] line).
String[] line = value.toString().split("\t");

We do the same processing with contents of the string to match the id i.e. first column of both the main data set and the look up file.
String[] listLine = e.toString().split("\t");
If the Id number matches i.e. Id of a record in main dataset is also present in the look up file, then the contents of both the files are emitted using context object.
if(line[0].equals(listLine[0])); context.write(new Text(line[0]), new Text(line[1]+"\t"+line[2]+"\t"+listLine[2]));

34 comments:

  1. If anything in the code is not clear.. feel free to ask...!!!

    ReplyDelete
  2. Hi are you working on Hadoop ? I need some real time knowledge yaar if that's is ok with you.

    ReplyDelete
  3. great work bro!!!!
    i have small doubt can u tell me have u faced any real time problems using mapreduce,hive,pig. Some scenarios with example. That would be helpful.

    ReplyDelete
  4. in real world scenarios did you face any requirement to write new writable, comparator, partitioner etc. I am having difficulty in connecting the dots as I don't have real time knowledge and the use cases... if you can write an article on the real time scenarios that would be of great help for people like me... as material is available everywhere but not the usage... hope I am not asking too much.... :) ... just my thought...
    I can contact you via mail or phone if I am not clear on my question ....

    ReplyDelete
    Replies
    1. Sure thing bro... I will write a couple of posts on real time scenarios very soon !!

      Delete
  5. Aman,
    i need an example without using distributed cache using joins in map-reduce?is it correct way to do that ?if yes give me example

    ReplyDelete
  6. Hi aman,
    i am new for hadoop.. please tell me some real time scenario of mapreduce which you used in your project..

    ReplyDelete
  7. Thanks for sharing the valuable information to share with us. For more informarion please visit our website. Get Information Regarding Semi IT Course Hadoop Training In Hyderabad#Book Now Online

    ReplyDelete
  8. Nice article, Thanks for sharing the more valuable information to share with us. For more details please visit our website. Class Room Based Learning for Hadoop Training In Ameerpet// Visit Our Site

    ReplyDelete
  9. This comment has been removed by the author.

    ReplyDelete
  10. Very Excellent Post! Thank you so much for sharing this good post, it was so nice to read and useful to improve my Technical knowledge as updated one, keep blogging.
    Map Reduce Training in Electronic city

    ReplyDelete
  11. Herpes Virus whether it is oral or genital. To control its symptoms, you usually do many things but it doesn’t give you the expected results. And sometimes some medicines can even give you side effects which can make your situation more critical. Personally I always prefer natural cure for herpes Or any Other Infection because they won’t give you side effects. You can cure your infection/Diseases smoothly and with less trouble with natural remedies. I Strongly Recommend Herbal doctor Razor's Traditional Medicine , Get in touch with him on his Facebook Page https://web.facebook.com/HerbalistrazorMedicinalcure He is blessed with the wisdom to get rid of this virus and other Diseases. I had suffered from this Virus since I was a child, I'd learnt to live with it but still wanted to get cured of it and DOC RAZOR simply helped me with that . All thanks To Doctor Razor Who Rescued Me. Contact him on email : drrazorherbalhome@gmail.com, . Reach Him directly on https://wa.me/message/USI4SETUUEW4H1

    ReplyDelete