一文帶你了解 Spark 架構(gòu)設(shè)計(jì)與原理思想
引子
MapReduce 主要使用磁盤存儲計(jì)算過程中的數(shù)據(jù),雖然可靠性比較高,但是性能卻較差 。 此外,MapReduce 只能使用 map 和 reduce 函數(shù)進(jìn)行編程,雖然能夠完成各種大數(shù)據(jù)計(jì)算,但是編程比較復(fù)雜 。 而且受 map 和 reduce 編程模型相對簡單的影響,復(fù)雜的計(jì)算必須組合多個 MapReduce job 才能完成,編程難度進(jìn)一步增加!
于是,在2009年,美國加州大學(xué)伯克利分校的AMP實(shí)驗(yàn)室,一個可應(yīng)用于大規(guī)模數(shù)據(jù)處理的統(tǒng)一分析引擎——Spark 應(yīng)運(yùn)而生 !
Spark 初識
Spark 在 MapReduce 的基礎(chǔ)上進(jìn)行了改進(jìn),它主要使用內(nèi)存進(jìn)行中間計(jì)算數(shù)據(jù)存儲,加快了計(jì)算執(zhí)行時間,在某些情況下性能可以提升百倍 。
除了速度更快,Spark 和 MapReduce 相比,還有更簡單易用的編程模型 。
Spark 的主要編程模型是 RDD,即彈性數(shù)據(jù)集 。在 RDD 上定義了許多常見的大數(shù)據(jù)計(jì)算函數(shù),利用這些函數(shù)可以用極少的代碼完成較為復(fù)雜的大數(shù)據(jù)計(jì)算 。
例如我們在介紹 Hive 架構(gòu)設(shè)計(jì)時談到的 WordCount 示例 。 使用 Scala 語言在 Spark 上編寫 ,代碼只需三行 。
val textFile = sc.textFile("hdfs://...")
val counts = textFile.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://...")
這個 demo 的代碼含義就不展開詳細(xì)介紹 。首選,從 HDFS 讀取數(shù)據(jù),構(gòu)建一個 RDD textFile,然后在這個 RDD 上執(zhí)行三個操作:一是將輸入數(shù)據(jù)的每一行文本用空格拆分單詞;二是將單詞進(jìn)行轉(zhuǎn)換,比如:word ——> (word,1),生成 < Key , Value > 的結(jié)構(gòu);三是針對相同的 Key 進(jìn)行統(tǒng)計(jì),統(tǒng)計(jì)方式是對 Value 求和 。最后,將 RDD counts 寫入 HDFS ,完成結(jié)果輸出 。
Spark 編程模型
RDD 是 Spark 的核心概念,是彈性數(shù)據(jù)集(Resilient Distributed Datasets)的縮寫。RDD 既是 Spark 面向開發(fā)者的編程模型,又是 Spark 自身架構(gòu)的核心元素。
我們先來認(rèn)識一下作為 Spark 編程模型的RDD 。我們知道,大數(shù)據(jù)計(jì)算就是在大規(guī)模的數(shù)據(jù)集上進(jìn)行一系列的數(shù)據(jù)計(jì)算處理。MapReduce 針對輸入數(shù)據(jù),將計(jì)算過程分為兩個階段,一個 Map 階段,一個 Reduce 階段,可以理解成是面向過程的大數(shù)據(jù)計(jì)算。我們在用 MapReduce 編程的時候,思考的是,如何將計(jì)算邏輯用 Map 和 Reduce 兩個階段實(shí)現(xiàn),map 和 reduce 函數(shù)的輸入和輸出是什么,這也是我們在學(xué)習(xí) MapReduce 編程的時候一再強(qiáng)調(diào)的。
而 Spark 則直接針對數(shù)據(jù)進(jìn)行編程,將大規(guī)模數(shù)據(jù)集合抽象成一個 RDD 對象,然后在這個 RDD 上進(jìn)行各種計(jì)算處理,得到一個新的 RDD,繼續(xù)計(jì)算處理,直到得到最后的結(jié)果數(shù)據(jù)。所以 Spark 可以理解成是面向?qū)ο?/strong>的大數(shù)據(jù)計(jì)算。我們在進(jìn)行 Spark 編程的時候,思考的是一個 RDD 對象需要經(jīng)過什么樣的操作,轉(zhuǎn)換成另一個 RDD 對象,思考的重心和落腳點(diǎn)都在 RDD 上。
所以在上面 WordCount 的代碼示例里,第 2 行代碼實(shí)際上進(jìn)行了 3 次 RDD 轉(zhuǎn)換,每次轉(zhuǎn)換都得到一個新的 RDD,因?yàn)樾碌?RDD 可以繼續(xù)調(diào)用 RDD 的轉(zhuǎn)換函數(shù),所以連續(xù)寫成一行代碼。事實(shí)上,可以分成 3 行
val rdd1 = textFile.flatMap(line => line.split(" "))
val rdd2 = rdd1.map(word => (word, 1))
val rdd3 = rdd2.reduceByKey(_ + _)
Spark 架構(gòu)核心
RDD 上定義的函數(shù)分兩種,一種是轉(zhuǎn)換(transformation) 函數(shù),這種函數(shù)的返回值還是 RDD;另一種是 執(zhí)行(action) 函數(shù),這種函數(shù)不再返回 RDD。
RDD 定義了很多轉(zhuǎn)換操作函數(shù),比如有計(jì)算 map(func)
、過濾 filter(func)
、合并數(shù)據(jù)集 union(otherDataset)
、根據(jù) Key 聚合 reduceByKey(func, [numPartitions])
、連接數(shù)據(jù)集 join(otherDataset, [numPartitions])
、分組 groupByKey([numPartitions])
等十幾個函數(shù)。
作為 Spark 架構(gòu)核心元素的 RDD。跟 MapReduce 一樣,Spark 也是對大數(shù)據(jù)進(jìn)行分片計(jì)算,Spark 分布式計(jì)算的數(shù)據(jù)分片、任務(wù)調(diào)度都是以 RDD 為單位展開的,每個 RDD 分片都會分配到一個執(zhí)行進(jìn)程去處理。
RDD 上的轉(zhuǎn)換操作又分成兩種,一種轉(zhuǎn)換操作產(chǎn)生的 RDD 不會出現(xiàn)新的分片,比如 map、filter 等,也就是說一個 RDD 數(shù)據(jù)分片,經(jīng)過 map 或者 filter 轉(zhuǎn)換操作后,結(jié)果還在當(dāng)前分片。就像你用 map 函數(shù)對每個數(shù)據(jù)加 1,得到的還是這樣一組數(shù)據(jù),只是值不同。實(shí)際上,Spark 并不是按照代碼寫的操作順序去生成 RDD,比如 rdd2 = rdd1.map(func) 這樣的代碼并不會在物理上生成一個新的 RDD。物理上,Spark 只有在產(chǎn)生新的 RDD 分片時候,才會真的生成一個 RDD,Spark 的這種特性也被稱作 惰性計(jì)算。
另一種轉(zhuǎn)換操作產(chǎn)生的 RDD 則會產(chǎn)生新的分片,比如 reduceByKey,來自不同分片的相同 Key 必須聚合在一起進(jìn)行操作,這樣就會產(chǎn)生新的 RDD 分片。
所以,大家只需要記住,Spark 應(yīng)用程序代碼中的 RDD 和 Spark 執(zhí)行過程中生成的物理 RDD 不是一一對應(yīng)的,RDD 在 Spark 里面是一個非常靈活的概念,同時又非常重要,需要認(rèn)真理解。
Spark 的計(jì)算階段
和 MapReduce 一樣,Spark 也遵循移動計(jì)算比移動數(shù)據(jù)更劃算 這一大數(shù)據(jù)計(jì)算基本原則。但是和 MapReduce 僵化的 Map 與 Reduce 分階段計(jì)算相比,Spark 的計(jì)算框架更加富有彈性和靈活性,進(jìn)而有更好的運(yùn)行性能 。
Spark 會根據(jù)程序中的轉(zhuǎn)換函數(shù)生成計(jì)算任務(wù)執(zhí)行計(jì)劃,這個執(zhí)行計(jì)劃就是一個 DAG 。Spark 可以在一個作業(yè)中完成非常復(fù)雜的大數(shù)據(jù)計(jì)算 。
所謂 DAG 也就是 有向無環(huán)圖,就是說不同階段的依賴關(guān)系是有向的,計(jì)算過程只能沿著依賴關(guān)系方向執(zhí)行,被依賴的階段執(zhí)行完成之前,依賴的階段不能開始執(zhí)行,同時,這個依賴關(guān)系不能有環(huán)形依賴,否則就成為死循環(huán)了。下面這張圖描述了一個典型的 Spark 運(yùn)行 DAG 的不同階段。
在上面的圖中, A、C、E 是從 HDFS 上加載的 RDD,A 經(jīng)過 groupBy 分組統(tǒng)計(jì)轉(zhuǎn)換函數(shù)計(jì)算后得到的 RDD B,C 經(jīng)過 map 轉(zhuǎn)換函數(shù)計(jì)算后得到 RDD D,D 和 E 經(jīng)過 union 合并轉(zhuǎn)換函數(shù)計(jì)算后得到 RDD F ,B 和 F 經(jīng)過 join 連接函數(shù)計(jì)算后得到最終的合并結(jié)果 RDD G 。
所以可以看到 Spark 作業(yè)調(diào)度執(zhí)行的核心是 DAG,有了 DAG,整個應(yīng)用就被切分成哪些階段,每個階段的依賴關(guān)系也就清楚了。之后再根據(jù)每個階段要處理的數(shù)據(jù)量生成相應(yīng)的任務(wù)集合(TaskSet),每個任務(wù)都分配一個任務(wù)進(jìn)程去處理,Spark 就實(shí)現(xiàn)了大數(shù)據(jù)的分布式計(jì)算。
具體來看的話,負(fù)責(zé) Spark 應(yīng)用 DAG 生成和管理的組件是 DAGScheduler,DAGScheduler 根據(jù)程序代碼生成 DAG,然后將程序分發(fā)到分布式計(jì)算集群,按計(jì)算階段的先后關(guān)系調(diào)度執(zhí)行。
大家注意到了么,上面的例子有 4 個轉(zhuǎn)換函數(shù),但是只有 3 個階段 。那么 Spark 劃分計(jì)算階段的依據(jù)具體是什么呢?顯然并不是 RDD 上的每個轉(zhuǎn)換函數(shù)都會生成一個計(jì)算階段 。
通過觀察一下上面的 DAG 圖,關(guān)于計(jì)算階段的劃分從圖上就能看出規(guī)律,當(dāng) RDD 之間的轉(zhuǎn)換連接線呈現(xiàn)多對多交叉連接的時候,就會產(chǎn)生新的階段。一個 RDD 代表一個數(shù)據(jù)集,圖中每個 RDD 里面都包含多個小塊,每個小塊代表 RDD 的一個分片。
一個數(shù)據(jù)集中的多個數(shù)據(jù)分片需要進(jìn)行分區(qū)傳輸,寫入到另一個數(shù)據(jù)集的不同分片中,這種數(shù)據(jù)分區(qū)交叉?zhèn)鬏數(shù)牟僮?,我們?MapReduce 的運(yùn)行過程中也看到過。
這就是 shuffle 過程,Spark 也需要通過 shuffle 將數(shù)據(jù)進(jìn)行重新組合,相同 Key 的數(shù)據(jù)放在一起,進(jìn)行聚合、關(guān)聯(lián)等操作,因而每次 shuffle 都產(chǎn)生新的計(jì)算階段。這也是為什么計(jì)算階段會有依賴關(guān)系,它需要的數(shù)據(jù)來源于前面一個或多個計(jì)算階段產(chǎn)生的數(shù)據(jù),必須等待前面的階段執(zhí)行完畢才能進(jìn)行 shuffle,并得到數(shù)據(jù)。
所以大家需要記住,計(jì)算階段劃分的依據(jù)是 shuffle,不是轉(zhuǎn)換函數(shù)的類型 。
思考
大家可能會想,為什么同樣經(jīng)過 shuffle ,Spark 可以更高效 ?
從本質(zhì)上看,Spark 可以算作是一種 MapReduce 計(jì)算模型的不同實(shí)現(xiàn)。Hadoop MapReduce 簡單粗暴地根據(jù) shuffle 將大數(shù)據(jù)計(jì)算分成 Map 和 Reduce 兩個階段,然后就算完事了。而 Spark 更細(xì)膩一點(diǎn),將前一個的 Reduce 和后一個的 Map 連接起來,當(dāng)作一個階段持續(xù)計(jì)算,形成一個更加優(yōu)雅、高效的計(jì)算模型,雖然其本質(zhì)依然是 Map 和 Reduce。但是這種多個計(jì)算階段依賴執(zhí)行的方案可以有效減少對 HDFS 的訪問,減少作業(yè)的調(diào)度執(zhí)行次數(shù),因此執(zhí)行速度也更快。
并且和 Hadoop MapReduce 主要使用磁盤存儲 shuffle 過程中的數(shù)據(jù)不同,Spark 優(yōu)先使用內(nèi)存進(jìn)行數(shù)據(jù)存儲,包括 RDD 數(shù)據(jù)。除非是內(nèi)存不夠用了,否則是盡可能使用內(nèi)存, 這也是 Spark 性能比 Hadoop 高的另一個原因。
Spark 執(zhí)行流程
Spark 支持 Standalone、Yarn、Mesos、Kubernetes 等多種部署方案,幾種部署方案原理也都一樣,只是不同組件角色命名不同,但是核心功能和運(yùn)行流程都差不多。
上面這張圖就是 Spark 的運(yùn)行流程 。
首先,Spark 應(yīng)用程序啟動在自己的 JVM 進(jìn)程里,即 Driver 進(jìn)程,啟動后調(diào)用 SparkContext 初始化執(zhí)行配置和輸入數(shù)據(jù)。SparkContext 啟動 DAGScheduler 構(gòu)造執(zhí)行的 DAG 圖,切分成最小的執(zhí)行單位也就是計(jì)算任務(wù)。
然后 Driver 向 Cluster Manager 請求計(jì)算資源,用于 DAG 的分布式計(jì)算。Cluster Manager 收到請求以后,將 Driver 的主機(jī)地址等信息通知給集群的所有計(jì)算節(jié)點(diǎn) Worker。
Worker 收到信息以后,根據(jù) Driver 的主機(jī)地址,跟 Driver 通信并注冊,然后根據(jù)自己的空閑資源向 Driver 通報(bào)自己可以領(lǐng)用的任務(wù)數(shù)。Driver 根據(jù) DAG 圖開始向注冊的 Worker 分配任務(wù)。
Worker 收到任務(wù)后,啟動 Executor 進(jìn)程開始執(zhí)行任務(wù)。Executor 先檢查自己是否有 Driver 的執(zhí)行代碼,如果沒有,從 Driver 下載執(zhí)行代碼,通過 Java 反射加載后開始執(zhí)行。
Spark性能調(diào)優(yōu)與故障處理
關(guān)于 Spark 的性能調(diào)優(yōu),就有很多可以值得探討的地方。 我們一般能快速想到的是常規(guī)的性能調(diào)優(yōu),包括最優(yōu)的資源配置
,RDD優(yōu)化
,并行度調(diào)節(jié)
等等,除此之外,還有算子調(diào)優(yōu),Shuffle 調(diào)優(yōu),JVM 調(diào)優(yōu) 。而關(guān)于故障處理,我們一般討論的是解決 Spark 數(shù)據(jù)傾斜 的問題,我們一般會通過聚合原數(shù)據(jù)
,過濾導(dǎo)致傾斜的 key
,提升shuffle 操作過程中的 reduce 并行度
等方式 。因?yàn)楸酒恼轮饕榻B架構(gòu)設(shè)計(jì)和原理思想,基于篇幅限制,詳細(xì)步驟就不展示詳細(xì)描述。正好最近收集了一本 Spark性能調(diào)優(yōu)與故障處理 的 pdf ,里面對于詳解的步驟均做了詳細(xì)的說明 。
關(guān)注大數(shù)據(jù)領(lǐng)域優(yōu)質(zhì)公眾號:大數(shù)據(jù)夢想家,后臺回復(fù) “spark” 即可免費(fèi)下載 Spark性能調(diào)優(yōu)與故障處理.pdf
Spark 生態(tài)
最后,我們來看看 Spark 的生態(tài)!
跟我們之前介紹的 Hadoop 一樣,Spark 也有他自己的生態(tài)體系 。以 Spark 為基礎(chǔ),有支持 SQL 語句的 Spark SQL,有支持流計(jì)算的 Spark Streaming,有支持機(jī)器學(xué)習(xí)的 MLlib,還有支持圖計(jì)算的 GraphX。利用這些產(chǎn)品,Spark 技術(shù)棧支撐起大數(shù)據(jù)分析、大數(shù)據(jù)機(jī)器學(xué)習(xí)等各種大數(shù)據(jù)應(yīng)用場景。
為了方便大家了解,下面對這些組件進(jìn)行一一介紹:
Spark SQL:用來操作結(jié)構(gòu)化數(shù)據(jù)的核心組件,通過Spark SQL可以直接查詢Hive、 HBase等多種外部數(shù)據(jù)源中的數(shù)據(jù)。Spark SQL的重要特點(diǎn)是能夠統(tǒng)一處理關(guān)系表和RDD在處理結(jié)構(gòu)化數(shù)據(jù)時,開發(fā)人員無須編寫 MapReduce程序,直接使用SQL命令就能完成更加復(fù)雜的數(shù)據(jù)查詢操作。
Spark Streaming:Spark提供的流式計(jì)算框架,支持高吞吐量、可容錯處理的實(shí)時流式數(shù)據(jù)處理,其核心原理是將流數(shù)據(jù)分解成一系列短小的批處理作業(yè),每個短小的批處理作業(yè)都可以使用 Spark Core進(jìn)行快速處理。Spark Streaming支持多種數(shù)據(jù)源,如 Kafka以及TCP套接字等。
MLlib:Spark提供的關(guān)于機(jī)器學(xué)習(xí)功能的算法程序庫,包括分類、回歸、聚類、協(xié)同過濾算法等,還提供了模型評估、數(shù)據(jù)導(dǎo)入等額外的功能,開發(fā)人員只需了解一定的機(jī)器學(xué)習(xí)算法知識就能進(jìn)行機(jī)器學(xué)習(xí)方面的開發(fā),降低了學(xué)習(xí)成本。
GraphX: Spark提供的分布式圖處理框架,擁有圖計(jì)算和圖挖掘算法的API接口以及豐富的功能和運(yùn)算符,極大地方便了對分布式圖的處理需求,能在海量數(shù)據(jù)上運(yùn)行復(fù)雜的圖算法。
Spark生態(tài)系統(tǒng)各個組件關(guān)系密切,并且可以相互調(diào)用,這樣設(shè)計(jì)具有以下顯著優(yōu)勢。
(1)Spark生態(tài)系統(tǒng)包含的所有程序庫和高級組件都可以從 Spark核心引擎的改進(jìn)中獲益。
(2)不需要運(yùn)行多套獨(dú)立的軟件系統(tǒng),能夠大大減少運(yùn)行整個系統(tǒng)的資源代價。
(3)能夠無縫整合各個系統(tǒng),構(gòu)建不同處理模型的應(yīng)用。
總結(jié)
Spark 有三個主要特性:RDD 的編程模型更簡單,DAG 切分的多階段計(jì)算過程更快速,使用內(nèi)存存儲中間計(jì)算結(jié)果更高效。這三個特性使得 Spark 相對 Hadoop MapReduce 可以有更快的執(zhí)行速度,以及更簡單的編程實(shí)現(xiàn)。
另外,從 Spark 的生態(tài)我們可以看出,Spark 框架對大數(shù)據(jù)的支持從內(nèi)存計(jì)算、實(shí)時處理到交互式查詢,進(jìn)而發(fā)展到圖計(jì)算和機(jī)器學(xué)習(xí)模塊。Spark 生態(tài)系統(tǒng)廣泛的技術(shù)面,一方面挑戰(zhàn)占據(jù)大數(shù)據(jù)市場份額最大的 Hadoop,另一方面又隨時準(zhǔn)備迎接后起之秀 Flink 、Kafka 等計(jì)算框架的挑戰(zhàn),從而使Spark 在大數(shù)據(jù)領(lǐng)域更好地發(fā)展 !
歡迎關(guān)注:大數(shù)據(jù)夢想家