Win7 Eclipse 搭建spark java1.8環(huán)境:WordCount helloworld例子

Win7 Eclipse 搭建spark java1.8環(huán)境:WordCount helloworld例子
馬克- to-win:馬克 java社區(qū):防盜版實(shí)名手機(jī)尾號(hào): 73203。
馬克-to-win @ 馬克java社區(qū):在eclipse oxygen上創(chuàng)建一個(gè)普通的java項(xiàng)目,然后把spark-assembly-1.6.1-hadoop2.6.0.jar這個(gè)包導(dǎo)進(jìn)工程就ok了。只要啟動(dòng)start-dfs,下面的程序就可以運(yùn)行了。


package com;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

public class WordCount1 {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local").setAppName("wc");
/*沒有下面的話, 會(huì)報(bào)一個(gè)錯(cuò)誤,java.lang.IllegalArgumentException: System memory 259522560 must be at least 4.718592E8(470M). Please use a larger heap size.這是memory不夠,導(dǎo)致無法啟動(dòng)SparkContext*/      
        conf.set("spark.testing.memory", "2000000000");
        JavaSparkContext sc = new JavaSparkContext(conf);
/*下面的這種倒入的方法也行*/      
 //       JavaRDD<String> text = sc.textFile("hdfs://localhost:9000/README.txt");
/*原文件是:o1abc 45
o1abc 77
o1abc o1abc */      
        JavaRDD<String> text = sc.textFile("E://temp//input//friend.txt");
        List<String> strList = text.collect();
/*輸出str:o1abc 45
str:o1abc 77
str:o1abc o1abc*/      
        for (String str : strList) {
            System.out.println("str:" + str);
        }
/*Interface FlatMapFunction<T,R>, Iterable<R> call(T t)(注意之后的版本,返回值有所變化。)*/      
        JavaRDD<String> words = text.flatMap(new FlatMapFunction<String, String>() {
/*List的super Interface 是java.lang.Iterable*/          
            public Iterable<String> call(String line) throws Exception {
                System.out.println("flatMap once, line is "+line );
                String[] wordsArray=line.split(" ");
                List<String> wordsList=Arrays.asList(wordsArray);
                return wordsList;
            }
        });
        List<String> wordsList = words.collect();
/*輸出
flatMap once, line is o1abc 45
flatMap once, line is o1abc 77
flatMap once, line is o1abc o1abc
 




word:o1abc
word:45
word:o1abc
word:77
word:o1abc
word:o1abc*/      
        for (String word : wordsList) {
            System.out.println("word:" + word);
        }  
/* http://spark.apache.org/docs/latest/
 Interface PairFunction<T,K,V>
A function that returns key-value pairs (Tuple2<K, V>), and can be used to construct PairRDDs.
scala.Tuple2<K,V>     call(T t)
*/  
/*
flatMap once, line is o1abc 45(這句說明前面語句再次被執(zhí)行)
in tuple2 word: o1abc
t._1 is 馬克-to-win @ 馬克java社區(qū):o1abc t._2 is 1
in tuple2 word: 45
t._1 is 馬克-to-win @ 馬克java社區(qū):45 t._2 is 1
flatMap once, line is o1abc 77
in tuple2 word: o1abc
t._1 is 馬克-to-win @ 馬克java社區(qū):o1abc t._2 is 1
in tuple2 word: 77
t._1 is 馬克-to-win @ 馬克java社區(qū):77 t._2 is 1
flatMap once, line is o1abc o1abc
in tuple2 word: o1abc
t._1 is 馬克-to-win @ 馬克java社區(qū):o1abc t._2 is 1
in tuple2 word: o1abc
t._1 is 馬克-to-win @ 馬克java社區(qū): o1abc t._2 is 1
*/      
        JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String word) throws Exception {
                System.out.println("in tuple2 word: " + word);
                Tuple2<String, Integer> t=new Tuple2<String, Integer>(word, 1);
                System.out.println("t._1 is 馬克-to-win @ 馬克java社區(qū):" +t._1+" t._2 is " +t._2);
                return t;
            }
        });
/*
注意tuple自帶括號(hào)。
listPair is (o1abc,1)
listPair is (45,1)
listPair is (o1abc,1)
listPair is (77,1)
listPair is (o1abc,1)
listPair is (o1abc,1)*/      
        List<Tuple2<String,Integer>> listPairs = pairs.collect();
        for (Tuple2<String, Integer> listPair : listPairs) {
            System.out.println("listPair is "+listPair);
        }
/*Interface Function2<T1,T2,R>, R call(T1 v1,T2 v2)*/
        JavaPairRDD<String, Integer> results = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {          
            public Integer call(Integer value1, Integer value2) throws Exception {
                return value1 + value2;
            }
        });
/*resultsPair is (o1abc,4)
resultsPair is (45,1)
resultsPair is (77,1)*/      
        List<Tuple2<String,Integer>> resultsPairs = results.collect();
        for (Tuple2<String, Integer> resultsPair : resultsPairs) {
            System.out.println("resultsPair is "+resultsPair);
        }
/*Interface VoidFunction<T>  void call(T t)
word:o1abc count:4
word:45 count:1
word:77 count:1
*/      
        results.foreach(new VoidFunction<Tuple2<String,Integer>>() {
            public void call(Tuple2<String, Integer> tuple) throws Exception {
                System.out.println("word:" + tuple._1 + " count:" + tuple._2);
            }
        });

        sc.close();
    }
}