Spark開發(fā)常用參數(shù)
Driver
spark.driver.cores
driver端分配的核數(shù),默認(rèn)為1,thriftserver是啟動thriftserver服務(wù)的機(jī)器,資源充足的話可以盡量給多。
spark.driver.memory
driver端分配的內(nèi)存數(shù),默認(rèn)為1g,同上。
spark.driver.maxResultSize
driver端接收的最大結(jié)果大小,默認(rèn)1GB,最小1MB,設(shè)置0為無限。
這個(gè)參數(shù)不建議設(shè)置的太大,如果要做數(shù)據(jù)可視化,更應(yīng)該控制在20-30MB以內(nèi)。過大會導(dǎo)致OOM。
spark.extraListeners
默認(rèn)none,隨著SparkContext被創(chuàng)建而創(chuàng)建,用于監(jiān)聽單參數(shù)、無參數(shù)構(gòu)造函數(shù)的創(chuàng)建,并拋出異常。
Executor
spark.executor.memory
每個(gè)executor分配的內(nèi)存數(shù),默認(rèn)1g,會受到y(tǒng)arn CDH的限制,和memoryOverhead相加 不能超過總內(nèi)存限制。
spark.executor.cores
每個(gè)executor的核數(shù),默認(rèn)yarn下1核,standalone下為所有可用的核。
spark.default.parallelism
默認(rèn)RDD的分區(qū)數(shù)、并行數(shù)。
像reduceByKey和join等這種需要分布式shuffle的操作中,最大父RDD的分區(qū)數(shù);像parallelize之類沒有父RDD的操作,則取決于運(yùn)行環(huán)境下得cluster manager:
如果為單機(jī)模式,本機(jī)核數(shù);集群模式為所有executor總核數(shù)與2中最大的一個(gè)。
spark.executor.heartbeatInterval
executor和driver心跳發(fā)送間隔,默認(rèn)10s,必須遠(yuǎn)遠(yuǎn)小于spark.network.timeout
spark.files.fetchTimeout
從driver端執(zhí)行SparkContext.addFile() 抓取添加的文件的超時(shí)時(shí)間,默認(rèn)60s
spark.files.useFetchCache
默認(rèn)true,如果設(shè)為true,拉取文件時(shí)會在同一個(gè)application中本地持久化,被若干個(gè)executors共享。這使得當(dāng)同一個(gè)主機(jī)下有多個(gè)executors時(shí),執(zhí)行任務(wù)效率提高。
spark.broadcast.blockSize
TorrentBroadcastFactory中的每一個(gè)block大小,默認(rèn)4m
過大會減少廣播時(shí)的并行度,過小會導(dǎo)致BlockManager 產(chǎn)生 performance hit.
spark.files.overwrite
默認(rèn)false,是否在執(zhí)行SparkContext.addFile() 添加文件時(shí),覆蓋已有的內(nèi)容有差異的文件。
spark.files.maxPartitionBytes
單partition中最多能容納的文件大小,單位Bytes 默認(rèn)134217728 (128 MB)
spark.files.openCostInBytes
小文件合并閾值,小于該參數(shù)就會被合并到一個(gè)partition內(nèi)。
默認(rèn)4194304 (4 MB) 。這個(gè)參數(shù)在將多個(gè)文件放入一個(gè)partition時(shí)被用到,寧可設(shè)置的小一些,因?yàn)樵趐artition操作中,小文件肯定會比大文件快。
spark.storage.memoryMapThreshold
從磁盤上讀文件時(shí),最小單位不能少于該設(shè)定值,默認(rèn)2m,小于或者接近操作系統(tǒng)的每個(gè)page的大小。
Shuffle
spark.reducer.maxSizeInFlight
默認(rèn)48m。從每個(gè)reduce任務(wù)同時(shí)拉取的最大map數(shù),每個(gè)reduce都會在完成任務(wù)后,需要一個(gè)堆外內(nèi)存的緩沖區(qū)來存放結(jié)果,如果沒有充裕的內(nèi)存就盡可能把這個(gè)調(diào)小一點(diǎn)。。相反,堆外內(nèi)存充裕,調(diào)大些就能節(jié)省gc時(shí)間。
spark.reducer.maxBlocksInFlightPerAddress
限制了每個(gè)主機(jī)每次reduce可以被多少臺遠(yuǎn)程主機(jī)拉取文件塊,調(diào)低這個(gè)參數(shù)可以有效減輕node manager的負(fù)載。(默認(rèn)值Int.MaxValue)
spark.reducer.maxReqsInFlight
限制遠(yuǎn)程機(jī)器拉取本機(jī)器文件塊的請求數(shù),隨著集群增大,需要對此做出限制。否則可能會使本機(jī)負(fù)載過大而掛掉。。(默認(rèn)值為Int.MaxValue)
spark.reducer.maxReqSizeShuffleToMem
shuffle請求的文件塊大小 超過這個(gè)參數(shù)值,就會被強(qiáng)行落盤,防止一大堆并發(fā)請求把內(nèi)存占滿。(默認(rèn)Long.MaxValue)
spark.shuffle.compress
是否壓縮map輸出文件,默認(rèn)壓縮 true
spark.shuffle.spill.compress
shuffle過程中溢出的文件是否壓縮,默認(rèn)true,使用spark.io.compression.codec壓縮。
spark.shuffle.file.buffer
在內(nèi)存輸出流中 每個(gè)shuffle文件占用內(nèi)存大小,適當(dāng)提高 可以減少磁盤讀寫 io次數(shù),初始值為32k
spark.shuffle.memoryFraction
該參數(shù)代表了Executor內(nèi)存中,分配給shuffle read task進(jìn)行聚合操作的內(nèi)存比例,默認(rèn)是20%。
cache少且內(nèi)存充足時(shí),可以調(diào)大該參數(shù),給shuffle read的聚合操作更多內(nèi)存,以避免由于內(nèi)存不足導(dǎo)致聚合過程中頻繁讀寫磁盤。
spark.shuffle.manager
當(dāng)ShuffleManager為SortShuffleManager時(shí),如果shuffle read
task的數(shù)量小于這個(gè)閾值(默認(rèn)是200),則shuffle
write過程中不會進(jìn)行排序操作,而是直接按照未經(jīng)優(yōu)化的HashShuffleManager的方式去寫數(shù)據(jù),但是最后會將每個(gè)task產(chǎn)生的所有臨時(shí)磁盤文件都合并成一個(gè)文件,并會創(chuàng)建單獨(dú)的索引文件。
當(dāng)使用SortShuffleManager時(shí),如果的確不需要排序操作,那么建議將這個(gè)參數(shù)調(diào)大一些,大于shuffle read
task的數(shù)量。那么此時(shí)就會自動啟用bypass機(jī)制,map-side就不會進(jìn)行排序了,減少了排序的性能開銷。但是這種方式下,依然會產(chǎn)生大量的磁盤文件,因此shuffle
write性能有待提高。
spark.shuffle.consolidateFiles
如果使用HashShuffleManager,該參數(shù)有效。如果設(shè)置為true,那么就會開啟consolidate機(jī)制,會大幅度合并shuffle
write的輸出文件,對于shuffle read task數(shù)量特別多的情況下,這種方法可以極大地減少磁盤IO開銷,提升性能。
如果的確不需要SortShuffleManager的排序機(jī)制,那么除了使用bypass機(jī)制,還可以嘗試將spark.shuffle.manager參數(shù)手動指定為hash,使用HashShuffleManager,同時(shí)開啟consolidate機(jī)制。
spark.shuffle.io.maxRetries
shuffle read task從shuffle write task所在節(jié)點(diǎn)拉取屬于自己的數(shù)據(jù)時(shí),如果因?yàn)榫W(wǎng)絡(luò)異常導(dǎo)致拉取失敗,是會自動進(jìn)行重試的。該參數(shù)就代表了可以重試的最大次數(shù)。如果在指定次數(shù)之內(nèi)拉取還是沒有成功,就可能會導(dǎo)致作業(yè)執(zhí)行失敗。
對于那些包含了特別耗時(shí)的shuffle操作的作業(yè),建議增加重試最大次數(shù)(比如60次),以避免由于JVM的full
gc或者網(wǎng)絡(luò)不穩(wěn)定等因素導(dǎo)致的數(shù)據(jù)拉取失敗。在實(shí)踐中發(fā)現(xiàn),對于針對超大數(shù)據(jù)量(數(shù)十億~上百億)的shuffle過程,調(diào)節(jié)該參數(shù)可以大幅度提升穩(wěn)定性。
spark.shuffle.io.retryWait
同上,默認(rèn)5s,建議加大間隔時(shí)長(比如60s),以增加shuffle操作的穩(wěn)定性
Compression and Serialization
spark.broadcast.compress
廣播變量前是否會先進(jìn)行壓縮。默認(rèn)true (spark.io.compression.codec)
spark.io.compression.codec
壓縮RDD數(shù)據(jù)、日志、shuffle輸出等的壓縮格式 默認(rèn)lz4
spark.io.compression.lz4.blockSize
使用lz4壓縮時(shí),每個(gè)數(shù)據(jù)塊大小 默認(rèn)32k
spark.rdd.compress
rdd是否壓縮 默認(rèn)false,節(jié)省memory_cache大量內(nèi)存 消耗更多的cpu資源(時(shí)間)。
spark.serializer.objectStreamReset
當(dāng)使用JavaSerializer序列化時(shí),會緩存對象防止寫多余的數(shù)據(jù),但這些對象就不會被gc,可以輸入reset 清空緩存。默認(rèn)緩存100個(gè)對象,修改成-1則不緩存任何對象。
Memory Management
spark.memory.fraction
執(zhí)行內(nèi)存和緩存內(nèi)存(堆)占jvm總內(nèi)存的比例,剩余的部分是spark留給用戶存儲內(nèi)部源數(shù)據(jù)、數(shù)據(jù)結(jié)構(gòu)、異常大的結(jié)果數(shù)據(jù)。
默認(rèn)值0.6,調(diào)小會導(dǎo)致頻繁gc,調(diào)大容易造成oom。
spark.memory.storageFraction
用于存儲的內(nèi)存在堆中的占比,默認(rèn)0.5。調(diào)大會導(dǎo)致執(zhí)行內(nèi)存過小,執(zhí)行數(shù)據(jù)落盤,影響效率;調(diào)小會導(dǎo)致緩存內(nèi)存不夠,緩存到磁盤上去,影響效率。
值得一提的是在spark中,執(zhí)行內(nèi)存和緩存內(nèi)存公用java堆,當(dāng)執(zhí)行內(nèi)存沒有使用時(shí),會動態(tài)分配給緩存內(nèi)存使用,反之也是這樣。如果執(zhí)行內(nèi)存不夠用,可以將存儲內(nèi)存釋放移動到磁盤上(最多釋放不能超過本參數(shù)劃分的比例),但存儲內(nèi)存不能把執(zhí)行內(nèi)存搶走。
spark.memory.offHeap.enabled
是否允許使用堆外內(nèi)存來進(jìn)行某些操作。默認(rèn)false
spark.memory.offHeap.size
允許使用進(jìn)行操作的堆外內(nèi)存的大小,單位bytes 默認(rèn)0
spark.cleaner.periodicGC.interval
控制觸發(fā)gc的頻率,默認(rèn)30min
spark.cleaner.referenceTracking
是否進(jìn)行context cleaning,默認(rèn)true
spark.cleaner.referenceTracking.blocking
清理線程是否應(yīng)該阻止清理任務(wù),默認(rèn)true
spark.cleaner.referenceTracking.blocking.shuffle
清理線程是否應(yīng)該阻止shuffle的清理任務(wù),默認(rèn)false
spark.cleaner.referenceTracking.cleanCheckpoints
清理線程是否應(yīng)該清理依賴超出范圍的檢查點(diǎn)文件(checkpoint files不知道怎么翻譯。。)默認(rèn)false
Networking
spark.rpc.message.maxSize
executors和driver間消息傳輸、map輸出的大小,默認(rèn)128M。map多可以考慮增加。
spark.driver.blockManager.port和spark.driver.bindAddress
driver端綁定監(jiān)聽block manager的地址與端口。
spark.driver.host和spark.driver.port
driver端的ip和端口。
spark.network.timeout
網(wǎng)絡(luò)交互超時(shí)時(shí)間,默認(rèn)120s。如果
spark.core.connection.ack.wait.timeout
spark.storage.blockManagerSlaveTimeoutMs
spark.shuffle.io.connectionTimeout
spark.rpc.askTimeout orspark.rpc.lookupTimeout
沒有設(shè)置,那么就以此參數(shù)為準(zhǔn)。
spark.port.maxRetries
設(shè)定了一個(gè)端口后,在放棄之前的最大重試次數(shù),默認(rèn)16。 會有一個(gè)預(yù)重試機(jī)制,每次會嘗試前一次嘗試的端口號+1的端口。如 設(shè)定了端口為8000,則最終會嘗試8000~(8000+16)范圍的端口。
spark.rpc.numRetries
rpc任務(wù)在放棄之前的重試次數(shù),默認(rèn)3,即rpc task最多會執(zhí)行3次。
spark.rpc.retry.wait
重試間隔,默認(rèn)3s
spark.rpc.askTimeout
rpc任務(wù)超時(shí)時(shí)間,默認(rèn)spark.network.timeout
spark.rpc.lookupTimeout
rpc任務(wù)查找時(shí)長
Scheduling
spark.scheduler.maxRegisteredResourcesWaitingTime
在執(zhí)行前最大等待申請資源的時(shí)間,默認(rèn)30s。
spark.scheduler.minRegisteredResourcesRatio
實(shí)際注冊的資源數(shù)占預(yù)期需要的資源數(shù)的比例,默認(rèn)0.8
spark.scheduler.mode
調(diào)度模式,默認(rèn)FIFO 先進(jìn)隊(duì)列先調(diào)度,可以選擇FAIR。
spark.scheduler.revive.interval
work回復(fù)重啟的時(shí)間間隔,默認(rèn)1s
spark.scheduler.listenerbus.eventqueue.capacity
spark事件監(jiān)聽隊(duì)列容量,默認(rèn)10000,必須為正值,增加可能會消耗更多內(nèi)存
spark.blacklist.enabled
是否列入黑名單,默認(rèn)false。如果設(shè)成true,當(dāng)一個(gè)executor失敗好幾次時(shí),會被列入黑名單,防止后續(xù)task派發(fā)到這個(gè)executor??梢赃M(jìn)一步調(diào)節(jié)spark.blacklist以下相關(guān)的參數(shù):
(均為測試參數(shù) Experimental)
spark.blacklist.timeout
spark.blacklist.task.maxTaskAttemptsPerExecutor
spark.blacklist.task.maxTaskAttemptsPerNode
spark.blacklist.stage.maxFailedTasksPerExecutor
spark.blacklist.application.maxFailedExecutorsPerNode
spark.blacklist.killBlacklistedExecutors
spark.blacklist.application.fetchFailure.enabled
spark.speculation
推測,如果有task執(zhí)行的慢了,就會重新執(zhí)行它。默認(rèn)false,
詳細(xì)相關(guān)配置如下:
spark.speculation.interval
檢查task快慢的頻率,推測間隔,默認(rèn)100ms。
spark.speculation.multiplier
推測比均值慢幾次算是task執(zhí)行過慢,默認(rèn)1.5
spark.speculation.quantile
在某個(gè)stage,完成度必須達(dá)到該參數(shù)的比例,才能被推測,默認(rèn)0.75
spark.task.cpus
每個(gè)task分配的cpu數(shù),默認(rèn)1
spark.task.maxFailures
在放棄這個(gè)job前允許的最大失敗次數(shù),重試次數(shù)為該參數(shù)-1,默認(rèn)4
spark.task.reaper.enabled
賦予spark監(jiān)控有權(quán)限去kill那些失效的task,默認(rèn)false
(原先有 job失敗了但一直顯示有task在running,總算找到這個(gè)參數(shù)了)
其他進(jìn)階的配置如下:
spark.task.reaper.pollingInterval
輪詢被kill掉的task的時(shí)間間隔,如果還在running,就會打warn日志,默認(rèn)10s。
spark.task.reaper.threadDump
線程回收是是否產(chǎn)生日志,默認(rèn)true。
spark.task.reaper.killTimeout
當(dāng)一個(gè)被kill的task過了多久還在running,就會把那個(gè)executor給kill掉,默認(rèn)-1。
spark.stage.maxConsecutiveAttempts
在終止前,一個(gè)stage連續(xù)嘗試次數(shù),默認(rèn)4。
Dynamic Allocation
spark.dynamicAllocation.enabled
是否開啟動態(tài)資源配置,根據(jù)工作負(fù)載來衡量是否應(yīng)該增加或減少executor,默認(rèn)false
以下相關(guān)參數(shù):
spark.dynamicAllocation.minExecutors
動態(tài)分配最小executor個(gè)數(shù),在啟動時(shí)就申請好的,默認(rèn)0
spark.dynamicAllocation.maxExecutors
動態(tài)分配最大executor個(gè)數(shù),默認(rèn)infinity
spark.dynamicAllocation.initialExecutors
動態(tài)分配初始executor個(gè)數(shù)默認(rèn)值=spark.dynamicAllocation.minExecutors
spark.dynamicAllocation.executorIdleTimeout
當(dāng)某個(gè)executor空閑超過這個(gè)設(shè)定值,就會被kill,默認(rèn)60s
spark.dynamicAllocation.cachedExecutorIdleTimeout
當(dāng)某個(gè)緩存數(shù)據(jù)的executor空閑時(shí)間超過這個(gè)設(shè)定值,就會被kill,默認(rèn)infinity
spark.dynamicAllocation.schedulerBacklogTimeout
任務(wù)隊(duì)列非空,資源不夠,申請executor的時(shí)間間隔,默認(rèn)1s
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout
同schedulerBacklogTimeout,是申請了新executor之后繼續(xù)申請的間隔,默認(rèn)=schedulerBacklogTimeout
Spark Streaming
spark.streaming.stopGracefullyOnShutdown (true / false)默認(rèn)fasle
確保在kill任務(wù)時(shí),能夠處理完最后一批數(shù)據(jù),再關(guān)閉程序,不會發(fā)生強(qiáng)制kill導(dǎo)致數(shù)據(jù)處理中斷,沒處理完的數(shù)據(jù)丟失
spark.streaming.backpressure.enabled (true / false) 默認(rèn)false
開啟后spark自動根據(jù)系統(tǒng)負(fù)載選擇最優(yōu)消費(fèi)速率
spark.streaming.backpressure.initialRate (整數(shù)) 默認(rèn)直接讀取所有
在開啟反壓的情況下,限制第一次批處理應(yīng)該消費(fèi)的數(shù)據(jù),因?yàn)槌绦蚶鋯雨?duì)列里面有大量積壓,防止第一次全部讀取,造成系統(tǒng)阻塞
spark.streaming.kafka.maxRatePerPartition (整數(shù)) 默認(rèn)直接讀取所有
限制每秒每個(gè)消費(fèi)線程讀取每個(gè)kafka分區(qū)最大的數(shù)據(jù)量
spark.streaming.unpersist
自動將spark streaming產(chǎn)生的、持久化的數(shù)據(jù)給清理掉,默認(rèn)true,自動清理內(nèi)存垃圾。
spark.streaming.ui.retainedBatches
spark streaming 日志接口在gc時(shí)保留的batch個(gè)數(shù),默認(rèn)1000
作者:柯廣的網(wǎng)絡(luò)日志
微信公眾號:Java大數(shù)據(jù)與數(shù)據(jù)倉庫