Wednesday, 27 August 2014

Ranker: Top N values


Ranker finds the top N values in a column, for each group.

Mapper:

Mapper will pass each group as a key and entire row as value. User is entering the column delimiter, group column number, value column number, and order (true for ascending, false for descending).
Suppose our input file is:
1,11,a
2,12,a
3,13,b
4,14,c
5,15,d
6,16,c
7,17,g
8,18,e
9,19,a

A sample command:
hadoop jar /root/Documents/iv3.jar iv3.TopValues  MapReduceInput/xy1.txt  MapReduceOutput/TopVal "," 3 1 1 true
Here "," is column delimiter.
"3" is group key i.e the 3rd column.
"1" is the column on which ranking will be done.
"1" means top 1 value.
"true" means we are expecting result in ascending order.

Then mapper will send key value pair as:
a,(1,11,a)
a,(2,12,a)
b,(3,13,b)
...
...

Reducer :

Reducer is using a TreeMap for storing the data.
key:value
1: 1,11,a
2: 2,12,a
9: 9,19,a
When number of enteries exceed the N value, we are deleting one entry i.e. the entry with highest key. For descending order it will delete the entry with lowest value.


So for key "a" it will keep just one entries and would delete the entry with "9" and "2" as key. Similarly each key (group) is processed. So the output will be:
1,11,a
3,13,b
4,14,c
5,15,d
8,18,e
Here is the entire code.

Friday, 21 February 2014

All About Hadoop: Issue #2

In the Issue #1 of this "All About Hadoop" series, we discussed some basic facts and components of Hadoop. This post will cover application development. We will also learn how to get the data into Hadoop. So let's get started.

Application Development


To abstract some of the complexity of Hadoop programming model, several application development languages have emerged that run on top of Hadoop. Here, we will cover the most popular ones: Pig, Hive and Jaql.

Pig And PigLatin


Little history first: Pig was developed by Yahoo. Just like real pigs, who can eat almost everything, Pig programming language is designed to deal with any kind of data. Pig is basically made of two components: first is language (PigLatin) and second is run-time environment where PigLatin programs are executed. We can correlate this to relationship between JVM and Java application. The first step in Pig program: LOAD the data we want to manipulate from HDFS. Then we run the data through set of transformations. Finally we dump the data into the screen or we can STORE the result in some file.

