Spark內(nèi)核解析

Spark內(nèi)核概述

Spark內(nèi)核泛指Spark的核心運(yùn)行機(jī)制,包括Spark核心組件的運(yùn)行機(jī)制、Spark任務(wù)調(diào)度機(jī)制、Spark內(nèi)存管理機(jī)制、Spark核心功能的運(yùn)行原理等,熟練掌握Spark內(nèi)核原理。

一、Spark核心組件回顧

Driver

Spark驅(qū)動器節(jié)點(diǎn),用于執(zhí)行Spark任務(wù)中的main方法,負(fù)責(zé)實(shí)際代碼的執(zhí)行工作。Driver在Spark作業(yè)執(zhí)行時主要負(fù)責(zé):

1、將用戶程序轉(zhuǎn)化為任務(wù)(Job);

2、在Executor之間調(diào)度任務(wù)(task);

3、跟蹤Executor的執(zhí)行情況;

4、通過UI展示查詢運(yùn)行情況。

Executor

Spark Executor節(jié)點(diǎn)是一個JVM進(jìn)程,負(fù)責(zé)在Spark作業(yè)中運(yùn)行具體任務(wù),任務(wù)彼此之間相互獨(dú)立。Spark應(yīng)用啟動時,Executor節(jié)點(diǎn)被同時啟動,并且始終伴隨著整個Spark應(yīng)用的生命周期而存在。如果有Executor節(jié)點(diǎn)發(fā)生了故障或崩潰,Spark應(yīng)用也可以繼續(xù)執(zhí)行,會將出錯節(jié)點(diǎn)上的任務(wù)調(diào)度到其他Executor節(jié)點(diǎn)上繼續(xù)運(yùn)行。

Executor有兩個核心功能:

1、負(fù)責(zé)運(yùn)行組成Spark應(yīng)用的任務(wù),并將結(jié)果返回給Driver進(jìn)程;

2、它們通過自身的塊管理器(Block Manager)為用戶程序中要求緩存的RDD提供內(nèi)存式存儲。RDD是直接緩存在Executor進(jìn)程內(nèi)的,因此任務(wù)可以在運(yùn)行時充分利用緩存數(shù)據(jù)加速運(yùn)算。

Spark通用運(yùn)行流程概述

上圖為Spark通用運(yùn)行流程,不論Spark以何種模式進(jìn)行部署,任務(wù)提交后,都會先啟動Driver進(jìn)程,隨后Driver進(jìn)程向集群管理器注冊應(yīng)用程序,之后集群管理器根據(jù)此任務(wù)的配置文件分配Executor并啟動,當(dāng)Driver所需的資源全部滿足后,Driver開始執(zhí)行main函數(shù),Spark查詢?yōu)閼袌?zhí)行,當(dāng)執(zhí)行到action算子時開始反向推算,根據(jù)寬依賴進(jìn)行stage的劃分,隨后每一個stage對應(yīng)一個taskset,taskset中有多個task,根據(jù)本地化原則,task會被分發(fā)到指定的Executor去執(zhí)行,在任務(wù)執(zhí)行的過程中,Executor也會不斷與Driver進(jìn)行通信,報告任務(wù)運(yùn)行情況。

二、Spark部署模式

Spark支持三種集群管理器(Cluster Manager),分別為:

1、Standalone:獨(dú)立模式,Spark原生的簡單集群管理器,自帶完整的服務(wù),可單獨(dú)部署到一個集群中,無需依賴任何其他資源管理系統(tǒng),使用Standalone可以很方便地搭建一個集群;

2、Apache Mesos:一個強(qiáng)大的分布式資源管理框架,它允許多種不同的框架部署在其上,包括yarn;

3、Hadoop YARN:統(tǒng)一的資源管理機(jī)制,在上面可以運(yùn)行多套計算框架,如map reduce、storm等,根據(jù)driver在集群中的位置不同,分為yarn client和yarn cluster。

實(shí)際上,除了上述這些通用的集群管理器外,Spark內(nèi)部也提供了一些方便用戶測試和學(xué)習(xí)的簡單集群部署模式。由于在實(shí)際工廠環(huán)境下使用的絕大多數(shù)的集群管理器是Hadoop YARN,因此我們關(guān)注的重點(diǎn)是Hadoop YARN模式下的Spark集群部署。

Spark的運(yùn)行模式取決于傳遞給SparkContext的MASTER環(huán)境變量的值,個別模式還需要輔助的程序接口來配合使用,目前支持的Master字符串及URL包括:

img

用戶在提交任務(wù)給Spark處理時,以下兩個參數(shù)共同決定了Spark的運(yùn)行方式。

- master MASTER_URL:決定了Spark任務(wù)提交給哪種集群處理。

- deploy-mode DEPLOY_MODE:決定了Driver的運(yùn)行方式,可選值為Client或者Cluster。

Standalone模式運(yùn)行機(jī)制

Standalone集群有四個重要組成部分,分別是:

(1)Driver:是一個進(jìn)程,我們編寫的Spark應(yīng)用程序就運(yùn)行在Driver上,由Driver進(jìn)程執(zhí)行;

(2)Master:是一個進(jìn)程,主要負(fù)責(zé)資源調(diào)度和分配,并進(jìn)行集群的監(jiān)控等職責(zé);

(3)Worker:是一個進(jìn)程,一個Worker運(yùn)行在集群中的一臺服務(wù)器上,主要負(fù)責(zé)兩個職責(zé),一個是用自己的內(nèi)存存儲RDD的某個或某些partition;另一個是啟動其他進(jìn)程和線程(Executor),對RDD上的partition進(jìn)行并行的處理和計算。

(4)Executor:是一個進(jìn)程,一個Worker上可以運(yùn)行多個Executor,Executor通過啟動多個線程(task)來執(zhí)行對RDD的partition進(jìn)行并行計算,也就是執(zhí)行我們對RDD定義的例如map、flatMap、reduce等算子操作。

Standalone Client模式

img

1、在Standalone Client模式下,Driver在任務(wù)提交的本地機(jī)器上運(yùn)行;

2、Driver啟動后向Master注冊應(yīng)用程序,Master根據(jù)submit腳本的資源需求找到內(nèi)部資源至少可以啟動一個Executor的所有Worker,然后在這些Worker之間分配Executor;

3、Worker上的Executor啟動后會向Driver反向注冊;

4、當(dāng)所有的Executor注冊完成后,Driver開始執(zhí)行main函數(shù);

5、之后執(zhí)行到Action算子時,開始劃分stage;

6、每個stage生成對應(yīng)的taskSet,之后將task 分發(fā)到各個Executor上執(zhí)行。

Standalone Cluster模式

img

1、在Standalone Cluster模式下,任務(wù)提交后,Master會找到一個Worker啟動Driver進(jìn)程;

2、Driver啟動后向 Master注冊應(yīng)用程序;

3、Master根據(jù)submit腳本的資源需求找到內(nèi)部資源至少可以啟動一個Executor的所有 Worker,然后在這些Worker之間分配Executor;

4、Worker上的Executor啟動后會向Driver反向注冊;

5、所有的 Executor注冊完成后,Driver開始執(zhí)行main函數(shù);

6、之后執(zhí)行到Action算子時,開始劃分stage,每個stage生成對應(yīng)的taskSet;

7、之后將task分發(fā)到各個Executor上執(zhí)行。

注意,Standalone的兩種模式下(client/Cluster),Master在接到Driver注冊Spark應(yīng)用程序的請求后,會獲取其所管理的剩余資源能夠啟動一個 Executor的所有Worker,然后在這些Worker之間分發(fā)Executor,此時的分發(fā)只考慮Worker上的資源是否足夠使用,直到當(dāng)前應(yīng)用程序所需的所有Executor都分配完畢,Executor反向注冊完畢后,Driver開始執(zhí)行main程序。

YARN模式運(yùn)行機(jī)制

YARN Client模式

img

1、在YARN Client模式下,Driver在任務(wù)提交的本地機(jī)器上運(yùn)行;

2、Driver啟動后會和ResourceManager通訊申請啟動ApplicationMaster;

3、隨后ResourceManager分配container,在合適的NodeManager上啟動ApplicationMaster,此時的ApplicationMaster的功能相當(dāng)于一個ExecutorLaucher(執(zhí)行者發(fā)射器),只負(fù)責(zé)向ResourceManager申請Executor內(nèi)存;

4、ResourceManager接到ApplicationMaster的資源申請后會分配container,然后ApplicationMaster在資源分配指定的NodeManager上啟動Executor進(jìn)程;

5、Executor進(jìn)程啟動后會向Driver反向注冊;

6、Executor全部注冊完成后Driver開始執(zhí)行main函數(shù);

7、之后執(zhí)行到Action算子時,觸發(fā)一個job,并根據(jù)寬依賴開始劃分stage;

8、每個stage生成對應(yīng)的taskSet,之后將task分發(fā)到各個Executor上執(zhí)行。

YARN Cluster模式

img

1、在YARN Cluster模式下,任務(wù)提交后會和ResourceManager通訊申請啟動ApplicationMaster;

2、隨后ResourceManager分配container,在合適的NodeManager上啟動ApplicationMaster;(此時的ApplicationMaster就是Driver)

3、Driver啟動后向ResourceManager申請Executor內(nèi)存,ResourceManager接到ApplicationMaster的資源申請后會分配container,然后在合適的NodeManager上啟動Executor進(jìn)程;

4、Executor進(jìn)程啟動后會向Driver反向注冊;

5、Executor全部注冊完成后Driver開始執(zhí)行main函數(shù);

6、之后執(zhí)行到Action算子時,觸發(fā)一個job,并根據(jù)寬依賴開始劃分stage;

7、每個stage生成對應(yīng)的taskSet,之后將task分發(fā)到各個Executor上執(zhí)行。

三、Spark通訊架構(gòu)

Spark通信架構(gòu)概述

Spark2.x版本使用Netty通訊架構(gòu)作為內(nèi)部通訊組件。Spark基于Netty新的rpc框架借鑒了Akka中的設(shè)計,它是基于Actor模型,如下圖所示:

