通過剖析源碼單步調(diào)試詳解MapReduce分組group遍歷
通過剖析源碼單步調(diào)試詳解MapReduce分組group遍歷:
馬克- to-win:馬克 java社區(qū):防盜版實(shí)名手機(jī)尾號(hào): 73203。
馬克-to-win @ 馬克java社區(qū):mapreduce的group知識(shí)點(diǎn)是最難理解的,本小節(jié)將通過仔細(xì)剖析源碼,單步調(diào)試,來詳解之。
另外注意:數(shù)據(jù)文件寫時(shí)一定注意:結(jié)尾不能有回車和空格,通過在map里面加斷點(diǎn),F(xiàn)8(resume),一輪一輪,調(diào)試一行一行的數(shù)據(jù),才發(fā)現(xiàn)最后一行數(shù)據(jù)出毛病了,只有是多了一個(gè)換行符的毛病。
現(xiàn)在需求變成:
源文件:
o1,p2,250.0
o2,p3,500.0
o2,p4,100.0
o2,p5,700.0
o3,p1,150.0
o1,p1,200.0
文件輸出結(jié)果:(每組中最小的)
o1 200.0
o2 100.0
o3 150.0
package com;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/*一般情況下,OrderBean必須是一個(gè)WritableComparable,因?yàn)镸yGroupComparato的構(gòu)造函數(shù):super(OrderBean.class, true);要求是個(gè) WritableComparable*/
public class OrderBean implements WritableComparable<OrderBean>{
private String orderId;
private Double amount;
public OrderBean() {
}
public OrderBean(String orderId, Double amount) {
this.orderId = orderId;
this.amount = amount;
}
public void set(String orderId, Double amount) {
this.orderId = orderId;
this.amount = amount;
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public Double getAmount() {
return amount;
}
public void setAmount(Double amount) {
this.amount = amount;
}
@Override
public int compareTo(OrderBean o) {
/*這里必須加下面這段比較規(guī)則(下面的4 句話),這樣就成為1)o1,250,2)o1, 200, 3)o2, 700,4) o2,500,5)o2,100, 6) o3,150,
準(zhǔn)確的這6 個(gè)順序的進(jìn)入reduce, 才可以分組。如果像下面一樣return 45;則順序是亂的。則進(jìn)入的順序和源文件一樣,是1)o1,250,2)o2,500, 3)o2, 100,4)o2,700, 5) o3,150, 6)o1, 200,這時(shí)就錯(cuò)了,
分組那里全都亂了。因?yàn)橛幸粋€(gè)nextKeyIsSame來一個(gè)一個(gè)key測試是否為一個(gè)組的, 所以同樣的key必須排在一起。輸出和源文件很像,成了4組, 完全是不對的。
o1 250.0
o2 700.0
o3 150.0
o1 200.0
原因見視頻。
那為什么要-Double.compare(this.getAmount(), o.getAmount());呢? 為什么也要比較一下amount呢?因?yàn)?你的需求是找出每個(gè)key最小的amount,
所以在reduce中什么都不干, 只要遍歷一下就行, 自然輪一圈, 輸出的就是最小的。比如如果reduce中什么都不干, 也不遍歷, 輸出的就是最大的。
如果沒有-Double.compare(this.getAmount(), o.getAmount());key能挨一塊, 但后邊的amount的順序就是不定的了。
但是如果需求改了,只要把一組的全部輸出就行, 那我們就可以不用加 return -Double.compare(this.getAmount(), o.getAmount());這句話。
結(jié)論就是:OrderBean里面compareTo就是把是一組的對象都排在一起。需要按順序就按,沒必要就沒必要。
*/
int cmp = this.getOrderId().compareTo(o.getOrderId());
if (0 == cmp) {
return -Double.compare(this.getAmount(), o.getAmount());
}
return cmp;
}
public void write(DataOutput out) throws IOException {
out.writeUTF(orderId);
out.writeDouble(amount);
}
public void readFields(DataInput in) throws IOException {
this.orderId = in.readUTF();
this.amount = in.readDouble();
}
/*最后reduce輸出時(shí)用這里。*/
public String toString() {
return orderId + "\t" + amount;
}
}
package com;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class GroupMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{
OrderBean bean = new OrderBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split(",");
String orderId = fields[0];
double amount = Double.parseDouble(fields[2]);
bean.set(orderId, amount);
context.write(bean, NullWritable.get());
}
}
package com;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.Iterator;
public class GroupReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{
protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
System.out.println("reduce key is 馬克-to-win @ 馬克java社區(qū):"+key.toString());
/*
o1,p2,250.0
o2,p3,500.0
o2,p4,100.0
o2,p5,700.0
o3,p1,150.0
o1,p1,200.0
馬克-to-win @ 馬克java社區(qū): 用如下的方法作group的遍歷,斷點(diǎn)加在這for (NullWritable val : values) {,按F5step into幾次就會(huì)發(fā)現(xiàn)ReduceContextImpl,對于以上這組數(shù)據(jù),第一group數(shù)據(jù)是:o1,250.0和o1,200.0, Mapreduce會(huì)把第一個(gè)key:o1,250.0,傳給輸入?yún)?shù)key指針,同時(shí)也會(huì)傳給context指向的ReduceContextImpl 的實(shí)例的一個(gè)屬性指針, 也叫key。馬克-to-win @ 馬克java社區(qū):這樣當(dāng)ReduceContextImpl的Key改變了,這邊的key也會(huì)變。另外,ReduceContextImpl里面有個(gè)屬性nextKeyIsSame, 對于第一組(實(shí)際上任何一組處理方式都相同)有兩個(gè)值這種情況,nextKeyIsSame的初值會(huì)變成真。而對于第一組只有一個(gè)值這種情況,nextKeyIsSame的初值會(huì)變成假。這是由mapreduce智能識(shí)別的。馬克-to-win @ 馬克java社區(qū):當(dāng)F5,單步調(diào)試時(shí),(注意要反復(fù)按F5,因?yàn)閒or迭代會(huì)在ReduceContextImpl中變成Iteraor語句,具體不深入研究),由于nextKeyIsSame是真,所以可以第二次進(jìn)入For循環(huán),在ReduceContextImpl中執(zhí)行nextKeyValue(),用 input.getKey()得到下一個(gè)key,之后把值給了ReduceContextImpl的key屬性,(這時(shí)我們for中的key值也變了)之后nextKey = input.getKey();還會(huì)緊接著
取出下一個(gè)key,注意這時(shí)可是o2了,已經(jīng)不是同一個(gè)key了,之后 nextKeyIsSame = comparator.compare(currentRawKey..., nextKey....)==0, 會(huì)調(diào)用我們的MyGroupComparator的compare方法,返回-1,不是0 ,這樣就會(huì)斷定o1和o2不是一個(gè)key。nextKeyIsSame就變成false了。馬克-to-win @ 馬克java社區(qū):循環(huán)到for的下一輪,判斷進(jìn)不進(jìn)去時(shí),又一次進(jìn)入ReduceContextImpl,由于nextKeyIsSame是 false,所以就不進(jìn)入for循環(huán)了,在這個(gè)過程中,大家也看到,在遍歷迭代過程中,key的值也改變了。而且也看到了我們的MyGroupComparator的 compare是怎么被調(diào)用的。 */
(購買完整教程)
System.out.println("val is "+val.toString());
System.out.println("inside for key is "+key.toString());
}
/*上面一段和下面一段代碼的作用相同*/
// Iterator it= values.iterator();
// while(it.hasNext())
// {
// System.out.println("inside iterator"+it.next());
// System.out.println("inside for key is "+key.toString());
// // it.remove();
// }
context.write(key, NullWritable.get());
}
}
package com;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class GroupTestMark_to_win {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(GroupTest.class);
job.setMapperClass(GroupMapper.class);
job.setReducerClass(GroupReducer.class);
job.setMapOutputKeyClass(OrderBean.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);
/*setGroupingComparatorClass的參數(shù)必須是個(gè)RawComparator,
而我們的MyGroupComparator extends WritableComparator
而WritableComparator implements RawComparator
而interface RawComparator<T> extends Comparator<T>,實(shí)現(xiàn)compare方法*/
job.setGroupingComparatorClass(MyGroupComparator.class);
File file = new File("e:/temp/output");
if (file.exists() && file.isDirectory()) {
deleteFile(file);
}
FileInputFormat.setInputPaths(job, new Path("e:/temp/input/serial.txt"));
FileOutputFormat.setOutputPath(job, new Path("e:/temp/output"));
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
public static boolean deleteFile(File dirFile) {
if (!dirFile.exists()) {
return false;
}
if (dirFile.isFile()) {
return dirFile.delete();
} else { /* 空目錄就不進(jìn)入for循環(huán)了, 進(jìn)入到下一句dirFile.delete(); */
for (File file : dirFile.listFiles()) {
deleteFile(file);
}
}
return dirFile.delete();
}
}
package com;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class MyGroupComparator extends WritableComparator{
protected MyGroupComparator() {
/*下面的話必須要有, 否則報(bào)空指針異常, true指底層代碼中指是否需要要?jiǎng)?chuàng)建key的實(shí)例,一般為true,在我們這種用法的情況下,為false就會(huì)空指針。沒必要關(guān)心細(xì)節(jié) */
super(OrderBean.class, true);
}
/* 下面這段話是從reducer摘錄出來的: MyGroupComparator的compare方法,返回-1,不是0 ,這樣就會(huì)斷定o1和o2不是一個(gè)key。nextKeyIsSame就變成
false了。循環(huán)到for的下一輪,判斷進(jìn)不進(jìn)去時(shí),又一次進(jìn)入ReduceContextImpl,由于
nextKeyIsSame是false, 所以就不進(jìn)入for循環(huán)了,在這個(gè)過程中,大家也看到,在遍歷迭代過
程中,key的值也改變了。而且也看到了我們的MyGroupComparator的compare是怎么被調(diào)用的。
可以判斷出:想讓誰和誰一組一起進(jìn)入reduce,就返回0。
如果我們的程序變成:return 0;則輸出 結(jié)果是:o3 150.0, 為什么? 聽我視頻
*/
public int compare(WritableComparable a, WritableComparable b) {
OrderBean bean1 = (OrderBean) a;
OrderBean bean2 = (OrderBean) b;
return bean1.getOrderId().compareTo(bean2.getOrderId());
// return 0;
}
}
輸出結(jié)果:
reduce key is 馬克-to-win @ 馬克java社區(qū):o1 250.0
val is (null)
inside for key is o1 250.0
val is (null)
inside for key is o1 200.0
reduce key is 馬克-to-win @ 馬克java社區(qū):o2 700.0
val is (null)
inside for key is o2 700.0
val is (null)
inside for key is o2 500.0
val is (null)
inside for key is o2 100.0
INFO - Job job_local1313255223_0001 running in uber mode : false
reduce key is 馬克-to-win @ 馬克java社區(qū):o3 150.0
val is (null)
inside for key is o3 150.0
文件輸出結(jié)果:(每組中最小的)
o1 200.0
o2 100.0
o3 150.09