MapReduce實現(xiàn)join算法
MapReduce實現(xiàn)join算法
馬克- to-win:馬克 java社區(qū):防盜版實名手機尾號: 73203。
馬克-to-win @ 馬克java社區(qū):需求:
訂單表:(order)
訂單號(id),產(chǎn)品號(pid),數(shù)量(number)
1,p2,2
2,p3,3
2,p4,1
2,p5,4
3,p1,5
1,p1,3
產(chǎn)品表:(product)
產(chǎn)品號(id),產(chǎn)品名(pname),價格(price)
p1,java,11
p2,c,22
p3,c#,33
p4,python,44
p5,js,66
馬克-to-win @ 馬克java社區(qū):防盜版實名手機尾號:73203?,F(xiàn)在數(shù)量巨大,且數(shù)據(jù)都在文本文件中,所以過去的sql不能用。 我們用大數(shù)據(jù)方法實現(xiàn),
select o.id order_id, o.number, p.id , p.pname, p.price number*price sum from order o join product p on o.pid = p.id
最后的結(jié)果一定和order表的條數(shù)是一樣的為6。因為product表只是一對多中的一。填充信息即可。
最終的結(jié)果希望是:
order_id=1, p_id=p1, number=3, pname=java, price=11.0, sum=33.0
order_id=3, p_id=p1, number=5, pname=java, price=11.0, sum=55.0
order_id=1, p_id=p2, number=2, pname=c, price=22.0, sum=44.0
order_id=2, p_id=p3, number=3, pname=c#, price=33.0, sum=99.0
order_id=2, p_id=p4, number=1, pname=python, price=44.0, sum=44.0
order_id=2, p_id=p5, number=4, pname=js, price=66.0, sum=264.0
思路:馬克-to-win @ 馬克java社區(qū):首先map當(dāng)中,可以根據(jù)文件名知道當(dāng)前這條數(shù)據(jù)是訂單還是商品。但因為map里面的context.write往外寫時和 reduce接收時都只能有一種格式(事先在泛型Mapper<LongWritable,Text,Text,UnionBean>當(dāng)中定好),所以無論訂單還是商品都只能放在一個它們的屬性合集UnionBean當(dāng)中。馬克-to-win @ 馬克java社區(qū):如果是Order,就填一下UnionBean的Order的幾個屬性,其他的屬性都用缺省值頂替。對于Product同理。這樣放在context中的 都是殘缺不全的UnionBean,但都有ProductID這個屬性。一種方法是,我們找到order和product的關(guān)系,(這樣這個關(guān)系需要儲藏在hashMap中), 根據(jù)order找product或根據(jù)product來找order,但都比較麻煩(這就是后面的map join的方法思路)。馬克-to-win @ 馬克java社區(qū):另外一種好的方法,因為所有的UnionBean都有productid,所以以productid作為key,給reducer。在reducer當(dāng)中, 把已經(jīng)填完整了的Order屬性的UnionBean當(dāng)做基礎(chǔ),放在一個ArrayList當(dāng)中,它里面所缺的Product的屬性,給填上就好了。馬克-to-win @ 馬克java社區(qū):另外注意orderID為0代表order,否則為product。有關(guān)如何遍歷兩遍的問題,見代碼里。
package com;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class UnionBean implements Writable{
private int order_id;
private String p_id;
private int number;
private String pname;
private float price;
public UnionBean() {
}
public UnionBean(int order_id, String p_id, int number, String pname, float price) {
this.order_id = order_id;
this.p_id = p_id;
this.number = number;
this.pname = pname;
this.price = price;
}
public void set(int order_id, String p_id, int number, String pname, float price) {
this.order_id = order_id;
this.p_id = p_id;
this.number = number;
this.pname = pname;
this.price = price;
}
public int getOrder_id() {
return order_id;
}
public void setOrder_id(int order_id) {
this.order_id = order_id;
}
public String getP_id() {
return p_id;
}
public void setP_id(String p_id) {
this.p_id = p_id;
}
public int getNumber() {
return number;
}
public void setNumber(int number) {
this.number = number;
}
public String getPname() {
return pname;
}
public void setPname(String pname) {
this.pname = pname;
}
public float getPrice() {
return price;
}
public void setPrice(float price) {
this.price = price;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(order_id);
out.writeUTF(p_id);
out.writeInt(number);
out.writeUTF(pname);
out.writeFloat(price);
}
@Override
public void readFields(DataInput in) throws IOException {
this.order_id = in.readInt();
this.p_id = in.readUTF();
this.number = in.readInt();
this.pname = in.readUTF();
this.price = in.readFloat();
}
@Override
public String toString() {
return "order_id=" + order_id +
", p_id=" + p_id +
", number=" + number +
", pname=" + pname +
", price=" + price +
", sum=" + price*number;
}
}
package com;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class JoinMapper extends Mapper<LongWritable,Text,Text,UnionBean>{
UnionBean unionBean = new UnionBean();
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String p_id;
// context讀取切片信息,之后返回文件名
FileSplit inputSplit = (FileSplit) context.getInputSplit();
String fileName = inputSplit.getPath().getName();
System.out.println(fileName+" is fileName馬克-to-win @ 馬克java社區(qū):");
if (fileName.startsWith("order")) {
String[] fields = line.split(",");
p_id = fields[1];
// 為了防止出現(xiàn)空指針,我們給后面不存在的變量賦予默認值
/* 舉個例子:對于:p1,java,11和3,p1,5和1,p1,3來講是:3,p1,5,"",0和1,p1,3,"",0 */
unionBean.set(Integer.parseInt(fields[0]), fields[1], Integer.parseInt(fields[2]), "", 0);
} else {
String[] fields = line.split(",");
p_id = fields[0];
// 為了防止出現(xiàn)空指針,我們給后面不存在的變量賦予默認值
/*舉個例子:對于:p1,java,11和3,p1,5和1,p1,3來講是:0,p1,0,java,11*/
unionBean.set(0, p_id, 0, fields[1], Integer.parseInt(fields[2]));
}
context.write(new Text(p_id), unionBean);
}
}
package com;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class JoinReducer extends Reducer<Text, UnionBean, UnionBean, NullWritable>{
@Override
protected void reduce(Text key, Iterable<UnionBean> values, Context context) throws IOException, InterruptedException {
UnionBean pBeanMiddle = new UnionBean();
List<UnionBean> resultList = new ArrayList<>();
/* 馬克-to-win:注意這里的key是個pid,只是一個值。因為reducer當(dāng)中不能遍歷兩遍,所以在第一遍遍歷的時候,把所有的Order(一對多的多)先放在一個ArrayList當(dāng)中,因為Product 的Bean只有一個實例(一對多的一),所以把它先存在一個實例當(dāng)中,第二次遍歷就發(fā)生在ArrayList的當(dāng)中,把那個Product的屬性給 Order Bean當(dāng)中的屬性就行了,舉個例子:p1,java,11和3,p1,5和1,p1,3,這3條數(shù)據(jù)形成的3條unionBean(3,p1,5,"",0和1,p1,3,"", 0和0,p1,0,java,11)就同時進入到reduce當(dāng)中,p1,java,11就進入pBeanMiddle, 3,p1,5和1,p1,3就進入兩個oBeanMiddle.即resultList*/
for (UnionBean value : values) {
if (0==value.getOrder_id()) {
try {
BeanUtils.copyProperties(pBeanMiddle, value);
} catch (Exception e) {
e.printStackTrace();
}
} else {
UnionBean oBeanMiddle = new UnionBean();
try {
BeanUtils.copyProperties(oBeanMiddle, value);
/*這里是一對多中的多, 所以要用一個List來處理*/
resultList.add(oBeanMiddle);
} catch (Exception e) {
e.printStackTrace();
}
}
}
for (UnionBean resultBean : resultList) {
resultBean.setPname(pBeanMiddle.getPname());
resultBean.setPrice(pBeanMiddle.getPrice());
context.write(resultBean, NullWritable.get());
}
}
}
package com;
import java.io.File;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class JoinTestMark_to_win {
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(JoinTest.class);
job.setMapperClass(JoinMapper.class);
job.setReducerClass(JoinReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(UnionBean.class);
job.setOutputKeyClass(UnionBean.class);
job.setOutputValueClass(NullWritable.class);
File file = new File("e:/temp/output");
if (file.exists() && file.isDirectory()) {
deleteFile(file);
}
/*inputJoin放了兩個文件, product.txt和order.txt*/
FileInputFormat.setInputPaths(job, new Path("e:/temp/inputJoin"));
/*或者用下面的本章前面的方法也可以。*/
// FileInputFormat.addInputPath(job, new Path("e:/temp/inputJoin/order.txt"));
// FileInputFormat.addInputPath(job, new Path("e:/temp/inputJoin/product.txt"));
FileOutputFormat.setOutputPath(job, new Path("e:/temp/output"));
System.out.println("馬克-to-win @馬克java社區(qū) successful");
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static boolean deleteFile(File dirFile) {
if (!dirFile.exists()) {
return false;
}
if (dirFile.isFile()) {
return dirFile.delete();
} else { /* 空目錄就不進入for循環(huán)了, 進入到下一句dirFile.delete(); */
for (File file : dirFile.listFiles()) {
deleteFile(file);
}
}
return dirFile.delete();
}
}
文件輸出結(jié)果是:
order_id=1, p_id=p1, number=3, pname=java, price=11.0, sum=33.0
order_id=3, p_id=p1, number=5, pname=java, price=11.0, sum=55.0
order_id=1, p_id=p2, number=2, pname=c, price=22.0, sum=44.0
order_id=2, p_id=p3, number=3, pname=c#, price=33.0, sum=99.0
order_id=2, p_id=p4, number=1, pname=python, price=44.0, sum=44.0
order_id=2, p_id=p5, number=4, pname=js, price=66.0, sum=264.0