Wednesday, 27 August 2014

Ranker: Top N values


Ranker finds the top N values in a column, for each group.

Mapper:

Mapper will pass each group as a key and entire row as value. User is entering the column delimiter, group column number, value column number, and order (true for ascending, false for descending).
Suppose our input file is:
1,11,a
2,12,a
3,13,b
4,14,c
5,15,d
6,16,c
7,17,g
8,18,e
9,19,a

A sample command:
hadoop jar /root/Documents/iv3.jar iv3.TopValues  MapReduceInput/xy1.txt  MapReduceOutput/TopVal "," 3 1 1 true
Here "," is column delimiter.
"3" is group key i.e the 3rd column.
"1" is the column on which ranking will be done.
"1" means top 1 value.
"true" means we are expecting result in ascending order.

Then mapper will send key value pair as:
a,(1,11,a)
a,(2,12,a)
b,(3,13,b)
...
...

Reducer :

Reducer is using a TreeMap for storing the data.
key:value
1: 1,11,a
2: 2,12,a
9: 9,19,a
When number of enteries exceed the N value, we are deleting one entry i.e. the entry with highest key. For descending order it will delete the entry with lowest value.


So for key "a" it will keep just one entries and would delete the entry with "9" and "2" as key. Similarly each key (group) is processed. So the output will be:
1,11,a
3,13,b
4,14,c
5,15,d
8,18,e
Here is the entire code.

Friday, 21 February 2014

All About Hadoop: Issue #2

In the Issue #1 of this "All About Hadoop" series, we discussed some basic facts and components of Hadoop. This post will cover application development. We will also learn how to get the data into Hadoop. So let's get started.

Application Development


To abstract some of the complexity of Hadoop programming model, several application development languages have emerged that run on top of Hadoop. Here, we will cover the most popular ones: Pig, Hive and Jaql.

Pig And PigLatin


Little history first: Pig was developed by Yahoo. Just like real pigs, who can eat almost everything, Pig programming language is designed to deal with any kind of data. Pig is basically made of two components: first is language (PigLatin) and second is run-time environment where PigLatin programs are executed. We can correlate this to relationship between JVM and Java application. The first step in Pig program: LOAD the data we want to manipulate from HDFS. Then we run the data through set of transformations. Finally we dump the data into the screen or we can STORE the result in some file.