LOAD: Just like any other Hadoop feature, the objects or data on which Hadoop works are stored in HDFS. To make Pig program access this data, the program first must tell Pig what file/files it will use. It is achieved through the LOAD 'data_file' command, data_file can be any HDFS directory or file. If data_file is directory, then all the files in the directory are loaded into the program. If the file format used as input file is not acceptable to [ig, then we can add the USING function to LOAD statement to specify a user defind function that can read and interpret the data.

TRANSFORM: It is responsible for all the data manipulations. In this command we can FILTER out rows that are of no use, JOIN two sets of data files, GROUP data to build aggregations, ORDER results and many more things.

DUMP and STORE: If we don't specify DUMP or STORE command, the result of a pig program are not generated. We would typically use the DUMP command, which sends the output to screen, when we are debugging pig programs. If we want to store the result in a file , then instead of using DUMP call, use STORE call.

After creating a pig program we need to run it on Hadoop environment. This is where the Pig run times comes in. There are three ways to run Pig program: embedded in a script, embedded in a java program, or from pig command line called as Grunt. No matter whichever way we choose to tun the Pig program, the pig run-time environment translates the program into a set of map and reduce tasks and run them under the covers for us. It simplifies the work associated with the analysis of massive volume of data and allows the developer to focus on the analysis of data rather on individual map and reduce tasks.

HIVE


Some history and few facts first, although Pig can be very powerful and simple language to use, the disadvantage is that it is new. Hive allows developers to develop Hive Query Language (HQL) statements that are similar to SQL statements. HQL statements are broken down into by Hive service into MapReduce jobs and are executed across the cluster. We can run our Hive queries in many ways:
  • Command line interface known as Hive shell
  • Java Database connectivity or Open Database Connectivity Application
  • Hive Thrift Client
Hive Thrift Client is much like any database client that gets installed on user's client machine: It communicates with the Hive services running on the server. We can use Hive Thrift Client within application written in C++, java, PHP, Python Ruby. Hive looks pretty much like traditional database code with SQL access. But, as Hive is based on MapReduce there are various differences Hadoop is intended for long sequential scan and because Hive is based on Hadoop, we can expect queries to have very high latency . This means Hve is not appropriate for applications that need very fast response times, as we could expect with database such as DB2. Moreover Hive is read based and therefore not appropriate for transaction processing that typically involves a high percentage of write operations.

Getting Data Into Hadoop


One of the challenges with Hadoop HDFS is that we can't use typical file system things like copying, creating, moving, deleting, or accessing a file and more. To do anything with a file in HDFS, we must use the HDFS interfaces or API directly. In this section, we will discuss the basic of getting our data into HDFS, and cover Flume , which is a distributed data collection service for flowing data into Hadoop cluster.

Basic Copy Data The most common way to move files from a local file system into HDFS is trough the copyFromLocal command. To get files out of HDFS to local file system use the copyToLocal command. An exmple of above two commands is shown below:
hdfs dfs -copyFromLocal /user/dir/file hdfs://s1.com/dir/hdfsfile
hdfs dfs -copyToLocal  hdfs://s1.com/dir/hdfsfile /user/dir/file
These commands are run through the HDFS shell program, which is simply a Java application. The shell uses the Java APIs for getting data in and out of HDFS. The APIs can be called from any Java application.

The problem with is approach is that we must have Java application developers write the logic and programs to read and write data from HDFS. If we need to access HDFS files from our Java applications, we would use the methods in org.apache.hadoop.fs package. This allows us to incorporate read and write operations directly , to and from HDFS, from within our MapReduce application.

Flume: A flume is a channel that directs water from a source to some other location where water is needed. Flume was created to allow you to flow the data from a source to Hadoop environment. In flume the entities we work with are called sources, decorators and sinks.

A source can be any data source. Flumes has many predefined source adapters. A sink is the target of specific operations. A decorator is an operation on the stream that can transform the steam in some manner, which could be to compress or decompress data, modify data by adding or removing pieces of information and more.

A number of predefined source adapters are built into Flume. For example, some adapters allow the flow of everything coming of TCP port to enter the flow, or anything coming to standard input (stdin). A number of text file source adapters give granular control to grab a specific file and feed it into a data flow or even take a tail of file and continuously feed the flow with whatever new data is written to a file.

There are three kind of sinks in flume. One basically is final flow destination and is known as Collector Tier Event sink. This is where we would land a flow into an HDFS formatted file system. Another sink type used in Flume is called an Agent Tier Event. This sink is used when we want the sink to be the input source for another operation. When we use these links, Flume will ensure the integrity of the flow by sending back acknowledgements that data has actually arrived at sink . The final sink type is known as a Basic sink, which can be a text file, the console display, a simple HDFS path or a null bucket, where the data is simply deleted.

Wednesday, 19 February 2014

All About Hadoop : Issue#1

If you are new to Hadoop, then this post is for you. Altough, it is very difficult to cover everything about Hadoop in few pages, but I have tried to touch every important term and concept that defines Hadoop. In this issue #1 of two part series, I will cover the facts and components of Hadoop. In the next post, we will discuss the application development and will learn how to get the data into Hadoop. So let's get started.

Just Some Facts


Hadoop is a computing environmemnt built on top of a distributed clustered file system that was designed specifically for very large-scale data operations. Hadoop was inspired by Google's work on its Google File System and the MapReduce programming paradigm, in which work is broken down into mapper and reducer taks to manipulate data that is stored across a cluster of servers for massive parallelism. Unlike transaction systems, Hadoop can scan through large data sets to produce it's results through scalable, distributed batch processing system. Hadoop is not about speed-of-thought response times, real-time warehousing, or blazing transactional speeds, it is about discover and making the once near-impossible possible from a scalability and analysis perspective. The Hadoop methodologhy is built around a function-to-data model as opposed to data-to-function, because there is so much data, the analysis programs are sent to the data.

Hadoop, basically has two parts: a file system (Hadoop Distributed File Sytem) and a programming paradigm (MapReduce). One of the key components of Hadoop is the redundancy. Not only data is the data redundantly stored i multiple places across the cluster, but the programming model is such that failures are expected and are resolved automatically by running portions of the program on various servers in the cluster.This Redundancy makes possible to distribute the data and it's associated programming across a very large cluster of commodity components. Obviously, from a large number of commodity hardware components, a few will fail, but this redundancy comes into play and provides fault tolerance, and a capability for the Hadoop cluster to heal itself. This allows Hadoop to scale out worloads across large clusters of inexpensive machines to work on Big Data problems.

Components Of Hadoop


Hadoop is comprised of three components :Hadoop Distributed File System, Hadoop MapReduce model and Hadoop Common.Let us undertand these compnents one by one.

Hadoop Distributed File System

Data in Hadoop cluster is broken down into smaller pieces called as blocks and distributed throughout the cluster. Due to this, the map and reduce functions can be executed on smaller subsets of our larger data sets, and this provides the scalability that is needed for Big Data processing. It can be infered that the goal of Hadoop is to use commonly available servers in a very large cluster where each server has a set of inexpensive interal disk drives. For achieving higher performance, MapReduce tries to asign workloads to these servers where the data to be processed is stored. This concept is known as data locality. The cool thing about Hadoop is that it has built-in fault tolerance and fault compensation capabilities. In HDFS too, data is divided into blocks and copies of the blocks are stored on other servers in the Hadoop cluster. It means, that an single file is actually stored as smaller blocks that are replicated across multiple servers in the entire cluster. The redundancy provides various benefits, among them higher availability comes on top. In addition, redundancy allows the Hadoop cluster to break work up into smaller chunks and run those jobs on all the servers in the cluster for better scalability.

A data file in HDFS is divided into blocks, and the deafult size of these blocks for Apache Hadoop is 64 MB. For larger files, a higher block size is a good idea, as this will greatly reduce the amount of metadata required by NameNode. The expected workload is another consideration, as nonsequenctial access patterns will perform more optimally with smaller block size. Coordination across a cluster has significant overhead, so the ability to process large chunks of work locally without sending data to other nodes helps improve both performance and the overhead to real work ratio. Each data block is stored on 3 differnt servers in Hadoop, this is implemented by HDFS working behind the scenes to make sure at least two blocks are atored on a separate server rack to improve realiability in the event of losing an entire rack of servers.

All of Hadop's data placement logic is managed by a special server called NameNode. This NameNode server keep track of all the data files in HDFS, such as where the blocks are stoed and more. All of the NameNode's information is stored in memory, which allows it to provide quick response times to storage manipulation to read requests. Now as there is only one NameNode for entire cluster, storing this information in memory creates a single point of failure. So server acting as NameNode must be more robust than the rest of the servers in cluster, obviously to minimize the possibility of failures. One more thing, it is highly recommended to have regular backup process for the cluster metadat stored in the NameNode.

Map Reduce

MapReduce is the heart of Hadoop. It is this programming paradigm that allows for massive scalability across hundreds or even thousands of servers in a Hadoop cluster. The term MapReduce actually refers to two separate and distinct task Hadoop program perform. The first is the map job, which takes a set of data and coverts it into another set of data, in which individual elements are broken down into key/value pairs. The reduce job takes the output from a map as input and combines those data key/value pairs into smaller aet of tuples. As the nsme implies, the reduce job is always performed after the map job.
Take an example to understand it more clearly. We have 4 files, each having 2 columns, City and Temperature. Sample data of a file is given below:
London, 12
New Delhi, 33
Venice, 22
Mumbai, 26
London, 22
New Delhi, 43
Venice, 27
Mumbai, 27

Now if we want to calculate maximum temperature of each city, of all the files. By using MapReduce framework, we can acheive this by breaking down into 4 map tasks, each working on one file. Each map task will return maximum temperature for each city in the file. Like from the sample file above, the map will return.

(London, 22) , (New Delhi, 43) , (Venice, 27) , (Mumbai,27)
Now assume that other map tasks working on other 3 files return this data.
(London, 23) , (New Delhi, 44) , (Venice, 23) , (Mumbai,29)
(London, 22) , (New Delhi, 41) , (Venice, 24) , (Mumbai,21)
(London, 24) , (New Delhi, 42) , (Venice, 25) , (Mumbai,24)
All the output data of 4 map tasks are fed into reduce tasks , which combine the input result and return output as single value for each city i.e. the maximum value. The key thing here to notice that, there may be more than multiple reducer working in parralel. In such case. all the key/ value pairs of each city sholud go to the same reducer to find the maximum temerature. This directing of records to reduce tasks is known as shuffle (learn more about shuffle) , which takes input from map tasks and and direct the output to a specific reduce task. So the final result generated by job will be:
(London, 24) , (New Delhi, 44) , (Venice, 27) , (Mumbai,29)


A MapReduce program is referred as job. A job is executed by subsequently breaking it down into small pieces called tasks. An application submits the job to a specific node in cluster, which is running a daemon (software) called JobTracker. The JobTracker communicates with the NameNode to find out where all of the data required for this job exists in the cluster and then breaks the job down into map and reduce tasks for each node to work on the cluster where the data exists. JobTracker tries to avoid the case where a node is given a task for which the data needed by that task is not local to that node. It does so by attempting to shedule tasks where the data is stored. This is the concept of data locality and it is very vital when working with large volumes of data.

In a cluster, a set of continually running daemons, known as TaskTracker agents, moniter the status of each task. If a task fails to complete, the status of that failure is reported back to the JobTracker, which will then reschedule that task on another node in the cluster. Hadoop gives the option to perform local aggregation on the output of each map task before sending the results off to a reduce task through a local aggregation called a Combiner (learn more about Combiner). Obviously when multiple reduce tasks are running overhead increases but for large datasets it improves overall performance. All MapReduce programs that run natively under Hadoop are written in Java, and it is the Java Archive file (jar) that's distributed by the JobTracker to the various Hadoop cluster nodes to execute the map and reduce tasks.

Check out the Fisrt Program in MapReduce and you will an idea what we just discussed. Issue#2 of this All About Hadoop series will cover applications and getting data into Hadoop.

Thursday, 13 February 2014

Why is Big Data Important - Analysis And Applications

According to a Forbes article on "Why Big Data Matters" : "Terabytes, Petabytes, Exabytes. Who can keep track? These strange terms have just begun to enter the business lexicon, but the hype surrounding them has reached a fever pitch. We have undoubtedly entered the age of big data". Exactly, we have entered that age. Like any new technology, there is a lot of confusion surrounding big data. There are endless debates about what is and isn’t big data. So first let us clear that.

Big Data can be defines and interpreted in many different ways and why in Big Data Introduction post I defined Big Data in the terms of volume, velocity, and variety attributes. One thing that should be kept in mind that Big Data solutions are not a replacement for our existing warehouse solutions. There are some key principles which should be kept in mind before considering when to use Big Data technologies.

  • Big Data solutions works very well not only for structured data but also well suited for semi structured and unstructured data.
  • Big Data solution work best when all of the data or almost all the data is analyzed with respect to sample data.
  • Big Data solutions are ideal for iteratory and exploratory analysis, when there is no predetermined business measures on data.

Social Media


Perhaps the most talk about Big Data usage pattern is social media and customer sentiment . We can use Big Data to figure out what customers are saying about any organization and what are they saying about their competitors. Moreover the organization can use this newly found insight to figure out how this how this sentiments impacts the decision you they are making and the way the company engages. More specifically they can determine how sentiment is impacting sales, the effectiveness of marketing campaign, review of certain product and so on.

Log Analytics


Log analytics is common use case for any Big Data project. All logs and trace data that are generated by operations of IT solutions are called as Data Exhaust. Organizations have a lot of Data Exhaust and it's pretty much a pollutant if it's just left around for a couple of hours or days in case of emergency and simply purged .Reason? Because Data Exhaust has concentrated value and IT companies need to figure out the way to store and extract value from it. Some of the value derived from data exhaust is obvious and has been transformed into value added click stream data that records every gesture , click and movement made on website.

Fraud Detection


By using Big Data platform it's possible to stop fraud. Several challenges in fraud detection pattern are directly attributable to solely utilizing conventional technologies. The most common and recurring theme we will see across all Big Data patterns is limits on what can be stored as well as available compute resources to process our intentions. Without Big Data Technologies, these factors limit what can be modeled. Less data equals constrained modeling.

Weather Forecasting


The philosophy of Big Data is that insights can be drawn from a large volume of ‘dirty’ (or ‘noisy’) data, rather than simply relying on a small number of precise observations. One good example of the success of the ‘Big Data’ approach can be seen in Google’s Flu Trends which uses Google searches to track the spread of flu outbreaks worldwide. Despite the inevitable noise, the sheer volume of Google search data meant that flu outbreaks could now be successfully identified and tracked in near real-time. It is also important to remember that Big Data when used on its own can only provide probabilistic insights based on correlation. The true benefit of Big Data is that it drives correlative insights, which are achieved through the comparison of independent datasets. It is this that buttresses the Big Data philosophy of ‘more data is better data’; you do not necessarily know what use the data you are collecting will have until you can investigate and compare it with other datasets.

The ‘Big Data’ approach has already begun to be incorporated into weather nowcasting, and the Flu Trends disease example provides an excellent allegory for where it can initially prove most useful.

Few Things to Remember


When it comes to solving information management challenges using Big Data technologies, there are few things that we should know.The data bound for analytic warehouse has to be cleansed , document before it is placed in the warehouse having strict schema. On the other hand Big Data Solution not only works on data which is not suited for traditional warehouse environment but also doesn't follow the strictness that traditional warehouse follow before putting data into them.

Conclusion


We can preserve the fidelity of data an gain access to massive volume of information for exploration and finding insights. It's important to understand that the traditional database technologies are important and in fact are relevant part of overall analytic solution. Traditional database technologies become more vital when used together with your Big Data Platform. Broadly it can be conclude that there are some class of problems that don't belong to traditional database technologies (at initial stage). And there is another kind of data that we are not sure of whether of putting in warehouse, may be because we don't now whether it's rich in value, it's structured, or it's too big. Sometimes we can't find out value per byte of data before investing effort and money. At the end of the day organizations want to know whether data is worth saving and has a high value per byte before investing in it.

So what we really need to know about big data is this: It represents a fundamental shift in how we do things. In effect, big data opens the door to a strategy where we no longer try to be “right” based on controlled research and small samples, but rather become less wrong over time as real world information floods in.

Tuesday, 11 February 2014

Big Data : The Next Big Thing

Big Data implies to data which can't be processed or analysed using traditional tools and processes. Obviously, organizations today are dealing with more and more Big Data and challenges that comes with it. This enormous volume of data is sitting in semi structured or unstructured format. Organizations are even wondering whether it's worth keeping this data. These challenges in a climate where they have the ability to store anything and they are generating data like never before in history, makes the problems more complicated.

Let's talk about the characteristics of Big Data and how it fits into the current information management landscape. Take the example of railway cars, which has hundreds of sensors. These sensors track things like conditions experienced by cars, the state of individual parts and GPS based data for shipment tracking and logistics. Processors have been added to them to interpret sensor data on parts prone to wear, such as bearings and to identify parts that need repair before they fail and cause more damage. Rail tracks also have been installed with sensors, every feet, to find out any damage to the track , to avoid any accident. Now add this to tracking a rail car's load, arrival and departure times and you can get an idea of BIG data problem. All these data stored everyday and kept stored for further analysis. Rail cars are just one example , but everywhere we look , we see domains with velocity, volume and variety combining to create the Big Data problem.

Characteristics of Big Data


Three characteristics define Big Data: volume, variety and velocity. These three characteristics define what we refer to as "Big Data".

Volume
As implied by the term "Big Data", companies are facing enormous amount of data. Companies that don't know how to manage this data are overwhelmed by it. They are missing an opportunity, which can be grabbed with right technology platform, to analyse almost all the data or at least that data that is useful, to gain better understanding of their business and their customers. As the amount of data available with the organization is increasing, the percent of data organization can process is on decline, creating a blind zone.

Variety
The sheer amount of volume associated with the Big Data brings new challenge to deal with it:variety. The data which we face today is not only traditional data, but also raw, unstructured and semi structured data generated by web (social media, search indexes, web log files, sensor data from active passive system etc ). Traditional analytic platforms obviously struggle to analyse this raw and unstructured data, and get understanding from it, which can be used further. In simple words, variety represents all kind of data, a fundamental shift from traditional structured data. As Traditional analytic platforms can't deal with unstructured data, organizations are struggling, as it's success depends on it's ability to draw insights from the various kind of data. If we look at the data, 20 percent of it is relational i.e structured data, which fits traditional schema structure, on which we spent most of our time. The other 80 percent of the world's data is unstructured or semi structured at it's best. For instance videos and pictures does not come under relational data and what we see everyday is videos and pictures.

Velocity
Just like volume and variety of the data has changed, likewise the velocity at which it is generated and handled is also changed. A conventional definition of velocity deals with questions like, how quickly the data is arriving and stored? and how quick is the retrieval ?. Although it is just a rough idea what velocity suggest here, but with the massive amount of data, the idea of velocity is far more compelling than this definition. Broadly understand it like this: the speed at which data is flowing. The traditional technology platforms are incapable in dealing with data that huge(Big Data Huge) and coming at fast speed, and sometimes knowing something first is everything. Identifying a trend, a need, a problem in seconds before someone else gives an edge over competitors. Plus, more and more data produced today has short shelf-life. So organizations need to analyse the data quickly and get insights in the data. For instance traffic management wants to know the vehicles heading in the direction of already crowded highways where there is a high possibility of traffic jam or vehicles headed for areas where there is already massive jam. So go get the data at real time, in seconds, will be helpful (by tracking the GPS in vehicles)in achieving that, as in minutes the locations of cars will change. Dealing effectively with Big Data requires to analyse the massive volume of data containing a variety of data while it is still in motion.

Data In Warehousing And Data In Hadoop


Traditional warehouse are only capable of dealing with traditional structured data. Hadoop platform is well structured in dealing with semi structured and unstructured data. Tha data that go into warehouse first goes through a lot of rigors to make it into the warehouse. Of course it's a costly process but it makes sure that the data that lands into the warehouse is of high quality, but it has a broad purpose. On the other hand Hadoop rarely undergo the quality control rigors of data that go into the warehouse. Why? With Hadoop, having massive volume of data and with it's variety, there is no way to cleanse the data and document every piece of data properly, and it's not economical too. Hadoop data is not trusted , Hadoop data might seem to be low in value but it can be in fact be the key to the question unasked.

Conclusion


The term Big Data applies to information that can not be processed or analyzed using traditional processes and tools. Increasingly, organization today are facing more and more Big Data challenges. They have access to a wealth of information, but they don't know how to get value out of it because it is sitting in it's most raw form or in a semi structured or unstructured format, and as a result they don't know whether it is worth keeping.

Wednesday, 5 February 2014

MapReduce Inputs And Splitting

Uptill now we have used only Text file as input files. Can we use any other file format? Can we use binary format or XML format? Let us find out. First, we need to get familiar with few terms . Input split: It is part of input processed by a single map. Each split is processed by a single map. In other words InputSplit represents the data to be processed by an individual Mapper. Each split is divided into records , and the map processes each record, which is a key value pair. Split is basically a number of rows and record is that number.

The length of the InputSplit is measured in bytes. Every InputSplit has a storage locations (hostname strings). The storage locations are used by the MapReduce system to place map tasks as close to split's data as possible. The tasks are processed in the order of the size of the splits, largest one get processed first(greedy approximation algorithm). This is done in order to minimize the job runtime. One important thing to remember is that InputSplit doesn't contain input data but a reference to the data.

public abstract class InputSplit { public abstract long getLength() throws IOException, InterruptedException; public abstract String[] getLocations() throws IOException,InterruptedException; }
As a user, we don't have to use InputSplits directly, InputFormat does that job. An InputFormat is a class that provides the following functionality:
  • Selects the files or other objects that should be used for input.
  • Defines the InputSplits that break a file into tasks.
  • Provides a factory for RecordReader objects that read the file.
The overall process can be explained in following points:
  • The client which runs the job calculates the splits for the job by calling getSplits().
  • Client then sends the splits to the jobtracker, which uses their storage locations to schedule map tasks that will process them on the tasktrackers.
  • On a tasktracker, the map task passes the split to the createRecordReader() method on InputFormat to obtain a RecordReader for that split.
  • Map task uses RecordReader to generate record key-value pairs, which it passes to the map function. We can see this by looking at the Mapper’s run() method:
public void run(Context context) throws IOException, InterruptedException { setup(context); while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } cleanup(context); }
First, the setup() method is called, then the nextKeyValue() is called repeatedly on the Context to populate the key and value objects for the mapper. Each key-value pair is retreived from the RecordReader and are passed to the map() method. The nextKeyValue() method returns false, when there is no more key-value pair left to get read. Then the map task runs its cleanup() method at the end.

