一文帶你了解 Spark 架構設計與原理思想

引子

        MapReduce 主要使用磁盤存儲計算過程中的數(shù)據(jù),雖然可靠性比較高,但是性能卻較差 。 此外,MapReduce 只能使用 map 和 reduce 函數(shù)進行編程,雖然能夠完成各種大數(shù)據(jù)計算,但是編程比較復雜 。 而且受 map 和 reduce 編程模型相對簡單的影響,復雜的計算必須組合多個 MapReduce job 才能完成,編程難度進一步增加!

        于是,在2009年,美國加州大學伯克利分校的AMP實驗室,一個可應用于大規(guī)模數(shù)據(jù)處理的統(tǒng)一分析引擎——Spark 應運而生 !

Spark 初識

        Spark 在 MapReduce 的基礎上進行了改進,它主要使用內(nèi)存進行中間計算數(shù)據(jù)存儲,加快了計算執(zhí)行時間,在某些情況下性能可以提升百倍 。

        
在這里插入圖片描述
      





 除了速度更快,Spark 和 MapReduce 相比,還有更簡單易用的編程模型 。

        Spark 的主要編程模型是 RDD,即彈性數(shù)據(jù)集 。在 RDD 上定義了許多常見的大數(shù)據(jù)計算函數(shù),利用這些函數(shù)可以用極少的代碼完成較為復雜的大數(shù)據(jù)計算 。

        例如我們在介紹 Hive 架構設計時談到的 WordCount 示例 。 使用 Scala 語言在 Spark 上編寫 ,代碼只需三行 。

val textFile = sc.textFile("hdfs://...")
val counts = textFile.flatMap(line => line.split(" "))
                 .map(word => (word, 1))
                 .reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://...")

        這個 demo 的代碼含義就不展開詳細介紹 。首選,從 HDFS 讀取數(shù)據(jù),構建一個 RDD textFile,然后在這個 RDD 上執(zhí)行三個操作:一是將輸入數(shù)據(jù)的每一行文本用空格拆分單詞;二是將單詞進行轉換,比如:word ——> (word,1),生成 < Key , Value > 的結構;三是針對相同的 Key 進行統(tǒng)計,統(tǒng)計方式是對 Value 求和 。最后,將 RDD counts 寫入 HDFS ,完成結果輸出 。

Spark 編程模型

        RDD 是 Spark 的核心概念,是彈性數(shù)據(jù)集(Resilient Distributed Datasets)的縮寫。RDD 既是 Spark 面向開發(fā)者的編程模型,又是 Spark 自身架構的核心元素。

        我們先來認識一下作為 Spark 編程模型的RDD 。我們知道,大數(shù)據(jù)計算就是在大規(guī)模的數(shù)據(jù)集上進行一系列的數(shù)據(jù)計算處理。MapReduce 針對輸入數(shù)據(jù),將計算過程分為兩個階段,一個 Map 階段,一個 Reduce 階段,可以理解成是面向過程的大數(shù)據(jù)計算。我們在用 MapReduce 編程的時候,思考的是,如何將計算邏輯用 Map 和 Reduce 兩個階段實現(xiàn),map 和 reduce 函數(shù)的輸入和輸出是什么,這也是我們在學習 MapReduce 編程的時候一再強調(diào)的。

        而 Spark 則直接針對數(shù)據(jù)進行編程,將大規(guī)模數(shù)據(jù)集合抽象成一個 RDD 對象,然后在這個 RDD 上進行各種計算處理,得到一個新的 RDD,繼續(xù)計算處理,直到得到最后的結果數(shù)據(jù)。所以 Spark 可以理解成是面向對象的大數(shù)據(jù)計算。我們在進行 Spark 編程的時候,思考的是一個 RDD 對象需要經(jīng)過什么樣的操作,轉換成另一個 RDD 對象,思考的重心和落腳點都在 RDD 上。

        所以在上面 WordCount 的代碼示例里,第 2 行代碼實際上進行了 3 次 RDD 轉換,每次轉換都得到一個新的 RDD,因為新的 RDD 可以繼續(xù)調(diào)用 RDD 的轉換函數(shù),所以連續(xù)寫成一行代碼。事實上,可以分成 3 行

val rdd1 = textFile.flatMap(line => line.split(" "))
val rdd2 = rdd1.map(word => (word, 1))
val rdd3 = rdd2.reduceByKey(_ + _)

