Mapper:
Mapper will pass each group as a key and entire row as value. User is entering the column delimiter, group column number, value column number, and order (true for ascending, false for descending).
Suppose our input file is:
1,11,a 2,12,a 3,13,b 4,14,c 5,15,d 6,16,c 7,17,g 8,18,e 9,19,a
A sample command:
hadoop jar /root/Documents/iv3.jar iv3.TopValues MapReduceInput/xy1.txt MapReduceOutput/TopVal "," 3 1 1 trueHere "," is column delimiter.
"3" is group key i.e the 3rd column.
"1" is the column on which ranking will be done.
"1" means top 1 value.
"true" means we are expecting result in ascending order.
Then mapper will send key value pair as:
a,(1,11,a) a,(2,12,a) b,(3,13,b) ... ...
Reducer :
Reducer is using a TreeMap for storing the data.
key:value 1: 1,11,a 2: 2,12,a 9: 9,19,aWhen number of enteries exceed the N value, we are deleting one entry i.e. the entry with highest key. For descending order it will delete the entry with lowest value.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
if (sortval.size() > rank_limit) { | |
if (sort_ascending) | |
{ | |
sortval.remove(sortval.lastKey()); | |
} | |
else | |
{ | |
sortval.remove(sortval.firstKey()); | |
} | |
} |
So for key "a" it will keep just one entries and would delete the entry with "9" and "2" as key. Similarly each key (group) is processed. So the output will be:
1,11,a 3,13,b 4,14,c 5,15,d 8,18,eHere is the entire code.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package iv3; | |
import java.io.IOException; | |
import java.util.HashMap; | |
import java.util.Iterator; | |
import java.util.Map; | |
import java.util.Set; | |
import java.util.TreeMap; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.conf.Configured; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.NullWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.Mapper; | |
import org.apache.hadoop.mapreduce.Reducer; | |
import org.apache.hadoop.mapreduce.Reducer.Context; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; | |
import org.apache.hadoop.util.GenericOptionsParser; | |
import org.apache.hadoop.util.Tool; | |
import org.apache.hadoop.util.ToolRunner; | |
public class TopValues extends Configured implements Tool{ | |
public static class TopValuesMapper extends Mapper<LongWritable,Text,Text,Text> { | |
String column_delimiter; | |
int column_to_rank; | |
int rank_limit,rank_key; | |
Boolean sort_ascending; | |
@Override | |
public void setup(Context context) { | |
Configuration conf = context.getConfiguration(); | |
column_delimiter = conf.get("column.delimiter"); | |
column_to_rank = conf.getInt("column.to.rank", Integer.MIN_VALUE) - 1; | |
rank_limit = conf.getInt("rank.limit", Integer.MIN_VALUE); | |
rank_key = conf.getInt("rank.keys", Integer.MIN_VALUE)-1; | |
sort_ascending = conf.getBoolean("sort.ascending", true); | |
} | |
@Override | |
protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException { | |
String column[]=value.toString().split(column_delimiter); | |
context.write(new Text(column[rank_key]),value); | |
} | |
} | |
public static class TopValuesReducer extends Reducer<Text,Text,NullWritable,Text>{ | |
String column_delimiter; | |
int column_to_rank; | |
int rank_limit,rank_key; | |
Boolean sort_ascending; | |
@Override | |
public void setup(Context context) { | |
Configuration conf = context.getConfiguration(); | |
column_delimiter = conf.get("column.delimiter"); | |
column_to_rank = conf.getInt("column.to.rank", Integer.MIN_VALUE) - 1; | |
rank_limit = conf.getInt("rank.limit", Integer.MIN_VALUE); | |
sort_ascending = conf.getBoolean("sort.ascending", true); | |
rank_key=conf.getInt("rank.keys", Integer.MIN_VALUE)-1; | |
} | |
@Override | |
protected void reduce(Text key, Iterable<Text> value,Context context)throws IOException, InterruptedException { | |
StringBuilder builder=new StringBuilder(); | |
TreeMap<Integer, String> sortval = new TreeMap<Integer, String>(); | |
for(Text val:value) { | |
String column[]=val.toString().split(column_delimiter); | |
sortval.put(Integer.parseInt(column[column_to_rank]),val.toString()); | |
if (sortval.size() > rank_limit) { | |
if (sort_ascending) | |
{ | |
sortval.remove(sortval.lastKey()); | |
}else | |
{ | |
sortval.remove(sortval.firstKey()); | |
} | |
} | |
} | |
Set set = sortval.entrySet(); | |
Iterator i = set.iterator(); | |
while(i.hasNext()) { | |
Map.Entry me = (Map.Entry)i.next(); | |
context.write(NullWritable.get(),new Text( me.getValue().toString())); | |
} | |
} | |
} | |
@Override | |
public int run(String[] args) throws Exception { | |
Configuration conf = new Configuration(); | |
String[] remaining_args = new GenericOptionsParser(conf, args).getRemainingArgs(); | |
if (remaining_args.length == 7) { | |
conf.set("column.delimiter", remaining_args[2]); | |
conf.set("rank.keys", remaining_args[3]); | |
conf.set("column.to.rank", remaining_args[4]); | |
conf.set("rank.limit", remaining_args[5]); | |
conf.setBoolean("sort.ascending", Boolean.parseBoolean(remaining_args[6])); | |
} | |
conf.set("hadoop.job.history.user.location", "none"); | |
Job job = new Job(conf, getClass().getSimpleName()); | |
job.setJarByClass(getClass()); | |
FileInputFormat.addInputPath(job, new Path(remaining_args[0])); | |
FileOutputFormat.setOutputPath(job, new Path(remaining_args[1])); | |
job.setInputFormatClass(TextInputFormat.class); | |
job.setOutputFormatClass(TextOutputFormat.class); | |
job.setMapperClass(TopValuesMapper.class); | |
job.setReducerClass(TopValuesReducer.class); | |
job.setMapOutputKeyClass(Text.class); | |
job.setMapOutputValueClass(Text.class); | |
job.setOutputKeyClass(NullWritable.class); | |
job.setOutputValueClass(Text.class); | |
return job.waitForCompletion(true) ? 0 : 1; | |
} | |
public static void main(String[] args) throws Exception { | |
int exit_code = ToolRunner.run(new TopValues(), args); | |
System.exit(exit_code); | |
} | |
} |