Spark通訊框架中各個組件(Client/Master/Worker)可以認(rèn)為是一個個獨(dú)立的實(shí)體,各個實(shí)體之間通過消息來進(jìn)行通信。具體各個組件之間的關(guān)系如下:

Endpoint(Client/Master/Worker)有一個InBox和N個OutBox(N>=1,N取決于當(dāng)前Endpoint與多少其他的Endpoint進(jìn)行通信,一個與其通訊的其他Endpoint對應(yīng)一個OutBox),Endpoint接收到的消息被寫入InBox,發(fā)送出去的消息寫入OutBox并被發(fā)送到其他Endpoint的InBox中。

Spark通訊架構(gòu)解析

Spark通信架構(gòu)如下圖所示:

img

1) RpcEndpoint:RPC端點(diǎn),Spark針對每個節(jié)點(diǎn)(Client/Master/Worker)都稱之為一個Rpc 端點(diǎn),且都實(shí)現(xiàn)RpcEndpoint接口,內(nèi)部根據(jù)不同端點(diǎn)的需求,設(shè)計不同的消息和不同的業(yè)務(wù)處理,如果需要發(fā)送(詢問)則調(diào)用 Dispatcher;

2) RpcEnv:RPC上下文環(huán)境,每個RPC端點(diǎn)運(yùn)行時依賴的上下文環(huán)境稱為 RpcEnv;

3) Dispatcher:消息分發(fā)器,針對于RPC端點(diǎn)需要發(fā)送消息或者從遠(yuǎn)程 RPC 接收到的消息,分發(fā)至對應(yīng)的指令收件箱/發(fā)件箱。如果指令接收方是自己則存入收件箱,如果指令接收方不是自己,則放入發(fā)件箱;

4) Inbox:指令消息收件箱,一個本地RpcEndpoint對應(yīng)一個收件箱,Dispatcher在每次向Inbox存入消息時,都將對應(yīng)EndpointData加入內(nèi)部ReceiverQueue中,另外Dispatcher創(chuàng)建時會啟動一個單獨(dú)線程進(jìn)行輪詢ReceiverQueue,進(jìn)行收件箱消息消費(fèi);

5) RpcEndpointRef:RpcEndpointRef是對遠(yuǎn)程RpcEndpoint的一個引用。當(dāng)我 們需要向一個具體的RpcEndpoint發(fā)送消息時,一般我們需要獲取到該RpcEndpoint的引用,然后通過該應(yīng)用發(fā)送消息。

6) OutBox:指令消息發(fā)件箱,對于當(dāng)前RpcEndpoint來說,一個目標(biāo)RpcEndpoint對應(yīng)一個發(fā)件箱,如果向多個目標(biāo)RpcEndpoint發(fā)送信息,則有多個OutBox。當(dāng)消息放入Outbox后,緊接著通過TransportClient將消息發(fā)送出去。消息放入發(fā)件箱以及發(fā)送過程是在同一個線程中進(jìn)行;

7) RpcAddress:表示遠(yuǎn)程的RpcEndpointRef的地址,Host + Port。

8) TransportClient:Netty通信客戶端,一個OutBox對應(yīng)一個TransportClient,TransportClient不斷輪詢OutBox,根據(jù)OutBox消息的receiver信息,請求對應(yīng)的遠(yuǎn)程TransportServer;

9) TransportServer:Netty通信服務(wù)端,一個RpcEndpoint對應(yīng)一個TransportServer,接受遠(yuǎn)程消息后調(diào)用 Dispatcher分發(fā)消息至對應(yīng)收發(fā)件箱;

根據(jù)上面的分析,Spark通信架構(gòu)的高層視圖如下圖所示:

img

四、SparkContext解析

在Spark中由SparkContext負(fù)責(zé)與集群進(jìn)行通訊、資源的申請以及任務(wù)的分配和監(jiān)控等。當(dāng) Worker節(jié)點(diǎn)中的Executor運(yùn)行完畢Task后,Driver同時負(fù)責(zé)將SparkContext關(guān)閉。

通常也可以使用SparkContext來代表驅(qū)動程序(Driver)。

img

SparkContext是用戶通往Spark集群的唯一入口,可以用來在Spark集群中創(chuàng)建RDD、累加器和廣播變量。

SparkContext也是整個Spark應(yīng)用程序中至關(guān)重要的一個對象,可以說是整個Application運(yùn)行調(diào)度的核心(不包括資源調(diào)度)。

SparkContext的核心作用是初始化Spark應(yīng)用程序運(yùn)行所需的核心組件,包括高層調(diào)度器(DAGScheduler)、底層調(diào)度器(TaskScheduler)和調(diào)度器的通信終端(SchedulerBackend),同時還會負(fù)責(zé)Spark程序向Cluster Manager的注冊等。

img

在實(shí)際的編碼過程中,我們會先創(chuàng)建SparkConf實(shí)例,并對SparkConf的屬性進(jìn)行自定義設(shè)置,隨后,將SparkConf作為SparkContext類的唯一構(gòu)造參數(shù)傳入來完成SparkContext實(shí)例對象的創(chuàng)建。SparkContext在實(shí)例化的過程中會初始化DAGScheduler、TaskScheduler和SchedulerBackend,當(dāng)RDD的action算子觸發(fā)了作業(yè)(Job)后,SparkContext會調(diào)用DAGScheduler根據(jù)寬窄依賴將Job劃分成幾個小的階段(Stage),TaskScheduler會調(diào)度每個Stage的任務(wù)(Task),另外,SchedulerBackend負(fù)責(zé)申請和管理集群為當(dāng)前Application分配的計算資源(即Executor)。

如果我們將Spark Application比作汽車,那么SparkContext就是汽車的引擎,而SparkConf就是引擎的配置參數(shù)。

下圖描述了Spark-On-Yarn模式下在任務(wù)調(diào)度期間,ApplicationMaster、Driver以及Executor內(nèi)部模塊的交互過程:

img

Driver初始化SparkContext過程中,會分別初始化DAGScheduler、TaskScheduler、SchedulerBackend以及HeartbeatReceiver,并啟動SchedulerBackend以及HeartbeatReceiver。SchedulerBackend通過ApplicationMaster申請資源,并不斷從TaskScheduler中拿到合適的Task分發(fā)到Executor執(zhí)行。HeartbeatReceiver負(fù)責(zé)接收Executor的心跳信息,監(jiān)控Executor的存活狀況,并通知到TaskScheduler。

五、Spark任務(wù)調(diào)度機(jī)制

在工廠環(huán)境下,Spark集群的部署方式一般為YARN-Cluster模式,之后的內(nèi)核分析內(nèi)容中我們默認(rèn)集群的部署方式為YARN-Cluster模式。

Spark任務(wù)提交流程

img

? Spark YARN-Cluster模式下的任務(wù)提交流程

下面的時序圖清晰地說明了一個Spark應(yīng)用程序從提交到運(yùn)行的完整流程:

img

1、提交一個Spark應(yīng)用程序,首先通過Client向ResourceManager請求啟動一個Application,同時檢查是否有足夠的資源滿足Application的需求,如果資源條件滿足,則準(zhǔn)備ApplicationMaster的啟動上下文,交給ResourceManager,并循環(huán)監(jiān)控Application狀態(tài)。

2、當(dāng)提交的資源隊列中有資源時,ResourceManager會在某個 NodeManager上啟動ApplicationMaster進(jìn)程,ApplicationMaster會單獨(dú)啟動Driver后臺線程,當(dāng)Driver啟動后,ApplicationMaster會通過本地的RPC連接Driver,并開始向ResourceManager申請Container資源運(yùn)行Executor進(jìn)程(一個Executor對應(yīng)與一個Container),當(dāng)ResourceManager返回Container資源,ApplicationMaster則在對應(yīng)的Container上啟動Executor。

3、Driver線程主要是初始化SparkContext對象,準(zhǔn)備運(yùn)行所需的上下文,然后一方面保持與ApplicationMaster的RPC連接,通過ApplicationMaster申請資源,另一方面根據(jù)用戶業(yè)務(wù)邏輯開始調(diào)度任務(wù),將任務(wù)下發(fā)到已有的空閑Executor上。

4、當(dāng)ResourceManager向ApplicationMaster返回Container資源時,ApplicationMaster就嘗試在對應(yīng)的Container上啟動Executor進(jìn)程,Executor進(jìn)程起來后,會向Driver反向注冊,注冊成功后保持與Driver的心跳,同時等待Driver分發(fā)任務(wù),當(dāng)分發(fā)的任務(wù)執(zhí)行完畢后,將任務(wù)狀態(tài)上報給 Driver。

從上述時序圖可知,Client只負(fù)責(zé)提交Application并監(jiān)控Application 的狀態(tài)。對于Spark的任務(wù)調(diào)度主要是集中在兩個方面: 資源申請和任務(wù)分發(fā),其主要是通過ApplicationMaster、Driver以及Executor之間來完成。

Spark任務(wù)調(diào)度概述

當(dāng)Driver起來后,Driver則會根據(jù)用戶程序邏輯準(zhǔn)備任務(wù),并根據(jù)Executor資源情況逐步分發(fā)任務(wù)。在詳細(xì)闡述任務(wù)調(diào)度前,首先說明下Spark里的幾個概念。一個Spark應(yīng)用程序包括Job、Stage以及Task三個概念:

Job是以Action方法為界,遇到一個Action方法則觸發(fā)一個Job;

Stage是Job的子集,以RDD寬依賴(即 Shuffle)為界,遇到Shuffle做一次劃分;

Task是Stage的子集,以并行度(分區(qū)數(shù))來衡量,分區(qū)數(shù)是多少,則有多少個task。






Spark的任務(wù)調(diào)度總體來說分兩路進(jìn)行,一路是Stage級的調(diào)度,一路是Task級的調(diào)度,總體調(diào)度流程如下圖所示:

img