FileInputFormat


It is the base class for all implementations of InputFormat. It provides two things: a place to define which files are included as the input to a job and an implementation for generating splits for the input files.

How to split input files?


Altough, FileInputFormat splits only those files which are larger than HDFS block. The split size can be controlled by various Hadoop properties. Input path and filter properties are given in the below table.
Property name Type Default value Description mapred.min.split.size int 1 Smallest valid size in bytes for a file split mapred.max.split.size long Long.MAX_VALUE, that is, Largest valid size in 9223372036854775807 bytes for a file split dfs.block.size long 64 MB, The size block in HDFS

The minimum split size is usually 1 byte, although some formats have a lower bound on the split size. We may impose a minimum split size. By setting this to a value larger than the block size, they can force splits to be larger than a block. But this is not good while using HDFS, because doing so will increase the number of blocks that are not local to a map task. The maximum split size defaults to the maximum value that can be represented by a Java long type. It has an effect only when it is less than the block size, forcing splits to be smaller than a block. The split size is calculated by the formula (see the computeSplitSize() method in FileInputFormat):

max(minimumSize, min(maximumSize, blockSize))
and by default:
minimumSize < blockSize < maximumSize
so the split size is blockSize.

How to prevent splitting?


Some applications don’t want files to be split, as this allows a single mapper to process each input file in its entirety. For example, a simple way to check if all the records in a file are sorted is to go through the records in order, checking whether each record is not less than the preceding one. Implemented as a map task, this algorithm will work only if one map processes the whole file. There are a couple of ways to ensure that an existing file is not split. The first (quick and dirty) way is to increase the minimum split size to be larger than the largest file in your system. Setting it to its maximum value, Long.MAX_VALUE, has this effect. The second is to subclass the concrete subclass of FileInputFormat that you want to use, to override the isSplitable() method4 to return false. For example, here’s a nonsplittable TextInputFormat:
import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; public class NonSplittableTextInputFormat extends TextInputFormat { @Override protected boolean isSplitable(JobContext context, Path file) { return false; } }