LOAD: Just like any other Hadoop feature, the objects or data on which Hadoop works are stored in HDFS. To make Pig program access this data, the program first must tell Pig what file/files it will use. It is achieved through the LOAD 'data_file' command, data_file can be any HDFS directory or file. If data_file is directory, then all the files in the directory are loaded into the program. If the file format used as input file is not acceptable to [ig, then we can add the USING function to LOAD statement to specify a user defind function that can read and interpret the data.

TRANSFORM: It is responsible for all the data manipulations. In this command we can FILTER out rows that are of no use, JOIN two sets of data files, GROUP data to build aggregations, ORDER results and many more things.

DUMP and STORE: If we don't specify DUMP or STORE command, the result of a pig program are not generated. We would typically use the DUMP command, which sends the output to screen, when we are debugging pig programs. If we want to store the result in a file , then instead of using DUMP call, use STORE call.

After creating a pig program we need to run it on Hadoop environment. This is where the Pig run times comes in. There are three ways to run Pig program: embedded in a script, embedded in a java program, or from pig command line called as Grunt. No matter whichever way we choose to tun the Pig program, the pig run-time environment translates the program into a set of map and reduce tasks and run them under the covers for us. It simplifies the work associated with the analysis of massive volume of data and allows the developer to focus on the analysis of data rather on individual map and reduce tasks.

HIVE


Some history and few facts first, although Pig can be very powerful and simple language to use, the disadvantage is that it is new. Hive allows developers to develop Hive Query Language (HQL) statements that are similar to SQL statements. HQL statements are broken down into by Hive service into MapReduce jobs and are executed across the cluster. We can run our Hive queries in many ways:
  • Command line interface known as Hive shell
  • Java Database connectivity or Open Database Connectivity Application
  • Hive Thrift Client
Hive Thrift Client is much like any database client that gets installed on user's client machine: It communicates with the Hive services running on the server. We can use Hive Thrift Client within application written in C++, java, PHP, Python Ruby. Hive looks pretty much like traditional database code with SQL access. But, as Hive is based on MapReduce there are various differences Hadoop is intended for long sequential scan and because Hive is based on Hadoop, we can expect queries to have very high latency . This means Hve is not appropriate for applications that need very fast response times, as we could expect with database such as DB2. Moreover Hive is read based and therefore not appropriate for transaction processing that typically involves a high percentage of write operations.

Getting Data Into Hadoop


One of the challenges with Hadoop HDFS is that we can't use typical file system things like copying, creating, moving, deleting, or accessing a file and more. To do anything with a file in HDFS, we must use the HDFS interfaces or API directly. In this section, we will discuss the basic of getting our data into HDFS, and cover Flume , which is a distributed data collection service for flowing data into Hadoop cluster.

Basic Copy Data The most common way to move files from a local file system into HDFS is trough the copyFromLocal command. To get files out of HDFS to local file system use the copyToLocal command. An exmple of above two commands is shown below:
hdfs dfs -copyFromLocal /user/dir/file hdfs://s1.com/dir/hdfsfile
hdfs dfs -copyToLocal  hdfs://s1.com/dir/hdfsfile /user/dir/file
These commands are run through the HDFS shell program, which is simply a Java application. The shell uses the Java APIs for getting data in and out of HDFS. The APIs can be called from any Java application.

The problem with is approach is that we must have Java application developers write the logic and programs to read and write data from HDFS. If we need to access HDFS files from our Java applications, we would use the methods in org.apache.hadoop.fs package. This allows us to incorporate read and write operations directly , to and from HDFS, from within our MapReduce application.

Flume: A flume is a channel that directs water from a source to some other location where water is needed. Flume was created to allow you to flow the data from a source to Hadoop environment. In flume the entities we work with are called sources, decorators and sinks.

A source can be any data source. Flumes has many predefined source adapters. A sink is the target of specific operations. A decorator is an operation on the stream that can transform the steam in some manner, which could be to compress or decompress data, modify data by adding or removing pieces of information and more.

A number of predefined source adapters are built into Flume. For example, some adapters allow the flow of everything coming of TCP port to enter the flow, or anything coming to standard input (stdin). A number of text file source adapters give granular control to grab a specific file and feed it into a data flow or even take a tail of file and continuously feed the flow with whatever new data is written to a file.

There are three kind of sinks in flume. One basically is final flow destination and is known as Collector Tier Event sink. This is where we would land a flow into an HDFS formatted file system. Another sink type used in Flume is called an Agent Tier Event. This sink is used when we want the sink to be the input source for another operation. When we use these links, Flume will ensure the integrity of the flow by sending back acknowledgements that data has actually arrived at sink . The final sink type is known as a Basic sink, which can be a text file, the console display, a simple HDFS path or a null bucket, where the data is simply deleted.

Wednesday, 19 February 2014

All About Hadoop : Issue#1

If you are new to Hadoop, then this post is for you. Altough, it is very difficult to cover everything about Hadoop in few pages, but I have tried to touch every important term and concept that defines Hadoop. In this issue #1 of two part series, I will cover the facts and components of Hadoop. In the next post, we will discuss the application development and will learn how to get the data into Hadoop. So let's get started.

Just Some Facts


Hadoop is a computing environmemnt built on top of a distributed clustered file system that was designed specifically for very large-scale data operations. Hadoop was inspired by Google's work on its Google File System and the MapReduce programming paradigm, in which work is broken down into mapper and reducer taks to manipulate data that is stored across a cluster of servers for massive parallelism. Unlike transaction systems, Hadoop can scan through large data sets to produce it's results through scalable, distributed batch processing system. Hadoop is not about speed-of-thought response times, real-time warehousing, or blazing transactional speeds, it is about discover and making the once near-impossible possible from a scalability and analysis perspective. The Hadoop methodologhy is built around a function-to-data model as opposed to data-to-function, because there is so much data, the analysis programs are sent to the data.

Hadoop, basically has two parts: a file system (Hadoop Distributed File Sytem) and a programming paradigm (MapReduce). One of the key components of Hadoop is the redundancy. Not only data is the data redundantly stored i multiple places across the cluster, but the programming model is such that failures are expected and are resolved automatically by running portions of the program on various servers in the cluster.This Redundancy makes possible to distribute the data and it's associated programming across a very large cluster of commodity components. Obviously, from a large number of commodity hardware components, a few will fail, but this redundancy comes into play and provides fault tolerance, and a capability for the Hadoop cluster to heal itself. This allows Hadoop to scale out worloads across large clusters of inexpensive machines to work on Big Data problems.

Components Of Hadoop


Hadoop is comprised of three components :Hadoop Distributed File System, Hadoop MapReduce model and Hadoop Common.Let us undertand these compnents one by one.

Hadoop Distributed File System

Data in Hadoop cluster is broken down into smaller pieces called as blocks and distributed throughout the cluster. Due to this, the map and reduce functions can be executed on smaller subsets of our larger data sets, and this provides the scalability that is needed for Big Data processing. It can be infered that the goal of Hadoop is to use commonly available servers in a very large cluster where each server has a set of inexpensive interal disk drives. For achieving higher performance, MapReduce tries to asign workloads to these servers where the data to be processed is stored. This concept is known as data locality. The cool thing about Hadoop is that it has built-in fault tolerance and fault compensation capabilities. In HDFS too, data is divided into blocks and copies of the blocks are stored on other servers in the Hadoop cluster. It means, that an single file is actually stored as smaller blocks that are replicated across multiple servers in the entire cluster. The redundancy provides various benefits, among them higher availability comes on top. In addition, redundancy allows the Hadoop cluster to break work up into smaller chunks and run those jobs on all the servers in the cluster for better scalability.

A data file in HDFS is divided into blocks, and the deafult size of these blocks for Apache Hadoop is 64 MB. For larger files, a higher block size is a good idea, as this will greatly reduce the amount of metadata required by NameNode. The expected workload is another consideration, as nonsequenctial access patterns will perform more optimally with smaller block size. Coordination across a cluster has significant overhead, so the ability to process large chunks of work locally without sending data to other nodes helps improve both performance and the overhead to real work ratio. Each data block is stored on 3 differnt servers in Hadoop, this is implemented by HDFS working behind the scenes to make sure at least two blocks are atored on a separate server rack to improve realiability in the event of losing an entire rack of servers.

All of Hadop's data placement logic is managed by a special server called NameNode. This NameNode server keep track of all the data files in HDFS, such as where the blocks are stoed and more. All of the NameNode's information is stored in memory, which allows it to provide quick response times to storage manipulation to read requests. Now as there is only one NameNode for entire cluster, storing this information in memory creates a single point of failure. So server acting as NameNode must be more robust than the rest of the servers in cluster, obviously to minimize the possibility of failures. One more thing, it is highly recommended to have regular backup process for the cluster metadat stored in the NameNode.

Map Reduce

MapReduce is the heart of Hadoop. It is this programming paradigm that allows for massive scalability across hundreds or even thousands of servers in a Hadoop cluster. The term MapReduce actually refers to two separate and distinct task Hadoop program perform. The first is the map job, which takes a set of data and coverts it into another set of data, in which individual elements are broken down into key/value pairs. The reduce job takes the output from a map as input and combines those data key/value pairs into smaller aet of tuples. As the nsme implies, the reduce job is always performed after the map job.
Take an example to understand it more clearly. We have 4 files, each having 2 columns, City and Temperature. Sample data of a file is given below:
London, 12
New Delhi, 33
Venice, 22
Mumbai, 26
London, 22
New Delhi, 43
Venice, 27
Mumbai, 27

Now if we want to calculate maximum temperature of each city, of all the files. By using MapReduce framework, we can acheive this by breaking down into 4 map tasks, each working on one file. Each map task will return maximum temperature for each city in the file. Like from the sample file above, the map will return.

(London, 22) , (New Delhi, 43) , (Venice, 27) , (Mumbai,27)
Now assume that other map tasks working on other 3 files return this data.
(London, 23) , (New Delhi, 44) , (Venice, 23) , (Mumbai,29)
(London, 22) , (New Delhi, 41) , (Venice, 24) , (Mumbai,21)
(London, 24) , (New Delhi, 42) , (Venice, 25) , (Mumbai,24)
All the output data of 4 map tasks are fed into reduce tasks , which combine the input result and return output as single value for each city i.e. the maximum value. The key thing here to notice that, there may be more than multiple reducer working in parralel. In such case. all the key/ value pairs of each city sholud go to the same reducer to find the maximum temerature. This directing of records to reduce tasks is known as shuffle (learn more about shuffle) , which takes input from map tasks and and direct the output to a specific reduce task. So the final result generated by job will be:
(London, 24) , (New Delhi, 44) , (Venice, 27) , (Mumbai,29)


A MapReduce program is referred as job. A job is executed by subsequently breaking it down into small pieces called tasks. An application submits the job to a specific node in cluster, which is running a daemon (software) called JobTracker. The JobTracker communicates with the NameNode to find out where all of the data required for this job exists in the cluster and then breaks the job down into map and reduce tasks for each node to work on the cluster where the data exists. JobTracker tries to avoid the case where a node is given a task for which the data needed by that task is not local to that node. It does so by attempting to shedule tasks where the data is stored. This is the concept of data locality and it is very vital when working with large volumes of data.

In a cluster, a set of continually running daemons, known as TaskTracker agents, moniter the status of each task. If a task fails to complete, the status of that failure is reported back to the JobTracker, which will then reschedule that task on another node in the cluster. Hadoop gives the option to perform local aggregation on the output of each map task before sending the results off to a reduce task through a local aggregation called a Combiner (learn more about Combiner). Obviously when multiple reduce tasks are running overhead increases but for large datasets it improves overall performance. All MapReduce programs that run natively under Hadoop are written in Java, and it is the Java Archive file (jar) that's distributed by the JobTracker to the various Hadoop cluster nodes to execute the map and reduce tasks.

Check out the Fisrt Program in MapReduce and you will an idea what we just discussed. Issue#2 of this All About Hadoop series will cover applications and getting data into Hadoop.

Thursday, 13 February 2014

Why is Big Data Important - Analysis And Applications

According to a Forbes article on "Why Big Data Matters" : "Terabytes, Petabytes, Exabytes. Who can keep track? These strange terms have just begun to enter the business lexicon, but the hype surrounding them has reached a fever pitch. We have undoubtedly entered the age of big data". Exactly, we have entered that age. Like any new technology, there is a lot of confusion surrounding big data. There are endless debates about what is and isn’t big data. So first let us clear that.

Big Data can be defines and interpreted in many different ways and why in Big Data Introduction post I defined Big Data in the terms of volume, velocity, and variety attributes. One thing that should be kept in mind that Big Data solutions are not a replacement for our existing warehouse solutions. There are some key principles which should be kept in mind before considering when to use Big Data technologies.

  • Big Data solutions works very well not only for structured data but also well suited for semi structured and unstructured data.
  • Big Data solution work best when all of the data or almost all the data is analyzed with respect to sample data.
  • Big Data solutions are ideal for iteratory and exploratory analysis, when there is no predetermined business measures on data.

Social Media


Perhaps the most talk about Big Data usage pattern is social media and customer sentiment . We can use Big Data to figure out what customers are saying about any organization and what are they saying about their competitors. Moreover the organization can use this newly found insight to figure out how this how this sentiments impacts the decision you they are making and the way the company engages. More specifically they can determine how sentiment is impacting sales, the effectiveness of marketing campaign, review of certain product and so on.

Log Analytics


Log analytics is common use case for any Big Data project. All logs and trace data that are generated by operations of IT solutions are called as Data Exhaust. Organizations have a lot of Data Exhaust and it's pretty much a pollutant if it's just left around for a couple of hours or days in case of emergency and simply purged .Reason? Because Data Exhaust has concentrated value and IT companies need to figure out the way to store and extract value from it. Some of the value derived from data exhaust is obvious and has been transformed into value added click stream data that records every gesture , click and movement made on website.

Fraud Detection


By using Big Data platform it's possible to stop fraud. Several challenges in fraud detection pattern are directly attributable to solely utilizing conventional technologies. The most common and recurring theme we will see across all Big Data patterns is limits on what can be stored as well as available compute resources to process our intentions. Without Big Data Technologies, these factors limit what can be modeled. Less data equals constrained modeling.

Weather Forecasting


The philosophy of Big Data is that insights can be drawn from a large volume of ‘dirty’ (or ‘noisy’) data, rather than simply relying on a small number of precise observations. One good example of the success of the ‘Big Data’ approach can be seen in Google’s Flu Trends which uses Google searches to track the spread of flu outbreaks worldwide. Despite the inevitable noise, the sheer volume of Google search data meant that flu outbreaks could now be successfully identified and tracked in near real-time. It is also important to remember that Big Data when used on its own can only provide probabilistic insights based on correlation. The true benefit of Big Data is that it drives correlative insights, which are achieved through the comparison of independent datasets. It is this that buttresses the Big Data philosophy of ‘more data is better data’; you do not necessarily know what use the data you are collecting will have until you can investigate and compare it with other datasets.

The ‘Big Data’ approach has already begun to be incorporated into weather nowcasting, and the Flu Trends disease example provides an excellent allegory for where it can initially prove most useful.

Few Things to Remember


When it comes to solving information management challenges using Big Data technologies, there are few things that we should know.The data bound for analytic warehouse has to be cleansed , document before it is placed in the warehouse having strict schema. On the other hand Big Data Solution not only works on data which is not suited for traditional warehouse environment but also doesn't follow the strictness that traditional warehouse follow before putting data into them.

Conclusion


We can preserve the fidelity of data an gain access to massive volume of information for exploration and finding insights. It's important to understand that the traditional database technologies are important and in fact are relevant part of overall analytic solution. Traditional database technologies become more vital when used together with your Big Data Platform. Broadly it can be conclude that there are some class of problems that don't belong to traditional database technologies (at initial stage). And there is another kind of data that we are not sure of whether of putting in warehouse, may be because we don't now whether it's rich in value, it's structured, or it's too big. Sometimes we can't find out value per byte of data before investing effort and money. At the end of the day organizations want to know whether data is worth saving and has a high value per byte before investing in it.

So what we really need to know about big data is this: It represents a fundamental shift in how we do things. In effect, big data opens the door to a strategy where we no longer try to be “right” based on controlled research and small samples, but rather become less wrong over time as real world information floods in.

Tuesday, 11 February 2014

Big Data : The Next Big Thing

Big Data implies to data which can't be processed or analysed using traditional tools and processes. Obviously, organizations today are dealing with more and more Big Data and challenges that comes with it. This enormous volume of data is sitting in semi structured or unstructured format. Organizations are even wondering whether it's worth keeping this data. These challenges in a climate where they have the ability to store anything and they are generating data like never before in history, makes the problems more complicated.

Let's talk about the characteristics of Big Data and how it fits into the current information management landscape. Take the example of railway cars, which has hundreds of sensors. These sensors track things like conditions experienced by cars, the state of individual parts and GPS based data for shipment tracking and logistics. Processors have been added to them to interpret sensor data on parts prone to wear, such as bearings and to identify parts that need repair before they fail and cause more damage. Rail tracks also have been installed with sensors, every feet, to find out any damage to the track , to avoid any accident. Now add this to tracking a rail car's load, arrival and departure times and you can get an idea of BIG data problem. All these data stored everyday and kept stored for further analysis. Rail cars are just one example , but everywhere we look , we see domains with velocity, volume and variety combining to create the Big Data problem.

Characteristics of Big Data


Three characteristics define Big Data: volume, variety and velocity. These three characteristics define what we refer to as "Big Data".

Volume
As implied by the term "Big Data", companies are facing enormous amount of data. Companies that don't know how to manage this data are overwhelmed by it. They are missing an opportunity, which can be grabbed with right technology platform, to analyse almost all the data or at least that data that is useful, to gain better understanding of their business and their customers. As the amount of data available with the organization is increasing, the percent of data organization can process is on decline, creating a blind zone.

Variety
The sheer amount of volume associated with the Big Data brings new challenge to deal with it:variety. The data which we face today is not only traditional data, but also raw, unstructured and semi structured data generated by web (social media, search indexes, web log files, sensor data from active passive system etc ). Traditional analytic platforms obviously struggle to analyse this raw and unstructured data, and get understanding from it, which can be used further. In simple words, variety represents all kind of data, a fundamental shift from traditional structured data. As Traditional analytic platforms can't deal with unstructured data, organizations are struggling, as it's success depends on it's ability to draw insights from the various kind of data. If we look at the data, 20 percent of it is relational i.e structured data, which fits traditional schema structure, on which we spent most of our time. The other 80 percent of the world's data is unstructured or semi structured at it's best. For instance videos and pictures does not come under relational data and what we see everyday is videos and pictures.

Velocity
Just like volume and variety of the data has changed, likewise the velocity at which it is generated and handled is also changed. A conventional definition of velocity deals with questions like, how quickly the data is arriving and stored? and how quick is the retrieval ?. Although it is just a rough idea what velocity suggest here, but with the massive amount of data, the idea of velocity is far more compelling than this definition. Broadly understand it like this: the speed at which data is flowing. The traditional technology platforms are incapable in dealing with data that huge(Big Data Huge) and coming at fast speed, and sometimes knowing something first is everything. Identifying a trend, a need, a problem in seconds before someone else gives an edge over competitors. Plus, more and more data produced today has short shelf-life. So organizations need to analyse the data quickly and get insights in the data. For instance traffic management wants to know the vehicles heading in the direction of already crowded highways where there is a high possibility of traffic jam or vehicles headed for areas where there is already massive jam. So go get the data at real time, in seconds, will be helpful (by tracking the GPS in vehicles)in achieving that, as in minutes the locations of cars will change. Dealing effectively with Big Data requires to analyse the massive volume of data containing a variety of data while it is still in motion.

Data In Warehousing And Data In Hadoop


Traditional warehouse are only capable of dealing with traditional structured data. Hadoop platform is well structured in dealing with semi structured and unstructured data. Tha data that go into warehouse first goes through a lot of rigors to make it into the warehouse. Of course it's a costly process but it makes sure that the data that lands into the warehouse is of high quality, but it has a broad purpose. On the other hand Hadoop rarely undergo the quality control rigors of data that go into the warehouse. Why? With Hadoop, having massive volume of data and with it's variety, there is no way to cleanse the data and document every piece of data properly, and it's not economical too. Hadoop data is not trusted , Hadoop data might seem to be low in value but it can be in fact be the key to the question unasked.

Conclusion


The term Big Data applies to information that can not be processed or analyzed using traditional processes and tools. Increasingly, organization today are facing more and more Big Data challenges. They have access to a wealth of information, but they don't know how to get value out of it because it is sitting in it's most raw form or in a semi structured or unstructured format, and as a result they don't know whether it is worth keeping.

Wednesday, 5 February 2014

MapReduce Inputs And Splitting

Uptill now we have used only Text file as input files. Can we use any other file format? Can we use binary format or XML format? Let us find out. First, we need to get familiar with few terms . Input split: It is part of input processed by a single map. Each split is processed by a single map. In other words InputSplit represents the data to be processed by an individual Mapper. Each split is divided into records , and the map processes each record, which is a key value pair. Split is basically a number of rows and record is that number.

The length of the InputSplit is measured in bytes. Every InputSplit has a storage locations (hostname strings). The storage locations are used by the MapReduce system to place map tasks as close to split's data as possible. The tasks are processed in the order of the size of the splits, largest one get processed first(greedy approximation algorithm). This is done in order to minimize the job runtime. One important thing to remember is that InputSplit doesn't contain input data but a reference to the data.

public abstract class InputSplit { public abstract long getLength() throws IOException, InterruptedException; public abstract String[] getLocations() throws IOException,InterruptedException; }
As a user, we don't have to use InputSplits directly, InputFormat does that job. An InputFormat is a class that provides the following functionality:
  • Selects the files or other objects that should be used for input.
  • Defines the InputSplits that break a file into tasks.
  • Provides a factory for RecordReader objects that read the file.
The overall process can be explained in following points:
  • The client which runs the job calculates the splits for the job by calling getSplits().
  • Client then sends the splits to the jobtracker, which uses their storage locations to schedule map tasks that will process them on the tasktrackers.
  • On a tasktracker, the map task passes the split to the createRecordReader() method on InputFormat to obtain a RecordReader for that split.
  • Map task uses RecordReader to generate record key-value pairs, which it passes to the map function. We can see this by looking at the Mapper’s run() method:
public void run(Context context) throws IOException, InterruptedException { setup(context); while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } cleanup(context); }
First, the setup() method is called, then the nextKeyValue() is called repeatedly on the Context to populate the key and value objects for the mapper. Each key-value pair is retreived from the RecordReader and are passed to the map() method. The nextKeyValue() method returns false, when there is no more key-value pair left to get read. Then the map task runs its cleanup() method at the end.

