MapReduce實現(xiàn)join算法

MapReduce實現(xiàn)join算法
馬克- to-win:馬克 java社區(qū):防盜版實名手機尾號: 73203。
馬克-to-win @ 馬克java社區(qū):需求:



訂單表:(order)
訂單號(id),產(chǎn)品號(pid),數(shù)量(number)
1,p2,2
2,p3,3
2,p4,1
2,p5,4
3,p1,5
1,p1,3

產(chǎn)品表:(product)
產(chǎn)品號(id),產(chǎn)品名(pname),價格(price)

p1,java,11
p2,c,22
p3,c#,33
p4,python,44
p5,js,66

馬克-to-win @ 馬克java社區(qū):防盜版實名手機尾號:73203?,F(xiàn)在數(shù)量巨大,且數(shù)據(jù)都在文本文件中,所以過去的sql不能用。 我們用大數(shù)據(jù)方法實現(xiàn),
select o.id order_id,  o.number, p.id , p.pname,  p.price  number*price sum from order o join product p on o.pid = p.id

最后的結(jié)果一定和order表的條數(shù)是一樣的為6。因為product表只是一對多中的一。填充信息即可。

最終的結(jié)果希望是:

order_id=1, p_id=p1, number=3, pname=java, price=11.0, sum=33.0
order_id=3, p_id=p1, number=5, pname=java, price=11.0, sum=55.0
order_id=1, p_id=p2, number=2, pname=c, price=22.0, sum=44.0
order_id=2, p_id=p3, number=3, pname=c#, price=33.0, sum=99.0
order_id=2, p_id=p4, number=1, pname=python, price=44.0, sum=44.0
order_id=2, p_id=p5, number=4, pname=js, price=66.0, sum=264.0

思路:馬克-to-win @ 馬克java社區(qū):首先map當(dāng)中,可以根據(jù)文件名知道當(dāng)前這條數(shù)據(jù)是訂單還是商品。但因為map里面的context.write往外寫時和 reduce接收時都只能有一種格式(事先在泛型Mapper<LongWritable,Text,Text,UnionBean>當(dāng)中定好),所以無論訂單還是商品都只能放在一個它們的屬性合集UnionBean當(dāng)中。馬克-to-win @ 馬克java社區(qū):如果是Order,就填一下UnionBean的Order的幾個屬性,其他的屬性都用缺省值頂替。對于Product同理。這樣放在context中的 都是殘缺不全的UnionBean,但都有ProductID這個屬性。一種方法是,我們找到order和product的關(guān)系,(這樣這個關(guān)系需要儲藏在hashMap中), 根據(jù)order找product或根據(jù)product來找order,但都比較麻煩(這就是后面的map join的方法思路)。馬克-to-win @ 馬克java社區(qū):另外一種好的方法,因為所有的UnionBean都有productid,所以以productid作為key,給reducer。在reducer當(dāng)中, 把已經(jīng)填完整了的Order屬性的UnionBean當(dāng)做基礎(chǔ),放在一個ArrayList當(dāng)中,它里面所缺的Product的屬性,給填上就好了。馬克-to-win @ 馬克java社區(qū):另外注意orderID為0代表order,否則為product。有關(guān)如何遍歷兩遍的問題,見代碼里。




package com;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class UnionBean implements Writable{
    private int order_id;
    private String p_id;
    private int number;
    private String pname;
    private float price;

    public UnionBean() {
    }

    public UnionBean(int order_id, String p_id, int number, String pname, float price) {
        this.order_id = order_id;
        this.p_id = p_id;
        this.number = number;
        this.pname = pname;
        this.price = price;

    }
    public void set(int order_id, String p_id, int number, String pname, float price) {
        this.order_id = order_id;
        this.p_id = p_id;
        this.number = number;
        this.pname = pname;
        this.price = price;

    }
    public int getOrder_id() {
        return order_id;
    }

    public void setOrder_id(int order_id) {
        this.order_id = order_id;
    }

    public String getP_id() {
        return p_id;
    }

    public void setP_id(String p_id) {
        this.p_id = p_id;
    }

    public int getNumber() {
        return number;
    }

    public void setNumber(int number) {
        this.number = number;
    }

    public String getPname() {
        return pname;
    }

    public void setPname(String pname) {
        this.pname = pname;
    }

    public float getPrice() {
        return price;
    }

    public void setPrice(float price) {
        this.price = price;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(order_id);
        out.writeUTF(p_id);
        out.writeInt(number);
        out.writeUTF(pname);
        out.writeFloat(price);

    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.order_id = in.readInt();
        this.p_id = in.readUTF();
        this.number = in.readInt();
        this.pname = in.readUTF();
        this.price = in.readFloat();

    }

    @Override
    public String toString() {
        return  "order_id=" + order_id  +
                ", p_id=" + p_id +
                ", number=" + number +
                ", pname=" + pname +
                ", price=" + price +
                ", sum=" + price*number;
    }
}



package com;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;