Text Input


TextInputFormat: As yoy may know, text files are usually taken as input files.Similarly, TextInputFormat is the default InputFormat. Now each record is a line of input and key (a LongWritable) is the byte offset within the file of the beginning of the line. Finally, value is the contents of the line( excuding any line terminators).For instance, file having folowing content:

A king should hunt regularly A queen should shop daily, Other people should just try.

The records are interpreted as the following key-value pairs.
0, A king should hunt regularly 29,A queen should shop daily, 55,Other people should just try.

Rememeber guys, the keys are not line numbers but offset from begining of the file. Offset are sufficient to serve as a unique identifier for each line. And if we combine it with the file name, it would be unique with in the filesystem.

NLineInputFormat: The number of lines that each mapper receives depends on the size of split and the length of the lines. If we want to set this number then NLineInputFormat is the InputFormat to use. In this type too, the keys are the byte offsets within the file and values are the lines themselves. Here N refers to the number of lines of input that each mapper receives. The default value of N is 1, so each mapper receives excatly one line of input.
mapreduce.input.lineinputformat.linespermap

Above writen property controls the value of N.

XML: Hadoop provides class for called StreamXmlRecordReader. We can use it by setting our input format to StreamInputFormat and setting the stream.recordreader.class property to org.apache.hadoop.streaming.StreamXmlRecordReader. The reader is configured by setting job configuration properties to tell it the patterns for the start and end tags.

Binary Input


Here is the answer to the question asked at the beginning of this post. Yes we can use data other than textual data. Hadoop MapReduce also support binary formats.

SequenceFileInputFormat : It stores sequence of binary key-value pairs. To use data from sequence files as the input to MapReduce, you use SequenceFileIn putFormat.To use data from sequence files as the input to MapReduce, you use SequenceFileIn putFormat. SequenceFileIn putFormat. SequenceFileInputFormat can read MapFiles as well as sequence files. SequenceFileInputFormat assumes that it is reading a MapFile and uses its datafile. This is why there is no MapFileInputFormat class.

SequenceFileAsTextInputFormat: SequenceFileAsTextInputFormat is a variant of SequenceFileInputFormat which converts the sequence file’s keys and values to Text objects. The conversion is performed by calling toString() method on the keys and values. This format makes sequence files suitable input for Streaming.

Database Input and Output


DBInputFormat is an input format for reading data from a relational database, using JDBC. We need to be careful not to overwhelm the database from which you are reading by running too many mappers, as it doesn’t have any sharding capabilities. For this reason, it is best used for loading relatively small datasets, perhaps for joining with larger datasets from HDFS using MultipleInputs. The corresponding output format is DBOutputFormat, which is useful for dumping job outputs (of modest size) into a database.

Thursday, 30 January 2014

Mutiple Input Files In MapReduce: The Easy Way

In the previous issue of this series, we discussed a simple method of using multiple input files : Side Data Distribution. But it was of limited use as input files can only be of minimal size. In this issue, we’ll use our playground to investigate another approach to facilitate multiple input files offered by Hadoop.

This approach as a matter of fact is very simple and effective. Here we simply need to understand the concept of number of mappers needed. As you may know, mapper extract its input from the input file. When there are more than input file , we need the same number of mapper to read records from input files. For instance, if we are using two input files then we need two mapper classes.

We use MultipleInputs class which supports MapReduce jobs that have multiple input paths with a different InputFormat and Mapper for each path. To understand the concept more clearly let us take a case where user want to take input from two input files with similar structure. Also assume that both the input files have 2 columns, first having "Name" and second having "Age". We want to simply combine the data and sort it by "Name". What we need to do? Just two things:
  1. Use two mapper classes.
  2. Specify the mapper classes in MultipleInputs class object in run/main method.

File 1 File 2 Aman 19 Ash 12 Tom 20 James 21 Tony 15 Punk 21 John 18 Frank 20 Johnny 19 Hugh 17
Here is the code for the same. Notice two mapper classes with same logic and only single reducer.
import java.io.IOException; import mutipleInput.Join; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class multiInputFile extends Configured implements Tool { public static class CounterMapper extends Mapper { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] line=value.toString().split("\t"); context.write(new Text(line[0]), new Text(line[1])); } } public static class CountertwoMapper extends Mapper { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] line=value.toString().split("\t"); context.write(new Text(line[0]), new Text(line[1])); } } public static class CounterReducer extends Reducer { String line=null; public void reduce(Text key, Iterable values, Context context ) throws IOException, InterruptedException { for(Text value:values) { line = value.toString(); } context.write(key, new Text(line)); } } public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "aggprog"); job.setJarByClass(multiInputFile.class); MultipleInputs.addInputPath(job,new Path(args[0]),TextInputFormat.class,CounterMapper.class); MultipleInputs.addInputPath(job,new Path(args[1]),TextInputFormat.class,CountertwoMapper.class); FileOutputFormat.setOutputPath(job, new Path(args[2])); job.setReducerClass(CounterReducer.class); job.setNumReduceTasks(1); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); return (job.waitForCompletion(true) ? 0 : 1); } public static void main(String[] args) throws Exception { int ecode = ToolRunner.run(new multiInputFile(), args); System.exit(ecode); } }
Here is the output.
Ash 12
Tony 15
Hugh 17
John 18
Aman 19 
Johnny 19
Frank 20          
Tom  20
James 21
Punk 21

Monday, 27 January 2014

Multiple Input Files In MapReduce: Side Data Distribution

You may come to face problems which require more than one input files. For instance, you may want to join records from two input files. In such cases, where we want to use more than one input file, we have following options to do that.

  1. First, we can put the number of input files we want to use in a single directory, and give the path of directory as input file path.
  2. Second, we can use the concept of side data distribution, which implements distributed cache API.
  3. Third, we can simply use for more than one input files, and specify their paths.
Let us understand first two approaches here(Third method will be explained in my next post).

In first approach, we just put all input files in a single directory and give the path of the directory. This approach has a limitation that we can't use input files with different data structures. Thus this approach is of very limited use. In second approach, we use a main (usually large) input file or main dataset and other small input files. Ever heard the term "Look up file" ? In our case understand it in this way: It is a file containing very less volume of data compared to our main input file ( look up files in Distributed Cache ). This approach implements the concept of side data distribution. Side data can be defined as extra read-only data needed by a job to process the main dataset.

Distributed Cache

Rather than serializing side data in the job configuration, it is preferable to distribute datasets using Hadoop’s distributed cache mechanism. This provides a service for copying files and archives to the task nodes in time for the tasks to use them when they run. To save network bandwidth, files are normally copied to any particular node once per job. To understand this concept more clearly, take this example: Suppose we have two input files, one small and another comparatively large. Let us assume this the larger file i.e the input file .

101 Vince 12000
102 James 33
103 Tony 32
104 John 25
105 Nataliya 19
106 Anna 20
107 Harold 29
And this is the smaller file.

101 Vince 12000
102 James 10000
103 Tony 20000
104 John 25000
105 Nataliya 15000