Spark 架構核心

        RDD 上定義的函數(shù)分兩種,一種是轉換(transformation) 函數(shù),這種函數(shù)的返回值還是 RDD;另一種是 執(zhí)行(action) 函數(shù),這種函數(shù)不再返回 RDD。

        RDD 定義了很多轉換操作函數(shù),比如有計算 map(func)、過濾 filter(func)、合并數(shù)據(jù)集 union(otherDataset)、根據(jù) Key 聚合 reduceByKey(func, [numPartitions])、連接數(shù)據(jù)集 join(otherDataset, [numPartitions])、分組 groupByKey([numPartitions]) 等十幾個函數(shù)。

        作為 Spark 架構核心元素的 RDD。跟 MapReduce 一樣,Spark 也是對大數(shù)據(jù)進行分片計算,Spark 分布式計算的數(shù)據(jù)分片、任務調(diào)度都是以 RDD 為單位展開的,每個 RDD 分片都會分配到一個執(zhí)行進程去處理。

        RDD 上的轉換操作又分成兩種,一種轉換操作產(chǎn)生的 RDD 不會出現(xiàn)新的分片,比如 map、filter 等,也就是說一個 RDD 數(shù)據(jù)分片,經(jīng)過 map 或者 filter 轉換操作后,結果還在當前分片。就像你用 map 函數(shù)對每個數(shù)據(jù)加 1,得到的還是這樣一組數(shù)據(jù),只是值不同。實際上,Spark 并不是按照代碼寫的操作順序去生成 RDD,比如 rdd2 = rdd1.map(func) 這樣的代碼并不會在物理上生成一個新的 RDD。物理上,Spark 只有在產(chǎn)生新的 RDD 分片時候,才會真的生成一個 RDD,Spark 的這種特性也被稱作 惰性計算。

        另一種轉換操作產(chǎn)生的 RDD 則會產(chǎn)生新的分片,比如 reduceByKey,來自不同分片的相同 Key 必須聚合在一起進行操作,這樣就會產(chǎn)生新的 RDD 分片。

        所以,大家只需要記住,Spark 應用程序代碼中的 RDD 和 Spark 執(zhí)行過程中生成的物理 RDD 不是一一對應的,RDD 在 Spark 里面是一個非常靈活的概念,同時又非常重要,需要認真理解。

Spark 的計算階段

        和 MapReduce 一樣,Spark 也遵循移動計算比移動數(shù)據(jù)更劃算 這一大數(shù)據(jù)計算基本原則。但是和 MapReduce 僵化的 Map 與 Reduce 分階段計算相比,Spark 的計算框架更加富有彈性和靈活性,進而有更好的運行性能 。

        Spark 會根據(jù)程序中的轉換函數(shù)生成計算任務執(zhí)行計劃,這個執(zhí)行計劃就是一個 DAG 。Spark 可以在一個作業(yè)中完成非常復雜的大數(shù)據(jù)計算 。

        所謂 DAG 也就是 有向無環(huán)圖,就是說不同階段的依賴關系是有向的,計算過程只能沿著依賴關系方向執(zhí)行,被依賴的階段執(zhí)行完成之前,依賴的階段不能開始執(zhí)行,同時,這個依賴關系不能有環(huán)形依賴,否則就成為死循環(huán)了。下面這張圖描述了一個典型的 Spark 運行 DAG 的不同階段。

在這里插入圖片描述
        在上面的圖中, A、C、E 是從 HDFS 上加載的 RDD,A 經(jīng)過 groupBy 分組統(tǒng)計轉換函數(shù)計算后得到的 RDD B,C 經(jīng)過 map 轉換函數(shù)計算后得到 RDD D,D 和 E 經(jīng)過 union 合并轉換函數(shù)計算后得到 RDD F ,B 和 F 經(jīng)過 join 連接函數(shù)計算后得到最終的合并結果 RDD G 。

        所以可以看到 Spark 作業(yè)調(diào)度執(zhí)行的核心是 DAG,有了 DAG,整個應用就被切分成哪些階段,每個階段的依賴關系也就清楚了。之后再根據(jù)每個階段要處理的數(shù)據(jù)量生成相應的任務集合(TaskSet),每個任務都分配一個任務進程去處理,Spark 就實現(xiàn)了大數(shù)據(jù)的分布式計算。

        具體來看的話,負責 Spark 應用 DAG 生成和管理的組件是 DAGScheduler,DAGScheduler 根據(jù)程序代碼生成 DAG,然后將程序分發(fā)到分布式計算集群,按計算階段的先后關系調(diào)度執(zhí)行。

        大家注意到了么,上面的例子有 4 個轉換函數(shù),但是只有 3 個階段 。那么 Spark 劃分計算階段的依據(jù)具體是什么呢?顯然并不是 RDD 上的每個轉換函數(shù)都會生成一個計算階段 。

        通過觀察一下上面的 DAG 圖,關于計算階段的劃分從圖上就能看出規(guī)律,當 RDD 之間的轉換連接線呈現(xiàn)多對多交叉連接的時候,就會產(chǎn)生新的階段。一個 RDD 代表一個數(shù)據(jù)集,圖中每個 RDD 里面都包含多個小塊,每個小塊代表 RDD 的一個分片。

        一個數(shù)據(jù)集中的多個數(shù)據(jù)分片需要進行分區(qū)傳輸,寫入到另一個數(shù)據(jù)集的不同分片中,這種數(shù)據(jù)分區(qū)交叉?zhèn)鬏數(shù)牟僮?,我們?MapReduce 的運行過程中也看到過。