public class JoinMapper extends Mapper<LongWritable,Text,Text,UnionBean>{
    UnionBean unionBean = new UnionBean();

    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String p_id;
        // context讀取切片信息,之后返回文件名
        FileSplit inputSplit = (FileSplit) context.getInputSplit();
        String fileName = inputSplit.getPath().getName();
        System.out.println(fileName+" is fileName馬克-to-win @ 馬克java社區(qū):");
        if (fileName.startsWith("order")) {
            String[] fields = line.split(",");
            p_id = fields[1];
            // 為了防止出現(xiàn)空指針,我們給后面不存在的變量賦予默認值
/*  舉個例子:對于:p1,java,11和3,p1,5和1,p1,3來講是:3,p1,5,"",0和1,p1,3,"",0  */
            unionBean.set(Integer.parseInt(fields[0]), fields[1], Integer.parseInt(fields[2]), "", 0);
        } else {
            String[] fields = line.split(",");
            p_id = fields[0];
            // 為了防止出現(xiàn)空指針,我們給后面不存在的變量賦予默認值
/*舉個例子:對于:p1,java,11和3,p1,5和1,p1,3來講是:0,p1,0,java,11*/
            unionBean.set(0, p_id, 0, fields[1], Integer.parseInt(fields[2]));
        }
        context.write(new Text(p_id), unionBean);
    }
}



package com;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class JoinReducer extends Reducer<Text, UnionBean, UnionBean, NullWritable>{
    @Override
    protected void reduce(Text key, Iterable<UnionBean> values, Context context) throws IOException, InterruptedException {
        UnionBean pBeanMiddle = new UnionBean();
        List<UnionBean> resultList = new ArrayList<>();




/* 馬克-to-win:注意這里的key是個pid,只是一個值。因為reducer當(dāng)中不能遍歷兩遍,所以在第一遍遍歷的時候,把所有的Order(一對多的多)先放在一個ArrayList當(dāng)中,因為Product 的Bean只有一個實例(一對多的一),所以把它先存在一個實例當(dāng)中,第二次遍歷就發(fā)生在ArrayList的當(dāng)中,把那個Product的屬性給 Order Bean當(dāng)中的屬性就行了,舉個例子:p1,java,11和3,p1,5和1,p1,3,這3條數(shù)據(jù)形成的3條unionBean(3,p1,5,"",0和1,p1,3,"", 0和0,p1,0,java,11)就同時進入到reduce當(dāng)中,p1,java,11就進入pBeanMiddle, 3,p1,5和1,p1,3就進入兩個oBeanMiddle.即resultList*/
        for (UnionBean value : values) {
            if (0==value.getOrder_id()) {
                try {            
                    BeanUtils.copyProperties(pBeanMiddle, value);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            } else {
                UnionBean oBeanMiddle = new UnionBean();
                try {
                    BeanUtils.copyProperties(oBeanMiddle, value);
/*這里是一對多中的多, 所以要用一個List來處理*/                  
                    resultList.add(oBeanMiddle);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        for (UnionBean resultBean : resultList) {
            resultBean.setPname(pBeanMiddle.getPname());
            resultBean.setPrice(pBeanMiddle.getPrice());
            context.write(resultBean, NullWritable.get());
        }
    }
}


package com;
import java.io.File;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class JoinTestMark_to_win {
    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(JoinTest.class);
        job.setMapperClass(JoinMapper.class);
        job.setReducerClass(JoinReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(UnionBean.class);
        job.setOutputKeyClass(UnionBean.class);
        job.setOutputValueClass(NullWritable.class);
        File file = new File("e:/temp/output");
        if (file.exists() && file.isDirectory()) {
            deleteFile(file);
        }
/*inputJoin放了兩個文件, product.txt和order.txt*/      
        FileInputFormat.setInputPaths(job, new Path("e:/temp/inputJoin"));

/*或者用下面的本章前面的方法也可以。*/      
  //      FileInputFormat.addInputPath(job, new Path("e:/temp/inputJoin/order.txt"));
  //      FileInputFormat.addInputPath(job, new Path("e:/temp/inputJoin/product.txt"));

        FileOutputFormat.setOutputPath(job, new Path("e:/temp/output"));
        System.out.println("馬克-to-win @馬克java社區(qū) successful");
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    public static boolean deleteFile(File dirFile) {
        if (!dirFile.exists()) {
            return false;
        }
        if (dirFile.isFile()) {
            return dirFile.delete();
        } else { /* 空目錄就不進入for循環(huán)了, 進入到下一句dirFile.delete(); */
            for (File file : dirFile.listFiles()) {
                deleteFile(file);
            }
        }
        return dirFile.delete();
    }
}

文件輸出結(jié)果是:

order_id=1, p_id=p1, number=3, pname=java, price=11.0, sum=33.0
order_id=3, p_id=p1, number=5, pname=java, price=11.0, sum=55.0
order_id=1, p_id=p2, number=2, pname=c, price=22.0, sum=44.0
order_id=2, p_id=p3, number=3, pname=c#, price=33.0, sum=99.0
order_id=2, p_id=p4, number=1, pname=python, price=44.0, sum=44.0
order_id=2, p_id=p5, number=4, pname=js, price=66.0, sum=264.0