Thursday, 30 January 2014

Mutiple Input Files In MapReduce: The Easy Way

In the previous issue of this series, we discussed a simple method of using multiple input files : Side Data Distribution. But it was of limited use as input files can only be of minimal size. In this issue, we’ll use our playground to investigate another approach to facilitate multiple input files offered by Hadoop.

This approach as a matter of fact is very simple and effective. Here we simply need to understand the concept of number of mappers needed. As you may know, mapper extract its input from the input file. When there are more than input file , we need the same number of mapper to read records from input files. For instance, if we are using two input files then we need two mapper classes.

We use MultipleInputs class which supports MapReduce jobs that have multiple input paths with a different InputFormat and Mapper for each path. To understand the concept more clearly let us take a case where user want to take input from two input files with similar structure. Also assume that both the input files have 2 columns, first having "Name" and second having "Age". We want to simply combine the data and sort it by "Name". What we need to do? Just two things:
  1. Use two mapper classes.
  2. Specify the mapper classes in MultipleInputs class object in run/main method.

File 1 File 2 Aman 19 Ash 12 Tom 20 James 21 Tony 15 Punk 21 John 18 Frank 20 Johnny 19 Hugh 17
Here is the code for the same. Notice two mapper classes with same logic and only single reducer.
import java.io.IOException; import mutipleInput.Join; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class multiInputFile extends Configured implements Tool { public static class CounterMapper extends Mapper { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] line=value.toString().split("\t"); context.write(new Text(line[0]), new Text(line[1])); } } public static class CountertwoMapper extends Mapper { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] line=value.toString().split("\t"); context.write(new Text(line[0]), new Text(line[1])); } } public static class CounterReducer extends Reducer { String line=null; public void reduce(Text key, Iterable values, Context context ) throws IOException, InterruptedException { for(Text value:values) { line = value.toString(); } context.write(key, new Text(line)); } } public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "aggprog"); job.setJarByClass(multiInputFile.class); MultipleInputs.addInputPath(job,new Path(args[0]),TextInputFormat.class,CounterMapper.class); MultipleInputs.addInputPath(job,new Path(args[1]),TextInputFormat.class,CountertwoMapper.class); FileOutputFormat.setOutputPath(job, new Path(args[2])); job.setReducerClass(CounterReducer.class); job.setNumReduceTasks(1); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); return (job.waitForCompletion(true) ? 0 : 1); } public static void main(String[] args) throws Exception { int ecode = ToolRunner.run(new multiInputFile(), args); System.exit(ecode); } }
Here is the output.
Ash 12
Tony 15
Hugh 17
John 18
Aman 19 
Johnny 19
Frank 20          
Tom  20
James 21
Punk 21

Monday, 27 January 2014

Multiple Input Files In MapReduce: Side Data Distribution

You may come to face problems which require more than one input files. For instance, you may want to join records from two input files. In such cases, where we want to use more than one input file, we have following options to do that.

  1. First, we can put the number of input files we want to use in a single directory, and give the path of directory as input file path.
  2. Second, we can use the concept of side data distribution, which implements distributed cache API.
  3. Third, we can simply use for more than one input files, and specify their paths.
Let us understand first two approaches here(Third method will be explained in my next post).

In first approach, we just put all input files in a single directory and give the path of the directory. This approach has a limitation that we can't use input files with different data structures. Thus this approach is of very limited use. In second approach, we use a main (usually large) input file or main dataset and other small input files. Ever heard the term "Look up file" ? In our case understand it in this way: It is a file containing very less volume of data compared to our main input file ( look up files in Distributed Cache ). This approach implements the concept of side data distribution. Side data can be defined as extra read-only data needed by a job to process the main dataset.

Distributed Cache

Rather than serializing side data in the job configuration, it is preferable to distribute datasets using Hadoop’s distributed cache mechanism. This provides a service for copying files and archives to the task nodes in time for the tasks to use them when they run. To save network bandwidth, files are normally copied to any particular node once per job. To understand this concept more clearly, take this example: Suppose we have two input files, one small and another comparatively large. Let us assume this the larger file i.e the input file .

101 Vince 12000
102 James 33
103 Tony 32
104 John 25
105 Nataliya 19
106 Anna 20
107 Harold 29
And this is the smaller file.

101 Vince 12000
102 James 10000
103 Tony 20000
104 John 25000
105 Nataliya 15000

Now what we want is to get those results which have common Id Number. So, in order to achieve this use smaller file as look up file and larger file as input file. The complete java code and explanation of each component is given below:
public class Join extends Configured implements Tool { public static class JoinMapper extends Mapper { Path[] cachefiles = new Path[0]; //To store the path of lookup files List exEmployees = new ArrayList();//To store the data of lookup files /********************Setup Method******************************************/ @Override public void setup(Context context) { Configuration conf = context.getConfiguration(); try { cachefiles = DistributedCache.getLocalCacheFiles(conf); BufferedReader reader = new BufferedReader(new FileReader(cachefiles[0].toString())); String line; while ((line = reader.readLine())!= null) { exEmployees.add(line); //Data of lookup files get stored in list object } } catch (IOException e) { e.printStackTrace(); } } setup method ends /***********************************************************************/ /********************Map Method******************************************/ public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] line = value.toString().split("\t"); for (String e : exEmployees) { String[] listLine = e.toString().split("\t"); if(line[0].equals(listLine[0])) { context.write(new Text(line[0]), new Text(line[1]+"\t"+line[2]+"\t"+listLine[2])); } } } //map method ends /***********************************************************************/ } /********************run Method******************************************/ public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "aggprog"); job.setJarByClass(Join.class); DistributedCache.addCacheFile(new Path(args[0]).toUri(), job.getConfiguration()); FileInputFormat.addInputPath(job, new Path(args [1])); FileOutputFormat.setOutputPath(job, new Path(args [2])); job.setMapperClass(JoinMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); return (job.waitForCompletion(true) ? 0 : 1); } public static void main (String[] args) throws Exception { int ecode = ToolRunner.run(new Join(), args); System.exit(ecode); } }
This is the result we will get after running the above code.

102 James 33 10000 103 Tony 32 20000 104 John 25 25000 105 Nataliya 19 15000

Run() Method:


In run() Method, we used
public void addCacheFile(URI uri)
method to add file to distributed cache. If you go through code carefully, you will notice there is no reduce() method. Hence, there is no job.setReducerClass() in run method. In our above example there is in fact no need for using reducer as the common Id numbers are identified in map method only. Due to the same reason the job.setOutputkeyClass(Text.class); and job.setOutputValueClass(Text.class); have data-types of key_out and value_out datatypes of the mapper, and not the data-types of reducer.

Setup() method :


In setup() method,
cachefiles=DistributedCache.getLocalCacheFiles(conf);
is very important to understand. Here we are extracting the path of the file in distributed cache.
BufferedReader reader = new BufferedReader(new FileReader(cachefiles[0].toString()));
After that we have stored the contents of the file using BufferReader in a List object for further operations. Remember when the input files were created, we gave tab("\t") as delimiter to read it properly later.

Map() method :


In map method, we receive and extract the lines of main dataset one by one, break them into words, by using tab("\t") as delimiter, parse them into string and store them in a string array( String[] line).
String[] line = value.toString().split("\t");

We do the same processing with contents of the string to match the id i.e. first column of both the main data set and the look up file.
String[] listLine = e.toString().split("\t");
If the Id number matches i.e. Id of a record in main dataset is also present in the look up file, then the contents of both the files are emitted using context object.
if(line[0].equals(listLine[0])); context.write(new Text(line[0]), new Text(line[1]+"\t"+line[2]+"\t"+listLine[2]));

Friday, 24 January 2014

Secondary Sort In MapReduce: "Sort By Value"

As you may know, MapReduce by defult sorts the keys( Shuffle and Sort Phase) before sending the records to reducers. However, the values are not sorted. The order in which values appear to reducers differ from run to run. This is due to the fact that values are emitted from different map tasks, which may finish at different times from run to run. Generally, MapReduce programs are written in such a way that the order of values reaching reduce method doesn't matter. But if we want to impose an order on the values by sorting and grouping the keys in a particular way? Or if we also want to sort by value?

Let's understand the concept of secondary sorting with the help of an example. Consider the MapReduce program for calculating the maximum temperature for each year ( I shamelessly admit that I am taking this example from " Hadoop, The definitive Guide" and the data used is weather data set). With a slight modification in the format of the keys, secondary sorting gives us the ability to take the value into account during the sort phase. There are two possible approaches which can be followed.

The first approach involves having the reducer buffer all of the values for a given key and do an in-reducer sort on the values. Since the reducer will be receiving all values for a given key, this approach could possibly cause the reducer to run out of memory. The second approach involves creating a composite key by adding a part of, or the entire value to the natural key to achieve your sorting objectives.

We will stick to the second approach for the time being. For this we will need to write a custom partitioner to ensure all the data with same key (the natural key not including the composite key with the value) is sent to the same reducer and a custom Comparator so the data is grouped by the natural key once it arrives at the reducer. To achieve this, we change our keys to be composite: a combination of year and temperature. We want the sort order for keys to be by year (ascending) and then by temperature (descending): favorite According to the definitive guide example of secondary sorting We want the sort order for keys to be by year (ascending) and then by temperature (descending):

1900 35°C
1900 34°C 
1900 34°C
 ... 
1901 36°C 
1901 35°C

By setting a partitioner to partition by the year part of the key, we can guarantee that records for the same year go to the same reducer. This still isn’t enough to achieve our goal, however. A partitioner ensures only that one reducer receives all the records for a year; it doesn’t change the fact that the reducer groups by key within the partition Since we would have already written our own partitioner which would take care of the map output keys going to particular reducer". So, in order to get the desired reult we are going to need 3 main components:

