Showing posts with label hadoop tutorial for experts. Show all posts
Showing posts with label hadoop tutorial for experts. Show all posts

Friday, 10 January 2014

HDFS:“Moving Computation is Cheaper than Moving Data”

The Hadoop Distributed File System (HDFS) is a distributed file system designed to run on commodity hardware. It has many similarities with existing distributed file systems.HDFS is highly fault-tolerant and is designed to be deployed on low-cost hardware. HDFS is designed to store very large data sets reliably, and to stream those data sets at high bandwidth to user applications. By distributing storage and computation across many servers, the resource can grow with demand while remaining economical at every size.

Key HDFS Features:
  • Scale-Out Architecture - Add servers to increase capacity.
  • High Availability - Serve mission-critical workflows and applications.
  • Load Balancing - Place data intelligently for maximum efficiency and utilization.
  • Security: POSIX-based file permissions for users and groups with optional LDAP integration
  • Fault Tolerance - Automatically and seamlessly recover from failures. Hardware failure is the norm rather than the exception. An HDFS instance may consist of hundreds or thousands of server machines, each storing part of the file system’s data. The fact that there are a huge number of components and that each component has a non-trivial probability of failure means that some component of HDFS is always non-functional. Therefore, detection of faults and quick, automatic recovery from them is a core architectural goal of HDFS.
  • Flexible Access - Multiple and open frameworks for serialization and file system mounts Applications that run on HDFS have large data sets. A typical file in HDFS is gigabytes to terabytes in size. Thus, HDFS is tuned to support large files. It should provide high aggregate data bandwidth and scale to hundreds of nodes in a single cluster. It should support tens of millions of files in a single instance.
  • Tunable Replication -
    Multiple copies of each file provide data protection and computational performance HDFS is designed to reliably store very large files across machines in a large cluster. It stores each file as a sequence of blocks; all blocks in a file except the last block are the same size. The blocks of a file are replicated for fault tolerance. The block size and replication factor are configurable per file. An application can specify the number of replicas of a file. The replication factor can be specified at file creation time and can be changed later. Files in HDFS are write-once and have strictly one writer at any time.
  • Simple Coherency Model - HDFS applications need a write-once-read-many access model for files. A file once created, written, and closed need not be changed. This assumption simplifies data coherency issues and enables high throughput data access. A MapReduce application or a web crawler application fits perfectly with this model. There is a plan to support appending-writes to files in the future.

Working of HDFS


Every HDFS cluster is comprised of a NameNode and DataNodes. NameNode is the node which manages the cluster metadata and DataNodes are the nodes that store the data. Files and directories are represented on the NameNode by inodes. Inodes record attributes like permissions, modification and access times, or namespace and disk space quotas.

The file content is split into large blocks (generally 128 megabytes), and each block of the file is replicated at multiple DataNodes. The blocks are stored on the local file system on the datanodes. The Namenode actively monitors the number of replicas of a block. When a replica of a block is lost due to a DataNode failure or disk failure, the NameNode creates another replica of the block. The NameNode maintains the namespace tree and the mapping of blocks to DataNodes, holding the entire namespace image in RAM.

The NameNode does not directly send requests to DataNodes. It sends instructions to the DataNodes by replying to heartbeats sent by those DataNodes. The instructions include commands to: replicate blocks to other nodes, remove local block replicas, re-register and send an immediate block report, or shut down the node.

