Monday 30 December 2013

JobTracker and TaskTracker

Two types of nodes that control the job of job execution ptrocess: A Jobtracker and TaskTracker These two terms are very important and you will see them countless times.So let us understand them one by one.

JobTracker


It is a node in cluster which client applications submit MapReduce jobs. It coordinates all the jobs run on the system by scheduling tasks to run on TaskTrackers.If a task fails, the JobTracker can reschedule it on a different Taskracker. In other words JobTracker pushes work out to available TaskTracker nodes in the cluster, trying to keep the work as close to the data as possible. With a rack-aware file system( which is the essence of HDFS. For more info Click Here ), the JobTracker knows which node contains the data, and which other machines are nearby.

If the work cannot be hosted on the actual node where the data resides, priority is given to nodes in the same rack.The TaskTracker on each node creates a separate Java Virtual Machine process to prevent the TaskTracker itself from failing if the running job crashes the JVM. A heartbeat is sent from the TaskTracker to the JobTracker every few minutes to check its status.

TaskTracker


A TaskTracker is a node in the cluster that accepts tasks - Map, Reduce and Shuffle operations - from a JobTracker. It runs tasks and send progress reports to the JobTracker,which keeps a record the overall progress of each job. Every TaskTracker is configured with a set of slots, these indicate the number of tasks that it can accept. As explained in JobTracker above, when the JobTracker tries to find somewhere to schedule a task within the MapReduce operations, it first looks for an empty slot on the same server that hosts the DataNode containing the data, and if not, it looks for an empty slot on a machine in the same rack.

The TaskTracker creates a separate JVM processes to do the actual work; this is to ensure that process failure does not take down the task tracker. The TaskTracker monitors these created processes, capturing the output and exit codes. When the process finishes, successfully or not, the tracker notifies the JobTracker. The TaskTrackers also send out heartbeat messages to the JobTracker, usually every few minutes, to reassure the JobTracker that it is still alive. These message also inform the JobTracker of the number of available slots, so the JobTracker can stay up to date with where in the cluster work can be delegated.

JobTracker and TaskTracker: Complete working


The JobTracker is the service within Hadoop that farms out MapReduce tasks to specific nodes in the cluster, ideally the nodes that have the data, or at least are in the same rack.The overall processing of JobTracker and TaskTracker can be explained in the following steps.

  1. Client applications submit jobs to the Job tracker. 2.The JobTracker talks to the NameNode to determine the location of the data
  2. The JobTracker locates TaskTracker nodes with available slots at or near the data
  3. The JobTracker submits the work to the chosen TaskTracker nodes.
  4. The TaskTracker nodes are monitored. If they do not submit heartbeat signals often enough, they are deemed to have failed and the work is scheduled on a different TaskTracker.
  5. A TaskTracker will notify the JobTracker when a task fails. The JobTracker decides what to do then: it may resubmit the job elsewhere, it may mark that specific record as something to avoid, and it may may even blacklist the TaskTracker as unreliable.
  6. When the work is completed, the JobTracker updates its status.
  7. Client applications can poll the JobTracker for information. The JobTracker is a point of failure for the Hadoop MapReduce service. If it goes down, all running jobs are halted.

MapReduce: Introduction and Need

Data is everywhere. There is no way to measure the exact total volume of data stored electronically but it should be in zettabytes (A zettabyte is 1 billion terabytes). So we have a lot of data and we are struggling to store and analyze it. Now let's try to figure out what is wrong with dealing with such an enormous volume of data. The answer is same what's wrong in data from PC to portable drives: Speed. Over the years the storage capacities of hard drives have increased like anything but the rate at which data can be read from drives have not kept up and writing is even slower.

Solution: To read from multiple disks at a time. For instance if we have to store 100 GB of data and we have 100 drives with 100 GB storage space , then it would be faster to read data from 100 drives, each holding 1 GB of data than a single drive holding 100 GB of data.

Problem with solution: First, when using a large number of hardware pieces (hard disks) there is high possibility that one or two might fail.Second, correctly combining the data from different hard disks. MapReduce provides a programming model that abstracts the problem from disk reads and writes, transforming it into computation over sets of keys and values. So , MapReduce is a programming model for data processing.