FileInputFormat


It is the base class for all implementations of InputFormat. It provides two things: a place to define which files are included as the input to a job and an implementation for generating splits for the input files.

How to split input files?


Altough, FileInputFormat splits only those files which are larger than HDFS block. The split size can be controlled by various Hadoop properties. Input path and filter properties are given in the below table.
Property name Type Default value Description mapred.min.split.size int 1 Smallest valid size in bytes for a file split mapred.max.split.size long Long.MAX_VALUE, that is, Largest valid size in 9223372036854775807 bytes for a file split dfs.block.size long 64 MB, The size block in HDFS

The minimum split size is usually 1 byte, although some formats have a lower bound on the split size. We may impose a minimum split size. By setting this to a value larger than the block size, they can force splits to be larger than a block. But this is not good while using HDFS, because doing so will increase the number of blocks that are not local to a map task. The maximum split size defaults to the maximum value that can be represented by a Java long type. It has an effect only when it is less than the block size, forcing splits to be smaller than a block. The split size is calculated by the formula (see the computeSplitSize() method in FileInputFormat):

max(minimumSize, min(maximumSize, blockSize))
and by default:
minimumSize < blockSize < maximumSize
so the split size is blockSize.

How to prevent splitting?


Some applications don’t want files to be split, as this allows a single mapper to process each input file in its entirety. For example, a simple way to check if all the records in a file are sorted is to go through the records in order, checking whether each record is not less than the preceding one. Implemented as a map task, this algorithm will work only if one map processes the whole file. There are a couple of ways to ensure that an existing file is not split. The first (quick and dirty) way is to increase the minimum split size to be larger than the largest file in your system. Setting it to its maximum value, Long.MAX_VALUE, has this effect. The second is to subclass the concrete subclass of FileInputFormat that you want to use, to override the isSplitable() method4 to return false. For example, here’s a nonsplittable TextInputFormat:
import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; public class NonSplittableTextInputFormat extends TextInputFormat { @Override protected boolean isSplitable(JobContext context, Path file) { return false; } }