Spark RDD通過其Transactions操作,形成了RDD血緣關(guān)系圖,即DAG,最后通過Action的調(diào)用,觸發(fā)Job并調(diào)度執(zhí)行。DAGScheduler負(fù)責(zé)Stage級的調(diào)度,主要是將job切分成若干個Stage,并將每個Stage打包成TaskSet交給TaskScheduler調(diào)度。TaskScheduler負(fù)責(zé)Task級的調(diào)度,將DAGScheduler給過來的TaskSet按照指定的調(diào)度策略分發(fā)到Executor上執(zhí)行,調(diào)度過程中SchedulerBackend負(fù)責(zé)提供可用資源,其中SchedulerBackend有多種實(shí)現(xiàn),分別對接不同的資源管理系統(tǒng)。

Spark Stage級調(diào)度

Spark的任務(wù)調(diào)度是從DAG切割開始,主要是由DAGScheduler來完成。當(dāng)遇到一個Action操作后就會觸發(fā)一個Job的計算,并交給DAGScheduler來提交,下圖是涉及到Job提交的相關(guān)方法調(diào)用流程圖。

img

Job由最終的RDD和Action方法封裝而成,SparkContext 將Job交給DAGScheduler提交,它會根據(jù)RDD的血緣關(guān)系構(gòu)成的DAG進(jìn)行切分,將一個Job劃分為若干Stages,具體劃分策略是,由最終的RDD不斷通過依賴回溯判斷父依賴 是否是寬依賴,即以Shuffle為界,劃分Stage,窄依賴的RDD之間被劃分到同一個Stage中,可以進(jìn)行pipeline式的計算,如上圖紫色流程部分。劃分的Stages分兩類,一類叫做ResultStage,為DAG最下游的Stage,由Action方法決定,另一類叫做ShuffleMapStage,為下游Stage準(zhǔn)備數(shù)據(jù),下面看一個簡單的例子WordCount。

img

Job由saveAsTextFile觸發(fā),該Job由RDD-3和saveAsTextFile方法組成,根據(jù)RDD之間的依賴關(guān)系從RDD-3開始回溯搜索,直到?jīng)]有依賴的RDD-0,在回溯搜索過程中,RDD-3依賴RDD-2,并且是寬依賴,所以在RDD-2和RDD-3之間劃分Stage,RDD-3被劃到最后一個Stage,即ResultStage中,RDD-2依賴RDD-1,RDD-1依賴RDD-0,這些依賴都是窄依賴,所以將RDD-0、RDD-1和RDD-2劃分到同一個 Stage,即 ShuffleMapStage中,實(shí)際執(zhí)行的時候,數(shù)據(jù)記錄會一氣呵成地執(zhí)行RDD-0到RDD-2的轉(zhuǎn)化。不難看出,其本質(zhì)上是一個深度優(yōu)先搜索算法。一個Stage是否被提交,需要判斷它的父Stage是否執(zhí)行,只有在父Stage執(zhí)行完畢才能提交當(dāng)前Stage,如果一個Stage沒有父Stage,那么從該Stage開始提交。Stage提交時會將Task信息(分區(qū)信息以及方法等)序列化并被打包成TaskSet 交給TaskScheduler,一個Partition對應(yīng)一個Task,另一方面TaskScheduler會監(jiān)控Stage的運(yùn)行狀態(tài),只有Executor丟失或者Task由于Fetch失敗才需要重新提交失敗的Stage以調(diào)度運(yùn)行失敗的任務(wù),其他類型的Task失敗會在TaskScheduler的調(diào)度過程中重試。相對來說DAGScheduler做的事情較為簡單,僅僅是在Stage層面上劃分DAG,提交Stage并監(jiān)控相關(guān)狀態(tài)信息。TaskScheduler則相對較為復(fù)雜,下面詳細(xì)闡述其細(xì)節(jié)。

Spark Task級調(diào)度

Spark Task的調(diào)度是由TaskScheduler來完成,DAGScheduler將Stage打包到TaskSet交給TaskScheduler,TaskScheduler會將TaskSet封裝為TaskSetManager加入到調(diào)度隊列中,TaskSetManager結(jié)構(gòu)如下圖所示。

TaskSetManager負(fù)責(zé)監(jiān)控管理同一個Stage中的Tasks,TaskScheduler就是以TaskSetManager為單元來調(diào)度任務(wù)。

TaskScheduler初始化后會啟動SchedulerBackend,它負(fù)責(zé)跟外界打交道,接收Executor的注冊信息,并維護(hù)Executor的狀態(tài),所以說SchedulerBackend是管“糧食”的,同時它在啟動后會定期地去“詢問”TaskScheduler有沒有任務(wù)要運(yùn)行,也就是說,它會定期地“問”TaskScheduler“我有這么余量,你要不要啊”,TaskScheduler在SchedulerBackend“問 ”它的時候,會從調(diào)度隊列中按照指定的調(diào)度策略選擇TaskSetManager去調(diào)度運(yùn)行,大致方法調(diào)用流程如下圖所示:

img

將TaskSetManager加入rootPool調(diào)度池中之后,調(diào)用SchedulerBackend的riviveOffers方法給driverEndpoint發(fā)送ReviveOffer消息;driverEndpoint收到ReviveOffer消息后調(diào)用makeOffers方法,過濾出活躍狀態(tài)的Executor(這些Executor都是任務(wù)啟動時反向注冊到Driver的Executor),然后將Executor封裝成WorkerOffer對象;準(zhǔn)備好計算資源(WorkerOffer)后,taskScheduler基于這些資源調(diào)用resourceOffer在Executor上分配task。

六、調(diào)度策略

前面講到,TaskScheduler會先把DAGScheduler給過來的TaskSet封裝成TaskSetManager扔到任務(wù)隊列里,然后再從任務(wù)隊列里按照一定的規(guī)則把它們?nèi)〕鰜碓赟chedulerBackend給過來的Executor上運(yùn)行。這個調(diào)度過程實(shí)際上還是比較粗粒度的,是面向TaskSetManager的。調(diào)度隊列的層次結(jié)構(gòu)如下圖所示:

img

TaskScheduler是以樹的方式來管理任務(wù)隊列,樹中的節(jié)點(diǎn)類型為Schdulable,葉子節(jié)點(diǎn)為TaskSetManager,非葉子節(jié)點(diǎn)為Pool,下圖是它們之間的繼承關(guān)系。

img

TaskScheduler支持兩種調(diào)度策略,一種是FIFO,也是默認(rèn)的調(diào)度策略,另一種是FAIR。在TaskScheduler初始化過程中會實(shí)例化rootPool,表示樹的根節(jié)點(diǎn),是Pool類型。

1、FIFO調(diào)度策略

FIFO調(diào)度策略執(zhí)行步驟如下:

1)對s1和s2兩個Schedulable的優(yōu)先級(Schedulable類的一個屬性,記為priority,值越小,優(yōu)先級越高);

2)如果兩個Schedulable的優(yōu)先級相同,則對s1,s2所屬的Stage的身份進(jìn)行標(biāo)識進(jìn)行比較(Schedulable類的一個屬性,記為priority,值越小,優(yōu)先級越高);

3)如果比較的結(jié)果小于0,則優(yōu)先調(diào)度s1,否則優(yōu)先調(diào)度s2。

img

2、FAIR 調(diào)度策略

FAIR 調(diào)度策略的樹結(jié)構(gòu)如下圖所示:

img

FAIR模式中有一個rootPool和多個子Pool,各個子Pool中存儲著所有待分配的TaskSetMagager。

可以通過在Properties中指定spark.scheduler.pool屬性,指定調(diào)度池中的某個調(diào)度池作為TaskSetManager的父調(diào)度池,如果根調(diào)度池不存在此屬性值對應(yīng)的調(diào)度池,會創(chuàng)建以此屬性值為名稱的調(diào)度池作為TaskSetManager的父調(diào)度池,并將此調(diào)度池作為根調(diào)度池的子調(diào)度池。

在FAIR模式中,需要先對子Pool進(jìn)行排序,再對子Pool里面的TaskSetMagager進(jìn)行排序,因為Pool和TaskSetMagager都繼承了Schedulable特質(zhì),因此使用相同的排序算法。

排序過程的比較是基于Fair-share來比較的,每個要排序的對象包含三個屬性:runningTasks值(正在運(yùn)行的Task數(shù))、minShare值、weight值,比較時會綜合考量runningTasks值,minShare值以及weight值。

注意,minShare、weight的值均在公平調(diào)度配置文件fairscheduler.xml中被指定,調(diào)度池在構(gòu)建階段會讀取此文件的相關(guān)配置。

1)如果A對象的runningTasks大于它的minShare,B對象的runningTasks小于它的minShare,那么B排在A前面;(runningTasks比minShare小的先執(zhí)行)

2)如果A、B對象的runningTasks都小于它們的minShare,那么就比較runningTasks與minShare的比值(minShare使用率),誰小誰排前面;(minShare使用率低的先執(zhí)行)

3)如果A、B對象的runningTasks都大于它們的minShare,那么就比較runningTasks與weight的比值(權(quán)重使用率),誰小誰排前面。(權(quán)重使用率低的先執(zhí)行)

4)如果上述比較均相等,則比較名字。

整體上來說就是通過minShare和weight這兩個參數(shù)控制比較過程,可以做到讓minShare使用率和權(quán)重使用率少(實(shí)際運(yùn)行task比例較少)的先運(yùn)行。

FAIR模式排序完成后,所有的TaskSetManager被放入一個ArrayBuffer里,之后依次被取出并發(fā)送給Executor執(zhí)行。

從調(diào)度隊列中拿到TaskSetManager后,由于TaskSetManager封裝了一個Stage的所有Task,并負(fù)責(zé)管理調(diào)度這些Task,那么接下來的工作就是TaskSetManager按照一定的規(guī)則一個個取出Task給TaskScheduler,TaskScheduler再交給SchedulerBackend去發(fā)到Executor上執(zhí)行。

本地化調(diào)度