Now what we want is to get those results which have common Id Number. So, in order to achieve this use smaller file as look up file and larger file as input file. The complete java code and explanation of each component is given below:
public class Join extends Configured implements Tool { public static class JoinMapper extends Mapper { Path[] cachefiles = new Path[0]; //To store the path of lookup files List exEmployees = new ArrayList();//To store the data of lookup files /********************Setup Method******************************************/ @Override public void setup(Context context) { Configuration conf = context.getConfiguration(); try { cachefiles = DistributedCache.getLocalCacheFiles(conf); BufferedReader reader = new BufferedReader(new FileReader(cachefiles[0].toString())); String line; while ((line = reader.readLine())!= null) { exEmployees.add(line); //Data of lookup files get stored in list object } } catch (IOException e) { e.printStackTrace(); } } setup method ends /***********************************************************************/ /********************Map Method******************************************/ public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] line = value.toString().split("\t"); for (String e : exEmployees) { String[] listLine = e.toString().split("\t"); if(line[0].equals(listLine[0])) { context.write(new Text(line[0]), new Text(line[1]+"\t"+line[2]+"\t"+listLine[2])); } } } //map method ends /***********************************************************************/ } /********************run Method******************************************/ public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "aggprog"); job.setJarByClass(Join.class); DistributedCache.addCacheFile(new Path(args[0]).toUri(), job.getConfiguration()); FileInputFormat.addInputPath(job, new Path(args [1])); FileOutputFormat.setOutputPath(job, new Path(args [2])); job.setMapperClass(JoinMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); return (job.waitForCompletion(true) ? 0 : 1); } public static void main (String[] args) throws Exception { int ecode = ToolRunner.run(new Join(), args); System.exit(ecode); } }
This is the result we will get after running the above code.

102 James 33 10000 103 Tony 32 20000 104 John 25 25000 105 Nataliya 19 15000

Run() Method:


In run() Method, we used
public void addCacheFile(URI uri)
method to add file to distributed cache. If you go through code carefully, you will notice there is no reduce() method. Hence, there is no job.setReducerClass() in run method. In our above example there is in fact no need for using reducer as the common Id numbers are identified in map method only. Due to the same reason the job.setOutputkeyClass(Text.class); and job.setOutputValueClass(Text.class); have data-types of key_out and value_out datatypes of the mapper, and not the data-types of reducer.

Setup() method :


In setup() method,
cachefiles=DistributedCache.getLocalCacheFiles(conf);
is very important to understand. Here we are extracting the path of the file in distributed cache.
BufferedReader reader = new BufferedReader(new FileReader(cachefiles[0].toString()));
After that we have stored the contents of the file using BufferReader in a List object for further operations. Remember when the input files were created, we gave tab("\t") as delimiter to read it properly later.

Map() method :


In map method, we receive and extract the lines of main dataset one by one, break them into words, by using tab("\t") as delimiter, parse them into string and store them in a string array( String[] line).
String[] line = value.toString().split("\t");

We do the same processing with contents of the string to match the id i.e. first column of both the main data set and the look up file.
String[] listLine = e.toString().split("\t");
If the Id number matches i.e. Id of a record in main dataset is also present in the look up file, then the contents of both the files are emitted using context object.
if(line[0].equals(listLine[0])); context.write(new Text(line[0]), new Text(line[1]+"\t"+line[2]+"\t"+listLine[2]));

Friday, 24 January 2014

Secondary Sort In MapReduce: "Sort By Value"

As you may know, MapReduce by defult sorts the keys( Shuffle and Sort Phase) before sending the records to reducers. However, the values are not sorted. The order in which values appear to reducers differ from run to run. This is due to the fact that values are emitted from different map tasks, which may finish at different times from run to run. Generally, MapReduce programs are written in such a way that the order of values reaching reduce method doesn't matter. But if we want to impose an order on the values by sorting and grouping the keys in a particular way? Or if we also want to sort by value?

Let's understand the concept of secondary sorting with the help of an example. Consider the MapReduce program for calculating the maximum temperature for each year ( I shamelessly admit that I am taking this example from " Hadoop, The definitive Guide" and the data used is weather data set). With a slight modification in the format of the keys, secondary sorting gives us the ability to take the value into account during the sort phase. There are two possible approaches which can be followed.

The first approach involves having the reducer buffer all of the values for a given key and do an in-reducer sort on the values. Since the reducer will be receiving all values for a given key, this approach could possibly cause the reducer to run out of memory. The second approach involves creating a composite key by adding a part of, or the entire value to the natural key to achieve your sorting objectives.

We will stick to the second approach for the time being. For this we will need to write a custom partitioner to ensure all the data with same key (the natural key not including the composite key with the value) is sent to the same reducer and a custom Comparator so the data is grouped by the natural key once it arrives at the reducer. To achieve this, we change our keys to be composite: a combination of year and temperature. We want the sort order for keys to be by year (ascending) and then by temperature (descending): favorite According to the definitive guide example of secondary sorting We want the sort order for keys to be by year (ascending) and then by temperature (descending):

1900 35°C
1900 34°C 
1900 34°C
 ... 
1901 36°C 
1901 35°C

By setting a partitioner to partition by the year part of the key, we can guarantee that records for the same year go to the same reducer. This still isn’t enough to achieve our goal, however. A partitioner ensures only that one reducer receives all the records for a year; it doesn’t change the fact that the reducer groups by key within the partition Since we would have already written our own partitioner which would take care of the map output keys going to particular reducer". So, in order to get the desired reult we are going to need 3 main components:

  1. Key should be composite, having both year(natural key) and temperature(natural value).
  2. A partitioner which would pass common years to same partition.
  3. Two comparator,one for comparing year and another for comparing temperature.

Thursday, 23 January 2014

Partitioning in MapReduce


As you may know, when a job (it is a MapReduce term for program) is run it goes to the the mapper, and the output of the mapper goes to the reducer. Ever wondered how many mapper and how many reducers is required for a job execution? What are parameters taken into consideration for deciding number of mapper and reducer required in order to complete the execution of a job successfully? Can we decide it? If yes then how?

Let us understand the concept of passing the job to jobtracker upto the final result produced by the reducer.

Initially the job resides on the HDFS, when it is executed it goes to the jobtracker. Now jobtracker decides on the bais of size of the job that how many mappers are required. In MapReduce the size of the block is 128 MB, so if the size of the job is of 256 MB then jobtracker split the job into two blocks, each of 128 MB. These blocks are sent to datanodes or Tasktracer for execution. Each datanode has 2 mappers slots and 2 reducers slots. Now jobtrackers has the option to choose which mapper slot it want to assign the block.

How does jobtracker decides which mapper slot to use and from which datanode?

In our example our 256 MB block was splitted into 2 128 MB blocks. Suppose there are two datanodes availablw , with all empty mapper slots.Now there are two possibilities:
  1. Either jobtracker assigns both blocks to a single tasktracker.
  2. Or jobtracker assigns one block to one tasktracker and one to another.

Jobtracker will follow the second approach.It will assign one 128 block to one mapper slot of a tasktracker/datanode and another 128 MB block to another tasktracker/datanode. If another job comes then jobtracker can use the unused mapper slot of the datanode. After mapper's job is done the output of the mapper goes to one of the reducers. Which one? The mechanism sending specific key-value pairs to specific reducers is called partitioning. In Hadoop, the default partitioner is HashPartitioner, which hashes a record’s key to determine which partition (and thus which reducer) the record belongs in.The number of partition is then equal to the number of reduce tasks for the job.

Why Partitioning is important? First, partitioning has a direct impact on the overall performance of job we want to run. Second, it maybe sometimes required to control the key/value pairs (emitted from mapper) partitioning over the reducers. Let's understand this with the help of a simple example. Suppose we want to sort the output of the wordcount on the basis of number of occurences of tokens. Assume that our job will be handled by 2 reducers( We can specify that by using conf.setNumReduceTasks(0);). If we run our job without using any user defined partitioner, we will get output like this:
No_Of_Occur   Word/Token                No_Of_Occur   Word/Token

1                Hi                      2              so
3                a                       4              because
6                the                     5              is

      Reducer 1                               Reducer 2

This is certainly not what we wanted. Intead we were expecting the output to come like this:
No_Of_Occur   Word/Token                No_Of_Occur   Word/Token

1                Hi                         4              because
2                so                         5              is
3                a                          6              the

