MapReduce當(dāng)中尋找用戶間的共同好友

馬克- to-win:馬克 java社區(qū):防盜版實名手機尾號: 73203。
尋找用戶間的共同好友


馬克-to-win @ 馬克java社區(qū):下面我們給出一個經(jīng)典的案例:尋找用戶間的共同好友。(有意思的是:網(wǎng)上討論這個案例的雖多,但都有這那的錯誤,不是數(shù)據(jù)錯就是程序錯, 總有同學(xué)和我比對,實際和我的是不一樣的)馬克-to-win @ 馬克java社區(qū):下面給出用戶的好友關(guān)系列表(注意是單向的, 騰訊QQ單向好友的算法經(jīng)常改動,曾經(jīng)有一段時間,是這樣規(guī)定的。我可以讓你加我為單向好友,這樣的話我的好友列表當(dāng)中沒有你,但你的好友列表當(dāng)中卻有我,對我的好處就是可以節(jié)省我的資源,那一段兒時間規(guī)定單向好友可以5000甚至更多,但是雙向好友最多只能加500個。于是單向好友就非常流行 ),每一行代表一個用戶和他的好友列表。



A:B,C,D,F,E,O
B:A,C,E,F
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J




馬克-to-win @ 馬克java社區(qū):防盜版實名手機尾號:73203?,F(xiàn)在需要找出用戶間的共同好友。目測:A:B,C,D,F,E,O。 A的好友有這些人。 B:A,C,E,F, B的好友有這些人。A和B的共同好友是C,E,F。其實如果不用大數(shù)據(jù)的思維,很簡單,知道:A:B,C,D,F,E,O。而且B:A,C,E,F得出A -B:  C,E,F, 這算法并不難。之后再窮盡一下。(A-B,A-C,A-D。。。。之后再B-C,B-D。。。。二維數(shù)組就可以搞定)。但如果數(shù)據(jù)量大的話,內(nèi)存就崩了,為什么? 因為假設(shè)字母有非常多,整個走一遍, 才能知道A到底和誰有關(guān)系,A-B,A-C,A-D, 所以走一遍只能處理一個字母,想走第2遍,需要把第一輪的數(shù)據(jù)整個存在一個hashmap中,崩潰怎么辦? 那B怎么辦?所以這種方法不行.

馬克-to-win @ 馬克java社區(qū):A-B:  C,E,F  這結(jié)論用大數(shù)據(jù)來做是這樣:第一步:要知道C在A的好友列表,C也在B的好友列表。這就要知道C到底在多少人的好友列表?答案是:C    H-K-B-A-G-E-F(即C在這許多人的好友列表),怎么做,見下?第二步,有了如上的數(shù)據(jù),就好做窮盡,AB好友列表都有C。AE好友列表頁都有 C。以AB為鍵,馬上會發(fā)現(xiàn) AB好友列表也都有E,而且也都有F。這樣,最終的結(jié)論就有了:A -B:  C,E,F。

馬克-to-win @ 馬克java社區(qū):這件事用一個MapReduce不行,得用兩個,才能搞定。思路是這樣:

馬克-to-win @ 馬克java社區(qū):在第一個mapper中:讀一行A:B,C,D,F,E,O(A的好友列表有這些,反過來拆開)    輸出<B,A> <C,A> <D,A> <F,A> <E,A> <O,A>,(前者在后者的好友列表)再讀一行B:A,C,E,F ,輸出<A,B> <C,B> <E,B> <F,B>,

馬克-to-win @ 馬克java社區(qū):之后在第一個Reducer中:key相同的會分到一組,例如:<C,A><C,B><C,E><C,F><C,G>......(還是前者在后者的好友列表)
Key:C
value: [ A, B, E, F, G ]

馬克-to-win @ 馬克java社區(qū):輸出為:C    H-K-B-A-G-E-F。什么意思呢? C在H-K-B-A-G-E-F的好友列表。
有一個疑問:原始數(shù)據(jù):C:F,A,D,I意味著c的單向好友列表有這四個人。二者有區(qū)別, 注意單向問題。


最后整體輸出為 :

A    F-H-D-G-B-K-C-I-O
B    E-A-F-J
C    H-K-B-A-G-E-F
D    E-C-L-K-A-H-G-F
E    G-F-H-L-M-D-B-A
F    B-M-L-A-G-D-C
G    M
H    O
I    C-O
J    O
L    D-E
M    E-F
O    A-H-I-J-F