DAGScheduler切割Job,劃分Stage,通過調(diào)用submitStage來提交一個Stage對應(yīng)的tasks,submitStage會調(diào)用submitMissingTasks,submitMissingTasks確定每個需要計算的task的preferredLocations,通過調(diào)用getPreferrdeLocations()得到partition的優(yōu)先位置,由于一個partition對應(yīng)一個task,此partition的優(yōu)先位置就是task的優(yōu)先位置,對于要提交到TaskScheduler的TaskSet中的每一個task,該task優(yōu)先位置與其對應(yīng)的partition對應(yīng)的優(yōu)先位置一致。從調(diào)度隊列中拿到TaskSetManager后,那么接下來的工作就是TaskSetManager按照一定的規(guī)則一個個取出task給TaskScheduler,TaskScheduler再交給SchedulerBackend去發(fā)到Executor上執(zhí)行。前面也提到,TaskSetManager封裝了一個Stage的所有task,并負(fù)責(zé)管理調(diào)度這些task。根據(jù)每個task的優(yōu)先位置,確定task的Locality級別,Locality一共有五種,優(yōu)先級由高到低順序:

img

在調(diào)度執(zhí)行時,Spark調(diào)度總是會盡量讓每個task以最高的本地性級別來啟動,當(dāng)一個task以X本地性級別啟動,但是該本地性級別對應(yīng)的所有節(jié)點(diǎn)都沒有空閑資源而啟動失敗,此時并不會馬上降低本地性級別啟動而是在某個時間長度內(nèi)再次以X本地性級別來啟動該task,若超過限時時間則降級啟動,去嘗試下一個本地性級別,依次類推??梢酝ㄟ^調(diào)大每個類別的最大容忍延遲時間,在等待階段對應(yīng)的Executor可能就會有相應(yīng)的資源去執(zhí)行此task,這就在在一定程度上提到了運(yùn)行性能。

失敗重試與黑名單機(jī)制

除了選擇合適的Task調(diào)度運(yùn)行外,還需要監(jiān)控Task的執(zhí)行狀態(tài),前面也提到,與外部打交道的是SchedulerBackend,Task被提交到Executor啟動執(zhí)行后,Executor會將執(zhí)行狀態(tài)上報給SchedulerBackend,SchedulerBackend則告訴TaskScheduler,TaskScheduler找到該Task對應(yīng)的TaskSetManager,并通知到該TaskSetManager,這樣TaskSetManager就知道Task的失敗與成功狀態(tài),對于失敗的Task,會記錄它失敗的次數(shù),如果失敗次數(shù)還沒有超過最大重試次數(shù),那么就把它放回待調(diào)度的Task池子中,否則整個Application失敗。在記錄Task失敗次數(shù)過程中,會記錄它上一次失敗所在的ExecutorId和Host,這樣下次再調(diào)度這個Task時,會使用黑名單機(jī)制,避免它被調(diào)度到上一次失敗的節(jié)點(diǎn)上,起到一定的容錯作用。黑名單記錄Task上一次失敗所在的ExecutorId和Host,以及其對應(yīng)的“拉黑”時間,“拉黑”時間是指這段時間內(nèi)不要再往這個節(jié)點(diǎn)上調(diào)度這個Task了。

七、Spark Shuffle解析

ShuffleMapStage與FinalStage

img

在劃分stage時,最后一個stage成為FinalStage,它本質(zhì)上是一個ResultStage對象,前面的所有stage被稱為ShuffleMapStage。

ShuffleMapStage的結(jié)束伴隨著shuffle文件的寫磁盤。

ResultStage基本上對應(yīng)代碼中的action算子,即將一個函數(shù)應(yīng)用在RDD的各個partition的數(shù)據(jù)集上,意味著一個job的運(yùn)行結(jié)束。

Shuffle中的任務(wù)個數(shù)

map端task個數(shù)的確定

Shuffle過程中的task個數(shù)由RDD分區(qū)數(shù)決定,而RDD的分區(qū)個數(shù)與參數(shù)spark.default.parallelism有密切關(guān)系。

在Yarn Cluster模式下,如果沒有手動設(shè)置spark.default.parallelism,則有:

Others: total number of cores on all executor nodes or 2, whichever is larger. spark.default.parallelism = max(所有executor使用的core總數(shù),2)

如果進(jìn)行了手動配置,則:

spark.default.parallelism = 配置值

還有一個重要的配置:

The maximum number of bytes to pack into a single partition when reading files. spark.files.maxPartitionBytes = 128 M (默認(rèn))

代表著rdd的一個分區(qū)能存放數(shù)據(jù)的最大字節(jié)數(shù),如果一個400MB的文件,只分了兩個區(qū),則在action時會發(fā)生錯誤。

當(dāng)一個spark應(yīng)用程序執(zhí)行時,生成sparkContext,同時會生成兩個參數(shù),由上面得到的spark.default.parallelism推導(dǎo)出這兩個參數(shù)的值:

sc.defaultParallelism = spark.default.parallelism

sc.defaultMinPartitions = min(spark.default.parallelism,2)

當(dāng)以上參數(shù)確定后,就可以推算RDD分區(qū)數(shù)目了。

(1)通過scala集合方式parallelize生成的RDD

val rdd = sc.parallelize(1 to 10)

這種方式下,如果在parallelize操作時沒有指定分區(qū)數(shù),則有:

rdd的分區(qū)數(shù) = sc.defaultParallelism

(2)在本地文件系統(tǒng)通過textFile方式生成的RDD

val rdd = sc.textFile("path/file")

rdd的分區(qū)數(shù) = max(本地file的分片數(shù),sc.defaultMinPartitions)

(3)在HDFS文件系統(tǒng)生成的RDD

rdd的分區(qū)數(shù) = max(HDFS文件的Block數(shù)目,sc.defaultMinPartitions)

(4)從HBase數(shù)據(jù)表獲取數(shù)據(jù)并轉(zhuǎn)換為RDD

rdd的分區(qū)數(shù) = Table的region個數(shù)

(5)通過獲取json(或者parquet等等)文件轉(zhuǎn)換成的DataFrame

rdd的分區(qū)數(shù) = 該文件在文件系統(tǒng)中存放的Block數(shù)目

(6)Spark Streaming獲取Kafka消息對應(yīng)的分區(qū)數(shù)

基于Receiver:

在Receiver的方式中,Spark中的partition和kafka中的partition并不是相關(guān)的,所以如果我們加大每個topic的partition數(shù)量,僅僅是增加線程來處理由單一Receiver消費(fèi)的主題。但是這并沒有增加Spark在處理數(shù)據(jù)上的并行度。

基于DirectDStream:

Spark會創(chuàng)建跟Kafka partition一樣多的RDD partition,并且會并行從Kafka中讀取數(shù)據(jù),所以在Kafka partition和RDD partition之間,有一個一對一的映射關(guān)系。

reduce端task個數(shù)的確定

Reduce端進(jìn)行數(shù)據(jù)的聚合,一部分聚合算子可以手動指定reduce task的并行度,如果沒有指定,則以map端的最后一個RDD的分區(qū)數(shù)作為其分區(qū)數(shù),那么分區(qū)數(shù)就決定了reduce端的task的個數(shù)。

reduce端數(shù)據(jù)的讀取

根據(jù)stage的劃分我們知道,map端task和reduce端task不在相同的stage中,map task位于ShuffleMapStage,reduce task位于ResultStage,map task會先執(zhí)行,那么后執(zhí)行的reduce task如何知道從哪里去拉去map task落盤后的數(shù)據(jù)呢?

reduce端的數(shù)據(jù)拉取過程如下:

1、map task執(zhí)行完畢后會將計算狀態(tài)以及磁盤小文件位置等信息封裝到mapStatue對象中,然后由本進(jìn)程中的MapOutPutTrackerWorker對象將mapstatus對象發(fā)送給Driver進(jìn)程的MapOutPutTrackerMaster對象;

2、在reduce task開始執(zhí)行之前會先讓本進(jìn)程中的MapOutPutTrackerWorker向Driver進(jìn)程中的MapOutPutTrackerMaster發(fā)動請求,請求磁盤小文件位置信息;

3、當(dāng)所有的Map task執(zhí)行完畢后,Driver進(jìn)程中的MapOutPutTrackerMaster就掌握了所有的磁盤小文件的位置信息。此時MapOutPutTrackerMaster會告訴MapOutPutTrackerWorker磁盤小文件的位置信息;

4、完成之前的操作之后,由BlockerTransforService去Executor所在的節(jié)點(diǎn)拉數(shù)據(jù),默認(rèn)會啟動五個子線程。每次拉取的數(shù)據(jù)量不能超過48M(reduce task每次最多拉取48M數(shù)據(jù),將拉來的數(shù)據(jù)存儲到Executor內(nèi)存的20%內(nèi)存中)。

HashShuffle解析

以下的討論都假設(shè)每個Executor有一個CPU core。

1、未經(jīng)優(yōu)化的HashShuffleManager

shuffle write階段,主要就是在一個stage結(jié)束計算之后,為了下一個stage可以執(zhí)行shuffle類的算子(比如reduceByKey),而將每個task處理的數(shù)據(jù)按key進(jìn)行“劃分”。所謂“劃分”,就是對相同的key執(zhí)行hash算法,從而將相同key都寫入同一個磁盤文件中,而每一個磁盤文件都只屬于下游stage的一個task。在將數(shù)據(jù)寫入磁盤之前,會先將數(shù)據(jù)寫入內(nèi)存緩沖中,當(dāng)內(nèi)存緩沖填滿之后,才會溢寫到磁盤文件中去。

下一個stage的task有多少個,當(dāng)前stage的每個task就要創(chuàng)建多少份磁盤文件。比如下一個stage總共有100個task,那么當(dāng)前stage的每個task都要創(chuàng)建100份磁盤文件。如果當(dāng)前stage有50個task,總共有10個Executor,每個Executor執(zhí)行5個task,那么每個Executor上總共要創(chuàng)建500個磁盤文件,所有Executor上會創(chuàng)建5000個磁盤文件。由此可見,未經(jīng)優(yōu)化的shuffle write操作所產(chǎn)生的磁盤文件的數(shù)量是極其驚人的。

shuffle read階段,通常就是一個stage剛開始時要做的事情。此時該stage的每一個task就需要將上一個stage的計算結(jié)果中的所有相同key,從各個節(jié)點(diǎn)上通過網(wǎng)絡(luò)都拉取到自己所在的節(jié)點(diǎn)上,然后進(jìn)行key的集合或鏈接等操作。由于shuffle write的過程中,map task個下游stage的每個reduce task都創(chuàng)建了一個磁盤文件,因此shuffle read的過程中,每個reduce task只要從上游stage的所有map task所在的節(jié)點(diǎn)上,拉取屬于自己的那一個磁盤文件即可。

