MapReduce當(dāng)中排序sort的方法


馬克-to-win @ 馬克java社區(qū):防盜版實(shí)名手機(jī)尾號(hào):73203。注意:想自己實(shí)現(xiàn)Sort得不償失,但如想借助Hadoop MapReduce技術(shù)框架排序,key必須實(shí)現(xiàn)WritableComparable接口。具體做法見下。需求是先按id比,再按amount比。






package com;

import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/*public  interface  WritableComparable<T> extends Writable, Comparable<T>
 而且 public interface Comparable<T>
{
public int compareTo(T o);
}
*/
public class OrderBeanSort implements WritableComparable<OrderBeanSort>{
    private String orderId;
    private Double amount;
    private Double average=0.0;

    public Double getAverage() {
        return average;
    }

    public void setAverage(Double average) {
        this.average = average;
    }

    public OrderBeanSort() {
    }

    public OrderBeanSort(String orderId, Double amount) {
        this.orderId = orderId;
        this.amount = amount;
    }
    public void set(String orderId, Double amount) {
        this.orderId = orderId;
        this.amount = amount;
    }
    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public Double getAmount() {
        return amount;
    }

    public void setAmount(Double amount) {
        this.amount = amount;
    }

    public void write(DataOutput out) throws IOException {
        out.writeUTF(orderId);
        out.writeDouble(amount);
        out.writeDouble(average);
    }

    public void readFields(DataInput in) throws IOException {
        this.orderId = in.readUTF();
        this.amount = in.readDouble();
        this.average = in.readDouble();
    }
/*最后reduce輸出時(shí)用這里。*/
    public String toString() {
        return orderId + "\t" + amount+ "\t" +average;
    }
  
    @Override
    public int compareTo(OrderBeanSort o) {
        /*馬克-to-win: 如orderId相等,則比較amount,否則比較id即可,這樣id相同的連在一起了。因?yàn)閕d先比,amount后比,就是如下排的。
o1    250.0    0.0
o1    200.0    0.0
o2    700.0    0.0
o2    500.0    0.0
o2    100.0    0.0
o3    150.0    0.0
*/
        int cmp = this.getOrderId().compareTo(o.getOrderId());
        if (0 == cmp) {
            return -Double.compare(this.getAmount(), o.getAmount());
        }
        return cmp;
    }
}



package com;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

public class SortMapper extends Mapper<LongWritable, Text,OrderBeanSort, NullWritable>{
    OrderBeanSort bean = new OrderBeanSort();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] fields = line.split(",");
        String orderId = fields[0];
        double amount = Double.parseDouble(fields[2]);
        bean.set(orderId, amount);
        context.write(bean,NullWritable.get());
    }
}



package com;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

public class SortReducer extends Reducer<OrderBeanSort, NullWritable, OrderBeanSort, NullWritable>{

    protected void reduce(OrderBeanSort key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        System.out.println("reduce key is 馬克-to-win @ 馬克java社區(qū):"+key.toString());
/*
o1,p2,250.0
o2,p3,500.0
o2,p4,100.0
o2,p5,700.0
o3,p1,150.0
o1,p1,200.0
 
 */      
        for (NullWritable val : values) {
            System.out.println("val is "+val.toString());
            System.out.println("inside for key is "+key.toString());
        }
/*上面一段和下面一段代碼的作用相同*/
//        Iterator it= values.iterator();
//        while(it.hasNext())
//        {
//            System.out.println("inside iterator"+it.next());
//            System.out.println("inside for key is "+key.toString());
//     //         it.remove();
//        }
        context.write(key, NullWritable.get());
    }
}





package com;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SortTestMark_to_win {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(SortTest.class);
        job.setMapperClass(SortMapper.class);
        job.setReducerClass(SortReducer.class);
        job.setMapOutputKeyClass(OrderBeanSort.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setOutputKeyClass(OrderBeanSort.class);
        job.setOutputValueClass(NullWritable.class);
        FileInputFormat.setInputPaths(job, new Path("e:/temp/input/serial.txt"));
        FileOutputFormat.setOutputPath(job, new Path("e:/temp/output"));
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}


輸出結(jié)果:
reduce key is 馬克-to-win @ 馬克java社區(qū):o1    250.0    0.0
val is (null)
inside for key is o1    250.0    0.0
reduce key is 馬克-to-win @ 馬克java社區(qū):o1    200.0    0.0
val is (null)
inside for key is o1    200.0    0.0
reduce key is 馬克-to-win @ 馬克java社區(qū):o2    700.0    0.0
val is (null)
inside for key is o2    700.0    0.0
reduce key is 馬克-to-win @ 馬克java社區(qū):o2    500.0    0.0
val is (null)
inside for key is o2    500.0    0.0
reduce key is 馬克-to-win @ 馬克java社區(qū):o2    100.0    0.0
val is (null)
inside for key is o2    100.0    0.0
reduce key is 馬克-to-win @ 馬克java社區(qū):o3    150.0    0.0
val is (null)
inside for key is o3    150.0    0.0


輸出文件:
o1    250.0    0.0
o1    200.0    0.0
o2    700.0    0.0
o2    500.0    0.0
o2    100.0    0.0
o3    150.0    0.0

輸入文件:
o1,p2,250.0
o2,p3,500.0
o2,p4,100.0
o2,p5,700.0
o3,p1,150.0
o1,p1,200.0


分析:

先排好序了,6條數(shù)據(jù),在Mapreduce系統(tǒng)當(dāng)中,然后依次進(jìn)入reduce,reduce被調(diào)用了6次。補(bǔ)充講一句:如果是o1,p2,250.0和 o1,p1,250.0,這樣兩條數(shù)據(jù),他們就屬于key完全相同的兩條數(shù)據(jù)。將會(huì)同時(shí)進(jìn)入reduce方法, 和咱們當(dāng)時(shí)的 hello[1,1,1,1]是一樣的。而現(xiàn)在的o1,p2,250.0和o1,p1,200.0由于compareTo結(jié)果不同,所以是分別進(jìn)入的reduce方法.