Saturday 11 January 2014

Map Side Join And Reduce Side Join


Joins is one of the interesting features available in MapReduce. MapReduce can perform joins between very large datasets.Implementation of join depends on how large the datasets are and how they are partiotioned . If the join is performed by the mapper, it is called a map-side join, whereas if it is performed by the reducer it is called a reduce-side join.

If both datasets are too large for either to be copied to each node in the cluster, we can still join them using MapReduce with a map-side or reduce-side join, depending on how the data is structured. One common example of this case is a user database and a log of some user activity (such as access logs). For a popular service, it is not feasible to distribute the user database (or the logs) to all the MapReduce nodes. Before diving into the implementation let us understand the problem thoroughly.

If we have two datasets, for example, one dataset having user ids, names and the other having the user activity over the application. In-order to find out which user have performed what activity on the application we might need to join these two datasets such as both user names and the user activity will be joined together. Join can be applied based on the dataset size if one dataset is very small to be distributed across the cluster then we can use Side Data Distribution technique.

Map Side join


A map-side join between large inputs works by performing the join before the data reaches the map function. For this to work, though, the inputs to each map must be partitioned and sorted in a particular way. Each input data set must be divided into the same number of partitions, and it must be sorted by the same key (the join key) in each source. All the records for a particular key must reside in the same partition. This may sound like a strict requirement (and it is), but it actually fits the description of the output of a MapReduce job.

A map-side join can be used to join the outputs of several jobs that had the same number of reducers, the same keys, and output files that are not splittable which means the ouput files should not be bigger than the HDFS block size. Using the org.apache.hadoop.mapred.join.CompositeInputFormat class we can achieve this.



Reduce Side join


Reduce-Side joins are more simple than Map-Side joins since the input datasets need not to be structured. But it is less efficient as both datasets have to go through the MapReduce shuffle phase. the records with the same key are brought together in the reducer. We can also use the Secondary Sort technique to control the order of the records.

How it is done?
  • The key of the map output, of datasets being joined, has to be the join key - so they reach the same reducer.
  • Each dataset has to be tagged with its identity, in the mapper- to help differentiate between the datasets in the reducer, so they can be processed accordingly.
  • In each reducer, the data values from both datasets, for keys assigned to the reducer, are available, to be processed as required.
  • A secondary sort needs to be done to ensure the ordering of the values sent to the reducer.
  • If the input files are of different formats, we would need separate mappers, and we would need to use MultipleInputs class in the driver to add the inputs and associate the specific mapper to the same.
[MultipleInputs.addInputPath( job, (input path n), (inputformat class), (mapper class n));] Note: The join between the datasets (employee, current salary - cardinality of 1..1) in the sample program below has been demonstrated in my blog on map side joins of large datasets, as well. I have used the same datasets here as the purpose of this blog is to demonstrate the concept. Whenever possible, reduce-side joins should be avoided.