shuffle read的拉取過程是一邊拉取一邊進(jìn)行聚合的。每個shuffle read task都會有一個自己的buffer緩沖,每次都只能拉取與buffer緩沖相同大小的數(shù)據(jù),然后通過你村中的一個Map進(jìn)行聚合等操作。聚合完一批數(shù)據(jù)后,再拉取下一批數(shù)據(jù),并放到buffer緩沖中進(jìn)行聚合操作。以此類推,知道最后將所有數(shù)據(jù)到拉取完,并得到最終的結(jié)果。

未經(jīng)優(yōu)化的HashShuffleManager工作原理如下圖所示:

img

2、優(yōu)化后的HashShuffleManager

為了優(yōu)化HashShuffleManager我們可以設(shè)置一個參數(shù),spark.shuffle.consolidateFiles,該參數(shù)默認(rèn)值為false,將其設(shè)置為true即可開啟優(yōu)化機(jī)制,通常來說,如果我們使用HashShuffleManager,那么都建議開啟這個選項。

開啟consolidate機(jī)制之后,在shuffle write過程中,task就不是為了下游stage的每個task創(chuàng)建一個磁盤文件了,此時會出現(xiàn)shuffleFileGroup的概念,每個shuffleFileGroup會對應(yīng)一批磁盤文件,磁盤文件的數(shù)量與下游stage的task數(shù)量是相同的。一個Executor上有多少個CPU core,就可以并行執(zhí)行多少個task。而第一批并行執(zhí)行的每個task都會闖將一個shuffleFileGroup,并將數(shù)據(jù)寫入對應(yīng)的磁盤文件內(nèi)。

當(dāng)Executor的CPU core執(zhí)行完一批task,接著執(zhí)行下一批task時,下一批task就會復(fù)用之前已有的shuffleFileGroup,包括其中的磁盤文件,也就是說,此時task會將數(shù)據(jù)寫入已有的磁盤文件中,而不會寫入新的磁盤文件中。因此,consolidate機(jī)制允許不同的task復(fù)用同一批磁盤文件,這樣就可以有效將多個task的磁盤文件進(jìn)行一定程度上的合并,從而大幅度減少磁盤文件的數(shù)量,進(jìn)而提升shuffle write的性能。

假設(shè)第二個stage有100個task,第一個stage有50個task,總共還是有10個Executor(Executor CPU個數(shù)為1),每個Executor執(zhí)行5個task。那么原本使用未經(jīng)優(yōu)化的HashSHuffleManager時,每個Executor會產(chǎn)生500個磁盤文件,所有Executor會產(chǎn)生5000個磁盤文件的。但是此時經(jīng)過優(yōu)化之后,每個Executor創(chuàng)建的磁盤文件的數(shù)量的計算公式為:CPU core的數(shù)量 * 下一個stage的task數(shù)量,也就是說,每個Executor此時只會創(chuàng)建100個磁盤文件,所有Executor只會創(chuàng)建1000個磁盤文件。

優(yōu)化后的HashShuffleManager工作原理如下圖所示:

img

SortShuffle解析

SortShuffleManager的運(yùn)行機(jī)制主要分為兩種,一種是普通運(yùn)行機(jī)制,另一種是bypass運(yùn)行機(jī)制。當(dāng)shuffle read task的數(shù)量小于等于spark.shuffle.sort.bypassMergeThreshold參數(shù)的值時(默認(rèn)為200),就會啟用bypass機(jī)制。

1、普通運(yùn)行機(jī)制

在該模式下,數(shù)據(jù)會先寫入一個內(nèi)存數(shù)據(jù)結(jié)構(gòu)中此時根據(jù)不同的shuffle算子,可能選用不同的數(shù)據(jù)結(jié)構(gòu),如果是reduceByKey這種聚合類的shuffle算子,那么會選用Map數(shù)據(jù)結(jié)構(gòu),一邊通過Map進(jìn)行聚合,一邊寫入內(nèi)存;如果是join這種普通的shuffle算子,那么會選用Array數(shù)據(jù)結(jié)構(gòu),直接寫入內(nèi)存。接著,每寫一條數(shù)據(jù)進(jìn)如內(nèi)存數(shù)據(jù)結(jié)構(gòu)之后,就會判斷一下,是否達(dá)到了某個臨界閾值。如果達(dá)到臨界閾值的話,那么就會嘗試將內(nèi)存數(shù)據(jù)結(jié)構(gòu)中的數(shù)據(jù)溢寫到磁盤,然后清空內(nèi)存數(shù)據(jù)結(jié)構(gòu)。

在溢寫到磁盤文件之前,會先根據(jù)key對內(nèi)存數(shù)據(jù)結(jié)構(gòu)中已有的數(shù)據(jù)進(jìn)行排序。排序過后,會分批將數(shù)據(jù)寫入磁盤文件。默認(rèn)的batch數(shù)量是10000條,也就是說,排序好的數(shù)據(jù),會以每批1萬條數(shù)據(jù)的形式分批寫入磁盤文件。寫入磁盤文件是通過Java的BufferedOutputStream實(shí)現(xiàn)的。BufferedOutputStream是Java的緩沖輸出流,首先會將數(shù)據(jù)緩沖在內(nèi)存中,當(dāng)內(nèi)存緩沖滿溢之后再一次寫入磁盤文件中,這樣可以減少磁盤IO次數(shù),提升性能。

一個task將所有數(shù)據(jù)寫入內(nèi)存數(shù)據(jù)結(jié)構(gòu)的過程中,會發(fā)生多次磁盤溢寫操作,也就會產(chǎn)生多個臨時文件。最后會將之前所有的臨時磁盤文件都進(jìn)行合并,這就是merge過程,此時會將之前所有臨時磁盤文件中的數(shù)據(jù)讀取出來,然后依次寫入最終的磁盤文件之中。此外,由于一個task就只對應(yīng)一個磁盤文件,也就意味著該task為下游stage的task準(zhǔn)備的數(shù)據(jù)都在這一個文件中,一次你還會單獨(dú)寫一份索引文件,其中標(biāo)識了下游各個task的數(shù)據(jù)在文件中的start offset與end offset。

SortShuffleManager由于有一個磁盤文件merge的過程,因此大大減少了文件數(shù)量。比如第一個stage有50個task,總共有10個Executor,每個Executor執(zhí)行5個task,而第二個stage有100個task。由于每個task最終只有一個磁盤文件,因此此時每個Executor上只有5個磁盤文件,所有Executor只有50個磁盤文件。

普通運(yùn)行機(jī)制的SortShuffleManager工作原理如下圖所示:

img

2、bypass運(yùn)行機(jī)制

bypass運(yùn)行機(jī)制的觸發(fā)條件如下:

(1)shuffle map task數(shù)量小于spark.shuffle.sort.bypassMergeThreshold參數(shù)的值。

(2)不是聚合類的shuffle算子。

此時,每個task會為每個下游task都創(chuàng)建一個臨時磁盤文件,并將數(shù)據(jù)按key進(jìn)行hash然后根據(jù)key的hash值,將key寫入對應(yīng)的磁盤文件之中。當(dāng)然,寫入磁盤文件時也是先寫入內(nèi)存緩沖,緩沖寫滿之后再溢寫到磁盤文件的。最后,同樣會將所有臨時磁盤文件都合并成一個磁盤文件,并創(chuàng)建一個單獨(dú)的索引文件。

該過程的磁盤寫機(jī)制其實(shí)跟未經(jīng)優(yōu)化的HashShuffleManager是一模一樣的,因為都要創(chuàng)建數(shù)量驚人的磁盤文件,只是在最后會做一個磁盤文件的合并而已。因此少量的最終磁盤文件,也讓該機(jī)制相對未經(jīng)優(yōu)化的HashShuffleManager來說,shuffleread的性能會更好。

而該機(jī)制與普通SortShuffleManager運(yùn)行機(jī)制的不同在于:第一,磁盤寫機(jī)制不同;第二,不會進(jìn)行排序。也就是說,啟用該機(jī)制的最大好處在于,shuffle write過程中,不需要進(jìn)行數(shù)據(jù)的排序操作,也就節(jié)省掉了這部分的性能開銷。

普通運(yùn)行機(jī)制的SortShuffleManager工作原理如下圖所示:

img

八、Spark內(nèi)存管理

在執(zhí)行Spark應(yīng)用程序時,Spark集群會啟動Driver和Executor兩種JVM進(jìn)程,前者為主控進(jìn)程,負(fù)責(zé)創(chuàng)建Spark上下文,提交Spark作業(yè)(Job),并將作業(yè)轉(zhuǎn)化為計算任務(wù)(Task),在各個Executor進(jìn)程間協(xié)調(diào)任務(wù)的調(diào)度,后者負(fù)責(zé)在工作節(jié)點(diǎn)上執(zhí)行具體的計算任務(wù),并將結(jié)果返回給Driver,同時為需要持久化的RDD提供存儲功能。

堆內(nèi)和堆外內(nèi)存規(guī)劃

作為一個JVM進(jìn)程,Executor的內(nèi)存管理建立在JVM的內(nèi)存管理之上,Spark對JVM的堆內(nèi)(On-heap)空間進(jìn)行了更為詳細(xì)的分配,以充分利用內(nèi)存。同時,Spark引入了堆外(Off-heap)內(nèi)存,使之可以直接在工作節(jié)點(diǎn)的系統(tǒng)內(nèi)存中開辟空間,進(jìn)一步優(yōu)化了內(nèi)存的使用。

堆內(nèi)內(nèi)存受到JVM統(tǒng)一管理,堆外內(nèi)存是直接向操作系統(tǒng)進(jìn)行內(nèi)存的申請和釋放。

1、堆內(nèi)內(nèi)存

