Uptill now we have used only Text file as input files. Can we use any other file format? Can we use binary format or XML format? Let us find out.
First, we need to get familiar with few terms . Input split: It is part of input processed by a single map. Each split is processed by a single map. In other words InputSplit represents the data to be processed by an individual Mapper.
Each split is divided into records , and the map processes each record, which is a key value pair.
Split is basically a number of rows and record is that number.
The length of the InputSplit is measured in bytes. Every InputSplit has a storage locations (hostname strings). The storage locations are used by the MapReduce system to place map tasks as close to split's data as possible. The tasks are processed in the order of the size of the splits, largest one get processed first(greedy approximation algorithm). This is done in order to minimize the job runtime. One important thing to remember is that InputSplit doesn't contain input data but a reference to the data.
It is the base class for all implementations of InputFormat. It provides two things: a place to define which files are included as the input to a job and an implementation for generating splits for the input files.
Altough, FileInputFormat splits only those files which are larger than HDFS block. The split size can be controlled by various Hadoop properties. Input path and filter properties are given in the below table.
The minimum split size is usually 1 byte, although some formats have a lower bound on the split size. We may impose a minimum split size. By setting this to a value larger than the block size, they can force splits to be larger than a block. But this is not good while using HDFS, because doing so will increase the number of blocks that are not local to a map task. The maximum split size defaults to the maximum value that can be represented by a Java long type. It has an effect only when it is less than the block size, forcing splits to be smaller than a block. The split size is calculated by the formula (see the computeSplitSize() method in FileInputFormat):
max(minimumSize, min(maximumSize, blockSize))
and by default:
minimumSize < blockSize < maximumSize
so the split size is blockSize.
Some applications don’t want files to be split, as this allows a single mapper to process each input file in its entirety. For example, a simple way to check if all the records in a file are sorted is to go through the records in order, checking whether each record is not less than the preceding one. Implemented as a map task, this algorithm will work only if one map processes the whole file. There are a couple of ways to ensure that an existing file is not split. The first (quick and dirty) way is to increase the minimum split size to be larger than the largest file in your system. Setting it to its maximum value, Long.MAX_VALUE, has this effect. The second is to subclass the concrete subclass of FileInputFormat that you want to use, to override the isSplitable() method4 to return false. For example, here’s a nonsplittable TextInputFormat:
TextInputFormat: As yoy may know, text files are usually taken as input files.Similarly, TextInputFormat is the default InputFormat. Now each record is a line of input and key (a LongWritable) is the byte offset within the file of the beginning of the line. Finally, value is the contents of the line( excuding any line terminators).For instance, file having folowing content:
The records are interpreted as the following key-value pairs.
Rememeber guys, the keys are not line numbers but offset from begining of the file. Offset are sufficient to serve as a unique identifier for each line. And if we combine it with the file name, it would be unique with in the filesystem.
NLineInputFormat: The number of lines that each mapper receives depends on the size of split and the length of the lines. If we want to set this number then NLineInputFormat is the InputFormat to use. In this type too, the keys are the byte offsets within the file and values are the lines themselves. Here N refers to the number of lines of input that each mapper receives. The default value of N is 1, so each mapper receives excatly one line of input.
Above writen property controls the value of N.
XML: Hadoop provides class for called StreamXmlRecordReader. We can use it by setting our input format to StreamInputFormat and setting the stream.recordreader.class property to org.apache.hadoop.streaming.StreamXmlRecordReader. The reader is configured by setting job configuration properties to tell it the patterns for the start and end tags.
Here is the answer to the question asked at the beginning of this post. Yes we can use data other than textual data. Hadoop MapReduce also support binary formats.
SequenceFileInputFormat : It stores sequence of binary key-value pairs. To use data from sequence files as the input to MapReduce, you use SequenceFileIn putFormat.To use data from sequence files as the input to MapReduce, you use SequenceFileIn putFormat. SequenceFileIn putFormat. SequenceFileInputFormat can read MapFiles as well as sequence files. SequenceFileInputFormat assumes that it is reading a MapFile and uses its datafile. This is why there is no MapFileInputFormat class.
SequenceFileAsTextInputFormat: SequenceFileAsTextInputFormat is a variant of SequenceFileInputFormat which converts the sequence file’s keys and values to Text objects. The conversion is performed by calling toString() method on the keys and values. This format makes sequence files suitable input for Streaming.
DBInputFormat is an input format for reading data from a relational database, using JDBC. We need to be careful not to overwhelm the database from which you are reading by running too many mappers, as it doesn’t have any sharding capabilities. For this reason, it is best used for loading relatively small datasets, perhaps for joining with larger datasets from HDFS using MultipleInputs. The corresponding output format is DBOutputFormat, which is useful for dumping job outputs (of modest size) into a database.
The length of the InputSplit is measured in bytes. Every InputSplit has a storage locations (hostname strings). The storage locations are used by the MapReduce system to place map tasks as close to split's data as possible. The tasks are processed in the order of the size of the splits, largest one get processed first(greedy approximation algorithm). This is done in order to minimize the job runtime. One important thing to remember is that InputSplit doesn't contain input data but a reference to the data.
public abstract class InputSplit
{
public abstract long getLength() throws IOException, InterruptedException;
public abstract String[] getLocations() throws IOException,InterruptedException;
}
As a user, we don't have to use InputSplits directly, InputFormat does that job.
An InputFormat is a class that provides the following functionality:
- Selects the files or other objects that should be used for input.
- Defines the InputSplits that break a file into tasks.
- Provides a factory for RecordReader objects that read the file.
- The client which runs the job calculates the splits for the job by calling getSplits().
- Client then sends the splits to the jobtracker, which uses their storage locations to schedule map tasks that will process them on the tasktrackers.
- On a tasktracker, the map task passes the split to the createRecordReader() method on InputFormat to obtain a RecordReader for that split.
- Map task uses RecordReader to generate record key-value pairs, which it passes to the map function. We can see this by looking at the Mapper’s run() method:
public void run(Context context) throws IOException, InterruptedException
{
setup(context);
while (context.nextKeyValue())
{
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
cleanup(context);
}
First, the setup() method is called, then the nextKeyValue() is called repeatedly on the Context to populate the key
and value objects for the mapper.
Each key-value pair is retreived from the RecordReader and are passed to the map() method. The nextKeyValue() method returns false, when there is no more key-value pair left to get read. Then the map task runs its cleanup() method at the end. FileInputFormat
It is the base class for all implementations of InputFormat. It provides two things: a place to define which files are included as the input to a job and an implementation for generating splits for the input files.
How to split input files?
Altough, FileInputFormat splits only those files which are larger than HDFS block. The split size can be controlled by various Hadoop properties. Input path and filter properties are given in the below table.
Property name Type Default value Description
mapred.min.split.size int 1 Smallest valid size in
bytes for a file split
mapred.max.split.size long Long.MAX_VALUE, that is, Largest valid size in
9223372036854775807 bytes for a file split
dfs.block.size long 64 MB, The size block
in HDFS
The minimum split size is usually 1 byte, although some formats have a lower bound on the split size. We may impose a minimum split size. By setting this to a value larger than the block size, they can force splits to be larger than a block. But this is not good while using HDFS, because doing so will increase the number of blocks that are not local to a map task. The maximum split size defaults to the maximum value that can be represented by a Java long type. It has an effect only when it is less than the block size, forcing splits to be smaller than a block. The split size is calculated by the formula (see the computeSplitSize() method in FileInputFormat):
max(minimumSize, min(maximumSize, blockSize))
and by default:
minimumSize < blockSize < maximumSize
so the split size is blockSize.
How to prevent splitting?
Some applications don’t want files to be split, as this allows a single mapper to process each input file in its entirety. For example, a simple way to check if all the records in a file are sorted is to go through the records in order, checking whether each record is not less than the preceding one. Implemented as a map task, this algorithm will work only if one map processes the whole file. There are a couple of ways to ensure that an existing file is not split. The first (quick and dirty) way is to increase the minimum split size to be larger than the largest file in your system. Setting it to its maximum value, Long.MAX_VALUE, has this effect. The second is to subclass the concrete subclass of FileInputFormat that you want to use, to override the isSplitable() method4 to return false. For example, here’s a nonsplittable TextInputFormat:
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
public class NonSplittableTextInputFormat extends TextInputFormat {
@Override
protected boolean isSplitable(JobContext context, Path file) {
return false;
}
}
Text Input
TextInputFormat: As yoy may know, text files are usually taken as input files.Similarly, TextInputFormat is the default InputFormat. Now each record is a line of input and key (a LongWritable) is the byte offset within the file of the beginning of the line. Finally, value is the contents of the line( excuding any line terminators).For instance, file having folowing content:
A king should hunt regularly
A queen should shop daily,
Other people should just try.
The records are interpreted as the following key-value pairs.
0, A king should hunt regularly
29,A queen should shop daily,
55,Other people should just try.
Rememeber guys, the keys are not line numbers but offset from begining of the file. Offset are sufficient to serve as a unique identifier for each line. And if we combine it with the file name, it would be unique with in the filesystem.
NLineInputFormat: The number of lines that each mapper receives depends on the size of split and the length of the lines. If we want to set this number then NLineInputFormat is the InputFormat to use. In this type too, the keys are the byte offsets within the file and values are the lines themselves. Here N refers to the number of lines of input that each mapper receives. The default value of N is 1, so each mapper receives excatly one line of input.
mapreduce.input.lineinputformat.linespermap
Above writen property controls the value of N.
XML: Hadoop provides class for called StreamXmlRecordReader. We can use it by setting our input format to StreamInputFormat and setting the stream.recordreader.class property to org.apache.hadoop.streaming.StreamXmlRecordReader. The reader is configured by setting job configuration properties to tell it the patterns for the start and end tags.
Binary Input
Here is the answer to the question asked at the beginning of this post. Yes we can use data other than textual data. Hadoop MapReduce also support binary formats.
SequenceFileInputFormat : It stores sequence of binary key-value pairs. To use data from sequence files as the input to MapReduce, you use SequenceFileIn putFormat.To use data from sequence files as the input to MapReduce, you use SequenceFileIn putFormat. SequenceFileIn putFormat. SequenceFileInputFormat can read MapFiles as well as sequence files. SequenceFileInputFormat assumes that it is reading a MapFile and uses its datafile. This is why there is no MapFileInputFormat class.
SequenceFileAsTextInputFormat: SequenceFileAsTextInputFormat is a variant of SequenceFileInputFormat which converts the sequence file’s keys and values to Text objects. The conversion is performed by calling toString() method on the keys and values. This format makes sequence files suitable input for Streaming.
Database Input and Output
DBInputFormat is an input format for reading data from a relational database, using JDBC. We need to be careful not to overwhelm the database from which you are reading by running too many mappers, as it doesn’t have any sharding capabilities. For this reason, it is best used for loading relatively small datasets, perhaps for joining with larger datasets from HDFS using MultipleInputs. The corresponding output format is DBOutputFormat, which is useful for dumping job outputs (of modest size) into a database.