MapReduce當(dāng)中排序sort的方法
馬克-to-win @ 馬克java社區(qū):防盜版實(shí)名手機(jī)尾號(hào):73203。注意:想自己實(shí)現(xiàn)Sort得不償失,但如想借助Hadoop MapReduce技術(shù)框架排序,key必須實(shí)現(xiàn)WritableComparable接口。具體做法見下。需求是先按id比,再按amount比。
package com;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/*public interface WritableComparable<T> extends Writable, Comparable<T>
而且 public interface Comparable<T>
{
public int compareTo(T o);
}
*/
public class OrderBeanSort implements WritableComparable<OrderBeanSort>{
private String orderId;
private Double amount;
private Double average=0.0;
public Double getAverage() {
return average;
}
public void setAverage(Double average) {
this.average = average;
}
public OrderBeanSort() {
}
public OrderBeanSort(String orderId, Double amount) {
this.orderId = orderId;
this.amount = amount;
}
public void set(String orderId, Double amount) {
this.orderId = orderId;
this.amount = amount;
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public Double getAmount() {
return amount;
}
public void setAmount(Double amount) {
this.amount = amount;
}
public void write(DataOutput out) throws IOException {
out.writeUTF(orderId);
out.writeDouble(amount);
out.writeDouble(average);
}
public void readFields(DataInput in) throws IOException {
this.orderId = in.readUTF();
this.amount = in.readDouble();
this.average = in.readDouble();
}
/*最后reduce輸出時(shí)用這里。*/
public String toString() {
return orderId + "\t" + amount+ "\t" +average;
}
@Override
public int compareTo(OrderBeanSort o) {
/*馬克-to-win: 如orderId相等,則比較amount,否則比較id即可,這樣id相同的連在一起了。因?yàn)閕d先比,amount后比,就是如下排的。
o1 250.0 0.0
o1 200.0 0.0
o2 700.0 0.0
o2 500.0 0.0
o2 100.0 0.0
o3 150.0 0.0
*/
int cmp = this.getOrderId().compareTo(o.getOrderId());
if (0 == cmp) {
return -Double.compare(this.getAmount(), o.getAmount());
}
return cmp;
}
}
package com;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class SortMapper extends Mapper<LongWritable, Text,OrderBeanSort, NullWritable>{
OrderBeanSort bean = new OrderBeanSort();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split(",");
String orderId = fields[0];
double amount = Double.parseDouble(fields[2]);
bean.set(orderId, amount);
context.write(bean,NullWritable.get());
}
}
package com;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class SortReducer extends Reducer<OrderBeanSort, NullWritable, OrderBeanSort, NullWritable>{
protected void reduce(OrderBeanSort key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
System.out.println("reduce key is 馬克-to-win @ 馬克java社區(qū):"+key.toString());
/*
o1,p2,250.0
o2,p3,500.0
o2,p4,100.0
o2,p5,700.0
o3,p1,150.0
o1,p1,200.0
*/
for (NullWritable val : values) {
System.out.println("val is "+val.toString());
System.out.println("inside for key is "+key.toString());
}
/*上面一段和下面一段代碼的作用相同*/
// Iterator it= values.iterator();
// while(it.hasNext())
// {
// System.out.println("inside iterator"+it.next());
// System.out.println("inside for key is "+key.toString());
// // it.remove();
// }
context.write(key, NullWritable.get());
}
}
package com;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SortTestMark_to_win {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(SortTest.class);
job.setMapperClass(SortMapper.class);
job.setReducerClass(SortReducer.class);
job.setMapOutputKeyClass(OrderBeanSort.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(OrderBeanSort.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path("e:/temp/input/serial.txt"));
FileOutputFormat.setOutputPath(job, new Path("e:/temp/output"));
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
輸出結(jié)果:
reduce key is 馬克-to-win @ 馬克java社區(qū):o1 250.0 0.0
val is (null)
inside for key is o1 250.0 0.0
reduce key is 馬克-to-win @ 馬克java社區(qū):o1 200.0 0.0
val is (null)
inside for key is o1 200.0 0.0
reduce key is 馬克-to-win @ 馬克java社區(qū):o2 700.0 0.0
val is (null)
inside for key is o2 700.0 0.0
reduce key is 馬克-to-win @ 馬克java社區(qū):o2 500.0 0.0
val is (null)
inside for key is o2 500.0 0.0
reduce key is 馬克-to-win @ 馬克java社區(qū):o2 100.0 0.0
val is (null)
inside for key is o2 100.0 0.0
reduce key is 馬克-to-win @ 馬克java社區(qū):o3 150.0 0.0
val is (null)
inside for key is o3 150.0 0.0
輸出文件:
o1 250.0 0.0
o1 200.0 0.0
o2 700.0 0.0
o2 500.0 0.0
o2 100.0 0.0
o3 150.0 0.0
輸入文件:
o1,p2,250.0
o2,p3,500.0
o2,p4,100.0
o2,p5,700.0
o3,p1,150.0
o1,p1,200.0
分析:
先排好序了,6條數(shù)據(jù),在Mapreduce系統(tǒng)當(dāng)中,然后依次進(jìn)入reduce,reduce被調(diào)用了6次。補(bǔ)充講一句:如果是o1,p2,250.0和 o1,p1,250.0,這樣兩條數(shù)據(jù),他們就屬于key完全相同的兩條數(shù)據(jù)。將會(huì)同時(shí)進(jìn)入reduce方法, 和咱們當(dāng)時(shí)的 hello[1,1,1,1]是一樣的。而現(xiàn)在的o1,p2,250.0和o1,p1,200.0由于compareTo結(jié)果不同,所以是分別進(jìn)入的reduce方法.