馬克-to-win @ 馬克java社區(qū):之后在第二個mapper中,拿一行舉例:輸入是:    A    F-H-D-G-B-K-C-I-O 意味著:A在F-H-D-G-B-K-C-I-O好友列表。
經(jīng)過遍歷,窮盡,希望輸出結(jié)果是:
B-C     A      (意味著,B和C都有A這個好友。藍色)
B-D     A
B-F     A
B-G     A
B-H     A
B-I     A
B-K     A
B-O     A
C-D     A
C-F     A
C-G     A
C-H     A
C-I     A
C-K     A
C-O     A
D-F     A
D-G     A
D-H     A
D-I     A
D-K     A
D-O     A
F-G     A
F-H     A
F-I     A
F-K     A
F-O     A
G-H     A
G-I     A
G-K     A
G-O     A
H-I     A
H-K     A
H-O     A
I-K     A
I-O     A
K-O     A
A-E     B
A-F     B
A-J     B
E-F     B
E-J     B
。。。。。

G-L     F
G-M     F
L-M     F
C-O     I
D-E     L
E-F     M
A-F     O
A-H     O
A-I     O
A-J     O
F-H     O
F-I     O
F-J     O
H-I     O
H-J     O
I-J     O



在第二個mapper中:
讀入數(shù)據(jù),key相同的在一組  <A-B,C><A-B,E><A-B,F>
最后輸出的結(jié)果是:A-B   C,E,F,  代表C,E,F(xiàn)都在A和B的好友列表里,A和B 都有好友C,E,F(xiàn)




總結(jié)一下:第一步,找出都有誰有A這個好友,第二步,找出都有誰和誰同時有A這個好友,第三步,找出都有誰和誰,同時有誰誰誰這些好友。這也正是需求。

package com;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class CommonFriendsMapper extends Mapper<LongWritable, Text, Text, Text>{
    Text k = new Text();
    Text v = new Text();
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] fields = line.split(":");
        String user = fields[0];
        String[] friends = fields[1].split(",");
(購買完整教程)
            k.set(friend);
            v.set(user);
/*讀一行A:B,C,D,F,E,O(A的好友有這些,反過來拆開,這些人中的每一個都是A的好友)
輸出<B,A> <C,A> <D,A> <F,A> <E,A> <O,A>*/
            context.write(k, v);
        }
    }
}




package com;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class CommonFriendsReducer extends Reducer<Text, Text, Text, Text> {
    Text v = new Text();
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
/*輸入為比如:<C,A><C,B><C,E><C,F><C,G>*/      
        StringBuffer sb = new StringBuffer();
        for (Text value : values) {
            sb.append(value).append("-");
        }
        // 去掉最后一個字符(-)
        sb.deleteCharAt(sb.length() - 1);
        v.set(sb.toString());
/*輸出為 C    H-K-B-A-G-E-F*/      
        context.write(key, v);
    }
}




package com;
import java.io.File;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class CommonFriendsTestMark_to_win {
    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(CommonFriendsTest.class);
        job.setMapperClass(CommonFriendsMapper.class);
        job.setReducerClass(CommonFriendsReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        File file = new File("e:/temp/output");
        if (file.exists() && file.isDirectory()) {
            deleteFile(file);
        }
        FileInputFormat.setInputPaths(job, new Path("e:/temp/input/friend.txt"));
        FileOutputFormat.setOutputPath(job, new Path("e:/temp/output"));
        System.out.println("馬克-to-win @馬克java社區(qū) successful");
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 :1);
    }
  
    public static boolean deleteFile(File dirFile) {
        if (!dirFile.exists()) {
            return false;
        }
        if (dirFile.isFile()) {
            return dirFile.delete();
        } else { /* 馬克-to-win @ 馬克java社區(qū):防盜版實名手機尾號:73203 空目錄就不進入for循環(huán)了, 進入到下一句dirFile.delete(); */
            for (File file : dirFile.listFiles()) {
                deleteFile(file);
            }
        }
        return dirFile.delete();
    }
}



輸入文件:

A:B,C,D,F,E,O
B:A,C,E,F
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J




輸出:

A    F-H-D-G-B-K-C-I-O
B    E-A-F-J
C    H-K-B-A-G-E-F
D    E-C-L-K-A-H-G-F
E    G-F-H-L-M-D-B-A
F    B-M-L-A-G-D-C
G    M
H    O
I    C-O
J    O
L    D-E
M    E-F
O    A-H-I-J-F





把第一個Mapreduce的輸出作為輸入。