堆內(nèi)內(nèi)存的大小,由Spark應(yīng)用程序啟動時的- executor-memory或spark.executor.memory參數(shù)配置。Executor內(nèi)運(yùn)行的并發(fā)任務(wù)共享JVM堆內(nèi)內(nèi)存,這些任務(wù)在緩存RDD數(shù)據(jù)和廣播(Broadcast)數(shù)據(jù)時占用的內(nèi)存被規(guī)劃為存儲(Storage)內(nèi)存,而這些任務(wù)在執(zhí)行Shuffle時占用的內(nèi)存被規(guī)劃為執(zhí)行(Execution)內(nèi)存,剩余的部分不做特殊規(guī)劃,那些Spark內(nèi)部的對象實(shí)例,或者用戶定義的Spark應(yīng)用程序中的對象實(shí)例,均占用剩余的空間。不同的管理模式下,這三部分占用的空間大小各不相同。

Spark對堆內(nèi)內(nèi)存的管理是一種邏輯上的俄“規(guī)劃式”的管理,因為對象實(shí)例占用內(nèi)存的申請和釋放都由JVM完成,Spark只能在申請后和釋放前記錄這些內(nèi)存。其具體流程如下:

1、Spark在代碼中new一個對象實(shí)例;

2、JVM從堆內(nèi)內(nèi)存分配空間,創(chuàng)建對象并返回對象引用;

3、Spark保存該對象的引用,記錄該對象占用的內(nèi)存。

釋放內(nèi)存流程如下:

1、Spark記錄該對象釋放的內(nèi)存,刪除該對象的引用;

2、等待JVM的垃圾回收機(jī)制釋放該對象占用的堆內(nèi)內(nèi)存。

我們知道,JVM的對象可以以序列化的方式存儲,序列化的過程是將對象轉(zhuǎn)換為二進(jìn)制字節(jié)流,本質(zhì)上可以理解為將非連續(xù)空間的鏈?zhǔn)酱鎯D(zhuǎn)化為連續(xù)空間或塊存儲,在訪問時則需要進(jìn)行序列化的逆過程--反序列化,將字節(jié)流轉(zhuǎn)化為對象,序列化的方式可以節(jié)省存儲空間,但增加了存儲和讀取時候的計算開銷。

對于Spark中序列化的對象,由于是字節(jié)流的形式,其占用的內(nèi)存大小可直接計算,而對于非序列化的對象,其占用的內(nèi)存是通過周期性地采樣近似估算而得,即并不是每次新增的數(shù)據(jù)項都會計算一次占用的內(nèi)存大小,這種方法降低了時間開銷但是有可能誤差較大,導(dǎo)致某一時刻的實(shí)際內(nèi)存可能遠(yuǎn)遠(yuǎn)超出預(yù)期。此外,在被Spark標(biāo)記為釋放的對象實(shí)例,很有可能在實(shí)際上并沒有被JVM回收,導(dǎo)致實(shí)際可用的內(nèi)存小于Spark記錄的可用內(nèi)存。所以Spark并不能準(zhǔn)確記錄實(shí)際可用的堆內(nèi)內(nèi)存,從而也就無法完全避免內(nèi)存溢出(OOM,Out of Memory)的異常。

雖然不能精確控制堆內(nèi)內(nèi)存的申請和釋放,但Spark通過對存儲內(nèi)存和執(zhí)行內(nèi)存各自獨(dú)立的規(guī)劃管理,可以決定是否要在存儲內(nèi)存里緩沖新的RDD,以及是否為新的任務(wù)分配執(zhí)行內(nèi)存,在一定程度上可以提升內(nèi)存的利用率,減少異常的出現(xiàn)。

2、堆外內(nèi)存

為了進(jìn)一步優(yōu)化內(nèi)存的使用以及提高Shuffle時排序的效率,Spark引入了堆外(Off-heap)內(nèi)存,使之可以直接在工作節(jié)點(diǎn)的系統(tǒng)內(nèi)存中開辟空間,存儲經(jīng)過序列化的二進(jìn)制數(shù)據(jù)。

堆外內(nèi)存意味著把內(nèi)存對象分配在Java虛擬機(jī)的堆以外的內(nèi)存,這些內(nèi)存直接受操作系統(tǒng)管理(而不是虛擬機(jī))。這樣做的結(jié)果就是能保持一個較小的堆,以減少垃圾收集對應(yīng)用的影響。

利用JDK Unsafe API(從spark2.0開始,在管理堆外的存儲內(nèi)存時不再基于Tachyon,而是與堆外的執(zhí)行內(nèi)存一樣,基于JDK Unsafe API實(shí)現(xiàn)),Spark可以直接操作系統(tǒng)堆外內(nèi)存,減少了不必要的內(nèi)存開銷,以及頻繁的GC掃描和回收,提升了處理性能。堆外內(nèi)存可以被精確地申請和釋放(堆外內(nèi)存之所以能夠被精確的申請和釋放,是由于內(nèi)存的申請和釋放不再通過JVM機(jī)制,而是直接向操作系統(tǒng)申請,JVM對于內(nèi)存的清理是無法準(zhǔn)確指定時間點(diǎn)的,因此無法實(shí)現(xiàn)精確的釋放),而且序列化的數(shù)據(jù)占用的空間可以被精確計算,所以相比堆內(nèi)內(nèi)存來說降低了管理的難度,也降低了誤差。

在默認(rèn)情況下堆外內(nèi)存并不啟用,可以通過配置spark.memory.offHeap.enabled參數(shù)啟用,并由spark.memory.offHeap.size參數(shù)設(shè)定堆外空間的大小。除了沒有other空間,堆外內(nèi)存與堆內(nèi)內(nèi)存的劃分方式相同,所有運(yùn)行中的并發(fā)任務(wù)共享存儲內(nèi)存和執(zhí)行內(nèi)存。

(該部分內(nèi)存主要用于程序的共享庫,Perm Space、線程Stack和一些Memory mapping等,或者類C方式allocate object)

內(nèi)存空間分配

1、靜態(tài)內(nèi)存管理

在Spark最初采用的靜態(tài)內(nèi)存管理機(jī)制下,存儲內(nèi)存、執(zhí)行內(nèi)存和其他內(nèi)存的大小在Spark應(yīng)用程序運(yùn)行期間均為固定的,但用戶可以應(yīng)用程序啟動前進(jìn)行配置,堆內(nèi)內(nèi)存的分配如下圖所示:

img

可以看到,可用的堆內(nèi)內(nèi)存的大小需要按照代碼清單的方式計算:

可用的存儲內(nèi)存 = systemMaxMemory spark.storage.memoryFraction spark.storage.safety Fraction

可用的執(zhí)行內(nèi)存 = systemMaxMemory spark.shuffle.memoryFraction spark.shuffle.safety Fraction






其中systemMaxMemory取決于當(dāng)前JVM堆內(nèi)內(nèi)存的大小,最后可用的執(zhí)行內(nèi)存或者存儲內(nèi)存要在此基礎(chǔ)上與各自的memoryFraction參數(shù)和safetyFraction參數(shù)相乘得出。上述計算公式中的兩個safetyFraction參數(shù),其意義在于在邏輯預(yù)留出1-safetyFraction這么一塊保險區(qū)域,降低因?qū)嶋H內(nèi)存超出當(dāng)前預(yù)設(shè)范圍而導(dǎo)致OOM的風(fēng)險(上文提到,對于非序列化對象的內(nèi)存采樣估算會產(chǎn)生誤差)。值得注意的是,這個預(yù)留的保險區(qū)域僅僅是一種邏輯上的規(guī)劃,再具體使用時Spark并沒有區(qū)別對待,和“其他內(nèi)存”一樣交給了JVM去管理。

Storage內(nèi)存和Executor內(nèi)存都有預(yù)留空間,目的是防止OOM,因為Spark堆內(nèi)內(nèi)存大小的記錄是不準(zhǔn)確的,需要留出保險區(qū)域。

堆外的空間分配較為簡單,只有存儲內(nèi)存和執(zhí)行內(nèi)存??捎玫膱?zhí)行內(nèi)存和存儲內(nèi)存占用的空間大小直接由參數(shù)spark.memory.storageFraction決定,由于堆外內(nèi)存占用的空間可以被精確計算,所以無需再設(shè)定保險區(qū)域。

img

靜態(tài)內(nèi)存管理機(jī)制實(shí)現(xiàn)起來較為簡單,但如果用戶不熟悉Spark的鵆機(jī)制,或沒有根據(jù)具體的數(shù)據(jù)規(guī)模和計算任務(wù)或做相應(yīng)的配置,很容易造成“一般海水,一般火焰”的局面,即存儲內(nèi)存和執(zhí)行內(nèi)存中的一方剩余大量的空間,而另一方卻早早被占滿,不得不淘汰或移出舊的內(nèi)容以存儲新的內(nèi)容。由于新的內(nèi)存管理機(jī)制的出現(xiàn),這種方式目前已經(jīng)很少有開發(fā)者使用,出于兼容舊版本的應(yīng)用程序的目的,Spark依然保留了它的實(shí)現(xiàn)。

2、統(tǒng)一內(nèi)存管理

Spark1.6之后引入的統(tǒng)一內(nèi)存管理機(jī)制,與靜態(tài)內(nèi)存管理的區(qū)別在于存儲內(nèi)存和執(zhí)行內(nèi)存共享同一塊空間,可以動態(tài)占用對方的空閑區(qū)域,統(tǒng)一內(nèi)存管理的堆內(nèi)內(nèi)存結(jié)構(gòu)如下圖所示:

img

統(tǒng)一內(nèi)存管理的堆外內(nèi)存結(jié)構(gòu)如下圖所示:

img

其中最重要的優(yōu)化在于動態(tài)占用機(jī)制,其規(guī)則如下:

1、設(shè)定基本的存儲內(nèi)存和執(zhí)行內(nèi)存區(qū)域(spark.storage.storageFraction參數(shù)),該設(shè)定確定了雙方各自擁有的空間的范圍;

2、雙方的空間都不足時,則存儲到磁盤;若己方空間不足而對方空余時,可借用對方的空間;(存儲空間不足是指不足以放下一個完整的Block)