What Hadoop provides and why is MapReduce needed? Hadoop provides a reliable shared storage and analysis system. The storage is provided by HDFS and analysis by MapReduce. But Why can’t we use databases with lots of disks to do large-scale batch analysis? This is due to the fact that seek time is improving more slowly than transfer rate. Seeking is the process of moving the disk’s head to a particular place on the disk to read or write data. It characterizes the latency of a disk operation, whereas the transfer rate corresponds to a disk’s bandwidth.If the data access pattern is dominated by seeks, it will take longer to read or write large portions of the dataset than streaming through it, which operates at the transfer rate. We can define MapReduce as a complement to a Rational Database Management(RDBMS).

Comparison with RDBMS:
  1. MapReduce works well on unstructured or semistructured data because it is designed to interpret the data at processing time. In other words, the input keys and values for MapReduce are not intrinsic properties of the data, but they are chosen by the person analyzing the data.
  2. MapReduce works well for the applications where data is written once and read many times,whereas a relational database is good for datasets that are continually updated.
  3. MapReduce works on petabytes of data. On the other hand tradiional RDBMS works with gigabytes of data.
In MapReduce the programmer writes two functions: a map function and a reduce function, each of which defines a mapping from one set of key-value pairs to another. These functions are unaffected to the size of the data or the cluster that they are operating on, so they can be used unchanged for a small dataset and for a massive one. One more important thing to remember is, if you double the size of the input data, a job will run twice as slow. But if you also double the size of the cluster, a job will run as fast as the original one. This is not generally true of SQL queries. It is a programming model for processing large data sets with a parallel, distributed algorithm on a cluster.The data used in MapReduce is semi structured and record oriented. MapReduce works by breaking the processing into two phases: Map phase and Reduce phase. Each phase has a key value pair as input and output,types of which are choosen by programmer. The programmer also specifies two functions: Map function and Reduce function.

To take advantage of the parallel processing that Hadoop provides we need to express our query as a MapReduce job. After some local,small scale testing we can run it on a cluster of machines.Broadly MapReduce working can be broken down into three components:
  • Map Method
  • Reduce Method
  • Code to run
The map function is represented by the mapper class , which declares an abstract map() method.Mapper class is agenric type,with four formal type parameters that specify the input key,output key,input value and output value.

Map function


It's just a data preparation phase,setting up data in such a way that the reducer function can do its work on it.The map function is also a good place to drop bad records.The output from the map function is processed by the MapReduce framework before being sent to the reduce function.

The map() function is passed a key and value.We convert the Text value containing the line of input into a Java String,then use its substring() method to extract the columns we are interested in.The map() function also provides an instance of Context(Context object is used to store the data which is used by reduce method) to write the output to.

Sample code for mapper class and map method.
public static class WordCountMap extends Mapper <LongWritable,Text,Text,IntWritable> { @Override public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException { // ********some code*********** context.write(new Text(Count), new IntWritable(1)); } } }

Point to remember: Rather than using bult-in Java types,Hadoop provides its own set of basic types that are optimized for network serialization.Few are given below:
  • LongWritable-Corresponds to Java Long
  • Text -Corresponds to Java String
  • IntWritable -Corresponds to Java Integer

Reduce function


Just like map() function, four formal parameters are used to specify the input and output types for reduce() function.The input types of reduce function must match the output types of the map function. Reduces a set of intermediate values which share a key to a smaller set of values. The number of Reducers for the job is set by the user via JobConf.setNumReduceTasks(int). Reducer implementations can access the JobConf for the job via the JobConfigurable.configure(JobConf) method and initialize themselves. Similarly they can use the Closeable.close() method for de-initialization.

Sample code for reducer class and reduce method:
public static class WordCountReduce extends Reducer <Text,IntWritable,Text,IntWritable> { public void reduce(Text key,Iterable values,Context context) throws IOException, InterruptedException { // ******some code********* context.write(key, new IntWritable(sum)); } }

Reducer has 3 primary phases:Shuffle, Sort and Reduce.

Shuffle: Reducer is input the grouped output of a Mapper. In the phase the framework, for each Reducer, fetches the relevant partition of the output of all the Mappers, via HTTP.
Sort:The framework groups Reducer inputs by keys (since different Mappers may have output the same key) in this stage. The shuffle and sort phases occur simultaneously i.e. while outputs are being fetched they are merged.
Reduce: In this phase the reduce(Object, Iterator, OutputCollector, Reporter) method is called for each ltkey, (list of values)> pair in the grouped inputs. The output of the reduce task is typically written to the FileSystem via OutputCollector.collect(Object, Object). The output of the Reducer is not re-sorted.

Friday 27 December 2013

YARN: MapReduce 2

How does YARN overcomes shortcomings of “classic” MapReduce


– By splitting the responsibilities of the jobtracker into separate entities. Now a jobtracker takes care of both job scheduling and task progress monitoring. YARN separates these two roles into two daemons: A resource manager and application master. A resource manager manages the use of resources across the cluster and application manager manage the lifecycle of applications running on the cluster. Application master negotiates with the resource manager for cluster resources (number of containers) and then runs application specific processes in these containers. The containers are monitored by node managers running on cluster nodes, which make sure that only allocated resources re used not more than that.

Yarn is more general than MapReduce in fact MapReduce is just one type YARN application. The best thing about YARN design is that different YARN Applications can coexist on the same cluster. It is also possible for users to run different versions of MapReduce on the same YARN cluster, which makes the process of upgrading MapReduce more manageable. MapReduce on YARN involves more entities than classic MapReduce(MapReduce 1).They are:

  • The client
  • The YARN Resource Manager
  • The YARN Node Manager
  • The MapReduce Application Master
  • The Distributed Filesystem

The process of running a job is shown below.

Job Submission


  • Step 1 in figure:The submit() method on Job creates an internal Jobsubmitter instance and calls submitJobInternal() on it.The job submission process implemented by Jobsubmitter does the following.
  • Step 2:Asks the resource manager a new job Id.
  • Step 3:Checks the output specification of the job,Computes input splits,Copies job resources (job JAR ,configuration,and split information)to HDFS.
  • Step 4:Finally, the job is submitted by calling submitApplication() on the resource manager.

Job Intitialization


  • Step 5a and 5b: When the resource manager receives a call to its submitApplication(), it hands off the request to the scheduler.The scheduler allocates a container,and the resource manager then launches the application master's process there , under the node manager's management.
  • Step 6:The application master initializes job by creating a number of book keeping objects to keep track of the job's progress,as it will receive progress and completion reports from the tasks.
  • Step 7:Then it receives the input splits computed in the client from the shared filesystem.


Task Assignment


  • Step 8: If the job does not qualify for running as uber task, then the application master requestes containers for all the map and reduce rasks in the job from resource manager.
Note: All requests includes inforamation about each map task's data locality, in particular the hosts and coressponding racks that the input split resides on.The scheduler uses this info to make scheduling decesions.How? It attempts to place tasks on data-local nodes(in the ideal case), but if this is not possible, it prefers rack-local placement to non=local placement.

Task Execution


  • Step 9a and 9b: After a container has been assigned to the task by resource manager's scheduler, the application master starts the container by contacting the node manager.
  • Step 10:The task is executed by a Java application whose main class is YarnChild. Before running the task it localizes the resources that task needs, which includes the job configuration and JAR file and any files from distributed cache.
  • Step 11: Finally, it runs the map or reduce task
Note: Unlike MapReduce 1 YARN does not suppoert JVM reuse,so each task runs in a new JVM.Streaming and Pipes programs work in the same way as MapReduce 1.The YarnChild launches the Streaming or Pipes process and communicates with it using standard input/output or a socket(respectively).

Progress and status updates


  • When running under YARN , the task reports its progress and status back to its application master,which has an aggregate view of the job,every three seconds over the umbilical interface.The clients polls the appkiaction master every second to receive progress updates,which are usually displayed to the user.


Job Completion


  • Every five seconds the client checks whether the job has completed by calling the waitForCompletion() method on Job.The polling interval can be set via the mapreduce.client.completon.pollinterval configuration property. On job completion,the applcation master and the task containers clean up their working stste , and the OutputCommitter's job cleanup method is called.Job information is archived by the job history server to enable later interrogation b users if desired.