MapReduce當(dāng)中自定義輸出:多文件輸出MultipleOutputs
自定義輸出:多文件輸出MultipleOutputs
馬克- to-win:馬克 java社區(qū):防盜版實名手機尾號: 73203。
馬克-to-win @ 馬克java社區(qū):對于剛才的單獨訂單topN的問題,如果需要把單獨的訂單id的記錄放在自己的一個文件中,并以訂單id命名。怎么辦?multipleOutputs可以幫我們解決這個問題。注意:和我們本章開始講的多文件輸出不一樣的是,這里的多文件輸出還可以跟程序的業(yè)務(wù)邏輯綁定在一起,比如文件的名字和訂單有關(guān)系且把此個訂單的最大值放在文件當(dāng)中。
multipleOutputs的用法可以見底下的程序。
package com;
import java.io.File;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
public class MultipleOutputsTest {
public static class TokenizerMapper extends Mapper<Object, Text, Text, DoubleWritable> {
/*
o1abc,p2,250.0
o2kkk,p3,500.0
o2kkk,p4,100.0
o2kkk,p5,700.0
o3mmm,p1,150.0
o1abc,p1,200.0
*/
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
System.out.println("key is " + key.toString() + " value is " + value.toString());
String line = value.toString();
String[] fields = line.split(",");
String orderId = fields[0];
double amount = Double.parseDouble(fields[2]);
DoubleWritable amountDouble = new DoubleWritable(amount);
context.write(new Text(orderId), amountDouble);
}
}
public static class IntSumReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
DoubleWritable resultDouble = new DoubleWritable(0.0);
private MultipleOutputs<Text, DoubleWritable> multipleOutputs;
protected void setup(Context context) throws IOException, InterruptedException {
multipleOutputs = new MultipleOutputs<Text, DoubleWritable>(context);
}
public void reduce(Text key, Iterable<DoubleWritable> values, Context context)
throws IOException, InterruptedException {
System.out.println("reduce key is " + key.toString());
double max = Double.MIN_VALUE;
for (DoubleWritable v2 : values) {
if (v2.get() > max) {
max = v2.get();
}
}
resultDouble.set(max);
String fileName = key.toString().substring(0, 4);
System.out.println("fileName is "+ fileName);
/* multipleOutputs用于map輸出時采用:name-m-nnnnn 形式的文件名,用于reduce輸出時采用name-r-nnnnn 形式的文件名,其中 name 是由程序設(shè)定的任意名字, nnnnn 是一個指明塊號的整數(shù)(從 0 開始)。塊號保證在相同名字name情況下不會沖突
void MultipleOutputs.write(Text key, DoubleWritable value, String baseOutputPath)
Write key 和 value to an output file name.
Parameters:key:the key。value: the value。 baseOutputPath: base-output path to write the record to. */
multipleOutputs.write(key, resultDouble, fileName);
// context.write(key, resultDouble);
}
protected void cleanup(Context context) throws IOException ,InterruptedException{
multipleOutputs.close();
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "word count");
job.setJarByClass(MultipleOutputsTest.class);
job.setMapperClass(TokenizerMapper.class);
// job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
File file = new File("e:/temp/output");
if (file.exists() && file.isDirectory()) {
deleteFile(file);
}
FileInputFormat.setInputPaths(job, new Path("e:/temp/input/multipleOutput.txt"));
FileOutputFormat.setOutputPath(job, new Path("e:/temp/output"));
System.out.println("mytest hadoop successful");
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static boolean deleteFile(File dirFile) {
if (!dirFile.exists()) {
return false;
}
if (dirFile.isFile()) {
return dirFile.delete();
} else { /* 空目錄就不進入for循環(huán)了, 進入到下一句dirFile.delete(); */
for (File file : dirFile.listFiles()) {
deleteFile(file);
}
}
return dirFile.delete();
}
}
輸入文件是:
o1abc,p2,250.0
o2kkk,p3,500.0
o2kkk,p4,100.0
o2kkk,p5,700.0
o3mmm,p1,150.0
o1abc,p1,200.0
輸出文件是:
console的輸出是:
reduce key is 馬克-to-win @ 馬克java社區(qū):o1abc
name is o1ab
INFO - File Output Committer Algorithm version is 1
reduce key is 馬克-to-win @ 馬克java社區(qū):o2kkk
name is o2kk
INFO - File Output Committer Algorithm version is 1
reduce key is 馬克-to-win @ 馬克java社區(qū):o3mmm
name is o3mm
o1ab-r-00000這個文件的輸出是:
o1abc 250.0