Wednesday, 27 August 2014

Ranker: Top N values


Ranker finds the top N values in a column, for each group.

Mapper:

Mapper will pass each group as a key and entire row as value. User is entering the column delimiter, group column number, value column number, and order (true for ascending, false for descending).
Suppose our input file is:
1,11,a
2,12,a
3,13,b
4,14,c
5,15,d
6,16,c
7,17,g
8,18,e
9,19,a

A sample command:
hadoop jar /root/Documents/iv3.jar iv3.TopValues  MapReduceInput/xy1.txt  MapReduceOutput/TopVal "," 3 1 1 true
Here "," is column delimiter.
"3" is group key i.e the 3rd column.
"1" is the column on which ranking will be done.
"1" means top 1 value.
"true" means we are expecting result in ascending order.

Then mapper will send key value pair as:
a,(1,11,a)
a,(2,12,a)
b,(3,13,b)
...
...

Reducer :

Reducer is using a TreeMap for storing the data.
key:value
1: 1,11,a
2: 2,12,a
9: 9,19,a
When number of enteries exceed the N value, we are deleting one entry i.e. the entry with highest key. For descending order it will delete the entry with lowest value.

if (sortval.size() > rank_limit) {
if (sort_ascending)
{
sortval.remove(sortval.lastKey());
}
else
{
sortval.remove(sortval.firstKey());
}
}
view raw Sorting logic hosted with ❤ by GitHub

So for key "a" it will keep just one entries and would delete the entry with "9" and "2" as key. Similarly each key (group) is processed. So the output will be:
1,11,a
3,13,b
4,14,c
5,15,d
8,18,e
Here is the entire code.
package iv3;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class TopValues extends Configured implements Tool{
public static class TopValuesMapper extends Mapper<LongWritable,Text,Text,Text> {
String column_delimiter;
int column_to_rank;
int rank_limit,rank_key;
Boolean sort_ascending;
@Override
public void setup(Context context) {
Configuration conf = context.getConfiguration();
column_delimiter = conf.get("column.delimiter");
column_to_rank = conf.getInt("column.to.rank", Integer.MIN_VALUE) - 1;
rank_limit = conf.getInt("rank.limit", Integer.MIN_VALUE);
rank_key = conf.getInt("rank.keys", Integer.MIN_VALUE)-1;
sort_ascending = conf.getBoolean("sort.ascending", true);
}
@Override
protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
String column[]=value.toString().split(column_delimiter);
context.write(new Text(column[rank_key]),value);
}
}
public static class TopValuesReducer extends Reducer<Text,Text,NullWritable,Text>{
String column_delimiter;
int column_to_rank;
int rank_limit,rank_key;
Boolean sort_ascending;
@Override
public void setup(Context context) {
Configuration conf = context.getConfiguration();
column_delimiter = conf.get("column.delimiter");
column_to_rank = conf.getInt("column.to.rank", Integer.MIN_VALUE) - 1;
rank_limit = conf.getInt("rank.limit", Integer.MIN_VALUE);
sort_ascending = conf.getBoolean("sort.ascending", true);
rank_key=conf.getInt("rank.keys", Integer.MIN_VALUE)-1;
}
@Override
protected void reduce(Text key, Iterable<Text> value,Context context)throws IOException, InterruptedException {
StringBuilder builder=new StringBuilder();
TreeMap<Integer, String> sortval = new TreeMap<Integer, String>();
for(Text val:value) {
String column[]=val.toString().split(column_delimiter);
sortval.put(Integer.parseInt(column[column_to_rank]),val.toString());
if (sortval.size() > rank_limit) {
if (sort_ascending)
{
sortval.remove(sortval.lastKey());
}else
{
sortval.remove(sortval.firstKey());
}
}
}
Set set = sortval.entrySet();
Iterator i = set.iterator();
while(i.hasNext()) {
Map.Entry me = (Map.Entry)i.next();
context.write(NullWritable.get(),new Text( me.getValue().toString()));
}
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] remaining_args = new GenericOptionsParser(conf, args).getRemainingArgs();
if (remaining_args.length == 7) {
conf.set("column.delimiter", remaining_args[2]);
conf.set("rank.keys", remaining_args[3]);
conf.set("column.to.rank", remaining_args[4]);
conf.set("rank.limit", remaining_args[5]);
conf.setBoolean("sort.ascending", Boolean.parseBoolean(remaining_args[6]));
}
conf.set("hadoop.job.history.user.location", "none");
Job job = new Job(conf, getClass().getSimpleName());
job.setJarByClass(getClass());
FileInputFormat.addInputPath(job, new Path(remaining_args[0]));
FileOutputFormat.setOutputPath(job, new Path(remaining_args[1]));
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapperClass(TopValuesMapper.class);
job.setReducerClass(TopValuesReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exit_code = ToolRunner.run(new TopValues(), args);
System.exit(exit_code);
}
}
view raw TopValues hosted with ❤ by GitHub