在這里插入圖片描述
        這就是 shuffle 過程,Spark 也需要通過 shuffle 將數(shù)據(jù)進行重新組合,相同 Key 的數(shù)據(jù)放在一起,進行聚合、關聯(lián)等操作,因而每次 shuffle 都產(chǎn)生新的計算階段。這也是為什么計算階段會有依賴關系,它需要的數(shù)據(jù)來源于前面一個或多個計算階段產(chǎn)生的數(shù)據(jù),必須等待前面的階段執(zhí)行完畢才能進行 shuffle,并得到數(shù)據(jù)。

        所以大家需要記住,計算階段劃分的依據(jù)是 shuffle,不是轉換函數(shù)的類型 。

思考

        大家可能會想,為什么同樣經(jīng)過 shuffle ,Spark 可以更高效 ?

        從本質上看,Spark 可以算作是一種 MapReduce 計算模型的不同實現(xiàn)。Hadoop MapReduce 簡單粗暴地根據(jù) shuffle 將大數(shù)據(jù)計算分成 Map 和 Reduce 兩個階段,然后就算完事了。而 Spark 更細膩一點,將前一個的 Reduce 和后一個的 Map 連接起來,當作一個階段持續(xù)計算,形成一個更加優(yōu)雅、高效的計算模型,雖然其本質依然是 Map 和 Reduce。但是這種多個計算階段依賴執(zhí)行的方案可以有效減少對 HDFS 的訪問,減少作業(yè)的調(diào)度執(zhí)行次數(shù),因此執(zhí)行速度也更快。

        并且和 Hadoop MapReduce 主要使用磁盤存儲 shuffle 過程中的數(shù)據(jù)不同,Spark 優(yōu)先使用內(nèi)存進行數(shù)據(jù)存儲,包括 RDD 數(shù)據(jù)。除非是內(nèi)存不夠用了,否則是盡可能使用內(nèi)存, 這也是 Spark 性能比 Hadoop 高的另一個原因。

Spark 執(zhí)行流程

        Spark 支持 Standalone、Yarn、Mesos、Kubernetes 等多種部署方案,幾種部署方案原理也都一樣,只是不同組件角色命名不同,但是核心功能和運行流程都差不多。

在這里插入圖片描述
        上面這張圖就是 Spark 的運行流程 。

        首先,Spark 應用程序啟動在自己的 JVM 進程里,即 Driver 進程,啟動后調(diào)用 SparkContext 初始化執(zhí)行配置和輸入數(shù)據(jù)。SparkContext 啟動 DAGScheduler 構造執(zhí)行的 DAG 圖,切分成最小的執(zhí)行單位也就是計算任務。

        然后 Driver 向 Cluster Manager 請求計算資源,用于 DAG 的分布式計算。Cluster Manager 收到請求以后,將 Driver 的主機地址等信息通知給集群的所有計算節(jié)點 Worker。

        Worker 收到信息以后,根據(jù) Driver 的主機地址,跟 Driver 通信并注冊,然后根據(jù)自己的空閑資源向 Driver 通報自己可以領用的任務數(shù)。Driver 根據(jù) DAG 圖開始向注冊的 Worker 分配任務。

        Worker 收到任務后,啟動 Executor 進程開始執(zhí)行任務。Executor 先檢查自己是否有 Driver 的執(zhí)行代碼,如果沒有,從 Driver 下載執(zhí)行代碼,通過 Java 反射加載后開始執(zhí)行。
        