Text Input


TextInputFormat: As yoy may know, text files are usually taken as input files.Similarly, TextInputFormat is the default InputFormat. Now each record is a line of input and key (a LongWritable) is the byte offset within the file of the beginning of the line. Finally, value is the contents of the line( excuding any line terminators).For instance, file having folowing content:

A king should hunt regularly A queen should shop daily, Other people should just try.

The records are interpreted as the following key-value pairs.
0, A king should hunt regularly 29,A queen should shop daily, 55,Other people should just try.

Rememeber guys, the keys are not line numbers but offset from begining of the file. Offset are sufficient to serve as a unique identifier for each line. And if we combine it with the file name, it would be unique with in the filesystem.

NLineInputFormat: The number of lines that each mapper receives depends on the size of split and the length of the lines. If we want to set this number then NLineInputFormat is the InputFormat to use. In this type too, the keys are the byte offsets within the file and values are the lines themselves. Here N refers to the number of lines of input that each mapper receives. The default value of N is 1, so each mapper receives excatly one line of input.
mapreduce.input.lineinputformat.linespermap

Above writen property controls the value of N.

XML: Hadoop provides class for called StreamXmlRecordReader. We can use it by setting our input format to StreamInputFormat and setting the stream.recordreader.class property to org.apache.hadoop.streaming.StreamXmlRecordReader. The reader is configured by setting job configuration properties to tell it the patterns for the start and end tags.