3、執(zhí)行內(nèi)存的空間被對方占用后,可讓對方將占用的部分轉(zhuǎn)存到磁盤,然后“歸還”借用的空間;

4、存儲內(nèi)存的空間被對方占用后,無法讓對方“歸還”,因為需要考慮Shuffle過程中的很多因素,實(shí)現(xiàn)起來較為復(fù)雜。

統(tǒng)一內(nèi)存管理的動態(tài)占用機(jī)制如下圖所示:

img

憑借統(tǒng)一內(nèi)存管理機(jī)制,spark在一定程度上提高了堆內(nèi)和堆外內(nèi)存資源的利用率,降低了開發(fā)者維護(hù)spark內(nèi)存的難度。如果存儲內(nèi)存的空間太大或者說緩存的數(shù)據(jù)過多,反而會導(dǎo)致頻繁的全量垃圾回收,降低任務(wù)執(zhí)行時的性能,因為緩存的RDD數(shù)據(jù)通常都是長期主流內(nèi)存的。所以要想充分發(fā)揮Spark的性能,需要開發(fā)者進(jìn)一步了解存儲內(nèi)存和執(zhí)行內(nèi)存各自管理方式和實(shí)現(xiàn)原理。

存儲內(nèi)存管理

1、RDD持久化機(jī)制

彈性分布式數(shù)據(jù)集(RDD)作為Spark最根本的數(shù)據(jù)抽象,是只讀的分區(qū)記錄(Partition)的集合,只能基于在穩(wěn)定物理存儲中的數(shù)據(jù)集上創(chuàng)建,或者在其他已有的RDD上執(zhí)行轉(zhuǎn)換(Transformation)操作產(chǎn)生一個新的RDD。轉(zhuǎn)換后的RDD與原始的RDD之間產(chǎn)生了依賴關(guān)系,構(gòu)成了血統(tǒng)(Lineage)。憑借血統(tǒng),Spark保證了每一個RDD都可以被重新恢復(fù)。但是RDD的所有轉(zhuǎn)換都是有惰性的,即只有當(dāng)一個返回結(jié)果給Driver的行動(Action)發(fā)生時,Spark才會創(chuàng)建任務(wù)讀取RDD,然后真正觸發(fā)轉(zhuǎn)換的執(zhí)行。

Task在啟動之初讀取一個分區(qū)時,會先判斷這個分區(qū)是否已經(jīng)被持久化,如果沒有則需要檢查Checkpoint或按照血統(tǒng)重新計算。所以如果一個RDD上要執(zhí)行多次行動,可以在第一次行動中使用persist或cache方法,在內(nèi)存或磁盤中持久化或緩存這個RDD,從而在后面的行動中提升計算速度。

事實(shí)上,cache方法是使用默認(rèn)的MEMORY_ONLY的存儲級別將RDD持久化到內(nèi)存,故緩存是一種特殊的持久化。堆內(nèi)和堆外存儲內(nèi)存的設(shè)計,便可以對緩存RDD時使用的內(nèi)存做統(tǒng)一的規(guī)劃和管理。

RDD的持久化由Spark的Storage模塊負(fù)責(zé),實(shí)現(xiàn)了RDD與物理存儲的解耦合。Storage模塊負(fù)責(zé)管理Spark在計算過程中產(chǎn)生的數(shù)據(jù),將那些在內(nèi)存或磁盤、在本地或遠(yuǎn)程存取數(shù)據(jù)的功能封裝了起來。在具體實(shí)現(xiàn)時Driver端和Executor端的Storage模塊構(gòu)成了主從式的架構(gòu),即Driver端的BlockManager為Master,Executor端的BlockManager為Slave。

Storage模塊在邏輯上以Block為基本存儲單位,RDD的每個Partition經(jīng)過處理后位移對應(yīng)一個Block(BlockId的格式為rdd_RDD-ID_PARTITION-ID)。Driver端的Master負(fù)責(zé)整個Spark應(yīng)用程序的Block的元數(shù)據(jù)信息的管理和維護(hù),而Executor端的Slave需要將Block的更新等狀態(tài)上報到Master,同時接受Master的命令,例如新增或刪除一個RDD。

img

在對RDD持久化時,Spark規(guī)定了MEMORY_ONLY、MEMORY_AND_DISK等7中不同的存儲級別,而存儲級別是以下5個變量的組合:

class StorageLevel private(

private var _useDisk: Boolean, //磁盤

private var _useMemory: Boolean, //這里其實(shí)是指堆內(nèi)內(nèi)存

private var _useOffHeap: Boolean, //堆外內(nèi)存

private var _deserialized: Boolean, //是否為非序列化

private var _replication: Int = 1 //副本個數(shù)

)

Spark中7中存儲級別如下:

img

通過對數(shù)據(jù)結(jié)構(gòu)的分析,可以看出存儲級別從三個維度定義了RDD的Partition(同時也就是Block)的存儲方式:

(1)存儲位置:磁盤/堆內(nèi)內(nèi)存/堆外內(nèi)存。如MEMORY_AND_DISK是同時在磁盤和堆內(nèi)內(nèi)存上存儲,實(shí)現(xiàn)了冗余備份。OFF_HEAP則是只在堆外內(nèi)存存儲,目前選擇堆外內(nèi)存時不能同時存儲到其他位置。

(2)存儲形式:Block緩存到存儲內(nèi)存后,是否為非序列化的形式。如MEMORY_ONLY是非序列化方式存儲,OFF_HEAP是序列化方式存儲。

(3)副本數(shù)量:大于1時需要遠(yuǎn)程冗余備份到其他節(jié)點(diǎn)。如DISK_ONLY_2需要遠(yuǎn)程備份1個副本。

2、RDD的緩存過程

RDD在緩存到存儲內(nèi)存之前,Partition中的數(shù)據(jù)一般以迭代器(Iterator)的數(shù)據(jù)結(jié)構(gòu)來訪問,這是Scala語言中一種遍歷數(shù)據(jù)集合的方法。通過Iterator可以獲取分區(qū)中每一條序列化或者非序列化的數(shù)據(jù)項(Record),這些Record的對象實(shí)例在邏輯上占用了JVM堆內(nèi)內(nèi)存的other部分的空間,同一Partition的不同Record的存儲空間并不連續(xù)。

RDD在緩存到存儲內(nèi)存之后,Partition被轉(zhuǎn)換成Block,Record在堆內(nèi)或堆外存儲內(nèi)存中占用一塊連續(xù)的空間。將Partition由不連續(xù)的存儲空間轉(zhuǎn)換為連續(xù)存儲空間的過程,Spark稱之為“展開”(Unroll)。

Block有序列化和非序列化兩種存儲格式,具體以哪種方式取決于該RDD的存儲級別。非序列化的Block以一種DeserializedMemoryEntry的數(shù)據(jù)結(jié)構(gòu)定義,用一個數(shù)組存儲所有的對象實(shí)例,序列化的Block則以SerializedMemoryEntry的數(shù)據(jù)結(jié)構(gòu)定義,用字節(jié)緩沖區(qū)(ByteBuffer)來存儲二進(jìn)制數(shù)據(jù)。每個Executor的Storage模塊用一個鏈?zhǔn)組ap結(jié)構(gòu)(LinkedHashMap)來管理堆內(nèi)和堆外存儲內(nèi)存中所有的Block對象的實(shí)例,對這個LinkedHashMap新增和刪除間接記錄了內(nèi)存的申請和釋放。

因為不能保證存儲空間可以一次容納Iterator中的所有數(shù)據(jù),當(dāng)前的計算任務(wù)在Unroll時要向MemoryManager申請足夠的Unroll空間來臨時占位,空間不足則Unroll失敗,空間足夠時可以繼續(xù)進(jìn)行。

對于序列化的Partition,其所需的Unroll空間可以直接累加計算,一次申請。

對于非序列化的Partition則要在便利Record的過程中一次申請,即每讀取一條Record,采樣估算其所需的Unroll空間并進(jìn)行申請,空間不足時可以中斷,釋放已占用的Unroll空間。

如果最終Unroll成功,當(dāng)前Partition所占用的Unroll空間被轉(zhuǎn)換為正常的緩存RDD的存儲空間,如下圖所示。

img

在靜態(tài)內(nèi)存管理時,Spark在存儲內(nèi)存中專門劃分了一塊Unroll空間,其大小是固定的,統(tǒng)一內(nèi)存管理時則沒有對Unroll空間進(jìn)行特別區(qū)分,當(dāng)存儲空間不足時會根據(jù)動態(tài)占用機(jī)制進(jìn)行處理。

3、淘汰與落盤

由于同一個Executor的所有的計算任務(wù)共享有限的存儲內(nèi)存空間,當(dāng)有新的Block需要緩存單數(shù)剩余空間不足且無法動態(tài)占用時,就要對LinkedHashMap中的舊Block進(jìn)行淘汰(Eviction),而被淘汰的Block如果其存儲級別中同時包含存儲到磁盤的要求,則要對其進(jìn)行落盤(Drop),否則直接刪除該Block。

存儲內(nèi)存的淘汰規(guī)則為:

被淘汰的舊Block要與新的Block的MemoryMode相同,即同屬于堆外或堆內(nèi)內(nèi)存;

新舊Block不能屬于同一個RDD,避免循環(huán)淘汰;

舊Block所屬RDD不能處于被讀狀態(tài),避免引發(fā)一致性問題;

遍歷LinkedHashMap中Block,按照最近最少使用(LRU)的順序淘汰,直到滿足新Block所需的空間。其中LRU是LinkedHashMap的特性。

落盤的流程則比較簡單,如果其存儲級別符合_useDisk為true的條件,再根據(jù)其_deserialized判斷是否是非序列化的形式,若是則對其進(jìn)行序列化,最后將數(shù)據(jù)存儲到磁盤,在Storage模塊中更新其信息。

執(zhí)行內(nèi)存管理

執(zhí)行內(nèi)存主要用來存儲任務(wù)再在執(zhí)行Shuffle時占用的內(nèi)存,Shuffle是按照一定規(guī)則對RDD數(shù)據(jù)重新分區(qū)的過程,Shuffle的Write和Read兩階段對執(zhí)行內(nèi)存的使用:

Shuffle Write

在map端會采用ExternalSorter進(jìn)行外排,在內(nèi)存中存儲數(shù)據(jù)時主要占用堆內(nèi)執(zhí)行空間。

Shuffle Read

(1)在對reduce端的數(shù)據(jù)進(jìn)行聚合時,要將數(shù)據(jù)交給Aggregator處理,在內(nèi)存中存儲數(shù)據(jù)時占用堆內(nèi)執(zhí)行空間。

(2)如果需要進(jìn)行最終結(jié)果排序,則要將再次將數(shù)據(jù)交給ExternalSorter處理,占用堆內(nèi)執(zhí)行空間。

在ExternalSorter和Aggregator中,Spark會使用一種叫做AppendOnlyMap的哈希表在堆內(nèi)執(zhí)行內(nèi)存中存儲數(shù)據(jù),但是Shuffle過程中所有數(shù)據(jù)并不能都保存到該哈希表中,當(dāng)這個哈希表占用的內(nèi)存會進(jìn)行周期性地采樣估算,當(dāng)其大到一定程度,無法再從MemoryManager申請到新的執(zhí)行內(nèi)存時,Spark就會將其全部內(nèi)容存儲到磁盤文件中,這個過程被稱為溢存(Spill),溢存到磁盤的文件最后會被歸并(Merge)。

Spark的存儲內(nèi)存和執(zhí)行內(nèi)存有著截然不同的管理方式:對于存儲內(nèi)存來說,Spark用一個LinkedHashMap來集中管理所有的Block,Block由需要緩存的RDD的Partition轉(zhuǎn)化而成;而對于執(zhí)行內(nèi)存,Spark用AppendOnlyMap來存儲Shuffle過程中的數(shù)據(jù),在Tungsten排序中甚至抽象稱為頁式內(nèi)存管理,開辟了全新的JVM內(nèi)存管理機(jī)制。

九、Spark核心組件解析

BlockManager數(shù)據(jù)存儲與管理機(jī)制

BlockManager是整個Spark底層負(fù)責(zé)數(shù)據(jù)存儲與管理的一個組件,Driver和Executor的所有數(shù)據(jù)都由對應(yīng)的BlockManager進(jìn)行管理。

Driver上有BlockManagerMaster,負(fù)責(zé)對各個節(jié)點(diǎn)上的BlockManager內(nèi)部管理的數(shù)據(jù)的元數(shù)據(jù)進(jìn)行維護(hù),比如block的增刪改等操作,都會在這里維護(hù)好元數(shù)據(jù)的變更。

每個節(jié)點(diǎn)都有一個BlockManager,每個BlockManager創(chuàng)建之后,第一件事即使去向BlockManagerMaster進(jìn)行注冊,此時BlockManagerMaster會為其創(chuàng)建對應(yīng)的BlockManagerInfo。

img

BlockManagerMaster與BlockManager的關(guān)系非常像NameNode與DataNode的關(guān)系,BlockManagerMaster中保存BlockManager內(nèi)部管理數(shù)據(jù)的元數(shù)據(jù),進(jìn)行維護(hù),當(dāng)BlockManager進(jìn)行Block增刪改等操作時,都會在BlockManagerMaster中進(jìn)行元數(shù)據(jù)的變更,這與NameNode維護(hù)DataNode的元數(shù)據(jù)信息,DataNode中數(shù)據(jù)發(fā)生變化時NameNode中的元數(shù)據(jù)也會相應(yīng)變化是一致的。

每個節(jié)點(diǎn)上都有一個BlockManager,BlockManager中有三個非常重要的組件:

DisStore:負(fù)責(zé)對磁盤數(shù)據(jù)進(jìn)行讀寫;

MemoryStore:負(fù)責(zé)對內(nèi)存數(shù)據(jù)進(jìn)行讀寫;

BlockTransferService:負(fù)責(zé)建立BlockManager到遠(yuǎn)程其他節(jié)點(diǎn)的BlockManager的連接,負(fù)責(zé)對遠(yuǎn)程其他節(jié)點(diǎn)的BlockManager的數(shù)據(jù)進(jìn)行讀寫;

每個BlockManager創(chuàng)建之后,做的第一件事就是向BlockManagerMaster進(jìn)行注冊,此時BlockManagerMaster會為其創(chuàng)建對應(yīng)的BlockManagerInfo。

使用BlockManager進(jìn)行寫操作時,比如說,RDD運(yùn)行過程中的一些中間數(shù)據(jù),或者我們手動指定了persist(),會優(yōu)先將數(shù)據(jù)寫入內(nèi)存中,如果內(nèi)存大小不夠,會使用自己的算法,將內(nèi)存中的部分?jǐn)?shù)據(jù)寫入磁盤;此外,如果persist()指定了要replica,那么會使用BlockTransferService將數(shù)據(jù)replicate一份到其他節(jié)點(diǎn)的BlockManager上去。

使用BlockManager進(jìn)行讀操作時,比如說,shuffleRead操作,如果能從本地讀取,就利用DisStore或者M(jìn)emoryStore從本地讀取數(shù)據(jù),但是本地沒有數(shù)據(jù)的話,那么會用BlockTransferService與有數(shù)據(jù)的BlockManager建立連接,然后用BlockTransferService從遠(yuǎn)程BlockManager讀取數(shù)據(jù);例如,shuffle Read操作中,很有可能要拉取的數(shù)據(jù)本地沒有,那么此時就會從遠(yuǎn)程有數(shù)據(jù)的節(jié)點(diǎn)上,找那個節(jié)點(diǎn)的BlockManager來拉取需要的數(shù)據(jù)。

只要使用BlockManager執(zhí)行了數(shù)據(jù)增刪改的操作,那么必須將Block的BlockStatus上報到BlockManagerMaster,在BlockManagerMaster上會對指定BlockManager的BlockManagerInfo內(nèi)部的BlockStatus進(jìn)行增刪改操作,從而達(dá)到元數(shù)據(jù)的維護(hù)功能。

Spark共享變量底層實(shí)現(xiàn)

Spark一個非常重要的特性就是共享變量。

默認(rèn)情況下,如果在一個算子的函數(shù)中使用到了某個外部的變量,那么這個變量的值會被拷貝到每個task中,此時每個task只能操作自己的那份變量副本。如果多個task想要共享某個變量,那么這種方式是做不到的。

Spark為此提供了兩種共享變量,一種是Broadcast Variable(廣播變量),另一種是Accumulator(累加變量)。Broadcast Variable會將用到的變量,僅僅為每個節(jié)點(diǎn)拷貝一份,即每個Executor拷貝一份,更大的用途是優(yōu)化性能,見上網(wǎng)絡(luò)傳輸以及內(nèi)存損耗。Accumulator則可以讓多個task共同操作一份變量,主要可以進(jìn)行累加操作。Broadcast Variable是共享讀變量,task不能去修改它,而Accumulator可以讓多個task操作一個變量。

廣播變量

廣播變量允許編程者在每個Executor上暴力外部數(shù)據(jù)的只讀變量,而不是給每個任務(wù)發(fā)送一個副本。

每個task都會保存一份它所使用的外部變量的副本,當(dāng)一個Executor上的多個task都使用一個外部變量時,對于Executor內(nèi)存的消耗是非常大的,因此,我們可以將大型外部變量封裝為廣播變量,此時一個Executor保存一個變量副本,此Executor上的所有task共用此變量,不再是一個task單獨(dú)保存一個副本,這在一定程度上降低了Spark任務(wù)的內(nèi)存占用。

使用外部變量

使用廣播變量

Spark還嘗試使用高效的廣播算法分發(fā)廣播變量,以降低通信成本。

Spark提供的Broadcast Variable是只讀的,并且在每個Executor上只會有一個副本,而不會為每個task都拷貝一份副本,因此,它的最大作用,就是減少變量到各個節(jié)點(diǎn)的網(wǎng)絡(luò)傳輸消耗,以及在各個節(jié)點(diǎn)上的內(nèi)存消耗。此外,Spark內(nèi)部也是用了高效的廣播算法來減少網(wǎng)絡(luò)消耗。

可以通過調(diào)用SparkContext的broadcast()方法來針對每個變量創(chuàng)建廣播變量。然后再算子的函數(shù)內(nèi),使用到廣播變量時,每個Executor只會拷貝一份副本了,每個task可以使用廣播變量的value()方法獲取值。

在任務(wù)運(yùn)行時,Executor并不獲取廣播變量,當(dāng)task執(zhí)行到使用廣播變量的代碼時,會向Executor的內(nèi)存中請求廣播變量,如下圖所示:

img

之后Executor會通過BlockManager向Driver拉取廣播變量,然后提供給task進(jìn)行使用,如下圖所示:

img

廣播大變量是Spark中常用的基礎(chǔ)優(yōu)化方法,通過減少內(nèi)存占用實(shí)現(xiàn)任務(wù)執(zhí)行性能的提升。

累加器

累加器(accumulator):Accumulator是僅僅被相關(guān)操作累加的變量,因此可以在并行中被有效地支持。它們可用于實(shí)現(xiàn)計數(shù)器(如MapReduce)或總和計數(shù)。

Accumulator是存在于Driver端的,集群上運(yùn)行的task進(jìn)行Accumulator的累加,隨后把值發(fā)送到Driver端,在Driver端匯總(Spark UI在SparkContext創(chuàng)建時被創(chuàng)建,即在Driver端被創(chuàng)建,因此它可以讀取Accumulator的數(shù)值),由于Accumulator存在于Driver端,從節(jié)點(diǎn)讀取不到Accumulator的數(shù)值。

Spark提供的Accumulator主要用于多個節(jié)點(diǎn)對一個變量進(jìn)行共享性的操作。Accumulator只提供了累加的功能,但是卻給我們提供了多個task對于同一個變量并行操作的功能,但是task只能對Accumulator進(jìn)行累加操作,不能讀取它的值,只有Driver程序可以讀取Accumulator的值。

Accumulator的底層原理如下圖所示:

img








作者:柯廣的網(wǎng)絡(luò)日志

微信公眾號:Java大數(shù)據(jù)與數(shù)據(jù)倉庫