      Reducer 1                               Reducer 2

This would happen if we use correct partitioning function: all the tokens having a number of occurrences less than N (here 4) are sent to reducer 1 and the others are sent to reducer 2, resulting in two partitions. Since the tokens are sorted on each partition, we get the expected total order on the number of occurrences. Suppose we hava sample data.

aman 1
ashima 2
kaushik 3
sood 4
tony 5
stark 6
bruce 7
wayne 8
james 9
bond 10
mark 11
zuckerberg 12
saun 13
parker 14

And we want the result in such a way that names with number from 1 to 5 should appear in one file and rest in another file.Here is the code to achieve that:

package Partitioner; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class UserDefinedPartitioner { static String[] line=null; public static class PartitionerMap extends Mapper { @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { line=value.toString().split("\t"); context.write(new Text(line[0]), new Text(line[1])); } } public static class MyPartitioner extends org.apache.hadoop.mapreduce.Partitioner&ltText,Text&gt { @Override public int getPartition(Text key, Text value, int numPartitions) { int seed =Integer.parseInt(line[1]); if((seed>=1)&&(seed<=5)) return 0; else return 1; } } public static void main(String args[]) throws Exception { Job job = new Job(); job.setJarByClass(UserDefinedPartitioner.class); FileInputFormat.addInputPath(job, new Path(args [0])); FileOutputFormat.setOutputPath(job, new Path(args [1])); job.setMapperClass(PartitionerMap.class); job.setPartitionerClass(MyPartitioner.class); job.setNumReduceTasks(2); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
When we will list the contents of our output folder, we will get to files, each having result of one partitioner

$ hadoop fs -ls training/aman/nwc14 Found 3 items -rw-r--r-- 2 gpadmin hadoop 0 2014-01-24 16:17 training/aman/nwc14/_SUCCESS -rw-r--r-- 2 gpadmin hadoop 0 2014-01-24 16:17 training/aman/nwc14/part-r-00000 -rw-r--r-- 2 gpadmin hadoop 120 2014-01-24 16:17 training/aman/nwc14/part-r-00001 $ hadoop fs -cat training/aman/nwc15/part-r-00001 bond 10 bruce 7 james 9 mark 11 parker 14 saun 13 stark 6 wayne 8 zuckerberg 12 $ hadoop fs -cat training/aman/nwc15/part-r-00000 aman 1 ashima 2 kaushik 3 sood 4 tony 5
Obsereve the job.setNumReduceTasks(2); line in run method? If we don't write this line our code will work. Two partitions would be created but if size of the partition in too small, chances are there that only one outfile file would be created . So to delibrately tell the compiler to create two output files for each partition this line must be used.

Tuesday, 21 January 2014

First Program In MapReduce

Unlike going for typical "Hello World" program we will start by word count. As name shows this program counts number of words. First thing first , every mapreduce program need not to have a map and reduce method or mapper and reducder class(here is the code in decompression section with no map and reduce method).

In our first program we are going to use both map and reduce method. While making the program make sure the project in which the class is have all thenecessary jar files.

How do add jar files into your project for MapReduce?
  1. It's simple just go to location of your project in the eclipse-workspace, open the project,create a new folder ,copy all the jar files into it.
  2. Now open the project in eclipse, right click on project name, click on properties, a window will open in that choose java build path on the left panel, now choose Add External Jars, go the same location of your project select all jar files, click on ok.

Now we are good to go or our first program.


public class WordCount { public static class WordCountMap extends Mapper <LongWritable,Text,Text,IntWritable> { @Override public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException { String[] Line=value.toString().split("\t"); StringTokenizer tokenizer = new StringTokenizer(Line[0]); while (tokenizer.hasMoreTokens()) { String Count = tokenizer.nextToken(); context.write(new Text(Count), new IntWritable(1)); } } } public static class WordCountReduce extends Reducer <Text,IntWritable,Text,IntWritable> { public void reduce(Text key,Iterable values,Context context) throws IOException, InterruptedException { int sum=0; for(IntWritable value : values) { sum += value.get(); } context.write(key, new IntWritable(sum)); } } public static void main(String args[]) throws Exception { Job job = new Job(); job.setJarByClass(WordCount.class); FileInputFormat.addInputPath(job, new Path(args [0])); FileOutputFormat.setOutputPath(job, new Path(args [1])); job.setMapperClass(WordCountMap.class); job.setReducerClass(WordCountReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
The main class is defined ( here WordCount ),in which two class are defined which extends mapper and reducer classes. After the extending there is a list of parameters written, in our code we have Mapper < LongWritable, Text, Text, IntWritable >. It is in the form of < key_in, value_in, key_out, value_out >. Similarly in reducer's parameters Reducer < Text, IntWritable, Text, IntWritable > Mapper class contains map method and reducer class contains reduce method.

Map method has 3 parameters, map(LongWritable key, Text value, Context context).The first two arguments are the same as of mapper parameters i.e. key_in, value_in, in this case LongWritable and Text. The third argument Context's object is used to communicate between map and reduce methods(including setup and clean method). Now have a look at the first line of the map method.


String[] Line=value.toString().split("\t");


It split the input file on the basis of delimiter (here tab), parse the text into string and stores the string in the string array.This is most important line of the whole code.It uses StringTokenizer's object and take String line as argument. StringTokenizer break a string into tokens. The StringTokenizer methods do not distinguish among identifiers, numbers, and quoted strings, nor do they recognize and skip comments which is what we need in wordcount. the next loop simply counts the occurences of tokens and by emitting token as key and 1 as value. Why 1 as value? Its explained in the next section. Before we move to the reducer method let's understand the concept of Context:


context.write(new Text(Count), new IntWritable(1));


Context is the only means of communication between map and reduce methods. Whatever value is passed to the reduce method through map method is passed through the object of context. The parameter of context is context.write(key_out,value_out). So the types key_out and value_out should be same as given in the Mapper class's declaration. Now move on to the reduce method:


public void reduce(Text key, Iterable values, Context context)


Here also reduce method has 3 arguments.The first two arguments are the same as of map method's parameters i.e. key_out, value_out, in this case Text and IntWritable. But here instead of IntWritable we have written Iterable. This is due to the output format of the map method.Let us understand this with the help of an example.Suppose our input file has this data.

He is a king. He is nice.

Now internally the map method will emit the key and value like this.

< He,(1)>
< is,(1)>
< a,(1)>
< king.,(1)>
< He,(1)>
< is,(1)>
< nice,(1)>

Before the result reaches reducer , short and suffle phase conert the result into this intermeddiate reult

< He,(1,1)>
< is,(1,1)>
< a,(1)>
< king.,(1)>
< nice,(1)>

Reducer receives the above intermeddiate result and all the logics of reduce method are performed on this result not on the result produced by the map method.So the reduce method will just add the number of 1's in order to count the total occurences of the token. Here is the value which it emits along with the number of occurences. So the final result would be.

He 2
is 2
a 1
nice 1

Monday, 20 January 2014

Counters In MapReduce:"Channel For Statistics Gathering "

In MapReduce counters provide a useful way for gathering statistics about the job and problem diagnosis. Statistics gathering it may be for quality control or for application control. Hadoop has some but-in counters for every job which reports various metrics.

Advantages of using counters:
  • Counter should be used to record whether a particular condition occurred instead of using log message in map or reduce task.
  • Counter values are much easier to retrieve than log output for large distributed jobs.

Disdvantages of using counters:
  • Counters may go down if a task fails during a job run.

Built-in Counters


As mentioned above Hadoop maintains some built in counters for every job. Counters are divided into various groups. Each group either contains task counters which are updated as task progress or job counters which are updated as a job progresses.

Task Counters:It gathers information about the task dividing their entire execution and the results are aggregated over all the tasks in a job.For example MAP_INPUT_RECORDS counter counts the total number of input records for the whole job. It counts the input records read by each map task and aggregates over all map tasks in a job. Task counters are maintained by each task attempt and periodically sent to the tasktracker and then to the jobtracker so they can be globally aggregated (For more info, check YARN:MapRedeuce 2 post's "Progress And Status Update" section).To guard against errors due to lost messages, task counters are sent in full rather than sending the counts after last transmission.

Although counter values give the final value only after the job has finished execution successfully, some counters provide information while job is under execution. This inforamtion is useful to monitor job with web UI. For example, PHYSICAL_MEMORY_BYTES, VIRTUAL_MEMORY_BYTES and COMMITTED_HEAP_BYTES provide an indication of how memory usage varies over the course of a particulaar task attempt.

Job Counters:Job counters are maintained by the jobtracker (or application master in YARN).This is due to the fact that unlike all other counters(including user_defined) they don't need to be sent across the network.They measure job-level statistics , not values that change while a task is running.For example , TOTAL_LUUNCHED_MAPS counts the number of map tasks thet were launcehed over the course of a job including tasks that failed.

User-Defined Java Counters


MapReduce allows user to define a set of counters, which are incremented as required in mapper or reducer. Counters are defined by a Java enum which serves for group related counters. A job may define an arbitrary number of enums, each with an arbitrary number of fields. The name of the enum is the group name, and the enum’s fields are the counter names. Counters are global: the MapReduce framework aggregates them across all maps and reduces to produce a grand total at the end of the job.

Dynamic counters: The code makes use of a dynamic counter—one that isn’t defined by a Java enum. Because a Java enum’s fields are defined at compile time, you can’t create new counters on the fly using enums. Here we want to count the distribution of temperature quality codes, and though the format specification defines the values that the temperature quality code can take, it is more convenient to use a dynamic counter to emit the values that it actually takes.

The method we use on the Reporter object takes a group and counter name using String names: public void incrCounter(String group, String counter, long amount) The two ways of creating and accessing counters—using enums and using strings— are actually equivalent because Hadoop turns enums into strings to send counters over RPC. Enums are slightly easier to work with, provide type safety, and are suitable for most jobs. For the odd occasion

Saturday, 18 January 2014

Serialization and Deserialization in Hadoop

Serilaization is the process of converting structured objects into a byte stream. It is done basically for two purposes one, for transmission over a network(interprocess communication) and for writing to persisitent storage. In Hadoop the interprocess communication between nodes in the system is done by using remote procedure calls i.e. RPCs. The RPC rotocol uses serialization to make the message into a binary stream to be sent to the remote node,which receives and deserializes the binary stream into the original message.

RPC serialization format is expected to be:
  • Compact: To efficenetly use network bandwidth.
  • Fast: Very little performance overhead is expected for serialization and deserilization process.
  • Extensible: To adept to new changes and reqirements.
  • Interoperable:The format needs to be designed to support clients that are written in different languages to the server.
It should be noted that the data format for persistent storage purposes would have different requirements from serilaization framework in addition to four expected properties of an RPC's serialization format mentioned above.
  • Compact : To efficenetly use storage space.
  • Fast : To keep the overhead in reading or writing terabytes of data minimal.
  • Extensible : To transparently read data written in older format.
  • Interoperable :To read and write persistent using different languages.

Hadoop uses its own serialization format,Writables. Writable is compact and fast, but not extensible or interoperable.

The Writable Interface


The Writable interface has two methods, one for writing and one for reading. The method for writing writes its state to a DataOutput binary stream and the method for reading reads its state from a DataInput binary stream.

public interface Writable { void write(DataOutput out) throws IOException; void readFields(DataOutput in)throws IOException; }
Let us understand serialization with an example.Given below is a helper method.

public static byte[] serialize(Writable writable) throws IOException { ByteArrayOutputStream out = new ByteArrayOutputStream(); DataOutputStream dataOut = new DataOutputStream(out); writable.write(dataOut); dataOut.close(); return out.toByteArray(); }
Let’s try deserialization. Again, we create a helper method to read a Writable object from a byte array:

public static byte[] deserialize(Writable writable, byte[] bytes) throws IOException { ByteArrayInputStream in = new ByteArrayInputStream(bytes); DataInputStream dataIn = new DataInputStream(in); writable.readFields(dataIn); dataIn.close(); return bytes; }

WritableComparable and comparators


IntWritable implements the WritableComparable interface, which is a subinterface of the Writable and java.lang.Comparable interfaces:

package org.apache.hadoop.io; public interface WritableComparable extends Writable, Comparable { }
Comparison of types is important for MapReduce because in MapReduce there is sorting phase during which keys are compared with one another. Hadoop provides RawComparator extension of Java’s Comparator :

package org.apache.hadoop.io; import java.util.Comparator; public interface RawComparator extends Comparator { public int compare(byte[] b1,int s1,int l1,byte[] b2, int s2, int l2); }
This interface permits implementors to compare records read from a stream without deserializing them into objects, hence avoiding any overhead of object creation. For example, the comparator for IntWritables implements the raw compare() method by reading an integer from each of the byte arrays b1 and b2 and comparing them directly from the given start positions (s1 and s2) and lengths (l1 and l2). WritableComparator is a general-purpose implementation of RawComparator for WritableComparable classes. It provides two main functions:

First, it provides a default implementation of the raw compare() method that deserializes the objects to be compared from the stream and invokes the object compare() method. Second, it acts as a factory for RawComparator instances (that Writable implementations have registered).

For example, to obtain a comparator for IntWritable, we just use: RawComparator comparator = WritableComparator.get(IntWritable.class); The comparator can be used to compare two IntWritable objects:

IntWritable w1 = new IntWritable(163); IntWritable w2 = new IntWritable(67);
assertThat(comparator.compare(w1, w2), greaterThan(0)); or their serialized representations:

byte[] b1 = serialize(w1); byte[] b2 = serialize(w2);
assertThat(comparator.compare(b1, 0, b1.length, b2, 0, b2.length), greaterThan(0));

Wednesday, 15 January 2014

Hadoop Introduction


What is Hadoop ? “Hadoop”, the name itself is weird, isn’t it? the term Apache Hadoop was created by Doug Cutting. Hadoop came from the name of a toy elephant. Hadoop is all about processing large amount of data irrespective of whether its structured or unstructured, huge data means hundreds of GIGs and more. Traditional RDBMS system may not be apt when you have to deal with huge data sets. Even though “database sharding” is trying to address this issue, chances of node failure makes this less approachable.

Hadoop is an open-source software framework which enables applications to work with multiple nodes which can store enormous amount of data. It comprises of Two components:

Apache Hadoop Distributed File System (HDFS)
Google’s MapReduce Framework

Apache Hadoop was created by Doug Cutting, he named it after his son’s toy elephant. Hadoop’s original purpose was to support the Nutch search engine project. But Hadoop’s significance grown too far from that, now its a top level Apache project and is being used by a large community of users, to name a few, Facebook, New York times, Yahoo are some of the examples of Apache Hadoop implementations.

Hadoop is written in the Java programming language!! The Apache Hadoop framework is composed of the following modules :

Hadoop Common - contains libraries and utilities needed by other Hadoop modules
Hadoop Distributed File System (HDFS) - a distributed file-system that stores data on the commodity machines, providing very high aggregate bandwidth across the cluster.
Hadoop YARN - a resource-management platform responsible for managing compute resources in clusters and using them for scheduling of users' applications.
Hadoop MapReduce - a programming model for large scale data processing.

Significance of Hadoop

The data on the World Wide Web is growing at an enormous rate. As the number of active internet users increases, the amount of data getting uploaded is increasing. Some of the estimates related to the growth of data are as follows:

In 2006, the total estimated size of the digital data stood at 0.18 zettabytes
By 2011, a forecast estimates it to stand at 1.8 zettabytes.
One zettabyte = 1000 exabytes = 1 million petabytes = 1 billion terabytes

Social networking sites hosting photos, Video streaming sites, Stock exchange transactions are some of the major reasons of this huge amount of data. The growth of data also brings some challenges with it. Even though the amount of data storage has increased over time, the data access speed has not increased at the same rate.

If all the data resides on one node, then it deteriorates the overall data access time. Reading becomes slower; writing becomes even slower. As a solution to this, if the same data is accessed from multiple nodes in parallel, then the overall data access time can be reduced. In order to implement this, we need the data to be distributed among multiple nodes and there should be a framework to control these multiple nodes’ Read and write. Here comes the role of Hadoop kind of system.

Let’s see the problems that can happen with shared storage and how Apache Hadoop framework overcomes it.
Hardware Failure Hadoop is not expecting all nodes to be up and running all the time. Hapoop has a mechanism to handle the node failures, it replicates the data. Combining the data retrieved from multiple nodes Combining the output of each worker node is a challenge, Google’s MapReduce framework helps to solve this problem. Map is more like a key-value pair. MapReduce framework has a mechanism of mapping the data retrieved from the multiple disks and then, combining them to generate one output

Components Of Apache Hadoop: Hadoop framework is consisting of 2 parts Apache Hadoop Distributed File System (HDFS) and MapReduce.

Hadoop Distributed File System (HDFS)

Hadoop Distributed File System is a distributed file system which is designed to run on commodity hardware. Since the Hadoop treats node failures as a norm rather than an exception, HDFS has been designed to be highly fault tolerant. And moreover, it is designed to run on low cost shared hardware.

HDFS is designed to reliably store very large files across machines in a large cluster
HDFS stores each file as a sequence of blocks; all blocks in a file except the last block are the same size.
The blocks of a file are replicated for fault tolerance and this replication is configurable.
An application can specify the number of replicas of a file. The replication factor can be specified at file creation time and can be changed later.

The NameNode makes all decisions regarding replication of blocks. It periodically receives a Heartbeat and a Blockreport from each of the DataNodes in the cluster. Receipt of a Heartbeat implies that the DataNode is functioning properly. A Blockreport contains a list of all blocks on a DataNode

Replica placement is crucial for faster retrieval of data by the clients, For this, HDFS uses a technique known as Rack Awareness HDFS tries to satisfy a read request from a replica that is closest to the client. All HDFS communication protocols are layered on top of the TCP/IP protocol.

MapReduce :

MapReduce is the framework which helps in the data analysis part of Apache Hadoop implementation. Following are the notable points of MapReduce.

MapReduce is a patented software framework introduced by Google to support distributed computing on large data sets on clusters of computers
MapReduce framework is inspired by map and reduce functions commonly used in functional programming
MapReduce is consisting of a Map step and a Reduce step to solve a given problem.

Map Step:

The master node takes the input, chops it up into smaller sub-problems, and distributes those to worker nodes. A worker node may do this again in turn, leading to a multi-level tree structure. The worker node processes that smaller problem, and passes the answer back to its master node.

Reduce Step:

The master node then takes the answers to all the sub-problems and combines them in a way to get the output. All Maps steps execute in a parallel fashion The Reduce step takes in the input from the Map step. All the Maps with the same key fall under one reducer. However, there are multiple reducers and it will work in parallel. This parallel execution offers the possibility of recovery from partial failure. If one node (Mapper/Reducer) fails, then its work can be re-scheduled to another node.

Tuesday, 14 January 2014

Sequence File

What Is SequenceFile?


SequenceFile is just a flat file consisting of binary key/value pairs. It is highly used in MapReduce as input/output formats. In fact, the temporary output of each map is stored using SequenceFile.

The SequenceFile provides a Writer, Reader and Sorter classes for writing, reading and sorting purposes.
There are three different SequenceFile formats:-

  1. Uncompressed key/value records.
  2. Record compressed key/value records - only values are compressed .
  3. Block compressed key/value records - both keys and values are collected in blocks separately and compressed. The size of the block is configurable by user.

The recommended way is to use the SequenceFile.createWriter methods to construct the preferred writer implementation. The SequenceFile.Reader acts as a bridge and can read any of the above SequenceFile formats.

Why Do We Need A Sequence File?


HDFS is a distributed file system, mainly designed for batch processing of large amount of data. Now default block size of HDFS block is 64MB. When the size of a file is much smaller than the default block size, there is a tremendous degradation of performance, because of large number of seeks and lots of hopping from one datanode to another to retrieve a small file, which is inefficient.

When file size is very very small, the input for each process is very little and there are large number of map tasks. For example, a 20GB file broken up into files of size 100KB each, use a map of their own. Thus the time taken to finish the job extensively increases.

For solving these two problems mentioned above, we need a Sequence file. A Sequence file is a data structure for binary key-value pairs. it can be used as a common format to transfer data between MapReduce jobs. Another important advantage of a sequence file is that it can be used as an archive to pack smaller files. This avoids the above mentioned problems with small files.

How To Write And Read A Sequence File?


In order to create a sequence file, use one of its createWriter() static methods which returns a SequenceFile.Writer instance. We can then write key-value pairs using the append() method. After we are done, we can call the close() method. Similarly to read a sequence file, create an instance of SequenceFile.Reader and iterate it over the records by invoking the next() method. There are several versions of next() method and which one we use depends upon the serialization framework used. If a key-value pair is read, the function returns true, else it returns false. In case a value is read, it can be retrieved using the getCurrentValue() method.

How SequenceFile Is Stored Internally?


All of the above formats(in What Is SequenceFile heading) share a common header (which is used by the SequenceFile.Reader to return the appropriate key/value pairs). The summary of header is given below:-
SequenceFile Common Header
  • version - A byte array: 3 bytes of magic header 'SEQ', followed by 1 byte of actual version no. (example SEQ4,SEQ6)
  • keyClassName - String
  • valueClassName - String
  • compression - A boolean which specifies if compression is turned on for keys/values in this file.
  • blockCompression - A boolean which specifies if block compression is turned on for keys/values in this file.
  • compressor class - The classname of the CompressionCodec which is used to compress/decompress keys and/or values in this SequenceFile (only if compression is enabled).
  • metadata - SequenceFile.Metadata for this file (key/value pairs)
  • sync - A sync marker to denote end of the header. All strings are serialized using Text.writeString api.

The formats for Uncompressed and RecordCompressed Writers are very similar and are explained below:

Uncompressed and RecordCompressed Writer Format
  • Header
  • Record
  • Record length
    • Key length
    • Key
    • (Compressed?) Value

A sync-marker every few k bytes or so. The sync marker permits seeking to a random point in a file and then re-synchronizing input with record boundaries. This is required to be able to efficiently split large files for MapReduce processing. The format for the BlockCompressedWriter is as follows:

BlockCompressed Writer Format
  • Header
  • Record Block
    • A sync-marker to help in seeking to a random point in the file and then seeking to next record block.
    • CompressedKeyLengthsBlockSize
    • CompressedKeyLengthsBlock
    • CompressedKeysBlockSize
    • CompressedKeysBlock
    • CompressedValueLengthsBlockSize
    • CompressedValueLengthsBlock
    • CompressedValuesBlockSize
    • CompressedValuesBlock

The compressed blocks of key lengths and value lengths consist of the actual lengths of individual keys/values encoded in ZeroCompressedInteger format .

A sequence file is composed of a header and one or more records. The first three bytes of a sequence file are the bytes SEQ, which acts like a magic number, followed by a single byte representing the version number. The header contains other fields, including the names of the key and value classes, compression details, user-defined metadata etc. Each file has a randomly generated sync marker, whose value is stored in the header. Sync markers appear between records in the sequence file, not necessarily between every pair of records.

The internal format of the records depends on whether compression is enabled, and if it is, whether it is record compression or block compression. If no compression is enabled (the default), each record is made up of the record length (in bytes), the key length, the key, and then the value. The format for record compression is almost identical to no compression, except the value bytes are compressed using the codec defined in the header. Keys are not compressed.

Block compression compresses multiple records at once, it is therefore more compact than and should generally be preferred over record compression because it has the opportunity to take advantage of similarities between records. Records are added to a block until it reaches a minimum size in bytes, defined by the io.seqfile.compress.blocksize property, the default is 1 million bytes. A sync marker is written before the start of every block. The format of a block is a field indicating the number of records in the block, followed by four compressed fields: the key lengths, the keys, the value lengths, and the values.

Enough of theory, let us do some coding and implement Sequencefile in a program.

We will start with simple WordCount program. Write complete WordCount program as it is and just add one line in main method.


job.setOutputFormatClass(SequenceFileOutputFormat.class);
The final main method will look like this:

public static void main(String args[]) throws Exception { Job job = new Job(); job.setJarByClass(WordCount.class); FileInputFormat.addInputPath(job, new Path(args [0])); FileOutputFormat.setOutputPath(job, new Path(args [1])); job.setMapperClass(WordCountMap.class); job.setReducerClass(WordCountReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); System.exit(job.waitForCompletion(true) ? 0 : 1); }
Try to get the output as we do normally i.e.

$ hadoop fs -cat traing/aman/nwc8/part-r-00000
Instead of showing the result it will print some unexpected lines. This is due fact that sequence file can't be viewed like this.The format of viewing sequence file is different. Now try this command.

$ hadoop fs -text traing/aman/nwc8/part-r-00000
It will show the exact result of the code.

Monday, 13 January 2014

Hadoop Archieves


In HDFS each file is stored in a block and this block metadata is held in memory by the namenode. No matter how small the file is the same method is followed HDFS end up storing small files inefficiently. As a result a large number of small files can consume a lot of memory on the namenode. HDFS provides a file archiving facility,special format archives, HAR files or Hadoop archives that stores files into HDFS blocks more efficiently,hence reduces namenode memory usage and allowing transarent access to files at the same time.

Point To Remember- In HDFS small files do not take up any more disk space than its size. For example, a 2 MB file stored with a block size of 128 MB uses 2 MB of disk space, not 128 MB.

A Hadoop Archive is created from a collection of files using the archive tool. The tool runs a MapReduce job to process the input files in parallel. so in order to run it,we need a running MapReduce cluster to use it. A Hadoop archive maps to a file system directory. A Hadoop archive always has a *.har extension. A Hadoop archive directory contains metadata and data (part-*) files. Metadata is in the form of _index and _masterindex.The _index file contains the name of the files that are part of the archive and the location within the part files.

How to Create an Archive



Usage: hadoop archive -archiveName name -p *

For example
% hadoop fs -lsr /old/files
-rw-r--r-- 1 tom supergp 1 2013-05-09 09:03 /old/files/a
drwxr-xr-x - tom supergp 0 2013-05-09 09:03 /old/files/dir
-rw-r--r-- 1 tom supergp 1 2013-05-09 09:03 /my/files/dir/b

Now run the archive command: % hadoop archive -archiveName files.har /old/files /old

The first option after -archiveName is the name of the archive, here files.har. Second one is the files to put in the archive. Here we are archiving only one source tree, the files in /old/files in HDFS, but the tool accepts multiple source trees. The final argument is the output directory for the HAR file.

% hadoop fs -ls /old
Found 2 items
drwxr-xr-x - tom supergp 0 2013-05-09 09:03 /old/files
drwxr-xr-x - tom supergp 0 2009-04-09 19:13 /old/files.har

% hadoop fs -ls /old/files.har
Found 3 items

-rw-r--r-- 10 tom supergp 165 2013-05-09 09:03 /old/files.har/_index
-rw-r--r-- 10 tom supergp 23 2013-05-09 09:03 /old/files.har/_masterindex
-rw-r--r-- 1 tom supergp 2 2013-05-09 09:03 /old/files.har/part-0

The directory listing shows what a HAR file is made of: two index files and a collection of part files (this example has just one of the latter). The part files contain the contents of a number of the original files concatenated together, and the indexes make it possible to look up the part file that an archived file is contained in, as well as its offset and length. All these details are hidden from the application, however, which uses the har URI scheme to interact with HAR files, using a HAR filesystem that is layered on top of the underlying filesystem (HDFS in this case).