Monday 30 December 2013

JobTracker and TaskTracker

Two types of nodes that control the job of job execution ptrocess: A Jobtracker and TaskTracker These two terms are very important and you will see them countless times.So let us understand them one by one.

JobTracker


It is a node in cluster which client applications submit MapReduce jobs. It coordinates all the jobs run on the system by scheduling tasks to run on TaskTrackers.If a task fails, the JobTracker can reschedule it on a different Taskracker. In other words JobTracker pushes work out to available TaskTracker nodes in the cluster, trying to keep the work as close to the data as possible. With a rack-aware file system( which is the essence of HDFS. For more info Click Here ), the JobTracker knows which node contains the data, and which other machines are nearby.

If the work cannot be hosted on the actual node where the data resides, priority is given to nodes in the same rack.The TaskTracker on each node creates a separate Java Virtual Machine process to prevent the TaskTracker itself from failing if the running job crashes the JVM. A heartbeat is sent from the TaskTracker to the JobTracker every few minutes to check its status.

TaskTracker


A TaskTracker is a node in the cluster that accepts tasks - Map, Reduce and Shuffle operations - from a JobTracker. It runs tasks and send progress reports to the JobTracker,which keeps a record the overall progress of each job. Every TaskTracker is configured with a set of slots, these indicate the number of tasks that it can accept. As explained in JobTracker above, when the JobTracker tries to find somewhere to schedule a task within the MapReduce operations, it first looks for an empty slot on the same server that hosts the DataNode containing the data, and if not, it looks for an empty slot on a machine in the same rack.

The TaskTracker creates a separate JVM processes to do the actual work; this is to ensure that process failure does not take down the task tracker. The TaskTracker monitors these created processes, capturing the output and exit codes. When the process finishes, successfully or not, the tracker notifies the JobTracker. The TaskTrackers also send out heartbeat messages to the JobTracker, usually every few minutes, to reassure the JobTracker that it is still alive. These message also inform the JobTracker of the number of available slots, so the JobTracker can stay up to date with where in the cluster work can be delegated.

JobTracker and TaskTracker: Complete working


The JobTracker is the service within Hadoop that farms out MapReduce tasks to specific nodes in the cluster, ideally the nodes that have the data, or at least are in the same rack.The overall processing of JobTracker and TaskTracker can be explained in the following steps.

  1. Client applications submit jobs to the Job tracker. 2.The JobTracker talks to the NameNode to determine the location of the data
  2. The JobTracker locates TaskTracker nodes with available slots at or near the data
  3. The JobTracker submits the work to the chosen TaskTracker nodes.
  4. The TaskTracker nodes are monitored. If they do not submit heartbeat signals often enough, they are deemed to have failed and the work is scheduled on a different TaskTracker.
  5. A TaskTracker will notify the JobTracker when a task fails. The JobTracker decides what to do then: it may resubmit the job elsewhere, it may mark that specific record as something to avoid, and it may may even blacklist the TaskTracker as unreliable.
  6. When the work is completed, the JobTracker updates its status.
  7. Client applications can poll the JobTracker for information. The JobTracker is a point of failure for the Hadoop MapReduce service. If it goes down, all running jobs are halted.

MapReduce: Introduction and Need

Data is everywhere. There is no way to measure the exact total volume of data stored electronically but it should be in zettabytes (A zettabyte is 1 billion terabytes). So we have a lot of data and we are struggling to store and analyze it. Now let's try to figure out what is wrong with dealing with such an enormous volume of data. The answer is same what's wrong in data from PC to portable drives: Speed. Over the years the storage capacities of hard drives have increased like anything but the rate at which data can be read from drives have not kept up and writing is even slower.

Solution: To read from multiple disks at a time. For instance if we have to store 100 GB of data and we have 100 drives with 100 GB storage space , then it would be faster to read data from 100 drives, each holding 1 GB of data than a single drive holding 100 GB of data.

Problem with solution: First, when using a large number of hardware pieces (hard disks) there is high possibility that one or two might fail.Second, correctly combining the data from different hard disks. MapReduce provides a programming model that abstracts the problem from disk reads and writes, transforming it into computation over sets of keys and values. So , MapReduce is a programming model for data processing.

