MapReduce當(dāng)中尋找用戶間的共同好友
馬克- to-win:馬克 java社區(qū):防盜版實名手機尾號: 73203。
尋找用戶間的共同好友
馬克-to-win @ 馬克java社區(qū):下面我們給出一個經(jīng)典的案例:尋找用戶間的共同好友。(有意思的是:網(wǎng)上討論這個案例的雖多,但都有這那的錯誤,不是數(shù)據(jù)錯就是程序錯, 總有同學(xué)和我比對,實際和我的是不一樣的)馬克-to-win @ 馬克java社區(qū):下面給出用戶的好友關(guān)系列表(注意是單向的, 騰訊QQ單向好友的算法經(jīng)常改動,曾經(jīng)有一段時間,是這樣規(guī)定的。我可以讓你加我為單向好友,這樣的話我的好友列表當(dāng)中沒有你,但你的好友列表當(dāng)中卻有我,對我的好處就是可以節(jié)省我的資源,那一段兒時間規(guī)定單向好友可以5000甚至更多,但是雙向好友最多只能加500個。于是單向好友就非常流行 ),每一行代表一個用戶和他的好友列表。
A:B,C,D,F,E,O
B:A,C,E,F
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J
馬克-to-win @ 馬克java社區(qū):防盜版實名手機尾號:73203?,F(xiàn)在需要找出用戶間的共同好友。目測:A:B,C,D,F,E,O。 A的好友有這些人。 B:A,C,E,F, B的好友有這些人。A和B的共同好友是C,E,F。其實如果不用大數(shù)據(jù)的思維,很簡單,知道:A:B,C,D,F,E,O。而且B:A,C,E,F得出A -B: C,E,F, 這算法并不難。之后再窮盡一下。(A-B,A-C,A-D。。。。之后再B-C,B-D。。。。二維數(shù)組就可以搞定)。但如果數(shù)據(jù)量大的話,內(nèi)存就崩了,為什么? 因為假設(shè)字母有非常多,整個走一遍, 才能知道A到底和誰有關(guān)系,A-B,A-C,A-D, 所以走一遍只能處理一個字母,想走第2遍,需要把第一輪的數(shù)據(jù)整個存在一個hashmap中,崩潰怎么辦? 那B怎么辦?所以這種方法不行.
馬克-to-win @ 馬克java社區(qū):A-B: C,E,F 這結(jié)論用大數(shù)據(jù)來做是這樣:第一步:要知道C在A的好友列表,C也在B的好友列表。這就要知道C到底在多少人的好友列表?答案是:C H-K-B-A-G-E-F(即C在這許多人的好友列表),怎么做,見下?第二步,有了如上的數(shù)據(jù),就好做窮盡,AB好友列表都有C。AE好友列表頁都有 C。以AB為鍵,馬上會發(fā)現(xiàn) AB好友列表也都有E,而且也都有F。這樣,最終的結(jié)論就有了:A -B: C,E,F。
馬克-to-win @ 馬克java社區(qū):這件事用一個MapReduce不行,得用兩個,才能搞定。思路是這樣:
馬克-to-win @ 馬克java社區(qū):在第一個mapper中:讀一行A:B,C,D,F,E,O(A的好友列表有這些,反過來拆開) 輸出<B,A> <C,A> <D,A> <F,A> <E,A> <O,A>,(前者在后者的好友列表)再讀一行B:A,C,E,F ,輸出<A,B> <C,B> <E,B> <F,B>,
馬克-to-win @ 馬克java社區(qū):之后在第一個Reducer中:key相同的會分到一組,例如:<C,A><C,B><C,E><C,F><C,G>......(還是前者在后者的好友列表)
Key:C
value: [ A, B, E, F, G ]
馬克-to-win @ 馬克java社區(qū):輸出為:C H-K-B-A-G-E-F。什么意思呢? C在H-K-B-A-G-E-F的好友列表。
有一個疑問:原始數(shù)據(jù):C:F,A,D,I意味著c的單向好友列表有這四個人。二者有區(qū)別, 注意單向問題。
最后整體輸出為 :
A F-H-D-G-B-K-C-I-O
B E-A-F-J
C H-K-B-A-G-E-F
D E-C-L-K-A-H-G-F
E G-F-H-L-M-D-B-A
F B-M-L-A-G-D-C
G M
H O
I C-O
J O
L D-E
M E-F
O A-H-I-J-F
馬克-to-win @ 馬克java社區(qū):之后在第二個mapper中,拿一行舉例:輸入是: A F-H-D-G-B-K-C-I-O 意味著:A在F-H-D-G-B-K-C-I-O好友列表。
經(jīng)過遍歷,窮盡,希望輸出結(jié)果是:
B-C A (意味著,B和C都有A這個好友。藍色)
B-D A
B-F A
B-G A
B-H A
B-I A
B-K A
B-O A
C-D A
C-F A
C-G A
C-H A
C-I A
C-K A
C-O A
D-F A
D-G A
D-H A
D-I A
D-K A
D-O A
F-G A
F-H A
F-I A
F-K A
F-O A
G-H A
G-I A
G-K A
G-O A
H-I A
H-K A
H-O A
I-K A
I-O A
K-O A
A-E B
A-F B
A-J B
E-F B
E-J B
。。。。。
G-L F
G-M F
L-M F
C-O I
D-E L
E-F M
A-F O
A-H O
A-I O
A-J O
F-H O
F-I O
F-J O
H-I O
H-J O
I-J O
在第二個mapper中:
讀入數(shù)據(jù),key相同的在一組 <A-B,C><A-B,E><A-B,F>
最后輸出的結(jié)果是:A-B C,E,F, 代表C,E,F(xiàn)都在A和B的好友列表里,A和B 都有好友C,E,F(xiàn)
總結(jié)一下:第一步,找出都有誰有A這個好友,第二步,找出都有誰和誰同時有A這個好友,第三步,找出都有誰和誰,同時有誰誰誰這些好友。這也正是需求。
package com;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class CommonFriendsMapper extends Mapper<LongWritable, Text, Text, Text>{
Text k = new Text();
Text v = new Text();
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split(":");
String user = fields[0];
String[] friends = fields[1].split(",");
(購買完整教程)
k.set(friend);
v.set(user);
/*讀一行A:B,C,D,F,E,O(A的好友有這些,反過來拆開,這些人中的每一個都是A的好友)
輸出<B,A> <C,A> <D,A> <F,A> <E,A> <O,A>*/
context.write(k, v);
}
}
}
package com;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class CommonFriendsReducer extends Reducer<Text, Text, Text, Text> {
Text v = new Text();
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
/*輸入為比如:<C,A><C,B><C,E><C,F><C,G>*/
StringBuffer sb = new StringBuffer();
for (Text value : values) {
sb.append(value).append("-");
}
// 去掉最后一個字符(-)
sb.deleteCharAt(sb.length() - 1);
v.set(sb.toString());
/*輸出為 C H-K-B-A-G-E-F*/
context.write(key, v);
}
}
package com;
import java.io.File;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class CommonFriendsTestMark_to_win {
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(CommonFriendsTest.class);
job.setMapperClass(CommonFriendsMapper.class);
job.setReducerClass(CommonFriendsReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
File file = new File("e:/temp/output");
if (file.exists() && file.isDirectory()) {
deleteFile(file);
}
FileInputFormat.setInputPaths(job, new Path("e:/temp/input/friend.txt"));
FileOutputFormat.setOutputPath(job, new Path("e:/temp/output"));
System.out.println("馬克-to-win @馬克java社區(qū) successful");
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 :1);
}
public static boolean deleteFile(File dirFile) {
if (!dirFile.exists()) {
return false;
}
if (dirFile.isFile()) {
return dirFile.delete();
} else { /* 馬克-to-win @ 馬克java社區(qū):防盜版實名手機尾號:73203 空目錄就不進入for循環(huán)了, 進入到下一句dirFile.delete(); */
for (File file : dirFile.listFiles()) {
deleteFile(file);
}
}
return dirFile.delete();
}
}
輸入文件:
A:B,C,D,F,E,O
B:A,C,E,F
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J
輸出:
A F-H-D-G-B-K-C-I-O
B E-A-F-J
C H-K-B-A-G-E-F
D E-C-L-K-A-H-G-F
E G-F-H-L-M-D-B-A
F B-M-L-A-G-D-C
G M
H O
I C-O
J O
L D-E
M E-F
O A-H-I-J-F
把第一個Mapreduce的輸出作為輸入。
package com;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.Arrays;
public class CommonFriendsMapper2 extends Mapper<LongWritable, Text, Text, Text>{
Text k = new Text();
Text v = new Text();
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
/* 馬克-to-win @ 馬克java社區(qū):輸入是: A F-H-D-G-B-K-C-I-O(默認分隔符\t)意味著:F-H-D-G-B-K-C-I-O都有A這個好友。
或者最后輸入是: O A-H-I-J-F
*/
String line = value.toString();
String[] fields = line.split("\t");
String user = fields[0];
String[] friends = fields[1].split("-");
/*比如前面的F-H-D-G-B-K-C-I-O*/
Arrays.sort(friends);
/*變成B-C-D-F-G-H-I-K-O*/
/*最后一行變成:A-F-H-I-J*/
/*遍歷*/
for (int i = 0; i < friends.length - 1; i++) {
for (int j = i + 1; j < friends.length; j++) {
k.set(friends[i] + "-" + friends[j]);
v.set(user);
System.out.println(friends[i] + "-" + friends[j]+" \t"+user);
/*馬克-to-win @ 馬克java社區(qū):輸出結(jié)果是:意味著,B和C都有A這個好友。
B-C A
B-D A
B-F A
B-G A
B-H A
B-I A
B-K A
B-O A
C-D A
C-F A
C-G A
C-H A
C-I A
C-K A
C-O A
D-F A
D-G A
D-H A
D-I A
D-K A
D-O A
F-G A
F-H A
F-I A
F-K A
F-O A
G-H A
G-I A
G-K A
G-O A
H-I A
H-K A
H-O A
I-K A
I-O A
K-O A
A-E B
A-F B
A-J B
E-F B
E-J B
。。。。。
G-L F
G-M F
L-M F
C-O I
D-E L
E-F M
A-F O
A-H O
A-I O
A-J O
F-H O
F-I O
F-J O
H-I O
H-J O
I-J O
*/
context.write(k, v);
}
}
}
}
package com;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class CommonFriendsReducer2 extends Reducer<Text, Text, Text, Text> {
Text v = new Text();
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
/*讀入數(shù)據(jù),key相同的在一組
<A-B,C><A-B,E><A-B,F>
*/
StringBuffer sb = new StringBuffer();
for (Text value : values) {
sb.append(value).append(",");
}
/*刪除最后一個,*/
sb.deleteCharAt(sb.length() - 1);
v.set(sb.toString());
/*最后輸出的結(jié)果是:A-B C,E,F, 代表A和B 都有好友C,E,F(xiàn)*/
context.write(key, v);
}
}
package com;
import java.io.File;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class CommonFriendsTest2Mark_to_win {
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(CommonFriendsTest2.class);
job.setMapperClass(CommonFriendsMapper2.class);
job.setReducerClass(CommonFriendsReducer2.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
File file = new File("e:/temp/output");
if (file.exists() && file.isDirectory()) {
deleteFile(file);
}
FileInputFormat.setInputPaths(job, new Path("e:/temp/input/friend1.txt"));
FileOutputFormat.setOutputPath(job, new Path("e:/temp/output"));
System.out.println("馬克-to-win @馬克java社區(qū) successful");
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 :1);
}
public static boolean deleteFile(File dirFile) {
if (!dirFile.exists()) {
return false;
}
if (dirFile.isFile()) {
return dirFile.delete();
} else { /* 空目錄就不進入for循環(huán)了, 進入到下一句dirFile.delete(); */
for (File file : dirFile.listFiles()) {
deleteFile(file);
}
}
return dirFile.delete();
}
}
輸出結(jié)果是:
A-B C,E,F
A-C D,F
A-D E,F
A-E C,D,B
A-F C,O,D,B,E
A-G F,C,D,E
A-H C,D,E,O
A-I O
A-J O,B
A-K D,C
A-L E,D,F
A-M F,E
B-C F,A
B-D F,E,A
B-E C
B-F C,A,E
B-G E,C,F,A
B-H C,A,E
B-I A
B-K C,A
B-L F,E
B-M E,F
B-O A
C-D F,A
C-E D
C-F D,A
C-G F,D,A
C-H D,A
C-I A
C-K A,D
C-L F,D
C-M F
C-O A,I
D-E L
D-F A,E
D-G F,A,E
D-H A,E
D-I A
D-K A
D-L E,F
D-M F,E
D-O A
E-F D,M,C,B
E-G C,D
E-H C,D
E-J B
E-K C,D
E-L D
F-G D,C,E,A
F-H A,C,O,D,E
F-I O,A
F-J B,O
F-K C,D,A
F-L E,D
F-M E
F-O A
G-H C,D,E,A
G-I A
G-K C,A,D
G-L D,E,F
G-M E,F
G-O A
H-I A,O
H-J O
H-K A,C,D
H-L E,D
H-M E
H-O A
I-J O
I-K A
I-O A
K-L D
K-O A
L-M F,E