Spark性能調(diào)優(yōu)與故障處理

        關于 Spark 的性能調(diào)優(yōu),就有很多可以值得探討的地方。 我們一般能快速想到的是常規(guī)的性能調(diào)優(yōu),包括最優(yōu)的資源配置RDD優(yōu)化,并行度調(diào)節(jié)等等,除此之外,還有算子調(diào)優(yōu)Shuffle 調(diào)優(yōu)JVM 調(diào)優(yōu) 。而關于故障處理,我們一般討論的是解決 Spark 數(shù)據(jù)傾斜 的問題,我們一般會通過聚合原數(shù)據(jù),過濾導致傾斜的 key,提升shuffle 操作過程中的 reduce 并行度等方式 。因為本篇文章主要介紹架構設計和原理思想,基于篇幅限制,詳細步驟就不展示詳細描述。正好最近收集了一本 Spark性能調(diào)優(yōu)與故障處理 的 pdf ,里面對于詳解的步驟均做了詳細的說明 。
在這里插入圖片描述

關注大數(shù)據(jù)領域優(yōu)質公眾號:大數(shù)據(jù)夢想家,后臺回復 “spark” 即可免費下載 Spark性能調(diào)優(yōu)與故障處理.pdf

Spark 生態(tài)

        最后,我們來看看 Spark 的生態(tài)!

        跟我們之前介紹的 Hadoop 一樣,Spark 也有他自己的生態(tài)體系 。以 Spark 為基礎,有支持 SQL 語句的 Spark SQL,有支持流計算的 Spark Streaming,有支持機器學習的 MLlib,還有支持圖計算的 GraphX。利用這些產(chǎn)品,Spark 技術棧支撐起大數(shù)據(jù)分析、大數(shù)據(jù)機器學習等各種大數(shù)據(jù)應用場景。

在這里插入圖片描述
        為了方便大家了解,下面對這些組件進行一一介紹:

        Spark SQL:用來操作結構化數(shù)據(jù)的核心組件,通過Spark SQL可以直接查詢Hive、 HBase等多種外部數(shù)據(jù)源中的數(shù)據(jù)。Spark SQL的重要特點是能夠統(tǒng)一處理關系表和RDD在處理結構化數(shù)據(jù)時,開發(fā)人員無須編寫 MapReduce程序,直接使用SQL命令就能完成更加復雜的數(shù)據(jù)查詢操作。

        Spark Streaming:Spark提供的流式計算框架,支持高吞吐量、可容錯處理的實時流式數(shù)據(jù)處理,其核心原理是將流數(shù)據(jù)分解成一系列短小的批處理作業(yè),每個短小的批處理作業(yè)都可以使用 Spark Core進行快速處理。Spark Streaming支持多種數(shù)據(jù)源,如 Kafka以及TCP套接字等。

        MLlib:Spark提供的關于機器學習功能的算法程序庫,包括分類、回歸、聚類、協(xié)同過濾算法等,還提供了模型評估、數(shù)據(jù)導入等額外的功能,開發(fā)人員只需了解一定的機器學習算法知識就能進行機器學習方面的開發(fā),降低了學習成本。

        GraphX: Spark提供的分布式圖處理框架,擁有圖計算和圖挖掘算法的API接口以及豐富的功能和運算符,極大地方便了對分布式圖的處理需求,能在海量數(shù)據(jù)上運行復雜的圖算法。

        Spark生態(tài)系統(tǒng)各個組件關系密切,并且可以相互調(diào)用,這樣設計具有以下顯著優(yōu)勢。

        (1)Spark生態(tài)系統(tǒng)包含的所有程序庫和高級組件都可以從 Spark核心引擎的改進中獲益。

        (2)不需要運行多套獨立的軟件系統(tǒng),能夠大大減少運行整個系統(tǒng)的資源代價。

        (3)能夠無縫整合各個系統(tǒng),構建不同處理模型的應用。

總結

        Spark 有三個主要特性:RDD 的編程模型更簡單,DAG 切分的多階段計算過程更快速,使用內(nèi)存存儲中間計算結果更高效。這三個特性使得 Spark 相對 Hadoop MapReduce 可以有更快的執(zhí)行速度,以及更簡單的編程實現(xiàn)。

        另外,從 Spark 的生態(tài)我們可以看出,Spark 框架對大數(shù)據(jù)的支持從內(nèi)存計算、實時處理到交互式查詢,進而發(fā)展到圖計算和機器學習模塊。Spark 生態(tài)系統(tǒng)廣泛的技術面,一方面挑戰(zhàn)占據(jù)大數(shù)據(jù)市場份額最大的 Hadoop,另一方面又隨時準備迎接后起之秀 Flink 、Kafka 等計算框架的挑戰(zhàn),從而使Spark 在大數(shù)據(jù)領域更好地發(fā)展 !


歡迎關注:大數(shù)據(jù)夢想家