Saturday 18 January 2014

Serialization and Deserialization in Hadoop

Serilaization is the process of converting structured objects into a byte stream. It is done basically for two purposes one, for transmission over a network(interprocess communication) and for writing to persisitent storage. In Hadoop the interprocess communication between nodes in the system is done by using remote procedure calls i.e. RPCs. The RPC rotocol uses serialization to make the message into a binary stream to be sent to the remote node,which receives and deserializes the binary stream into the original message.

RPC serialization format is expected to be:
  • Compact: To efficenetly use network bandwidth.
  • Fast: Very little performance overhead is expected for serialization and deserilization process.
  • Extensible: To adept to new changes and reqirements.
  • Interoperable:The format needs to be designed to support clients that are written in different languages to the server.
It should be noted that the data format for persistent storage purposes would have different requirements from serilaization framework in addition to four expected properties of an RPC's serialization format mentioned above.
  • Compact : To efficenetly use storage space.
  • Fast : To keep the overhead in reading or writing terabytes of data minimal.
  • Extensible : To transparently read data written in older format.
  • Interoperable :To read and write persistent using different languages.

Hadoop uses its own serialization format,Writables. Writable is compact and fast, but not extensible or interoperable.

The Writable Interface


The Writable interface has two methods, one for writing and one for reading. The method for writing writes its state to a DataOutput binary stream and the method for reading reads its state from a DataInput binary stream.

public interface Writable { void write(DataOutput out) throws IOException; void readFields(DataOutput in)throws IOException; }
Let us understand serialization with an example.Given below is a helper method.

public static byte[] serialize(Writable writable) throws IOException { ByteArrayOutputStream out = new ByteArrayOutputStream(); DataOutputStream dataOut = new DataOutputStream(out); writable.write(dataOut); dataOut.close(); return out.toByteArray(); }
Let’s try deserialization. Again, we create a helper method to read a Writable object from a byte array:

public static byte[] deserialize(Writable writable, byte[] bytes) throws IOException { ByteArrayInputStream in = new ByteArrayInputStream(bytes); DataInputStream dataIn = new DataInputStream(in); writable.readFields(dataIn); dataIn.close(); return bytes; }

WritableComparable and comparators


IntWritable implements the WritableComparable interface, which is a subinterface of the Writable and java.lang.Comparable interfaces:

package org.apache.hadoop.io; public interface WritableComparable extends Writable, Comparable { }
Comparison of types is important for MapReduce because in MapReduce there is sorting phase during which keys are compared with one another. Hadoop provides RawComparator extension of Java’s Comparator :

package org.apache.hadoop.io; import java.util.Comparator; public interface RawComparator extends Comparator { public int compare(byte[] b1,int s1,int l1,byte[] b2, int s2, int l2); }
This interface permits implementors to compare records read from a stream without deserializing them into objects, hence avoiding any overhead of object creation. For example, the comparator for IntWritables implements the raw compare() method by reading an integer from each of the byte arrays b1 and b2 and comparing them directly from the given start positions (s1 and s2) and lengths (l1 and l2). WritableComparator is a general-purpose implementation of RawComparator for WritableComparable classes. It provides two main functions:

First, it provides a default implementation of the raw compare() method that deserializes the objects to be compared from the stream and invokes the object compare() method. Second, it acts as a factory for RawComparator instances (that Writable implementations have registered).

For example, to obtain a comparator for IntWritable, we just use: RawComparator comparator = WritableComparator.get(IntWritable.class); The comparator can be used to compare two IntWritable objects:

IntWritable w1 = new IntWritable(163); IntWritable w2 = new IntWritable(67);
assertThat(comparator.compare(w1, w2), greaterThan(0)); or their serialized representations:

byte[] b1 = serialize(w1); byte[] b2 = serialize(w2);
assertThat(comparator.compare(b1, 0, b1.length, b2, 0, b2.length), greaterThan(0));