MapReduce當(dāng)中自定義對(duì)象的用法

自定義對(duì)象:
馬克- to-win:馬克 java社區(qū):防盜版實(shí)名手機(jī)尾號(hào): 73203。
馬克-to-win @ 馬克java社區(qū):到目前為止,我們?cè)趆adoop網(wǎng)絡(luò)上傳的變量類型都是預(yù)定義的類型比如Text或IntWritable等,但有時(shí)需要我們,自己建一個(gè)類,把預(yù)定義的簡(jiǎn)單數(shù)據(jù)類型封裝在里頭而且還能像預(yù)定義的類型一樣在hadoop網(wǎng)絡(luò)中傳輸,這樣更便于管理和運(yùn)作。這樣就需要向下面這樣 implements Writable。實(shí)現(xiàn)write和readFields方法。思路:如果想求平均值的話,按照前一章講的hello world方法,同一個(gè)鍵的所有值,同時(shí)都進(jìn)同一個(gè)reduce方法,這樣的話,我們可以以字符串的形式,把o1,p2,250.0和o1,p1,200.0兩個(gè)字符串同時(shí)傳到同一個(gè) reduce之后,經(jīng)過處理,求和求平均值就可以了,但是這種方法畢竟顯得有點(diǎn)笨,(但基本能解決所有問題),所以我們可以用這一節(jié)所學(xué)的自定義對(duì)象的方法傳值。顯得高大上。講述本節(jié)時(shí),可以先講一遍,明白了后,再講一遍,說明為什么這么寫程序。
 



需求:訂單
o1,p2,250.0
o2,p3,500.0
o2,p4,100.0
o2,p5,700.0
o3,p1,150.0
o1,p1,200.0

求出每個(gè)訂單的平均值:

o1    o1    0.0    225.0
o2    o2    0.0    433.3333333333333
o3    o3    0.0    150.0



package com;

import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

(購(gòu)買完整教程)
    private String orderId;
    private Double amount;
/* 為了防止出現(xiàn)空指針,我們給后面不存在的變量賦予默認(rèn)值*/
    private Double average=0.0;

    public Double getAverage() {
        return average;
    }

    public void setAverage(Double average) {
        this.average = average;
    }

    public OrderBeanSerial() {
    }

    public OrderBeanSerial(String orderId, Double amount) {
        this.orderId = orderId;
        this.amount = amount;
    }
    public void set(String orderId, Double amount) {
        this.orderId = orderId;
        this.amount = amount;
    }
    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public Double getAmount() {
        return amount;
    }

    public void setAmount(Double amount) {
        this.amount = amount;
    }



    public void write(DataOutput out) throws IOException {
        out.writeUTF(orderId);
        out.writeDouble(amount);
        out.writeDouble(average);
    }

    public void readFields(DataInput in) throws IOException {
        this.orderId = in.readUTF();
        this.amount = in.readDouble();
        this.average = in.readDouble();
    }
/*最后reduce輸出時(shí)用這里。既然輸出在這里,輸出要求有平均值,那當(dāng)然要有average這一項(xiàng),
否則這里寫成sum/num,sum和num參數(shù)不好傳過來(lái)。*/
    public String toString() {
        return orderId + "\t" + amount+ "\t" +average;
    }
}





package com;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

public class SerialMapper extends Mapper<LongWritable, Text, Text,OrderBeanSerial>{
    OrderBeanSerial bean = new OrderBeanSerial();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] fields = line.split(",");
        String orderId = fields[0];
        double amount = Double.parseDouble(fields[2]);
        bean.set(orderId, amount);
        context.write(new Text(orderId),bean);
    }
}



package com;

import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import org.apache.hadoop.io.Text;

public class SerialReducer extends Reducer<Text,OrderBeanSerial, Text, OrderBeanSerial>{

    protected void reduce(Text key, Iterable<OrderBeanSerial> values, Context context) throws IOException, InterruptedException {
        System.out.println("reduce key is 馬克-to-win @ 馬克java社區(qū):"+key.toString());
/*  對(duì)于下面這組數(shù)據(jù), 像helloworld一樣,hello[1,1,1,1]一起進(jìn)入reduce,這里是o1[o1,250.0,o1,200.0]一起進(jìn)入reduce,下面的for有體現(xiàn)。在for中就好求平均值了。
o1,p2,250.0
o2,p3,500.0
o2,p4,100.0
o2,p5,700.0
o3,p1,150.0
o1,p1,200.0

*/      
        Double d,sum=0.0;
        int num=0;
        for (OrderBeanSerial val : values) {
            d=val.getAmount();
            sum=sum+d;
            num++;
          
            System.out.println("val is "+val.toString());
            System.out.println("inside for key is "+key.toString());
        }

        OrderBeanSerial vals=new OrderBeanSerial();
        vals.set(key.toString(), 0.0);
        vals.setAverage(sum/num);
/*這樣就是最后的文件里的輸出結(jié)果了。正好符合需求。*/
        context.write(key, vals);
    }
}






package com;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SerialTestMark_to_win {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(SerialTest.class);
        job.setMapperClass(SerialMapper.class);
        job.setReducerClass(SerialReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(OrderBeanSerial.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(OrderBeanSerial.class);
        FileInputFormat.setInputPaths(job, new Path("e:/temp/input/serial.txt"));
/*保證下面的目錄不存在*/      
        FileOutputFormat.setOutputPath(job, new Path("e:/temp/output"));
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

文件當(dāng)中的輸出是:(分析:key和value都輸出了,所以有兩個(gè)o1)
o1    o1    0.0    225.0
o2    o2    0.0    433.3333333333333
o3    o3    0.0    150.0



輸出結(jié)果:(分析amount的輸出是無(wú)序的)

reduce key is 馬克-to-win @ 馬克java社區(qū): o1
 INFO - reduce > reduce
 INFO -  map 100% reduce 78%
val is o1    200.0    0.0
inside for key is o1
 INFO - reduce > reduce
 INFO -  map 100% reduce 83%
val is o1    250.0    0.0
inside for key is o1
reduce key is 馬克-to-win @ 馬克java社區(qū):o2
 INFO - reduce > reduce
 INFO -  map 100% reduce 89%
val is o2    700.0    0.0
inside for key is o2
 INFO - reduce > reduce
 INFO -  map 100% reduce 94%
val is o2    100.0    0.0
inside for key is o2
 INFO - reduce > reduce
 INFO -  map 100% reduce 100%
val is o2    500.0    0.0
inside for key is o2
reduce key is 馬克-to-win @ 馬克java社區(qū):o3
 INFO - reduce > reduce
val is o3    150.0    0.0
inside for key is o3