Here is litle bit more about NameNodes and DataNodes.

  • HDFS has a master/slave architecture. HDFS is comprised of interconnected clusters of nodes where files and directories reside. An HDFS cluster consists of a single node, known as a NameNode, that manages the file system namespace and regulates client access to files,a master server that manages the file system namespace and regulates access to files by clients. In addition, data nodes (DataNodes) store data as blocks within files.
  • Internally, a file is split into one or more blocks and these blocks are stored in a set of DataNodes. Within HDFS, The NameNode executes file system namespace operations like opening, closing, and renaming files and directories. It also determines the mapping of blocks to DataNodes, which handle read and write requests from HDFS clients. Data nodes also create, delete, and replicate data blocks according to instructions from the governing name node.
  • The namenode maintains two in-memory tables, one which maps the blocks to datanodes (one block maps to 3 datanodes for a replication value of 3) and a datanode to block number mapping. Whenever a datanode reports a disk corruption of a particular block, the first table gets updated and whenever a datanode is detected to be dead (because of a node/network failure) both the tables get updated.
  • Data nodes continuously loop, asking the name node for instructions. A name node can't connect directly to a data node; it simply returns values from functions invoked by a data node. Each data node maintains an open server socket so that client code or other data nodes can read or write data. The host or port for this server socket is known by the name node, which provides the information to interested clients or other data nodes.

  • Some interesting facts about DataNode


    • All datanodes send a heartbeat message to the namenode every 3 seconds to say that they are alive. If the namenode does not receive a heartbeat from a particular data node for 10 minutes, then it considers that data node to be dead/out of service and initiates replication of blocks which were hosted on that data node to be hosted on some other data node.
    • The data nodes can talk to each other to rebalance data, move and copy data around and keep the replication high.
    • When the datanode stores a block of information, it maintains a checksum for it as well. The data nodes update the namenode with the block information periodically and before updating verify the checksums. If the checksum is incorrect for a particular block i.e. there is a disk level corruption for that block, it skips that block while reporting the block information to the namenode. In this way, namenode is aware of the disk level corruption on that datanode and takes steps accordingly.

    Communications protocols


  • All HDFS communication protocols build on the TCP/IP protocol. HDFS clients connect to a Transmission Control Protocol (TCP) port opened on the name node, and then communicate with the name node using a proprietary Remote Procedure Call (RPC)-based protocol. Data nodes talk to the name node using a proprietary block-based protocol.


  • The File System Namespace


  • HDFS supports a traditional hierarchical file organization. A user or an application can create directories and store files inside these directories. The file system namespace hierarchy is similar to most other existing file systems; one can create and remove files, move a file from one directory to another, or rename a file. HDFS does not yet implement user quotas. HDFS does not support hard links or soft links. However, the HDFS architecture does not preclude implementing these features.
  • The NameNode maintains the file system namespace. Any change to the file system namespace or its properties is recorded by the NameNode. An application can specify the number of replicas of a file that should be maintained by HDFS. The number of copies of a file is called the replication factor of that file. This information is stored by the NameNode.

  • Data replication


  • HDFS uses an intelligent replica placement model for reliability and performance. Optimizing replica placement makes HDFS unique from most other distributed file systems, and is facilitated by a rack-aware replica placement policy that uses network bandwidth efficiently. HDFS replicates file blocks for fault tolerance. An application can specify the number of replicas of a file at the time it is created, and this number can be changed any time after that. The name node makes all decisions concerning block replication.
  • Large HDFS environments typically operate across multiple installations of computers. Communication between two data nodes in different installations is typically slower than data nodes within the same installation. Therefore, the name node attempts to optimize communications between data nodes. The name node identifies the location of data nodes by their rack IDs.

Wednesday, 8 January 2014

MapReduce Types


The first thing that comes into mind while writing a MapReduce program is the types we you are going to use in the code for Mapper and Reducer class.There are few points that should be followed for writing and understanding Mapreduce program.Here is a recap for the data types used in MapReduce (in case you have missed the MapReduce Introduction post).

Broadly the data types used in MapRduce are as follows.
  • LongWritable-Corresponds to Java Long
  • Text -Corresponds to Java String
  • IntWritable -Corresponds to Java Integer
  • NullWritable - Corrresponds to Null Values

Having a quick overview, we can jump over to the key thing that is data type in MapReduce. Now MapReduce has a simple model of data processing: inputs and outputs for the map and reduce functions are key-value pairs
  • The map and reduce functions in MapReduce have the following general form:
    map: (K1, V1) → list(K2, V2)
    reduce: (K2, list(V2)) → list(K3, V3)
    • K1-Input Key
    • V1-Input value
    • K2-Output Key
    • V2-Output value
  • In general,the map input key and value types (K1 and V1) are different from the map output types (K2 and V2). However, the reduce input must have the same types as the map output, although the reduce output types may be different again (K3 and V3).
  • As said in above pont even though the map output types and the reduce input types must match, this is not enforced by the Java compiler. If the reduce output types may be different from the map output types (K2 and V2) then we have to specify in the code the types of both the map and reduce function else error will be thrown.So if k2 and k3 are the same, we don't need to call setMapoutputKeyClass().Similarly, if v2 and v3 are the same, we only need to use setOutputValueClass()
  • NullWritable is used when the user want to pass either key or value (generally key) of map/reduce method as null.
  • If a combine function is used, then it is the same form as the reduce function (and is an implementation of Reducer), except its output types are the intermediate key and value types (K2 and V2), so they can feed the reduce function: map: (K1, V1) → list(K2, V2) combine: (K2, list(V2)) → list(K2, V2) reduce: (K2, list(V2)) → list(K3, V3) Often the combine and reduce functions are the same, in which case K3 is the same as K2, and V3 is the same as V2.
  • The partition function operates on the intermediate key and value types (K2 and V2) and returns the partition index. In practice, the partition is determined solely by the key (the value is ignored): partition: (K2, V2) → integer

Default MapReduce Job:No Mapper, No Reducer


Ever tried to run MapReduce program without setting a mapper or a reducer? Here is the minimal MapReduce program.



Run it over a small data and check the output. Here is little data which I used and the final result.You can take a larger data set.







Notice the result file we get after running the above code on the given data. It added an extra column with some numbers as data.What happened is the that the the newly added column contains the key for every line. The number is the offset of the line from the first line i.e. how far the beginning of the first line is placed from the first line(0 of course)similarly how many characters away is the second line from first. Count the characters, it will be 16 and so on.

This offset is taken as a key and emitted in the result.