Spark傾斜調(diào)優(yōu)(建議收藏)
Spark 優(yōu)化
一、概述
粉絲朋友們大家好!我是唐三少,大家在工作或者自己練習(xí)比較"幸運"的時候,會遇到數(shù)據(jù)傾斜或者shuffle優(yōu)化等幸福的事情,今天三少就和大家一起討論一下,遇到數(shù)據(jù)傾斜和shuffle優(yōu)化時我們應(yīng)該怎么去做,往哪個方向撞南墻。come on my fans!
二、shuffle優(yōu)化
(一)性能調(diào)優(yōu)
1.分配更多資源
在生產(chǎn)環(huán)境中,提交spark作業(yè)時,用的spark-submit shell腳本,里面調(diào)整對應(yīng)的參數(shù):
/usr/local/spark/bin/spark-submit \
--class cn.spark.sparktest.core.WordCountCluster \
--num-executors 3 \ 配置executor的數(shù)量
--driver-memory 100m \ 配置driver的內(nèi)存(影響不大)
--executor-memory 100m \ 配置每個executor的內(nèi)存大小
--total-executor-cores 3 \ 配置所有executor的cpu core數(shù)量
/usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
2.設(shè)置并行度
spark.default.parallelism
SparkConf conf = new SparkConf()
.set("spark.default.parallelism", "500")
3.使用Kryo序列化
Spark支持使用Kryo序列化機制。這種序列化機制,比默認的Java序列化機制速度要快,序列化后的數(shù)據(jù)更小,大概是Java序列化機制的1/10。所以Kryo序列化優(yōu)化以后,可以讓網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)變少,在集群中耗費的內(nèi)存資源大大減少。
具體生產(chǎn)中使用如下:
第一步,在SparkConf中設(shè)置一個屬性,spark.serializer,org.apache.spark.serializer.KryoSerializer類。
第二步,注冊你使用的需要通過Kryo序列化的一些自定義類,SparkConf.registerKryoClasses()。
項目中的使用:
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(new Class[]{CategorySortKey.class})
(二)Shuffle調(diào)優(yōu)
在工作中使用Spark的時候,大家會經(jīng)常用到算子開發(fā),在算子開發(fā)過程中,部分算子涉及shuffle,主要是以下幾個算子:groupByKey、reduceByKey、countByKey、特殊算子join(先groupByKey后再join是不會發(fā)生shuffle的)等等。那什么是shuffle呢?什么情況下會出現(xiàn)?哪些算子會產(chǎn)生shuffle?哇哦,那我們就一起嘮嘮吧。說一千道一萬,不如實例來相見:
比如groupByKey算子:
要把分布在集群各個節(jié)點上的數(shù)據(jù)中的同一個key,對應(yīng)的values,都要集中到一塊兒,集中到集群中同一個節(jié)點上,更嚴(yán)密一點說,就是集中到一個節(jié)點的一個executor的一個task中。
然后集中一個key對應(yīng)的values之后,才能交給我們來進行處理,<key, Iterable>。
總結(jié):shuffle,一定是分為兩個stage來完成的。因為這其實是個逆向的過程,不是stage決定shuffle,是shuffle決定stage。
既然有shuffle,我們優(yōu)化的思路就可以從shuffle進行入手,那我們就嘮嘮怎么對shuffle進行調(diào)優(yōu)吧
1.合并map端輸出文件
如果不合并map端輸出文件的話,會怎么樣?
舉例實際生產(chǎn)環(huán)境的條件:
環(huán)境:
(1)100個節(jié)點(每個節(jié)點一個executor):100個executor
(2)每個executor:2個cpu core
(3)總共1000個task:每個executor平均10個task
(4)每個節(jié)點,10個task,每個節(jié)點會輸出多少份map端文件?10 * 1000=1萬個文件
寶寶們思考下總共有多少份map端輸出文件?
答案是:100 * 10000 = 100萬。
分析:
(1)第一個stage,每個task,都會給第二個stage的每個task創(chuàng)建一份map端的輸出文件。
(2)第二個stage,每個task,會到各個節(jié)點上面去,拉取第一個stage每個task輸出的,屬于自己的那一份文件。
(3)shuffle中的寫磁盤的操作,基本上就是shuffle中性能消耗最為嚴(yán)重的部分。
通過上面的分析,一個普通的生產(chǎn)環(huán)境的spark job的一個shuffle環(huán)節(jié),會寫入磁盤100萬個文件。
磁盤IO對性能和spark作業(yè)執(zhí)行速度的影響,是極其驚人和嚇人的。
基本上,spark作業(yè)的性能,都消耗在shuffle中了,雖然不只是shuffle的map端輸出文件這一個部分,但是這里也是非常大的一個性能消耗點。
開啟shuffle map端輸出文件合并的機制:
new SparkConf().set("spark.shuffle.consolidateFiles", "true")
默認情況下,是不開啟的,如果不開啟的話會發(fā)生如上所述的大量map端輸出文件的操作,嚴(yán)重影響性能。
實際在生產(chǎn)環(huán)境中,使用了spark.shuffle.consolidateFiles機制以后,實際的性能調(diào)優(yōu)的效果是相當(dāng)?shù)目捎^的。spark作業(yè)可從5個小時 -> 2~3個小時。實際上,在數(shù)據(jù)量比較大,你自己本身做了前面的性能調(diào)優(yōu),executor上去->cpu core上去->并行度(task數(shù)量)上去,shuffle沒調(diào)優(yōu),shuffle就很糟糕了。大量的map端輸出文件的產(chǎn)生,對性能有比較惡劣的影響。這個時候,去開啟這個機制,可以很有效的提升性能。
3. 調(diào)節(jié)map端內(nèi)存緩沖與reduce端內(nèi)存占比:
調(diào)節(jié)map task內(nèi)存緩沖:
spark.shuffle.file.buffer,默認32k(spark 1.3.x不是這個參數(shù),后面還有一個后綴,kb。
spark 1.5.x以后,變了,就是現(xiàn)在這個參數(shù))
調(diào)節(jié)reduce端聚合內(nèi)存占比:spark.shuffle.memoryFraction,0.2
PS:可以根據(jù)實際情況進行調(diào)整哦
那么我們什么時候需要調(diào)整呢?答案只有一個,當(dāng)默認的滿足不了的時候;那怎么算滿足不了呢?調(diào)整完之后有什么好處呢?下面聽三少給到大家一一解惑答疑
(1)首先需要看Spark UI,如果公司使用的是standalone模式,那么so easy,你的spark跑起來,會顯示一個Spark UI的地址,進去觀察每個stage的詳情,有哪些executor,有哪些task,每個task的shuffle write和shuffle read的量,shuffle的磁盤和內(nèi)存讀寫的數(shù)據(jù)量。如果是用的yarn模式來提交,從yarn的界面進去,點擊對應(yīng)的application,進入Spark UI查看詳情。
(2)如果發(fā)現(xiàn)shuffle磁盤的write和read都很大。這個時候,就意味著最好調(diào)節(jié)一些shuffle的參數(shù)。首先當(dāng)然是考慮開啟map端輸出文件合并機制。其次調(diào)節(jié)上面說的那兩個參數(shù)。
SQL調(diào)節(jié)原則如下:
spark.shuffle.file.buffer每次擴大一倍,然后看效果;
spark.shuffle.memoryFraction每次提高0.1,然后看效果。
PS:不能調(diào)節(jié)的太大,因為內(nèi)存資源是有限的,要做到所有資源的均衡分配。
(3)參數(shù)調(diào)整之后,通過將map task內(nèi)存緩沖變大減少spill到磁盤文件的次數(shù)。通過reduce端聚合內(nèi)存變大減少spill到磁盤的次數(shù),從而減少后面聚合讀取磁盤文件的數(shù)量。
(三)數(shù)據(jù)傾斜解決
1. 數(shù)據(jù)傾斜的現(xiàn)象及產(chǎn)生原因:
困擾我們的數(shù)據(jù)傾斜的問題相信大家都備感“幸?!?,那么出現(xiàn)的原因是什么呢?出現(xiàn)時候有什么現(xiàn)象呢?
現(xiàn)象:
絕大多數(shù)task執(zhí)行得都非??欤珎€別task執(zhí)行極慢。比如,總共有1000個task,998,999個task都在1分鐘之內(nèi)執(zhí)行完了,但是剩余一兩個task卻要一兩個小時才能執(zhí)行完,這種現(xiàn)象是很常見的。
原本能夠正常執(zhí)行的Spark作業(yè),某天突然報出OOM(內(nèi)存溢出)異常,觀察異常棧,是我們寫的業(yè)務(wù)代碼造成的。這種情況比較少見。
原因:
(1)在進行shuffle的時候,必須將各個節(jié)點上相同的key拉取到某個節(jié)點上的一個task來進行處理,比如按照key進行聚合或join等操作。此時如果某個key對應(yīng)的數(shù)據(jù)量特別大的話,就會發(fā)生數(shù)據(jù)傾斜。比如大部分key對應(yīng)10條數(shù)據(jù),但是個別key卻對應(yīng)了100萬條數(shù)據(jù),那么大部分task可能就只會分配到10條數(shù)據(jù),然后1秒鐘就運行完了;但是個別task可能分配到了100萬數(shù)據(jù),要運行一兩個小時。因此,整個Spark作業(yè)的運行進度是由運行時間最長的那個task決定的。
(2)出現(xiàn)數(shù)據(jù)傾斜的時候,Spark作業(yè)看起來會運行得非常緩慢,甚至可能因為某個task處理的數(shù)據(jù)量過大導(dǎo)致內(nèi)存溢出
(3)定位問題:
看算子:
你在自己的程序里面找找,哪些地方用了會產(chǎn)生shuffle的算子,groupByKey、countByKey、reduceByKey、join
看log :
log一般會報是在你的哪一行代碼,導(dǎo)致了OOM異常?;蛘呖磍og,看看是執(zhí)行到了第幾個stage。spark代碼,是怎么劃分成一個一個的stage的。哪一個stage生成的task特別慢,就能夠自己用肉眼去對你的spark代碼進行stage的劃分,就能夠通過stage定位到你的代碼,到底哪里發(fā)生了數(shù)據(jù)傾斜。
2. 數(shù)據(jù)傾斜的解決方案
我們既然知道了原因和現(xiàn)象,怎么定位問題,那so good,集美們就可以對癥下藥搞定它了。
(1)聚合源數(shù)據(jù):Spark元數(shù)據(jù)基本是來自hive表,數(shù)據(jù)傾斜,某個key對應(yīng)的80萬數(shù)據(jù),某些key對應(yīng)幾百條,某些key對應(yīng)幾十條。現(xiàn)在咱們直接在生成hive表的hive etl中對數(shù)據(jù)進行聚合。比如按key來分組,將key對應(yīng)的所有的values全部用一種特殊的格式拼接到一個字符串里面去。那么也就意味著,每個key就只對應(yīng)一條數(shù)據(jù)。在spark中,就不需要再去執(zhí)行g(shù)roupByKey+map這種操作了。直接對每個key對應(yīng)的values字符串進行map操作,進行你需要的操作即可。
比如“key=sessionid,
value: action_seq=1|user_id=1|search_keyword=火鍋|category_id=001;
action_seq=2|user_id=1|search_keyword=涮肉|category_id=001”。
對key進行g(shù)roup,在spark中,拿到key=sessionid,values。hive etl中,直接對key進行了聚合。那么也就意味著,每個key就只對應(yīng)一條數(shù)據(jù)。
在spark中,就不需要再去執(zhí)行g(shù)roupByKey+map這種操作了。直接對每個key對應(yīng)的values字符串進行map操作,進行你需要的操作即可。
(2)過濾導(dǎo)致傾斜的key:
如果你能夠接受某些數(shù)據(jù)在spark作業(yè)中直接就摒棄。比如說,總共有100萬個key,只有2個key是數(shù)據(jù)量達到10萬的,其他所有的key,對應(yīng)的數(shù)量都是幾十萬。
這個時候,你自己可以去取舍,如果業(yè)務(wù)和需求可以理解和接受的話,在你從hive表查詢源數(shù)據(jù)的時候,直接在sql中用where條件,過濾掉某幾個key。這樣之前有大量數(shù)據(jù)導(dǎo)致數(shù)據(jù)傾斜的key被過濾掉之后,自然就不會發(fā)生數(shù)據(jù)傾斜了。
(3)提高shuffle操作reduce并行度:
在調(diào)用我們的shuffle算子時候,比如groupByKey、countByKey、reduceByKey,傳入進去一個參數(shù)。這個參數(shù),就代表了shuffle操作的reduce端的并行度。
(4)使用隨機數(shù)以及擴容表進行join:
當(dāng)采用隨機數(shù)和擴容表進行join解決數(shù)據(jù)傾斜的時候,就代表著,你的之前的數(shù)據(jù)傾斜的解決方案,都沒法使用。這個方案是沒辦法徹底解決數(shù)據(jù)傾斜的,更多的,是一種對數(shù)據(jù)傾斜的緩解。具體如下:
選擇一個RDD,要用flatMap,進行擴容,將每條數(shù)據(jù),映射為多條數(shù)據(jù),每個映射出來的數(shù)據(jù),都帶了一個n以內(nèi)的隨機數(shù),通常來說會選擇10。
將另外一個RDD,做普通的map映射操作,每條數(shù)據(jù)都打上一個10以內(nèi)的隨機數(shù)。
最后將兩個處理后的RDD進行join操作。
作者:教你學(xué)懂大數(shù)據(jù)
歡迎關(guān)注微信公眾號 :教你學(xué)懂大數(shù)據(jù)