Binary Input


Here is the answer to the question asked at the beginning of this post. Yes we can use data other than textual data. Hadoop MapReduce also support binary formats.

SequenceFileInputFormat : It stores sequence of binary key-value pairs. To use data from sequence files as the input to MapReduce, you use SequenceFileIn putFormat.To use data from sequence files as the input to MapReduce, you use SequenceFileIn putFormat. SequenceFileIn putFormat. SequenceFileInputFormat can read MapFiles as well as sequence files. SequenceFileInputFormat assumes that it is reading a MapFile and uses its datafile. This is why there is no MapFileInputFormat class.

SequenceFileAsTextInputFormat: SequenceFileAsTextInputFormat is a variant of SequenceFileInputFormat which converts the sequence file’s keys and values to Text objects. The conversion is performed by calling toString() method on the keys and values. This format makes sequence files suitable input for Streaming.

Database Input and Output


DBInputFormat is an input format for reading data from a relational database, using JDBC. We need to be careful not to overwhelm the database from which you are reading by running too many mappers, as it doesn’t have any sharding capabilities. For this reason, it is best used for loading relatively small datasets, perhaps for joining with larger datasets from HDFS using MultipleInputs. The corresponding output format is DBOutputFormat, which is useful for dumping job outputs (of modest size) into a database.