What Hadoop provides and why is MapReduce needed? Hadoop provides a reliable shared storage and analysis system. The storage is provided by HDFS and analysis by MapReduce. But Why can’t we use databases with lots of disks to do large-scale batch analysis? This is due to the fact that seek time is improving more slowly than transfer rate. Seeking is the process of moving the disk’s head to a particular place on the disk to read or write data. It characterizes the latency of a disk operation, whereas the transfer rate corresponds to a disk’s bandwidth.If the data access pattern is dominated by seeks, it will take longer to read or write large portions of the dataset than streaming through it, which operates at the transfer rate. We can define MapReduce as a complement to a Rational Database Management(RDBMS).

Comparison with RDBMS:
  1. MapReduce works well on unstructured or semistructured data because it is designed to interpret the data at processing time. In other words, the input keys and values for MapReduce are not intrinsic properties of the data, but they are chosen by the person analyzing the data.
  2. MapReduce works well for the applications where data is written once and read many times,whereas a relational database is good for datasets that are continually updated.
  3. MapReduce works on petabytes of data. On the other hand tradiional RDBMS works with gigabytes of data.
In MapReduce the programmer writes two functions: a map function and a reduce function, each of which defines a mapping from one set of key-value pairs to another. These functions are unaffected to the size of the data or the cluster that they are operating on, so they can be used unchanged for a small dataset and for a massive one. One more important thing to remember is, if you double the size of the input data, a job will run twice as slow. But if you also double the size of the cluster, a job will run as fast as the original one. This is not generally true of SQL queries. It is a programming model for processing large data sets with a parallel, distributed algorithm on a cluster.The data used in MapReduce is semi structured and record oriented. MapReduce works by breaking the processing into two phases: Map phase and Reduce phase. Each phase has a key value pair as input and output,types of which are choosen by programmer. The programmer also specifies two functions: Map function and Reduce function.

To take advantage of the parallel processing that Hadoop provides we need to express our query as a MapReduce job. After some local,small scale testing we can run it on a cluster of machines.Broadly MapReduce working can be broken down into three components:
  • Map Method
  • Reduce Method
  • Code to run
The map function is represented by the mapper class , which declares an abstract map() method.Mapper class is agenric type,with four formal type parameters that specify the input key,output key,input value and output value.

Map function


It's just a data preparation phase,setting up data in such a way that the reducer function can do its work on it.The map function is also a good place to drop bad records.The output from the map function is processed by the MapReduce framework before being sent to the reduce function.

The map() function is passed a key and value.We convert the Text value containing the line of input into a Java String,then use its substring() method to extract the columns we are interested in.The map() function also provides an instance of Context(Context object is used to store the data which is used by reduce method) to write the output to.

Sample code for mapper class and map method.
public static class WordCountMap extends Mapper <LongWritable,Text,Text,IntWritable> { @Override public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException { // ********some code*********** context.write(new Text(Count), new IntWritable(1)); } } }

Point to remember: Rather than using bult-in Java types,Hadoop provides its own set of basic types that are optimized for network serialization.Few are given below:
  • LongWritable-Corresponds to Java Long
  • Text -Corresponds to Java String
  • IntWritable -Corresponds to Java Integer

Reduce function


Just like map() function, four formal parameters are used to specify the input and output types for reduce() function.The input types of reduce function must match the output types of the map function. Reduces a set of intermediate values which share a key to a smaller set of values. The number of Reducers for the job is set by the user via JobConf.setNumReduceTasks(int). Reducer implementations can access the JobConf for the job via the JobConfigurable.configure(JobConf) method and initialize themselves. Similarly they can use the Closeable.close() method for de-initialization.

Sample code for reducer class and reduce method:
public static class WordCountReduce extends Reducer <Text,IntWritable,Text,IntWritable> { public void reduce(Text key,Iterable values,Context context) throws IOException, InterruptedException { // ******some code********* context.write(key, new IntWritable(sum)); } }

Reducer has 3 primary phases:Shuffle, Sort and Reduce.

Shuffle: Reducer is input the grouped output of a Mapper. In the phase the framework, for each Reducer, fetches the relevant partition of the output of all the Mappers, via HTTP.
Sort:The framework groups Reducer inputs by keys (since different Mappers may have output the same key) in this stage. The shuffle and sort phases occur simultaneously i.e. while outputs are being fetched they are merged.
Reduce: In this phase the reduce(Object, Iterator, OutputCollector, Reporter) method is called for each ltkey, (list of values)> pair in the grouped inputs. The output of the reduce task is typically written to the FileSystem via OutputCollector.collect(Object, Object). The output of the Reducer is not re-sorted.