MapReduce當(dāng)中自定義對(duì)象的用法
自定義對(duì)象:
馬克- to-win:馬克 java社區(qū):防盜版實(shí)名手機(jī)尾號(hào): 73203。
馬克-to-win @ 馬克java社區(qū):到目前為止,我們?cè)趆adoop網(wǎng)絡(luò)上傳的變量類型都是預(yù)定義的類型比如Text或IntWritable等,但有時(shí)需要我們,自己建一個(gè)類,把預(yù)定義的簡(jiǎn)單數(shù)據(jù)類型封裝在里頭而且還能像預(yù)定義的類型一樣在hadoop網(wǎng)絡(luò)中傳輸,這樣更便于管理和運(yùn)作。這樣就需要向下面這樣 implements Writable。實(shí)現(xiàn)write和readFields方法。思路:如果想求平均值的話,按照前一章講的hello world方法,同一個(gè)鍵的所有值,同時(shí)都進(jìn)同一個(gè)reduce方法,這樣的話,我們可以以字符串的形式,把o1,p2,250.0和o1,p1,200.0兩個(gè)字符串同時(shí)傳到同一個(gè) reduce之后,經(jīng)過處理,求和求平均值就可以了,但是這種方法畢竟顯得有點(diǎn)笨,(但基本能解決所有問題),所以我們可以用這一節(jié)所學(xué)的自定義對(duì)象的方法傳值。顯得高大上。講述本節(jié)時(shí),可以先講一遍,明白了后,再講一遍,說明為什么這么寫程序。
需求:訂單
o1,p2,250.0
o2,p3,500.0
o2,p4,100.0
o2,p5,700.0
o3,p1,150.0
o1,p1,200.0
求出每個(gè)訂單的平均值:
o1 o1 0.0 225.0
o2 o2 0.0 433.3333333333333
o3 o3 0.0 150.0
package com;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
(購(gòu)買完整教程)
private String orderId;
private Double amount;
/* 為了防止出現(xiàn)空指針,我們給后面不存在的變量賦予默認(rèn)值*/
private Double average=0.0;
public Double getAverage() {
return average;
}
public void setAverage(Double average) {
this.average = average;
}
public OrderBeanSerial() {
}
public OrderBeanSerial(String orderId, Double amount) {
this.orderId = orderId;
this.amount = amount;
}
public void set(String orderId, Double amount) {
this.orderId = orderId;
this.amount = amount;
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public Double getAmount() {
return amount;
}
public void setAmount(Double amount) {
this.amount = amount;
}
public void write(DataOutput out) throws IOException {
out.writeUTF(orderId);
out.writeDouble(amount);
out.writeDouble(average);
}
public void readFields(DataInput in) throws IOException {
this.orderId = in.readUTF();
this.amount = in.readDouble();
this.average = in.readDouble();
}
/*最后reduce輸出時(shí)用這里。既然輸出在這里,輸出要求有平均值,那當(dāng)然要有average這一項(xiàng),
否則這里寫成sum/num,sum和num參數(shù)不好傳過來(lái)。*/
public String toString() {
return orderId + "\t" + amount+ "\t" +average;
}
}
package com;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class SerialMapper extends Mapper<LongWritable, Text, Text,OrderBeanSerial>{
OrderBeanSerial bean = new OrderBeanSerial();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split(",");
String orderId = fields[0];
double amount = Double.parseDouble(fields[2]);
bean.set(orderId, amount);
context.write(new Text(orderId),bean);
}
}
package com;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import org.apache.hadoop.io.Text;
public class SerialReducer extends Reducer<Text,OrderBeanSerial, Text, OrderBeanSerial>{
protected void reduce(Text key, Iterable<OrderBeanSerial> values, Context context) throws IOException, InterruptedException {
System.out.println("reduce key is 馬克-to-win @ 馬克java社區(qū):"+key.toString());
/* 對(duì)于下面這組數(shù)據(jù), 像helloworld一樣,hello[1,1,1,1]一起進(jìn)入reduce,這里是o1[o1,250.0,o1,200.0]一起進(jìn)入reduce,下面的for有體現(xiàn)。在for中就好求平均值了。
o1,p2,250.0
o2,p3,500.0
o2,p4,100.0
o2,p5,700.0
o3,p1,150.0
o1,p1,200.0
*/
Double d,sum=0.0;
int num=0;
for (OrderBeanSerial val : values) {
d=val.getAmount();
sum=sum+d;
num++;
System.out.println("val is "+val.toString());
System.out.println("inside for key is "+key.toString());
}
OrderBeanSerial vals=new OrderBeanSerial();
vals.set(key.toString(), 0.0);
vals.setAverage(sum/num);
/*這樣就是最后的文件里的輸出結(jié)果了。正好符合需求。*/
context.write(key, vals);
}
}
package com;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SerialTestMark_to_win {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(SerialTest.class);
job.setMapperClass(SerialMapper.class);
job.setReducerClass(SerialReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(OrderBeanSerial.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(OrderBeanSerial.class);
FileInputFormat.setInputPaths(job, new Path("e:/temp/input/serial.txt"));
/*保證下面的目錄不存在*/
FileOutputFormat.setOutputPath(job, new Path("e:/temp/output"));
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
文件當(dāng)中的輸出是:(分析:key和value都輸出了,所以有兩個(gè)o1)
o1 o1 0.0 225.0
o2 o2 0.0 433.3333333333333
o3 o3 0.0 150.0
輸出結(jié)果:(分析amount的輸出是無(wú)序的)
reduce key is 馬克-to-win @ 馬克java社區(qū): o1
INFO - reduce > reduce
INFO - map 100% reduce 78%
val is o1 200.0 0.0
inside for key is o1
INFO - reduce > reduce
INFO - map 100% reduce 83%
val is o1 250.0 0.0
inside for key is o1
reduce key is 馬克-to-win @ 馬克java社區(qū):o2
INFO - reduce > reduce
INFO - map 100% reduce 89%
val is o2 700.0 0.0
inside for key is o2
INFO - reduce > reduce
INFO - map 100% reduce 94%
val is o2 100.0 0.0
inside for key is o2
INFO - reduce > reduce
INFO - map 100% reduce 100%
val is o2 500.0 0.0
inside for key is o2
reduce key is 馬克-to-win @ 馬克java社區(qū):o3
INFO - reduce > reduce
val is o3 150.0 0.0
inside for key is o3