通過剖析源碼單步調(diào)試詳解MapReduce分組group遍歷
通過剖析源碼單步調(diào)試詳解MapReduce分組group遍歷:
馬克- to-win:馬克 java社區(qū):防盜版實名手機尾號: 73203。
馬克-to-win @ 馬克java社區(qū):mapreduce的group知識點是最難理解的,本小節(jié)將通過仔細剖析源碼,單步調(diào)試,來詳解之。
另外注意:數(shù)據(jù)文件寫時一定注意:結(jié)尾不能有回車和空格,通過在map里面加斷點,F(xiàn)8(resume),一輪一輪,調(diào)試一行一行的數(shù)據(jù),才發(fā)現(xiàn)最后一行數(shù)據(jù)出毛病了,只有是多了一個換行符的毛病。
現(xiàn)在需求變成:
源文件:
o1,p2,250.0
o2,p3,500.0
o2,p4,100.0
o2,p5,700.0
o3,p1,150.0
o1,p1,200.0
文件輸出結(jié)果:(每組中最小的)
o1 200.0
o2 100.0
o3 150.0
package com;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/*一般情況下,OrderBean必須是一個WritableComparable,因為MyGroupComparato的構造函數(shù):super(OrderBean.class, true);要求是個 WritableComparable*/
public class OrderBean implements WritableComparable<OrderBean>{
private String orderId;
private Double amount;
public OrderBean() {
}
public OrderBean(String orderId, Double amount) {
this.orderId = orderId;
this.amount = amount;
}
public void set(String orderId, Double amount) {
this.orderId = orderId;
this.amount = amount;
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public Double getAmount() {
return amount;
}
public void setAmount(Double amount) {
this.amount = amount;
}
@Override
public int compareTo(OrderBean o) {
/*這里必須加下面這段比較規(guī)則(下面的4 句話),這樣就成為1)o1,250,2)o1, 200, 3)o2, 700,4) o2,500,5)o2,100, 6) o3,150,
準確的這6 個順序的進入reduce, 才可以分組。如果像下面一樣return 45;則順序是亂的。則進入的順序和源文件一樣,是1)o1,250,2)o2,500, 3)o2, 100,4)o2,700, 5) o3,150, 6)o1, 200,這時就錯了,
分組那里全都亂了。因為有一個nextKeyIsSame來一個一個key測試是否為一個組的, 所以同樣的key必須排在一起。輸出和源文件很像,成了4組, 完全是不對的。
o1 250.0
o2 700.0
o3 150.0
o1 200.0
原因見視頻。
那為什么要-Double.compare(this.getAmount(), o.getAmount());呢? 為什么也要比較一下amount呢?因為 你的需求是找出每個key最小的amount,
所以在reduce中什么都不干, 只要遍歷一下就行, 自然輪一圈, 輸出的就是最小的。比如如果reduce中什么都不干, 也不遍歷, 輸出的就是最大的。
如果沒有-Double.compare(this.getAmount(), o.getAmount());key能挨一塊, 但后邊的amount的順序就是不定的了。
但是如果需求改了,只要把一組的全部輸出就行, 那我們就可以不用加 return -Double.compare(this.getAmount(), o.getAmount());這句話。
結(jié)論就是:OrderBean里面compareTo就是把是一組的對象都排在一起。需要按順序就按,沒必要就沒必要。
*/
int cmp = this.getOrderId().compareTo(o.getOrderId());
if (0 == cmp) {
return -Double.compare(this.getAmount(), o.getAmount());
}
return cmp;
}
public void write(DataOutput out) throws IOException {
out.writeUTF(orderId);
out.writeDouble(amount);
}
public void readFields(DataInput in) throws IOException {
this.orderId = in.readUTF();
this.amount = in.readDouble();
}
/*最后reduce輸出時用這里。*/
public String toString() {
return orderId + "\t" + amount;
}
}
package com;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class GroupMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{
OrderBean bean = new OrderBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split(",");
String orderId = fields[0];
double amount = Double.parseDouble(fields[2]);
bean.set(orderId, amount);
context.write(bean, NullWritable.get());
}
}
package com;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.Iterator;
public class GroupReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{
protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
System.out.println("reduce key is 馬克-to-win @ 馬克java社區(qū):"+key.toString());
/*
o1,p2,250.0
o2,p3,500.0
o2,p4,100.0
o2,p5,700.0
o3,p1,150.0
o1,p1,200.0
馬克-to-win @ 馬克java社區(qū): 用如下的方法作group的遍歷,斷點加在這for (NullWritable val : values) {,按F5step into幾次就會發(fā)現(xiàn)ReduceContextImpl,對于以上這組數(shù)據(jù),第一group數(shù)據(jù)是:o1,250.0和o1,200.0, Mapreduce會把第一個key:o1,250.0,傳給輸入?yún)?shù)key指針,同時也會傳給context指向的ReduceContextImpl 的實例的一個屬性指針, 也叫key。馬克-to-win @ 馬克java社區(qū):這樣當ReduceContextImpl的Key改變了,這邊的key也會變。另外,ReduceContextImpl里面有個屬性nextKeyIsSame, 對于第一組(實際上任何一組處理方式都相同)有兩個值這種情況,nextKeyIsSame的初值會變成真。而對于第一組只有一個值這種情況,nextKeyIsSame的初值會變成假。這是由mapreduce智能識別的。馬克-to-win @ 馬克java社區(qū):當F5,單步調(diào)試時,(注意要反復按F5,因為for迭代會在ReduceContextImpl中變成Iteraor語句,具體不深入研究),由于nextKeyIsSame是真,所以可以第二次進入For循環(huán),在ReduceContextImpl中執(zhí)行nextKeyValue(),用 input.getKey()得到下一個key,之后把值給了ReduceContextImpl的key屬性,(這時我們for中的key值也變了)之后nextKey = input.getKey();還會緊接著
取出下一個key,注意這時可是o2了,已經(jīng)不是同一個key了,之后 nextKeyIsSame = comparator.compare(currentRawKey..., nextKey....)==0, 會調(diào)用我們的MyGroupComparator的compare方法,返回-1,不是0 ,這樣就會斷定o1和o2不是一個key。nextKeyIsSame就變成false了。馬克-to-win @ 馬克java社區(qū):循環(huán)到for的下一輪,判斷進不進去時,又一次進入ReduceContextImpl,由于nextKeyIsSame是 false,所以就不進入for循環(huán)了,在這個過程中,大家也看到,在遍歷迭代過程中,key的值也改變了。而且也看到了我們的MyGroupComparator的 compare是怎么被調(diào)用的。 */
(購買完整教程)
System.out.println("val is "+val.toString());
System.out.println("inside for key is "+key.toString());
}
/*上面一段和下面一段代碼的作用相同*/
// Iterator it= values.iterator();
// while(it.hasNext())
// {
// System.out.println("inside iterator"+it.next());
// System.out.println("inside for key is "+key.toString());
// // it.remove();
// }
context.write(key, NullWritable.get());
}
}
package com;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class GroupTestMark_to_win {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(GroupTest.class);
job.setMapperClass(GroupMapper.class);
job.setReducerClass(GroupReducer.class);
job.setMapOutputKeyClass(OrderBean.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);
/*setGroupingComparatorClass的參數(shù)必須是個RawComparator,
而我們的MyGroupComparator extends WritableComparator
而WritableComparator implements RawComparator
而interface RawComparator<T> extends Comparator<T>,實現(xiàn)compare方法*/
job.setGroupingComparatorClass(MyGroupComparator.class);
File file = new File("e:/temp/output");
if (file.exists() && file.isDirectory()) {
deleteFile(file);
}
FileInputFormat.setInputPaths(job, new Path("e:/temp/input/serial.txt"));
FileOutputFormat.setOutputPath(job, new Path("e:/temp/output"));
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
public static boolean deleteFile(File dirFile) {
if (!dirFile.exists()) {
return false;
}
if (dirFile.isFile()) {
return dirFile.delete();
} else { /* 空目錄就不進入for循環(huán)了, 進入到下一句dirFile.delete(); */
for (File file : dirFile.listFiles()) {
deleteFile(file);
}
}
return dirFile.delete();
}
}
package com;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class MyGroupComparator extends WritableComparator{
protected MyGroupComparator() {
/*下面的話必須要有, 否則報空指針異常, true指底層代碼中指是否需要要創(chuàng)建key的實例,一般為true,在我們這種用法的情況下,為false就會空指針。沒必要關心細節(jié) */
super(OrderBean.class, true);
}
/* 下面這段話是從reducer摘錄出來的: MyGroupComparator的compare方法,返回-1,不是0 ,這樣就會斷定o1和o2不是一個key。nextKeyIsSame就變成
false了。循環(huán)到for的下一輪,判斷進不進去時,又一次進入ReduceContextImpl,由于
nextKeyIsSame是false, 所以就不進入for循環(huán)了,在這個過程中,大家也看到,在遍歷迭代過
程中,key的值也改變了。而且也看到了我們的MyGroupComparator的compare是怎么被調(diào)用的。
可以判斷出:想讓誰和誰一組一起進入reduce,就返回0。
如果我們的程序變成:return 0;則輸出 結(jié)果是:o3 150.0, 為什么? 聽我視頻
*/
public int compare(WritableComparable a, WritableComparable b) {
OrderBean bean1 = (OrderBean) a;
OrderBean bean2 = (OrderBean) b;
return bean1.getOrderId().compareTo(bean2.getOrderId());
// return 0;
}
}
輸出結(jié)果:
reduce key is 馬克-to-win @ 馬克java社區(qū):o1 250.0
val is (null)
inside for key is o1 250.0
val is (null)
inside for key is o1 200.0
reduce key is 馬克-to-win @ 馬克java社區(qū):o2 700.0
val is (null)
inside for key is o2 700.0
val is (null)
inside for key is o2 500.0
val is (null)
inside for key is o2 100.0
INFO - Job job_local1313255223_0001 running in uber mode : false
reduce key is 馬克-to-win @ 馬克java社區(qū):o3 150.0
val is (null)
inside for key is o3 150.0
文件輸出結(jié)果:(每組中最小的)
o1 200.0
o2 100.0
o3 150.09