MapReduce當中的reduce當中的cleanup的用法
reduce當中的cleanup的用法:
馬克- to-win:馬克 java社區(qū):防盜版實名手機尾號: 73203。
馬克-to-win @ 馬克java社區(qū):上面的topN是解決每個組里的topN,比如每個訂單中的最小的。但如果需要橫向的比較所有的key(初學者忽略:cleanup方法慎用, 如果所有的key的數(shù)據(jù)巨大量怎么辦?Map map = new HashMap();內(nèi)存都不夠了,所以考慮多步mapreduce或聰明的算法做法,每處理完一個key,就刪除一些沒用的空間,比如把不是topN的給刪除了,這樣可以釋放一部分空間),選出topN,得用cleanup。
馬克-to-win @ 馬克java社區(qū):從現(xiàn)在開始,我們講一些特殊用法,我們知道,map是讀一行執(zhí)行一次,reduce是對每一key,或一組,執(zhí)行一次。但是如果需求是當我們得到全部數(shù)據(jù)之后,需要再做一些處理,之后再輸出怎么辦?這時候setUp或cleanUp就登場了,他們像servlet的init和 destroy一樣都只執(zhí)行一次。map和reduce都有setUp或cleanUp,原理一樣。我們只拿reduce做例子。
馬克-to-win @ 馬克java社區(qū):這樣對于最終數(shù)據(jù)的過濾篩選和輸出步驟,要放在cleanUp中。前面我們的例子都是一行一行(對于map),一組一組(對于 reduce)輸出,借助cleanup,我們可以全部拿到數(shù)據(jù),完全按照java過去的算法,最后過濾輸出。下面我們用它解決topN問題。
還以wordcount為例,求出單詞出現(xiàn)數(shù)量前三名。
package com;
import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.Map.Entry;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class ReduceSetupTestMark_to_win {
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private IntWritable one = new IntWritable(1);
private Text word = new Text();
/*
hello a hello win
hello a to
hello mark
*/
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
System.out.println("key is " + key.toString() + " value is " + value.toString());
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
Map map = new HashMap();
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
System.out.println("reduce key is " + key.toString());
int sum = 0;
for (IntWritable val : values) {
System.out.println("val is " + val.toString());
sum += val.get();
}
String keyString = key.toString();
map.put(keyString, sum);
// System.out.println("val is " + val.toString());
// result.set(sum);
// context.write(key, result);
}
protected void cleanup(Context context) throws IOException, InterruptedException {
/* 放在一個List 當中, 之后排這個list, */
/*1)List<Integer> list = new ArrayList<Integer>();
2)public ArrayList(Collection<? extends E> c)
有兩種類型的構造函數(shù)。
*/
List<Map.Entry<String, Integer>> list = new ArrayList<Map.Entry<String, Integer>>(map.entrySet());
Collections.sort(list, new Comparator<Map.Entry<String, Integer>>() {
public int compare(Entry<String, Integer> o1, Entry<String, Integer> o2) {
return -1 * (o1.getValue().compareTo(o2.getValue()));
}
});
for (int i = 0; i < 3; i++) {
context.write(new Text(list.get(i).getKey()), new IntWritable(list.get(i).getValue()));
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "word count");
job.setJarByClass(ReduceSetupTest.class);
job.setMapperClass(TokenizerMapper.class);
// job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
File file = new File("e:/temp/output");
if (file.exists() && file.isDirectory()) {
deleteFile(file);
}
FileInputFormat.setInputPaths(job, new Path("e:/temp/input/cleanup.txt"));
FileOutputFormat.setOutputPath(job, new Path("e:/temp/output"));
System.out.println("mytest hadoop successful");
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static boolean deleteFile(File dirFile) {
if (!dirFile.exists()) {
return false;
}
if (dirFile.isFile()) {
return dirFile.delete();
} else { /* 空目錄就不進入for循環(huán)了, 進入到下一句dirFile.delete(); */
for (File file : dirFile.listFiles()) {
deleteFile(file);
}
}
return dirFile.delete();
}
}
輸出結果:
reduce key is a
val is 1
val is 1
reduce key is hello
val is 1
val is 1
val is 1
val is 1
reduce key is mark
val is 1
reduce key is to
val is 1
reduce key is win
val is 1
輸出的文件 :
hello 4
a 2
to 1