  1. Key should be composite, having both year(natural key) and temperature(natural value).
  2. A partitioner which would pass common years to same partition.
  3. Two comparator,one for comparing year and another for comparing temperature.

Thursday, 23 January 2014

Partitioning in MapReduce


As you may know, when a job (it is a MapReduce term for program) is run it goes to the the mapper, and the output of the mapper goes to the reducer. Ever wondered how many mapper and how many reducers is required for a job execution? What are parameters taken into consideration for deciding number of mapper and reducer required in order to complete the execution of a job successfully? Can we decide it? If yes then how?

Let us understand the concept of passing the job to jobtracker upto the final result produced by the reducer.

Initially the job resides on the HDFS, when it is executed it goes to the jobtracker. Now jobtracker decides on the bais of size of the job that how many mappers are required. In MapReduce the size of the block is 128 MB, so if the size of the job is of 256 MB then jobtracker split the job into two blocks, each of 128 MB. These blocks are sent to datanodes or Tasktracer for execution. Each datanode has 2 mappers slots and 2 reducers slots. Now jobtrackers has the option to choose which mapper slot it want to assign the block.

How does jobtracker decides which mapper slot to use and from which datanode?

In our example our 256 MB block was splitted into 2 128 MB blocks. Suppose there are two datanodes availablw , with all empty mapper slots.Now there are two possibilities:
  1. Either jobtracker assigns both blocks to a single tasktracker.
  2. Or jobtracker assigns one block to one tasktracker and one to another.

Jobtracker will follow the second approach.It will assign one 128 block to one mapper slot of a tasktracker/datanode and another 128 MB block to another tasktracker/datanode. If another job comes then jobtracker can use the unused mapper slot of the datanode. After mapper's job is done the output of the mapper goes to one of the reducers. Which one? The mechanism sending specific key-value pairs to specific reducers is called partitioning. In Hadoop, the default partitioner is HashPartitioner, which hashes a record’s key to determine which partition (and thus which reducer) the record belongs in.The number of partition is then equal to the number of reduce tasks for the job.

Why Partitioning is important? First, partitioning has a direct impact on the overall performance of job we want to run. Second, it maybe sometimes required to control the key/value pairs (emitted from mapper) partitioning over the reducers. Let's understand this with the help of a simple example. Suppose we want to sort the output of the wordcount on the basis of number of occurences of tokens. Assume that our job will be handled by 2 reducers( We can specify that by using conf.setNumReduceTasks(0);). If we run our job without using any user defined partitioner, we will get output like this:
No_Of_Occur   Word/Token                No_Of_Occur   Word/Token

1                Hi                      2              so
3                a                       4              because
6                the                     5              is

      Reducer 1                               Reducer 2

This is certainly not what we wanted. Intead we were expecting the output to come like this:
No_Of_Occur   Word/Token                No_Of_Occur   Word/Token

1                Hi                         4              because
2                so                         5              is
3                a                          6              the

