MapReduce當(dāng)中全局變量的用法
馬克-to-win @ 馬克java社區(qū):防盜版實名手機(jī)尾號:73203。如想傳遞變量,程序可以在main函數(shù)中,利用Congfiguraion類的set函數(shù)將一些簡單的數(shù)據(jù)結(jié)構(gòu)放到到Congfiguraion中,map或reduce task任務(wù)啟動的過程中(比如setup函數(shù))通過Configuration類的get函數(shù)讀取即可。
一切的代碼和上面都一樣,只是加了一個全局變量的特性。
package com;
import java.io.File;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class GlobalTestMark_to_win {
public static class TokenizerMapper extends Mapper<Object, Text, Text, DoubleWritable> {
String name;
protected void setup(Context context)
throws IOException, InterruptedException {
//從全局配置獲取配置參數(shù)
Configuration conf = context.getConfiguration();
name = conf.get("name"); //這樣就拿到了,但不能設(shè)置。想設(shè)置用其他方法。這里不深究了
}
/*
o1,p2,250.0
o2,p3,500.0
o2,p4,100.0
o2,p5,700.0
o3,p1,150.0
o1,p1,200.0
*/
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
System.out.println("name is "+name+"key is " + key.toString() + " value is " + value.toString());
String line = value.toString();
String[] fields = line.split(",");
String orderId = fields[0];
double amount = Double.parseDouble(fields[2]);
DoubleWritable amountDouble = new DoubleWritable(amount);
context.write(new Text(orderId), amountDouble);
}
}
public static class IntSumReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
String name1;
DoubleWritable resultDouble = new DoubleWritable(0.0);
protected void setup(Context context)
throws IOException, InterruptedException {
//從全局配置獲取配置參數(shù)
Configuration conf = context.getConfiguration();
name1 = conf.get("name"); //這樣就拿到了
}
public void reduce(Text key, Iterable<DoubleWritable> values, Context context)
throws IOException, InterruptedException {
System.out.println("name1 is "+name1+"reduce key is " + key.toString());
double max = Double.MIN_VALUE;
for (DoubleWritable v2 : values) {
if (v2.get() > max) {
max = v2.get();
}
}
resultDouble.set(max);
context.write(key, resultDouble);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("name", "馬克-to-win");
Job job = new Job(conf, "word count");
job.setJarByClass(GlobalTestMark_to_win.class);
job.setMapperClass(TokenizerMapper.class);
// job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
File file = new File("e:/temp/output");
if (file.exists() && file.isDirectory()) {
deleteFile(file);
}
FileInputFormat.setInputPaths(job, new Path("e:/temp/input/serial.txt"));
FileOutputFormat.setOutputPath(job, new Path("e:/temp/output"));
System.out.println("mytest hadoop successful");
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static boolean deleteFile(File dirFile) {
if (!dirFile.exists()) {
return false;
}
if (dirFile.isFile()) {
return dirFile.delete();
} else { /* 空目錄就不進(jìn)入for循環(huán)了, 進(jìn)入到下一句dirFile.delete(); */
for (File file : dirFile.listFiles()) {
deleteFile(file);
}
}
return dirFile.delete();
}
}
map里的輸出是:
name is 馬克-to-winkey is 0 value is o1,p2,250.0
name is 馬克-to-winkey is 13 value is o2,p3,500.0
name is 馬克-to-winkey is 26 value is o2,p4,100.0
name is 馬克-to-winkey is 39 value is o2,p5,700.0
name is 馬克-to-winkey is 52 value is o3,p1,150.0
name is 馬克-to-winkey is 65 value is o1,p1,200.0
reduce里的輸出是:
name1 is 馬克-to-winreduce key is o1
name1 is 馬克-to-winreduce key is o2
name1 is 馬克-to-winreduce key is o3