package com;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.Arrays;
public class CommonFriendsMapper2 extends Mapper<LongWritable, Text, Text, Text>{
    Text k = new Text();
    Text v = new Text();
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
       /* 馬克-to-win @ 馬克java社區(qū):輸入是:    A    F-H-D-G-B-K-C-I-O(默認分隔符\t)意味著:F-H-D-G-B-K-C-I-O都有A這個好友。
        或者最后輸入是:     O    A-H-I-J-F
        */
        String line = value.toString();
        String[] fields = line.split("\t");
        String user = fields[0];
        String[] friends = fields[1].split("-");
/*比如前面的F-H-D-G-B-K-C-I-O*/      
        Arrays.sort(friends);
/*變成B-C-D-F-G-H-I-K-O*/   
/*最后一行變成:A-F-H-I-J*/       
/*遍歷*/      
        for (int i = 0; i < friends.length - 1; i++) {
            for (int j = i + 1; j < friends.length; j++) {
                k.set(friends[i] + "-" + friends[j]);
                v.set(user);
                System.out.println(friends[i] + "-" + friends[j]+" \t"+user);
/*馬克-to-win @ 馬克java社區(qū):輸出結(jié)果是:意味著,B和C都有A這個好友。
B-C     A
B-D     A
B-F     A
B-G     A
B-H     A
B-I     A
B-K     A
B-O     A
C-D     A
C-F     A
C-G     A
C-H     A
C-I     A
C-K     A
C-O     A
D-F     A
D-G     A
D-H     A
D-I     A
D-K     A
D-O     A
F-G     A
F-H     A
F-I     A
F-K     A
F-O     A
G-H     A
G-I     A
G-K     A
G-O     A
H-I     A
H-K     A
H-O     A
I-K     A
I-O     A
K-O     A
A-E     B
A-F     B
A-J     B
E-F     B
E-J     B
。。。。。

G-L     F
G-M     F
L-M     F
C-O     I
D-E     L
E-F     M
A-F     O
A-H     O
A-I     O
A-J     O
F-H     O
F-I     O
F-J     O
H-I     O
H-J     O
I-J     O
*/
              
                context.write(k, v);
            }
        }
    }
}




package com;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class CommonFriendsReducer2 extends Reducer<Text, Text, Text, Text> {
    Text v = new Text();
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
/*讀入數(shù)據(jù),key相同的在一組
<A-B,C><A-B,E><A-B,F>
*/      
        StringBuffer sb = new StringBuffer();
        for (Text value : values) {
            sb.append(value).append(",");
        }
        /*刪除最后一個,*/
        sb.deleteCharAt(sb.length() - 1);
        v.set(sb.toString());
/*最后輸出的結(jié)果是:A-B   C,E,F,  代表A和B 都有好友C,E,F(xiàn)*/      
        context.write(key, v);
    }
}






package com;
import java.io.File;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class CommonFriendsTest2Mark_to_win {
    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(CommonFriendsTest2.class);
        job.setMapperClass(CommonFriendsMapper2.class);
        job.setReducerClass(CommonFriendsReducer2.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        File file = new File("e:/temp/output");
        if (file.exists() && file.isDirectory()) {
            deleteFile(file);
        }
        FileInputFormat.setInputPaths(job, new Path("e:/temp/input/friend1.txt"));
        FileOutputFormat.setOutputPath(job, new Path("e:/temp/output"));
        System.out.println("馬克-to-win @馬克java社區(qū) successful");
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 :1);
    }
  
    public static boolean deleteFile(File dirFile) {
        if (!dirFile.exists()) {
            return false;
        }
        if (dirFile.isFile()) {
            return dirFile.delete();
        } else { /* 空目錄就不進入for循環(huán)了, 進入到下一句dirFile.delete(); */
            for (File file : dirFile.listFiles()) {
                deleteFile(file);
            }
        }
        return dirFile.delete();
    }
}



輸出結(jié)果是:

A-B    C,E,F
A-C    D,F
A-D    E,F
A-E    C,D,B
A-F    C,O,D,B,E
A-G    F,C,D,E
A-H    C,D,E,O
A-I    O
A-J    O,B
A-K    D,C
A-L    E,D,F
A-M    F,E
B-C    F,A
B-D    F,E,A
B-E    C
B-F    C,A,E
B-G    E,C,F,A
B-H    C,A,E
B-I    A
B-K    C,A
B-L    F,E
B-M    E,F
B-O    A
C-D    F,A
C-E    D
C-F    D,A
C-G    F,D,A
C-H    D,A
C-I    A
C-K    A,D
C-L    F,D
C-M    F
C-O    A,I
D-E    L
D-F    A,E
D-G    F,A,E
D-H    A,E
D-I    A
D-K    A
D-L    E,F
D-M    F,E
D-O    A
E-F    D,M,C,B
E-G    C,D
E-H    C,D
E-J    B
E-K    C,D
E-L    D
F-G    D,C,E,A
F-H    A,C,O,D,E
F-I    O,A
F-J    B,O
F-K    C,D,A
F-L    E,D
F-M    E
F-O    A
G-H    C,D,E,A
G-I    A
G-K    C,A,D
G-L    D,E,F
G-M    E,F
G-O    A
H-I    A,O
H-J    O
H-K    A,C,D
H-L    E,D
H-M    E
H-O    A
I-J    O
I-K    A
I-O    A
K-L    D
K-O    A
L-M    F,E