Thursday, 30 January 2014

Mutiple Input Files In MapReduce: The Easy Way

In the previous issue of this series, we discussed a simple method of using multiple input files : Side Data Distribution. But it was of limited use as input files can only be of minimal size. In this issue, we’ll use our playground to investigate another approach to facilitate multiple input files offered by Hadoop.

This approach as a matter of fact is very simple and effective. Here we simply need to understand the concept of number of mappers needed. As you may know, mapper extract its input from the input file. When there are more than input file , we need the same number of mapper to read records from input files. For instance, if we are using two input files then we need two mapper classes.

We use MultipleInputs class which supports MapReduce jobs that have multiple input paths with a different InputFormat and Mapper for each path. To understand the concept more clearly let us take a case where user want to take input from two input files with similar structure. Also assume that both the input files have 2 columns, first having "Name" and second having "Age". We want to simply combine the data and sort it by "Name". What we need to do? Just two things:
  1. Use two mapper classes.
  2. Specify the mapper classes in MultipleInputs class object in run/main method.

File 1 File 2 Aman 19 Ash 12 Tom 20 James 21 Tony 15 Punk 21 John 18 Frank 20 Johnny 19 Hugh 17
Here is the code for the same. Notice two mapper classes with same logic and only single reducer.
import java.io.IOException; import mutipleInput.Join; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class multiInputFile extends Configured implements Tool { public static class CounterMapper extends Mapper { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] line=value.toString().split("\t"); context.write(new Text(line[0]), new Text(line[1])); } } public static class CountertwoMapper extends Mapper { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] line=value.toString().split("\t"); context.write(new Text(line[0]), new Text(line[1])); } } public static class CounterReducer extends Reducer { String line=null; public void reduce(Text key, Iterable values, Context context ) throws IOException, InterruptedException { for(Text value:values) { line = value.toString(); } context.write(key, new Text(line)); } } public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "aggprog"); job.setJarByClass(multiInputFile.class); MultipleInputs.addInputPath(job,new Path(args[0]),TextInputFormat.class,CounterMapper.class); MultipleInputs.addInputPath(job,new Path(args[1]),TextInputFormat.class,CountertwoMapper.class); FileOutputFormat.setOutputPath(job, new Path(args[2])); job.setReducerClass(CounterReducer.class); job.setNumReduceTasks(1); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); return (job.waitForCompletion(true) ? 0 : 1); } public static void main(String[] args) throws Exception { int ecode = ToolRunner.run(new multiInputFile(), args); System.exit(ecode); } }
Here is the output.
Ash 12
Tony 15
Hugh 17
John 18
Aman 19 
Johnny 19
Frank 20          
Tom  20
James 21
Punk 21