MapReduce當中的reduce當中的cleanup的用法

reduce當中的cleanup的用法:
馬克- to-win:馬克 java社區(qū):防盜版實名手機尾號: 73203。
馬克-to-win @ 馬克java社區(qū):上面的topN是解決每個組里的topN,比如每個訂單中的最小的。但如果需要橫向的比較所有的key(初學者忽略:cleanup方法慎用, 如果所有的key的數(shù)據(jù)巨大量怎么辦?Map map = new HashMap();內(nèi)存都不夠了,所以考慮多步mapreduce或聰明的算法做法,每處理完一個key,就刪除一些沒用的空間,比如把不是topN的給刪除了,這樣可以釋放一部分空間),選出topN,得用cleanup。

馬克-to-win @ 馬克java社區(qū):從現(xiàn)在開始,我們講一些特殊用法,我們知道,map是讀一行執(zhí)行一次,reduce是對每一key,或一組,執(zhí)行一次。但是如果需求是當我們得到全部數(shù)據(jù)之后,需要再做一些處理,之后再輸出怎么辦?這時候setUp或cleanUp就登場了,他們像servlet的init和 destroy一樣都只執(zhí)行一次。map和reduce都有setUp或cleanUp,原理一樣。我們只拿reduce做例子。

馬克-to-win @ 馬克java社區(qū):這樣對于最終數(shù)據(jù)的過濾篩選和輸出步驟,要放在cleanUp中。前面我們的例子都是一行一行(對于map),一組一組(對于 reduce)輸出,借助cleanup,我們可以全部拿到數(shù)據(jù),完全按照java過去的算法,最后過濾輸出。下面我們用它解決topN問題。

還以wordcount為例,求出單詞出現(xiàn)數(shù)量前三名。




package com;

import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.Map.Entry;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class ReduceSetupTestMark_to_win {

    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
        private IntWritable one = new IntWritable(1);
        private Text word = new Text();
        /*
          hello a hello win
          hello a to
          hello mark
         */
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            System.out.println("key is " + key.toString() + " value is " + value.toString());
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }

    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        Map map = new HashMap();
        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            System.out.println("reduce key is " + key.toString());
            int sum = 0;
            for (IntWritable val : values) {
                System.out.println("val is " + val.toString());
                sum += val.get();
            }
            String keyString = key.toString();
            map.put(keyString, sum);
            // System.out.println("val is " + val.toString());
            // result.set(sum);
            // context.write(key, result);
        }

        protected void cleanup(Context context) throws IOException, InterruptedException {
            /* 放在一個List 當中, 之后排這個list, */
/*1)List<Integer> list = new ArrayList<Integer>();
2)public ArrayList(Collection<? extends E> c)
有兩種類型的構造函數(shù)。
*/  
            List<Map.Entry<String, Integer>> list = new ArrayList<Map.Entry<String, Integer>>(map.entrySet());
            Collections.sort(list, new Comparator<Map.Entry<String, Integer>>() {
                public int compare(Entry<String, Integer> o1, Entry<String, Integer> o2) {
                    return -1 * (o1.getValue().compareTo(o2.getValue()));
                }
            });
            for (int i = 0; i < 3; i++) {
                context.write(new Text(list.get(i).getKey()), new IntWritable(list.get(i).getValue()));
            }
        }

    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "word count");
        job.setJarByClass(ReduceSetupTest.class);
        job.setMapperClass(TokenizerMapper.class);
        // job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        File file = new File("e:/temp/output");
        if (file.exists() && file.isDirectory()) {
            deleteFile(file);
        }

        FileInputFormat.setInputPaths(job, new Path("e:/temp/input/cleanup.txt"));
        FileOutputFormat.setOutputPath(job, new Path("e:/temp/output"));

        System.out.println("mytest hadoop successful");
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    public static boolean deleteFile(File dirFile) {
        if (!dirFile.exists()) {
            return false;
        }
        if (dirFile.isFile()) {
            return dirFile.delete();
        } else { /* 空目錄就不進入for循環(huán)了, 進入到下一句dirFile.delete(); */
            for (File file : dirFile.listFiles()) {
                deleteFile(file);
            }
        }
        return dirFile.delete();
    }

}




輸出結果:
reduce key is a
val is 1
val is 1
reduce key is hello
val is 1
val is 1
val is 1
val is 1
reduce key is mark
val is 1
reduce key is to
val is 1
reduce key is win
val is 1


輸出的文件 :

hello    4
a    2
to    1