      Reducer 1                               Reducer 2

This would happen if we use correct partitioning function: all the tokens having a number of occurrences less than N (here 4) are sent to reducer 1 and the others are sent to reducer 2, resulting in two partitions. Since the tokens are sorted on each partition, we get the expected total order on the number of occurrences. Suppose we hava sample data.

aman 1
ashima 2
kaushik 3
sood 4
tony 5
stark 6
bruce 7
wayne 8
james 9
bond 10
mark 11
zuckerberg 12
saun 13
parker 14

And we want the result in such a way that names with number from 1 to 5 should appear in one file and rest in another file.Here is the code to achieve that:

package Partitioner; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class UserDefinedPartitioner { static String[] line=null; public static class PartitionerMap extends Mapper { @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { line=value.toString().split("\t"); context.write(new Text(line[0]), new Text(line[1])); } } public static class MyPartitioner extends org.apache.hadoop.mapreduce.Partitioner&ltText,Text&gt { @Override public int getPartition(Text key, Text value, int numPartitions) { int seed =Integer.parseInt(line[1]); if((seed>=1)&&(seed<=5)) return 0; else return 1; } } public static void main(String args[]) throws Exception { Job job = new Job(); job.setJarByClass(UserDefinedPartitioner.class); FileInputFormat.addInputPath(job, new Path(args [0])); FileOutputFormat.setOutputPath(job, new Path(args [1])); job.setMapperClass(PartitionerMap.class); job.setPartitionerClass(MyPartitioner.class); job.setNumReduceTasks(2); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
When we will list the contents of our output folder, we will get to files, each having result of one partitioner

$ hadoop fs -ls training/aman/nwc14 Found 3 items -rw-r--r-- 2 gpadmin hadoop 0 2014-01-24 16:17 training/aman/nwc14/_SUCCESS -rw-r--r-- 2 gpadmin hadoop 0 2014-01-24 16:17 training/aman/nwc14/part-r-00000 -rw-r--r-- 2 gpadmin hadoop 120 2014-01-24 16:17 training/aman/nwc14/part-r-00001 $ hadoop fs -cat training/aman/nwc15/part-r-00001 bond 10 bruce 7 james 9 mark 11 parker 14 saun 13 stark 6 wayne 8 zuckerberg 12 $ hadoop fs -cat training/aman/nwc15/part-r-00000 aman 1 ashima 2 kaushik 3 sood 4 tony 5
Obsereve the job.setNumReduceTasks(2); line in run method? If we don't write this line our code will work. Two partitions would be created but if size of the partition in too small, chances are there that only one outfile file would be created . So to delibrately tell the compiler to create two output files for each partition this line must be used.

Tuesday, 21 January 2014

First Program In MapReduce

Unlike going for typical "Hello World" program we will start by word count. As name shows this program counts number of words. First thing first , every mapreduce program need not to have a map and reduce method or mapper and reducder class(here is the code in decompression section with no map and reduce method).

In our first program we are going to use both map and reduce method. While making the program make sure the project in which the class is have all thenecessary jar files.

How do add jar files into your project for MapReduce?
  1. It's simple just go to location of your project in the eclipse-workspace, open the project,create a new folder ,copy all the jar files into it.
  2. Now open the project in eclipse, right click on project name, click on properties, a window will open in that choose java build path on the left panel, now choose Add External Jars, go the same location of your project select all jar files, click on ok.

Now we are good to go or our first program.


public class WordCount { public static class WordCountMap extends Mapper <LongWritable,Text,Text,IntWritable> { @Override public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException { String[] Line=value.toString().split("\t"); StringTokenizer tokenizer = new StringTokenizer(Line[0]); while (tokenizer.hasMoreTokens()) { String Count = tokenizer.nextToken(); context.write(new Text(Count), new IntWritable(1)); } } } public static class WordCountReduce extends Reducer <Text,IntWritable,Text,IntWritable> { public void reduce(Text key,Iterable values,Context context) throws IOException, InterruptedException { int sum=0; for(IntWritable value : values) { sum += value.get(); } context.write(key, new IntWritable(sum)); } } public static void main(String args[]) throws Exception { Job job = new Job(); job.setJarByClass(WordCount.class); FileInputFormat.addInputPath(job, new Path(args [0])); FileOutputFormat.setOutputPath(job, new Path(args [1])); job.setMapperClass(WordCountMap.class); job.setReducerClass(WordCountReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
The main class is defined ( here WordCount ),in which two class are defined which extends mapper and reducer classes. After the extending there is a list of parameters written, in our code we have Mapper < LongWritable, Text, Text, IntWritable >. It is in the form of < key_in, value_in, key_out, value_out >. Similarly in reducer's parameters Reducer < Text, IntWritable, Text, IntWritable > Mapper class contains map method and reducer class contains reduce method.

Map method has 3 parameters, map(LongWritable key, Text value, Context context).The first two arguments are the same as of mapper parameters i.e. key_in, value_in, in this case LongWritable and Text. The third argument Context's object is used to communicate between map and reduce methods(including setup and clean method). Now have a look at the first line of the map method.


String[] Line=value.toString().split("\t");


It split the input file on the basis of delimiter (here tab), parse the text into string and stores the string in the string array.This is most important line of the whole code.It uses StringTokenizer's object and take String line as argument. StringTokenizer break a string into tokens. The StringTokenizer methods do not distinguish among identifiers, numbers, and quoted strings, nor do they recognize and skip comments which is what we need in wordcount. the next loop simply counts the occurences of tokens and by emitting token as key and 1 as value. Why 1 as value? Its explained in the next section. Before we move to the reducer method let's understand the concept of Context:


context.write(new Text(Count), new IntWritable(1));


Context is the only means of communication between map and reduce methods. Whatever value is passed to the reduce method through map method is passed through the object of context. The parameter of context is context.write(key_out,value_out). So the types key_out and value_out should be same as given in the Mapper class's declaration. Now move on to the reduce method:


public void reduce(Text key, Iterable values, Context context)


Here also reduce method has 3 arguments.The first two arguments are the same as of map method's parameters i.e. key_out, value_out, in this case Text and IntWritable. But here instead of IntWritable we have written Iterable. This is due to the output format of the map method.Let us understand this with the help of an example.Suppose our input file has this data.

He is a king. He is nice.

Now internally the map method will emit the key and value like this.

< He,(1)>
< is,(1)>
< a,(1)>
< king.,(1)>
< He,(1)>
< is,(1)>
< nice,(1)>

Before the result reaches reducer , short and suffle phase conert the result into this intermeddiate reult

< He,(1,1)>
< is,(1,1)>
< a,(1)>
< king.,(1)>
< nice,(1)>

Reducer receives the above intermeddiate result and all the logics of reduce method are performed on this result not on the result produced by the map method.So the reduce method will just add the number of 1's in order to count the total occurences of the token. Here is the value which it emits along with the number of occurences. So the final result would be.

He 2
is 2
a 1
nice 1

Monday, 20 January 2014

Counters In MapReduce:"Channel For Statistics Gathering "

In MapReduce counters provide a useful way for gathering statistics about the job and problem diagnosis. Statistics gathering it may be for quality control or for application control. Hadoop has some but-in counters for every job which reports various metrics.

Advantages of using counters:
  • Counter should be used to record whether a particular condition occurred instead of using log message in map or reduce task.
  • Counter values are much easier to retrieve than log output for large distributed jobs.

Disdvantages of using counters:
  • Counters may go down if a task fails during a job run.

Built-in Counters


As mentioned above Hadoop maintains some built in counters for every job. Counters are divided into various groups. Each group either contains task counters which are updated as task progress or job counters which are updated as a job progresses.

Task Counters:It gathers information about the task dividing their entire execution and the results are aggregated over all the tasks in a job.For example MAP_INPUT_RECORDS counter counts the total number of input records for the whole job. It counts the input records read by each map task and aggregates over all map tasks in a job. Task counters are maintained by each task attempt and periodically sent to the tasktracker and then to the jobtracker so they can be globally aggregated (For more info, check YARN:MapRedeuce 2 post's "Progress And Status Update" section).To guard against errors due to lost messages, task counters are sent in full rather than sending the counts after last transmission.

Although counter values give the final value only after the job has finished execution successfully, some counters provide information while job is under execution. This inforamtion is useful to monitor job with web UI. For example, PHYSICAL_MEMORY_BYTES, VIRTUAL_MEMORY_BYTES and COMMITTED_HEAP_BYTES provide an indication of how memory usage varies over the course of a particulaar task attempt.

Job Counters:Job counters are maintained by the jobtracker (or application master in YARN).This is due to the fact that unlike all other counters(including user_defined) they don't need to be sent across the network.They measure job-level statistics , not values that change while a task is running.For example , TOTAL_LUUNCHED_MAPS counts the number of map tasks thet were launcehed over the course of a job including tasks that failed.

User-Defined Java Counters


MapReduce allows user to define a set of counters, which are incremented as required in mapper or reducer. Counters are defined by a Java enum which serves for group related counters. A job may define an arbitrary number of enums, each with an arbitrary number of fields. The name of the enum is the group name, and the enum’s fields are the counter names. Counters are global: the MapReduce framework aggregates them across all maps and reduces to produce a grand total at the end of the job.

Dynamic counters: The code makes use of a dynamic counter—one that isn’t defined by a Java enum. Because a Java enum’s fields are defined at compile time, you can’t create new counters on the fly using enums. Here we want to count the distribution of temperature quality codes, and though the format specification defines the values that the temperature quality code can take, it is more convenient to use a dynamic counter to emit the values that it actually takes.

The method we use on the Reporter object takes a group and counter name using String names: public void incrCounter(String group, String counter, long amount) The two ways of creating and accessing counters—using enums and using strings— are actually equivalent because Hadoop turns enums into strings to send counters over RPC. Enums are slightly easier to work with, provide type safety, and are suitable for most jobs. For the odd occasion

Saturday, 18 January 2014

Serialization and Deserialization in Hadoop

Serilaization is the process of converting structured objects into a byte stream. It is done basically for two purposes one, for transmission over a network(interprocess communication) and for writing to persisitent storage. In Hadoop the interprocess communication between nodes in the system is done by using remote procedure calls i.e. RPCs. The RPC rotocol uses serialization to make the message into a binary stream to be sent to the remote node,which receives and deserializes the binary stream into the original message.

RPC serialization format is expected to be:
  • Compact: To efficenetly use network bandwidth.
  • Fast: Very little performance overhead is expected for serialization and deserilization process.
  • Extensible: To adept to new changes and reqirements.
  • Interoperable:The format needs to be designed to support clients that are written in different languages to the server.
It should be noted that the data format for persistent storage purposes would have different requirements from serilaization framework in addition to four expected properties of an RPC's serialization format mentioned above.
  • Compact : To efficenetly use storage space.
  • Fast : To keep the overhead in reading or writing terabytes of data minimal.
  • Extensible : To transparently read data written in older format.
  • Interoperable :To read and write persistent using different languages.

Hadoop uses its own serialization format,Writables. Writable is compact and fast, but not extensible or interoperable.

The Writable Interface


The Writable interface has two methods, one for writing and one for reading. The method for writing writes its state to a DataOutput binary stream and the method for reading reads its state from a DataInput binary stream.

public interface Writable { void write(DataOutput out) throws IOException; void readFields(DataOutput in)throws IOException; }
Let us understand serialization with an example.Given below is a helper method.

public static byte[] serialize(Writable writable) throws IOException { ByteArrayOutputStream out = new ByteArrayOutputStream(); DataOutputStream dataOut = new DataOutputStream(out); writable.write(dataOut); dataOut.close(); return out.toByteArray(); }
Let’s try deserialization. Again, we create a helper method to read a Writable object from a byte array:

public static byte[] deserialize(Writable writable, byte[] bytes) throws IOException { ByteArrayInputStream in = new ByteArrayInputStream(bytes); DataInputStream dataIn = new DataInputStream(in); writable.readFields(dataIn); dataIn.close(); return bytes; }

WritableComparable and comparators


IntWritable implements the WritableComparable interface, which is a subinterface of the Writable and java.lang.Comparable interfaces:

package org.apache.hadoop.io; public interface WritableComparable extends Writable, Comparable { }
Comparison of types is important for MapReduce because in MapReduce there is sorting phase during which keys are compared with one another. Hadoop provides RawComparator extension of Java’s Comparator :

package org.apache.hadoop.io; import java.util.Comparator; public interface RawComparator extends Comparator { public int compare(byte[] b1,int s1,int l1,byte[] b2, int s2, int l2); }
This interface permits implementors to compare records read from a stream without deserializing them into objects, hence avoiding any overhead of object creation. For example, the comparator for IntWritables implements the raw compare() method by reading an integer from each of the byte arrays b1 and b2 and comparing them directly from the given start positions (s1 and s2) and lengths (l1 and l2). WritableComparator is a general-purpose implementation of RawComparator for WritableComparable classes. It provides two main functions:

First, it provides a default implementation of the raw compare() method that deserializes the objects to be compared from the stream and invokes the object compare() method. Second, it acts as a factory for RawComparator instances (that Writable implementations have registered).

For example, to obtain a comparator for IntWritable, we just use: RawComparator comparator = WritableComparator.get(IntWritable.class); The comparator can be used to compare two IntWritable objects:

IntWritable w1 = new IntWritable(163); IntWritable w2 = new IntWritable(67);
assertThat(comparator.compare(w1, w2), greaterThan(0)); or their serialized representations:

byte[] b1 = serialize(w1); byte[] b2 = serialize(w2);
assertThat(comparator.compare(b1, 0, b1.length, b2, 0, b2.length), greaterThan(0));

Wednesday, 15 January 2014

Hadoop Introduction


What is Hadoop ? “Hadoop”, the name itself is weird, isn’t it? the term Apache Hadoop was created by Doug Cutting. Hadoop came from the name of a toy elephant. Hadoop is all about processing large amount of data irrespective of whether its structured or unstructured, huge data means hundreds of GIGs and more. Traditional RDBMS system may not be apt when you have to deal with huge data sets. Even though “database sharding” is trying to address this issue, chances of node failure makes this less approachable.

Hadoop is an open-source software framework which enables applications to work with multiple nodes which can store enormous amount of data. It comprises of Two components:

Apache Hadoop Distributed File System (HDFS)
Google’s MapReduce Framework

Apache Hadoop was created by Doug Cutting, he named it after his son’s toy elephant. Hadoop’s original purpose was to support the Nutch search engine project. But Hadoop’s significance grown too far from that, now its a top level Apache project and is being used by a large community of users, to name a few, Facebook, New York times, Yahoo are some of the examples of Apache Hadoop implementations.

Hadoop is written in the Java programming language!! The Apache Hadoop framework is composed of the following modules :

Hadoop Common - contains libraries and utilities needed by other Hadoop modules
Hadoop Distributed File System (HDFS) - a distributed file-system that stores data on the commodity machines, providing very high aggregate bandwidth across the cluster.
Hadoop YARN - a resource-management platform responsible for managing compute resources in clusters and using them for scheduling of users' applications.
Hadoop MapReduce - a programming model for large scale data processing.

Significance of Hadoop

The data on the World Wide Web is growing at an enormous rate. As the number of active internet users increases, the amount of data getting uploaded is increasing. Some of the estimates related to the growth of data are as follows:

In 2006, the total estimated size of the digital data stood at 0.18 zettabytes
By 2011, a forecast estimates it to stand at 1.8 zettabytes.
One zettabyte = 1000 exabytes = 1 million petabytes = 1 billion terabytes

Social networking sites hosting photos, Video streaming sites, Stock exchange transactions are some of the major reasons of this huge amount of data. The growth of data also brings some challenges with it. Even though the amount of data storage has increased over time, the data access speed has not increased at the same rate.

If all the data resides on one node, then it deteriorates the overall data access time. Reading becomes slower; writing becomes even slower. As a solution to this, if the same data is accessed from multiple nodes in parallel, then the overall data access time can be reduced. In order to implement this, we need the data to be distributed among multiple nodes and there should be a framework to control these multiple nodes’ Read and write. Here comes the role of Hadoop kind of system.

Let’s see the problems that can happen with shared storage and how Apache Hadoop framework overcomes it.
Hardware Failure Hadoop is not expecting all nodes to be up and running all the time. Hapoop has a mechanism to handle the node failures, it replicates the data. Combining the data retrieved from multiple nodes Combining the output of each worker node is a challenge, Google’s MapReduce framework helps to solve this problem. Map is more like a key-value pair. MapReduce framework has a mechanism of mapping the data retrieved from the multiple disks and then, combining them to generate one output

Components Of Apache Hadoop: Hadoop framework is consisting of 2 parts Apache Hadoop Distributed File System (HDFS) and MapReduce.

Hadoop Distributed File System (HDFS)

Hadoop Distributed File System is a distributed file system which is designed to run on commodity hardware. Since the Hadoop treats node failures as a norm rather than an exception, HDFS has been designed to be highly fault tolerant. And moreover, it is designed to run on low cost shared hardware.

HDFS is designed to reliably store very large files across machines in a large cluster
HDFS stores each file as a sequence of blocks; all blocks in a file except the last block are the same size.
The blocks of a file are replicated for fault tolerance and this replication is configurable.
An application can specify the number of replicas of a file. The replication factor can be specified at file creation time and can be changed later.

The NameNode makes all decisions regarding replication of blocks. It periodically receives a Heartbeat and a Blockreport from each of the DataNodes in the cluster. Receipt of a Heartbeat implies that the DataNode is functioning properly. A Blockreport contains a list of all blocks on a DataNode

Replica placement is crucial for faster retrieval of data by the clients, For this, HDFS uses a technique known as Rack Awareness HDFS tries to satisfy a read request from a replica that is closest to the client. All HDFS communication protocols are layered on top of the TCP/IP protocol.

MapReduce :

MapReduce is the framework which helps in the data analysis part of Apache Hadoop implementation. Following are the notable points of MapReduce.

MapReduce is a patented software framework introduced by Google to support distributed computing on large data sets on clusters of computers
MapReduce framework is inspired by map and reduce functions commonly used in functional programming
MapReduce is consisting of a Map step and a Reduce step to solve a given problem.

Map Step:

The master node takes the input, chops it up into smaller sub-problems, and distributes those to worker nodes. A worker node may do this again in turn, leading to a multi-level tree structure. The worker node processes that smaller problem, and passes the answer back to its master node.

Reduce Step:

The master node then takes the answers to all the sub-problems and combines them in a way to get the output. All Maps steps execute in a parallel fashion The Reduce step takes in the input from the Map step. All the Maps with the same key fall under one reducer. However, there are multiple reducers and it will work in parallel. This parallel execution offers the possibility of recovery from partial failure. If one node (Mapper/Reducer) fails, then its work can be re-scheduled to another node.

Tuesday, 14 January 2014

Sequence File

What Is SequenceFile?


SequenceFile is just a flat file consisting of binary key/value pairs. It is highly used in MapReduce as input/output formats. In fact, the temporary output of each map is stored using SequenceFile.

The SequenceFile provides a Writer, Reader and Sorter classes for writing, reading and sorting purposes.
There are three different SequenceFile formats:-

  1. Uncompressed key/value records.
  2. Record compressed key/value records - only values are compressed .
  3. Block compressed key/value records - both keys and values are collected in blocks separately and compressed. The size of the block is configurable by user.

The recommended way is to use the SequenceFile.createWriter methods to construct the preferred writer implementation. The SequenceFile.Reader acts as a bridge and can read any of the above SequenceFile formats.

Why Do We Need A Sequence File?


HDFS is a distributed file system, mainly designed for batch processing of large amount of data. Now default block size of HDFS block is 64MB. When the size of a file is much smaller than the default block size, there is a tremendous degradation of performance, because of large number of seeks and lots of hopping from one datanode to another to retrieve a small file, which is inefficient.

When file size is very very small, the input for each process is very little and there are large number of map tasks. For example, a 20GB file broken up into files of size 100KB each, use a map of their own. Thus the time taken to finish the job extensively increases.

For solving these two problems mentioned above, we need a Sequence file. A Sequence file is a data structure for binary key-value pairs. it can be used as a common format to transfer data between MapReduce jobs. Another important advantage of a sequence file is that it can be used as an archive to pack smaller files. This avoids the above mentioned problems with small files.

How To Write And Read A Sequence File?


In order to create a sequence file, use one of its createWriter() static methods which returns a SequenceFile.Writer instance. We can then write key-value pairs using the append() method. After we are done, we can call the close() method. Similarly to read a sequence file, create an instance of SequenceFile.Reader and iterate it over the records by invoking the next() method. There are several versions of next() method and which one we use depends upon the serialization framework used. If a key-value pair is read, the function returns true, else it returns false. In case a value is read, it can be retrieved using the getCurrentValue() method.

How SequenceFile Is Stored Internally?


All of the above formats(in What Is SequenceFile heading) share a common header (which is used by the SequenceFile.Reader to return the appropriate key/value pairs). The summary of header is given below:-
SequenceFile Common Header
  • version - A byte array: 3 bytes of magic header 'SEQ', followed by 1 byte of actual version no. (example SEQ4,SEQ6)
  • keyClassName - String
  • valueClassName - String
  • compression - A boolean which specifies if compression is turned on for keys/values in this file.
  • blockCompression - A boolean which specifies if block compression is turned on for keys/values in this file.
  • compressor class - The classname of the CompressionCodec which is used to compress/decompress keys and/or values in this SequenceFile (only if compression is enabled).
  • metadata - SequenceFile.Metadata for this file (key/value pairs)
  • sync - A sync marker to denote end of the header. All strings are serialized using Text.writeString api.

The formats for Uncompressed and RecordCompressed Writers are very similar and are explained below:

Uncompressed and RecordCompressed Writer Format
  • Header
  • Record
  • Record length
    • Key length
    • Key
    • (Compressed?) Value

A sync-marker every few k bytes or so. The sync marker permits seeking to a random point in a file and then re-synchronizing input with record boundaries. This is required to be able to efficiently split large files for MapReduce processing. The format for the BlockCompressedWriter is as follows:

BlockCompressed Writer Format
  • Header
  • Record Block
    • A sync-marker to help in seeking to a random point in the file and then seeking to next record block.
    • CompressedKeyLengthsBlockSize
    • CompressedKeyLengthsBlock
    • CompressedKeysBlockSize
    • CompressedKeysBlock
    • CompressedValueLengthsBlockSize
    • CompressedValueLengthsBlock
    • CompressedValuesBlockSize
    • CompressedValuesBlock

The compressed blocks of key lengths and value lengths consist of the actual lengths of individual keys/values encoded in ZeroCompressedInteger format .

A sequence file is composed of a header and one or more records. The first three bytes of a sequence file are the bytes SEQ, which acts like a magic number, followed by a single byte representing the version number. The header contains other fields, including the names of the key and value classes, compression details, user-defined metadata etc. Each file has a randomly generated sync marker, whose value is stored in the header. Sync markers appear between records in the sequence file, not necessarily between every pair of records.

The internal format of the records depends on whether compression is enabled, and if it is, whether it is record compression or block compression. If no compression is enabled (the default), each record is made up of the record length (in bytes), the key length, the key, and then the value. The format for record compression is almost identical to no compression, except the value bytes are compressed using the codec defined in the header. Keys are not compressed.

Block compression compresses multiple records at once, it is therefore more compact than and should generally be preferred over record compression because it has the opportunity to take advantage of similarities between records. Records are added to a block until it reaches a minimum size in bytes, defined by the io.seqfile.compress.blocksize property, the default is 1 million bytes. A sync marker is written before the start of every block. The format of a block is a field indicating the number of records in the block, followed by four compressed fields: the key lengths, the keys, the value lengths, and the values.

Enough of theory, let us do some coding and implement Sequencefile in a program.

We will start with simple WordCount program. Write complete WordCount program as it is and just add one line in main method.


job.setOutputFormatClass(SequenceFileOutputFormat.class);
The final main method will look like this:

public static void main(String args[]) throws Exception { Job job = new Job(); job.setJarByClass(WordCount.class); FileInputFormat.addInputPath(job, new Path(args [0])); FileOutputFormat.setOutputPath(job, new Path(args [1])); job.setMapperClass(WordCountMap.class); job.setReducerClass(WordCountReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); System.exit(job.waitForCompletion(true) ? 0 : 1); }
Try to get the output as we do normally i.e.

$ hadoop fs -cat traing/aman/nwc8/part-r-00000
Instead of showing the result it will print some unexpected lines. This is due fact that sequence file can't be viewed like this.The format of viewing sequence file is different. Now try this command.

$ hadoop fs -text traing/aman/nwc8/part-r-00000
It will show the exact result of the code.

Monday, 13 January 2014

Hadoop Archieves


In HDFS each file is stored in a block and this block metadata is held in memory by the namenode. No matter how small the file is the same method is followed HDFS end up storing small files inefficiently. As a result a large number of small files can consume a lot of memory on the namenode. HDFS provides a file archiving facility,special format archives, HAR files or Hadoop archives that stores files into HDFS blocks more efficiently,hence reduces namenode memory usage and allowing transarent access to files at the same time.

Point To Remember- In HDFS small files do not take up any more disk space than its size. For example, a 2 MB file stored with a block size of 128 MB uses 2 MB of disk space, not 128 MB.

A Hadoop Archive is created from a collection of files using the archive tool. The tool runs a MapReduce job to process the input files in parallel. so in order to run it,we need a running MapReduce cluster to use it. A Hadoop archive maps to a file system directory. A Hadoop archive always has a *.har extension. A Hadoop archive directory contains metadata and data (part-*) files. Metadata is in the form of _index and _masterindex.The _index file contains the name of the files that are part of the archive and the location within the part files.

How to Create an Archive



Usage: hadoop archive -archiveName name -p *

For example
% hadoop fs -lsr /old/files
-rw-r--r-- 1 tom supergp 1 2013-05-09 09:03 /old/files/a
drwxr-xr-x - tom supergp 0 2013-05-09 09:03 /old/files/dir
-rw-r--r-- 1 tom supergp 1 2013-05-09 09:03 /my/files/dir/b

Now run the archive command: % hadoop archive -archiveName files.har /old/files /old

The first option after -archiveName is the name of the archive, here files.har. Second one is the files to put in the archive. Here we are archiving only one source tree, the files in /old/files in HDFS, but the tool accepts multiple source trees. The final argument is the output directory for the HAR file.

% hadoop fs -ls /old
Found 2 items
drwxr-xr-x - tom supergp 0 2013-05-09 09:03 /old/files
drwxr-xr-x - tom supergp 0 2009-04-09 19:13 /old/files.har

% hadoop fs -ls /old/files.har
Found 3 items

-rw-r--r-- 10 tom supergp 165 2013-05-09 09:03 /old/files.har/_index
-rw-r--r-- 10 tom supergp 23 2013-05-09 09:03 /old/files.har/_masterindex
-rw-r--r-- 1 tom supergp 2 2013-05-09 09:03 /old/files.har/part-0

The directory listing shows what a HAR file is made of: two index files and a collection of part files (this example has just one of the latter). The part files contain the contents of a number of the original files concatenated together, and the indexes make it possible to look up the part file that an archived file is contained in, as well as its offset and length. All these details are hidden from the application, however, which uses the har URI scheme to interact with HAR files, using a HAR filesystem that is layered on top of the underlying filesystem (HDFS in this case).

Saturday, 11 January 2014

Map Side Join And Reduce Side Join


Joins is one of the interesting features available in MapReduce. MapReduce can perform joins between very large datasets.Implementation of join depends on how large the datasets are and how they are partiotioned . If the join is performed by the mapper, it is called a map-side join, whereas if it is performed by the reducer it is called a reduce-side join.

If both datasets are too large for either to be copied to each node in the cluster, we can still join them using MapReduce with a map-side or reduce-side join, depending on how the data is structured. One common example of this case is a user database and a log of some user activity (such as access logs). For a popular service, it is not feasible to distribute the user database (or the logs) to all the MapReduce nodes. Before diving into the implementation let us understand the problem thoroughly.

If we have two datasets, for example, one dataset having user ids, names and the other having the user activity over the application. In-order to find out which user have performed what activity on the application we might need to join these two datasets such as both user names and the user activity will be joined together. Join can be applied based on the dataset size if one dataset is very small to be distributed across the cluster then we can use Side Data Distribution technique.

Map Side join


A map-side join between large inputs works by performing the join before the data reaches the map function. For this to work, though, the inputs to each map must be partitioned and sorted in a particular way. Each input data set must be divided into the same number of partitions, and it must be sorted by the same key (the join key) in each source. All the records for a particular key must reside in the same partition. This may sound like a strict requirement (and it is), but it actually fits the description of the output of a MapReduce job.

A map-side join can be used to join the outputs of several jobs that had the same number of reducers, the same keys, and output files that are not splittable which means the ouput files should not be bigger than the HDFS block size. Using the org.apache.hadoop.mapred.join.CompositeInputFormat class we can achieve this.



Reduce Side join


Reduce-Side joins are more simple than Map-Side joins since the input datasets need not to be structured. But it is less efficient as both datasets have to go through the MapReduce shuffle phase. the records with the same key are brought together in the reducer. We can also use the Secondary Sort technique to control the order of the records.

How it is done?
  • The key of the map output, of datasets being joined, has to be the join key - so they reach the same reducer.
  • Each dataset has to be tagged with its identity, in the mapper- to help differentiate between the datasets in the reducer, so they can be processed accordingly.
  • In each reducer, the data values from both datasets, for keys assigned to the reducer, are available, to be processed as required.
  • A secondary sort needs to be done to ensure the ordering of the values sent to the reducer.
  • If the input files are of different formats, we would need separate mappers, and we would need to use MultipleInputs class in the driver to add the inputs and associate the specific mapper to the same.
[MultipleInputs.addInputPath( job, (input path n), (inputformat class), (mapper class n));] Note: The join between the datasets (employee, current salary - cardinality of 1..1) in the sample program below has been demonstrated in my blog on map side joins of large datasets, as well. I have used the same datasets here as the purpose of this blog is to demonstrate the concept. Whenever possible, reduce-side joins should be avoided.

Friday, 10 January 2014

Compression and Decompression in MapReduce


In order to reduces the space needed to store files, and it speeds up data transfer across the network or to or from disk file compression plays a very important role. When dealing with large volumes of data, both of these savings can be significant, so it pays to carefully consider how to use compression in Hadoop. There are many different compression formats, tools and algorithms, each with different characteristics in Hadoop.

Compression format CompressionCodec
DEFLATE org.apache.hadoop.io.compress.DefaultCodec
gzip org.apache.hadoop.io.compress.GzipCodec
bzip2 org.apache.hadoop.io.compress.BZip2Codec
LZO com.hadoop.compression.lzo.LzopCodec
LZ4 org.apache.hadoop.io.compress.Lz4Codec
Snappy org.apache.hadoop.io.compress.SnappyCodec

All compression algorithms exhibit a space/time trade-off i.e faster compression and decompression speeds usually come at the expense of smaller space savings. The different tools have very different compression characteristics.
  • Gzip is a generalpurpose compressor and sits in the middle of the space/time trade-off.
  • Bzip2 compresses more effectively than gzip, but is slower. Bzip2’s decompression speed is faster than its compression speed, but it is still slower than the other formats.
  • LZ4. and Snappy, on the other hand, all optimize for speed and are around an order of magnitude faster than gzip, but compress less effectively. Snappy and LZ4 are also significantly faster than LZO for decompression.

The tools listed above typically give some control over this trade-off at compression time by offering nine different options. –1 means optimize for speed, and -9 means optimize for space. For example, the following command creates a compressed file file.gz using the fastest compression method: gzip -1 file

Simplest program for compression.

In the above code there is no mapper and reducer.Notice the two lines which are doing the job of compression.Instead of defalte type any other compression format can be taken.

Simplest program for decompression.

In the decompression code we are not using any special expression which is doing the job of compression.In fact we are not doing anything at all. But if you take a compressed file as in input file for above code an run it, it will decompress the file. Now the format in which the decompressed file is produced is just FILE.

HDFS:“Moving Computation is Cheaper than Moving Data”

The Hadoop Distributed File System (HDFS) is a distributed file system designed to run on commodity hardware. It has many similarities with existing distributed file systems.HDFS is highly fault-tolerant and is designed to be deployed on low-cost hardware. HDFS is designed to store very large data sets reliably, and to stream those data sets at high bandwidth to user applications. By distributing storage and computation across many servers, the resource can grow with demand while remaining economical at every size.

Key HDFS Features:
  • Scale-Out Architecture - Add servers to increase capacity.
  • High Availability - Serve mission-critical workflows and applications.
  • Load Balancing - Place data intelligently for maximum efficiency and utilization.
  • Security: POSIX-based file permissions for users and groups with optional LDAP integration
  • Fault Tolerance - Automatically and seamlessly recover from failures. Hardware failure is the norm rather than the exception. An HDFS instance may consist of hundreds or thousands of server machines, each storing part of the file system’s data. The fact that there are a huge number of components and that each component has a non-trivial probability of failure means that some component of HDFS is always non-functional. Therefore, detection of faults and quick, automatic recovery from them is a core architectural goal of HDFS.
  • Flexible Access - Multiple and open frameworks for serialization and file system mounts Applications that run on HDFS have large data sets. A typical file in HDFS is gigabytes to terabytes in size. Thus, HDFS is tuned to support large files. It should provide high aggregate data bandwidth and scale to hundreds of nodes in a single cluster. It should support tens of millions of files in a single instance.
  • Tunable Replication -
    Multiple copies of each file provide data protection and computational performance HDFS is designed to reliably store very large files across machines in a large cluster. It stores each file as a sequence of blocks; all blocks in a file except the last block are the same size. The blocks of a file are replicated for fault tolerance. The block size and replication factor are configurable per file. An application can specify the number of replicas of a file. The replication factor can be specified at file creation time and can be changed later. Files in HDFS are write-once and have strictly one writer at any time.
  • Simple Coherency Model - HDFS applications need a write-once-read-many access model for files. A file once created, written, and closed need not be changed. This assumption simplifies data coherency issues and enables high throughput data access. A MapReduce application or a web crawler application fits perfectly with this model. There is a plan to support appending-writes to files in the future.

Working of HDFS


Every HDFS cluster is comprised of a NameNode and DataNodes. NameNode is the node which manages the cluster metadata and DataNodes are the nodes that store the data. Files and directories are represented on the NameNode by inodes. Inodes record attributes like permissions, modification and access times, or namespace and disk space quotas.

The file content is split into large blocks (generally 128 megabytes), and each block of the file is replicated at multiple DataNodes. The blocks are stored on the local file system on the datanodes. The Namenode actively monitors the number of replicas of a block. When a replica of a block is lost due to a DataNode failure or disk failure, the NameNode creates another replica of the block. The NameNode maintains the namespace tree and the mapping of blocks to DataNodes, holding the entire namespace image in RAM.

The NameNode does not directly send requests to DataNodes. It sends instructions to the DataNodes by replying to heartbeats sent by those DataNodes. The instructions include commands to: replicate blocks to other nodes, remove local block replicas, re-register and send an immediate block report, or shut down the node.

Here is litle bit more about NameNodes and DataNodes.

  • HDFS has a master/slave architecture. HDFS is comprised of interconnected clusters of nodes where files and directories reside. An HDFS cluster consists of a single node, known as a NameNode, that manages the file system namespace and regulates client access to files,a master server that manages the file system namespace and regulates access to files by clients. In addition, data nodes (DataNodes) store data as blocks within files.
  • Internally, a file is split into one or more blocks and these blocks are stored in a set of DataNodes. Within HDFS, The NameNode executes file system namespace operations like opening, closing, and renaming files and directories. It also determines the mapping of blocks to DataNodes, which handle read and write requests from HDFS clients. Data nodes also create, delete, and replicate data blocks according to instructions from the governing name node.
  • The namenode maintains two in-memory tables, one which maps the blocks to datanodes (one block maps to 3 datanodes for a replication value of 3) and a datanode to block number mapping. Whenever a datanode reports a disk corruption of a particular block, the first table gets updated and whenever a datanode is detected to be dead (because of a node/network failure) both the tables get updated.
  • Data nodes continuously loop, asking the name node for instructions. A name node can't connect directly to a data node; it simply returns values from functions invoked by a data node. Each data node maintains an open server socket so that client code or other data nodes can read or write data. The host or port for this server socket is known by the name node, which provides the information to interested clients or other data nodes.

  • Some interesting facts about DataNode


    • All datanodes send a heartbeat message to the namenode every 3 seconds to say that they are alive. If the namenode does not receive a heartbeat from a particular data node for 10 minutes, then it considers that data node to be dead/out of service and initiates replication of blocks which were hosted on that data node to be hosted on some other data node.
    • The data nodes can talk to each other to rebalance data, move and copy data around and keep the replication high.
    • When the datanode stores a block of information, it maintains a checksum for it as well. The data nodes update the namenode with the block information periodically and before updating verify the checksums. If the checksum is incorrect for a particular block i.e. there is a disk level corruption for that block, it skips that block while reporting the block information to the namenode. In this way, namenode is aware of the disk level corruption on that datanode and takes steps accordingly.

    Communications protocols


  • All HDFS communication protocols build on the TCP/IP protocol. HDFS clients connect to a Transmission Control Protocol (TCP) port opened on the name node, and then communicate with the name node using a proprietary Remote Procedure Call (RPC)-based protocol. Data nodes talk to the name node using a proprietary block-based protocol.


  • The File System Namespace


  • HDFS supports a traditional hierarchical file organization. A user or an application can create directories and store files inside these directories. The file system namespace hierarchy is similar to most other existing file systems; one can create and remove files, move a file from one directory to another, or rename a file. HDFS does not yet implement user quotas. HDFS does not support hard links or soft links. However, the HDFS architecture does not preclude implementing these features.
  • The NameNode maintains the file system namespace. Any change to the file system namespace or its properties is recorded by the NameNode. An application can specify the number of replicas of a file that should be maintained by HDFS. The number of copies of a file is called the replication factor of that file. This information is stored by the NameNode.

  • Data replication


  • HDFS uses an intelligent replica placement model for reliability and performance. Optimizing replica placement makes HDFS unique from most other distributed file systems, and is facilitated by a rack-aware replica placement policy that uses network bandwidth efficiently. HDFS replicates file blocks for fault tolerance. An application can specify the number of replicas of a file at the time it is created, and this number can be changed any time after that. The name node makes all decisions concerning block replication.
  • Large HDFS environments typically operate across multiple installations of computers. Communication between two data nodes in different installations is typically slower than data nodes within the same installation. Therefore, the name node attempts to optimize communications between data nodes. The name node identifies the location of data nodes by their rack IDs.

Wednesday, 8 January 2014

MapReduce Types


The first thing that comes into mind while writing a MapReduce program is the types we you are going to use in the code for Mapper and Reducer class.There are few points that should be followed for writing and understanding Mapreduce program.Here is a recap for the data types used in MapReduce (in case you have missed the MapReduce Introduction post).

Broadly the data types used in MapRduce are as follows.
  • LongWritable-Corresponds to Java Long
  • Text -Corresponds to Java String
  • IntWritable -Corresponds to Java Integer
  • NullWritable - Corrresponds to Null Values

Having a quick overview, we can jump over to the key thing that is data type in MapReduce. Now MapReduce has a simple model of data processing: inputs and outputs for the map and reduce functions are key-value pairs
  • The map and reduce functions in MapReduce have the following general form:
    map: (K1, V1) → list(K2, V2)
    reduce: (K2, list(V2)) → list(K3, V3)
    • K1-Input Key
    • V1-Input value
    • K2-Output Key
    • V2-Output value
  • In general,the map input key and value types (K1 and V1) are different from the map output types (K2 and V2). However, the reduce input must have the same types as the map output, although the reduce output types may be different again (K3 and V3).
  • As said in above pont even though the map output types and the reduce input types must match, this is not enforced by the Java compiler. If the reduce output types may be different from the map output types (K2 and V2) then we have to specify in the code the types of both the map and reduce function else error will be thrown.So if k2 and k3 are the same, we don't need to call setMapoutputKeyClass().Similarly, if v2 and v3 are the same, we only need to use setOutputValueClass()
  • NullWritable is used when the user want to pass either key or value (generally key) of map/reduce method as null.
  • If a combine function is used, then it is the same form as the reduce function (and is an implementation of Reducer), except its output types are the intermediate key and value types (K2 and V2), so they can feed the reduce function: map: (K1, V1) → list(K2, V2) combine: (K2, list(V2)) → list(K2, V2) reduce: (K2, list(V2)) → list(K3, V3) Often the combine and reduce functions are the same, in which case K3 is the same as K2, and V3 is the same as V2.
  • The partition function operates on the intermediate key and value types (K2 and V2) and returns the partition index. In practice, the partition is determined solely by the key (the value is ignored): partition: (K2, V2) → integer

Default MapReduce Job:No Mapper, No Reducer


Ever tried to run MapReduce program without setting a mapper or a reducer? Here is the minimal MapReduce program.



Run it over a small data and check the output. Here is little data which I used and the final result.You can take a larger data set.







Notice the result file we get after running the above code on the given data. It added an extra column with some numbers as data.What happened is the that the the newly added column contains the key for every line. The number is the offset of the line from the first line i.e. how far the beginning of the first line is placed from the first line(0 of course)similarly how many characters away is the second line from first. Count the characters, it will be 16 and so on.

This offset is taken as a key and emitted in the result.

Tuesday, 7 January 2014

Combiner: Introduction & Use (Or Not To Use)

  • When a mapper runs a produces it's intermediate output, it is first written to disk before sent over the network through the shuffle/sort and on to the reducer.
  • If we were to do a word count on a book , our mapper would read in the text line-by-line and emit a key value pair that would consist of a key of an individual word and a value of 1. All those key/value pairs will be first written to disk before been sent along the process over the network. For one book that may not be a problem, but we are talking Big Data here and so this is sub-optimal.
  • To work around this issues we can use the concept know as local aggregation which simply means that we want to consolidate the data before it writing it to disk. Local aggregations can be implemented in two way. First we could use internal structures to store data directly in the mapper. The downside to this approach is memory pressure and the potential that we exceed the amount of memory allocated to the JVM that the map job runs in.
  • A better method is to make use of the a combiner. Combiners act as local reducers aggregating data by key while its in memory. The difference between the two methods discussed is the combiners will spill or write to disk as buffer limits are reached. This obviously resolves the potentially out of memory issue. It can however results in duplicate keys being emitted to the Shuffle/Short which is generally not an issue considering where started from.
  • The primary goal of combiners is to optimize/minimize the number of key value pairs that will be shuffled accross the network between mappers and reducers. A combiner can be considered as a “mini reducer” that will be applied potentially several times still during the map phase before to send the new (reduced) set of key/value pairs to the reducer(s). This is why a combiner must implement the Reducer interface (or extend the Reducer class as of hadoop 0.20).If a combiner is used , then its key value dataypes should match with reducers key value datatypes
    • map:(k1,v1)->list(k2,v2)
    • combine:(k2,list(v2))->list(k2,v2)
    • reduce:(k2,list(v2))->list(k3,v3)
  • As explained above, the combine and reduce functions are the same, so k3 is the same as k2, and v3 is the same as v2.
  • Suppose 5 key/value pairs emitted from the mapper for a given key k: (k,40), (k,30), (k,20), (k,2), (k,8). Without combiner, when the reducer will receive the list (k,{40,30,20,2,8}), the mean output will be 20, but if a combiner were applied before on the two sets ((k,40), (k,30), (k,20)) and ((k,2), (k,8)) separately, then the reducer would have received the list and the output would have been different (17.5) which is an unexpected behavior.
  • More generally, combiners can be used when the function you want to apply is both commutative and associative . That’s the case for the addition function, this is why the word count example can benefit from combiners but not for the mean function Not that if the reducer function is both associative and commutative (i.e. sum of word counts) a reducer can function as both as a reducer and a combiner.
  • .
  • Do not assume that the combiner will run. Treat the combiner only as an optimization. The Combiner is not guaranteed to run over all of your data. In some cases when the data doesn't need to be spilled to disk, MapReduce will skip using the Combiner entirely. Note also that the Combiner may be ran multiple times over subsets of the data! It'll run once per spill.

Monday, 6 January 2014

Shuffle and Sort


MapReduce makes the guarantee that the input to every reducer is sorted by key.The process by which the system performes the sort and transfers the map outputs to the reducers as inputs is known as shuffle.In many ways, the shuffle is the heart of MapReduce.


The Map Side


When map function starts producing output,it is not simply written to the disk but it includes buffering writes and some presorting.Each map writes output to a circular memory buffer (default size 100 MB) assigned to it. When the contents of the buffer reaches a certain threshold size , a background thread will start to spill the contents to disk. Map outputs will continue to be written to the buffer while the spill takes place, but if the buffer fills up during this time, the map will block until the spill is complete.Before it writes to disk, the thread first divides the data into partitions corresponding to the reducers that they will ultimately be sent to.

Each time the memory buffer reaches the spill threshold, a new spill file is created, so after the map task has written its last output record, there could be several spill files. Before the task is finished, the spill files are merged into a single partitioned and sorted output file.

It is often a good idea to compress the map output as it is written to disk because doing so makes it faster to write to disk, saves disk space, and reduces the amount of data to transfer to the reducer. By default, the output is not compressed, but it is easy to enable this by setting mapred.compress.map.output to true.

The Reduce Side


The reduce task needs the map output for its particular partition from several map tasks across the cluster. The map tasks may finish at different times, so the reduce task starts copying their outputs as soon as each completes. This is known as the copy phase of the reduce task. The reduce task has a small number of copier threads so that it can fetch map outputs in parallel. The default is five threads, but this number can be changed by setting the mapred.reduce.parallel.copies property.

The map outputs are copied to the reduce task JVM’s memory if they are small enough (the buffer’s size is controlled by mapred.job.shuffle.input.buffer.percent, which specifies the proportion of the heap to use for this purpose); otherwise, they are copied to disk. When the in-memory buffer reaches a threshold size (controlled by mapred.job.shuffle.merge.percent) or reaches a threshold number of map outputs (mapred.inmem.merge.threshold), it is merged and spilled to disk. If a combiner is specified, it will be run during the merge to reduce the amount of data written to disk. As the copies accumulate on disk, a background thread merges them into larger, sorted files. This saves some time merging later on. Note that any map outputs that were compressed (by the map task) have to be decompressed in memory in order to perform a merge on them.

When all the map outputs have been copied, the reduce task moves into the sort phase (which should properly be called the merge phase, as the sorting was carried out on the map side), which merges the map outputs, maintaining their sort ordering. This is done in rounds. For example, if there were 50 map outputs and the merge factor was 10 (the default, controlled by the io.sort.factor property, just like in the map’s merge), there would be five rounds. Each round would merge 10 files into one, so at the end there would be five intermediate files. Rather than have a final round that merges these five files into a single sorted file, the merge saves a trip to disk by directly feeding the reduce function in what is the last phase: the reduce phase. This final merge can come from a mixture of in-memory and on-disk segments.

During the reduce phase, the reduce function is invoked for each key in the sorted output. The output of this phase is written directly to the output filesystem, typically HDFS. In the case of HDFS, because the tasktracker node (or node manager) is also running a datanode, the first block replica will be written to the local disk.

Wednesday, 1 January 2014

NameNode and TaskNodes

  • HDFS has a master/slave architecture. HDFS is comprised of interconnected clusters of nodes where files and directories reside. An HDFS cluster consists of a single node, known as a NameNode, that manages the file system namespace and regulates client access to files,a master server that manages the file system namespace and regulates access to files by clients.. In addition, data nodes (DataNodes) store data as blocks within files.

  • Internally, a file is split into one or more blocks and these blocks are stored in a set of DataNodes. Within HDFS, The NameNode executes file system namespace operations like opening, closing, and renaming files and directories. It also determines the mapping of blocks to DataNodes, which handle read and write requests from HDFS clients. Data nodes also create, delete, and replicate data blocks according to instructions from the governing name node.


  • The namenode maintains two in-memory tables, one which maps the blocks to datanodes (one block maps to 3 datanodes for a replication value of 3) and a datanode to block number mapping. Whenever a datanode reports a disk corruption of a particular block, the first table gets updated and whenever a datanode is detected to be dead (because of a node/network failure) both the tables get updated.

  • Data nodes continuously loop, asking the name node for instructions. A name node can't connect directly to a data node; it simply returns values from functions invoked by a data node. Each data node maintains an open server socket so that client code or other data nodes can read or write data. The host or port for this server socket is known by the name node, which provides the information to interested clients or other data nodes.

  • Some interesting facts about DataNode


    • All datanodes send a heartbeat message to the namenode every 3 seconds to say that they are alive. If the namenode does not receive a heartbeat from a particular data node for 10 minutes, then it considers that data node to be dead/out of service and initiates replication of blocks which were hosted on that data node to be hosted on some other data node.

    • The data nodes can talk to each other to rebalance data, move and copy data around and keep the replication high.

    • When the datanode stores a block of information, it maintains a checksum for it as well. The data nodes update the namenode with the block information periodically and before updating verify the checksums. If the checksum is incorrect for a particular block i.e. there is a disk level corruption for that block, it skips that block while reporting the block information to the namenode. In this way, namenode is aware of the disk level corruption on that datanode and takes steps accordingly.



    Communications protocols


  • All HDFS communication protocols build on the TCP/IP protocol. HDFS clients connect to a Transmission Control Protocol (TCP) port opened on the name node, and then communicate with the name node using a proprietary Remote Procedure Call (RPC)-based protocol. Data nodes talk to the name node using a proprietary block-based protocol.


  • The File System Namespace


  • HDFS supports a traditional hierarchical file organization. A user or an application can create directories and store files inside these directories. The file system namespace hierarchy is similar to most other existing file systems; one can create and remove files, move a file from one directory to another, or rename a file. HDFS does not yet implement user quotas. HDFS does not support hard links or soft links. However, the HDFS architecture does not preclude implementing these features.
  • The NameNode maintains the file system namespace. Any change to the file system namespace or its properties is recorded by the NameNode. An application can specify the number of replicas of a file that should be maintained by HDFS. The number of copies of a file is called the replication factor of that file. This information is stored by the NameNode.


  • Data replication


  • HDFS uses an intelligent replica placement model for reliability and performance. Optimizing replica placement makes HDFS unique from most other distributed file systems, and is facilitated by a rack-aware replica placement policy that uses network bandwidth efficiently. HDFS replicates file blocks for fault tolerance. An application can specify the number of replicas of a file at the time it is created, and this number can be changed any time after that. The name node makes all decisions concerning block replication.
  • Large HDFS environments typically operate across multiple installations of computers. Communication between two data nodes in different installations is typically slower than data nodes within the same installation. Therefore, the name node attempts to optimize communications between data nodes. The name node identifies the location of data nodes by their rack IDs.