「Hive進(jìn)階篇」萬字長文超詳述Hive企業(yè)級優(yōu)化
大家好,我是Akin,肝了幾個晚上,梳理總結(jié)了一份萬字長文超詳述hive企業(yè)級優(yōu)化文章,也整理了一份hive優(yōu)化總結(jié)思維導(dǎo)圖和hive優(yōu)化詳細(xì)PDF文檔,有需要可點贊+在看關(guān)注公眾號《大數(shù)據(jù)階梯之路》找小編獲取文檔保存本地吧,學(xué)習(xí)和復(fù)習(xí)都是絕佳,公眾號不斷分享技術(shù)相關(guān)文章。話不多說,下面就直接開講吧!
一、問題背景
hive離線數(shù)倉開發(fā),一個良好的數(shù)據(jù)任務(wù),它的運行時長一般是在合理范圍內(nèi)的,當(dāng)發(fā)現(xiàn)報表應(yīng)用層的指標(biāo)數(shù)據(jù)總是產(chǎn)出延遲,排查定位發(fā)現(xiàn)是有些任務(wù)執(zhí)行了超10小時這樣肯定是不合理的,此時就該想想如何優(yōu)化數(shù)據(jù)任務(wù)鏈路,主要從以下幾個角度來考慮問題解決:
從數(shù)據(jù)任務(wù)本身hive邏輯代碼出發(fā),即hive邏輯優(yōu)化,偏理解業(yè)務(wù)角度
從集群的資源設(shè)置出發(fā),即hive參數(shù)調(diào)優(yōu),偏理解技術(shù)角度
從全局?jǐn)?shù)據(jù)鏈路的任務(wù)設(shè)置出發(fā),觀測是否任務(wù)執(zhí)行調(diào)度設(shè)置不合理
從數(shù)倉的數(shù)據(jù)易用性和模型復(fù)用性的角度出發(fā),針對某些中間邏輯過程可以復(fù)用的就落地中間模型表
附上一份個人梳理總結(jié)的思維導(dǎo)圖部分截圖
下面就先分享下常見的hive優(yōu)化策略吧~ 會附帶案例實踐幫助理解
hive優(yōu)化文章大綱
列裁剪和分區(qū)裁剪
提前數(shù)據(jù)收斂
謂詞下推(PPD)
多路輸出,減少表讀取次數(shù)寫多個結(jié)果表
合理選擇排序
join優(yōu)化
合理選擇文件存儲格式和壓縮方式
解決小文件過多問題
distinct 和 group by
參數(shù)調(diào)優(yōu)
解決數(shù)據(jù)傾斜問題
二、hive優(yōu)化
1. 列裁剪和分區(qū)裁剪
裁剪 顧名思義就是不需要的數(shù)據(jù)不要多查。
列裁剪,盡量減少直接select * from table這種操作,首先可讀性不好,根本不知道具體用到哪幾個列,其次列選擇多了也會增大IO傳輸;
分區(qū)裁剪就是針對分區(qū)表切記要加上分區(qū)過濾條件,比如表以時間作為分區(qū)字段,要加上分區(qū)篩選。
2. 提前數(shù)據(jù)收斂
在子查詢中,有些條件能先過濾的盡量放在子查詢里先過濾,減少子查詢輸出的數(shù)據(jù)量。
-- 原腳本
select
a.字段a,a.字段b,b.字段a,b.字段b
from
(
select 字段a,字段b
from table_a
where dt = date_sub(current_date,1)
) a
left join
(
select 字段a,字段b
from table_b
where dt = date_sub(current_date,1)
) b
on a.字段a = b.字段a
where a.字段b <> ''
and b.字段b <> 'xxx'
;
-- 優(yōu)化腳本 (數(shù)據(jù)收斂)
select
a.字段a,a.字段b,b.字段a,b.字段b
from
(
select 字段a,字段b
from table_a
where dt = date_sub(current_date,1)
and 字段b <> ''
) a
left join
(
select 字段a,字段b
from table_b
where dt = date_sub(current_date,1)
and 字段b <> 'xxx'
) b
on a.字段a = b.字段a
;
3. 謂詞下推(Predicate Pushdown)
謂詞下推Predicate Pushdown是什么?簡稱PPD,指的是在不影響數(shù)據(jù)結(jié)果的情況下,將過濾表達(dá)式盡可能移動至靠近數(shù)據(jù)源的位置,以使真正執(zhí)行時能直接跳過無關(guān)的數(shù)據(jù),這樣在map執(zhí)行過濾條件,可以減少map端數(shù)據(jù)輸出,起到了數(shù)據(jù)收斂的作用,降低了數(shù)據(jù)在集群上傳輸?shù)牧?,?jié)約了集群的資源,也提升了任務(wù)的性能。
hive默認(rèn)是開啟謂詞下推該參數(shù)設(shè)置的,hive.optimize.ppd=true
所謂下推,即謂詞過濾在map端執(zhí)行;所謂不下推,即謂詞過濾在reduce端執(zhí)行。
關(guān)于謂詞下推的規(guī)則,主要分為join的on條件過濾下推和where條件過濾下推,我整理了一張圖方便理解。
核心判斷邏輯:join的on條件過濾不能下推到保留行表中;where條件過濾不能下推到null補(bǔ)充表中。
-- 舉例說明:以下腳本 on后面的a表條件過濾沒有下推至map端運行而是在reduce端運行,where后面的b表條件過濾則有下推至map端運行
select
a.字段a,a.字段b,b.字段a,b.字段b
from table_a a
left join table_b b
on a.字段a <> '' -- a表條件過濾
where a.字段b <> 'xxx' -- a表條件過濾
;
謂詞下推注意事項:
如果在表達(dá)式中含有不確定函數(shù),整個表達(dá)式的謂詞將不會被下推。例如下面腳本,則整個條件過濾都是在reduce端執(zhí)行:
select a.*
from a join b
on a.id = b.id
where a.ds = '2019-10-09'
and a.create_time = unix_timestamp()
;
因為上面unix_timestamp()是不確定函數(shù),在編譯的時候無法得知,所以,整個表達(dá)式不會被下推,即ds='2022-07-04'也不會被提前過濾。類似的不確定函數(shù)還有rand()函數(shù)等。
附上2篇關(guān)于謂詞下推的詳細(xì)案例分析講解
!上鏈接,自行復(fù)制去訪問哈:
① https://cloud.tencent.com/developer/article/1616687
② https://cloud.tencent.com/developer/article/1616689
4. 多路輸出
當(dāng)我們有使用一次查詢,多次插入的場景時,則可以采用多路輸出的寫法,減少表的讀取次數(shù),起到性能優(yōu)化的作用。
-- 讀取一次源表,同時寫入多張目標(biāo)表
from table_source
insert overwrite table table_a
select *
where dt = date_sub(current_date,1)
and event_name = '事件A'
insert overwrite table table_b
select *
where dt = date_sub(current_date,1)
and event_name = '事件B'
insert oveewrite table table_c
select *
where dt = date_sub(current_date,1)
and event_name = '事件C'
;
多路輸出注意事項:
一般情況下,一個sql里面最多支持128路輸出,超過了則會報錯
在多插往同一張分區(qū)表的不同分區(qū)時,不允許在一個sql里面多路輸出時既包含insert overwrite和insert into,要統(tǒng)一操作
5. 合理選擇排序
order by
全局排序,只走一個reducer,當(dāng)表數(shù)據(jù)量較大時容易計算不出來,性能不佳慎用,在嚴(yán)格模式下需要加limit
sort by
局部排序,即保證單個reduce內(nèi)結(jié)果有序,但沒有全局排序的能力。
distribute by
按照指定的字段把數(shù)據(jù)劃分輸出到不同的reducer中,是控制數(shù)據(jù)如何從map端輸出到reduce端,hive會根據(jù)distribute by后面的字段和對應(yīng)reducer的個數(shù)進(jìn)行hash分發(fā)
cluster by
擁有distrubute by的能力,同時也擁有sort by的能力,所以可以理解cluster by是 distrubute by+sort by
以下舉個排序方式優(yōu)化案例,取用戶信息表(10億數(shù)據(jù)量)中年齡排前100的用戶信息:以下案例實現(xiàn)也體現(xiàn)了一個大數(shù)據(jù)思想,分而治之,大job拆分小job。
-- 原腳本
select *
from tmp.user_info_table
where dt = '2022-07-04'
order by age -- 全局排序,只走一個reduce
limit 100
;
-- 優(yōu)化腳本
set mapred.reduce.tasks=50; -- 設(shè)置reduce個數(shù)為50
select *
from tmp.user_info_table
where dt = '2022-07-04'
distribute by (case when age<20 then 0
when age >=20 and age <= 40 then 1
else 2
end
) -- distribute by主要是為了控制map端輸出的數(shù)據(jù)在reduce端中是如何劃分的,防止map端數(shù)據(jù)隨機(jī)分配到reduce。這里字段做case when判斷是因為用戶年齡的零散值會導(dǎo)致分布不均勻,起太多reduce本身也耗時浪費資源
sort by age -- 起多個reduce排序,保證單個reduce結(jié)果有序
limit 100 -- 取前100,因為是按照年齡局部排序過,所以前100個也一定是年齡最小的
;
排序選擇的小結(jié):
order by全局排序,但只有一個reducer執(zhí)行,數(shù)據(jù)量大的話容易計算不過來,慎用
sort by局部排序,單個reducer內(nèi)有序,把map端隨機(jī)分發(fā)給reduce端執(zhí)行,如果是要實現(xiàn)全局排序且走多個reducer的優(yōu)化需求時,可以在外層嵌套一層,例如:select * from (select * from 表名 sort by 字段名 limit N) order by 字段名 limit N,這樣就有2個Job,一個是內(nèi)層的局部排序,一個是外層的歸并全局排序
distribute by可以按照指定字段將數(shù)據(jù)進(jìn)行hash分發(fā)到對應(yīng)的reducer去執(zhí)行
當(dāng)分區(qū)字段和排序字段相同時可以使用cluster by來簡化distribute by+sort by的寫法,但是cluster by排序只能是升序排序,不能指定排序規(guī)則是ASC或者DESC
6. join優(yōu)化
hive在redurce階段完成的join就是common join,在map階段完成的join就是map join。
提前收斂數(shù)據(jù)量,保證在join關(guān)聯(lián)前無用數(shù)據(jù)不參與關(guān)聯(lián)
這塊可以跟前面的數(shù)據(jù)收斂模塊&謂詞下推模塊 搭配起來看,主要就是提前收斂數(shù)據(jù)量,不止在join場景,在其他復(fù)雜計算前同樣適用。
left semi join左半關(guān)聯(lián)
left semi join一開始出現(xiàn)的使用場景其實是解決hive不支持in/exists子查詢的高效實現(xiàn),雖然left semi join含有l(wèi)eft,但其實不是保留左表全部數(shù)據(jù),效果類似于join吧,只是最終結(jié)果只取左表中的列,還有最終結(jié)果某些場景下會跟join結(jié)果不同。
select a.*
from
(
select 1 as id,'a' as name
union all
select 2 as id,'b' as name
) a
left semi join
(
select 1 as id,'b' as name
union all
select 1 as id,'c' as name
) b
on a.id = b.id
-- 你猜left semi join結(jié)果是?
id name
1 a
-- 而如果上面的腳本是join呢,結(jié)果?
id name
1 a
1 a
left semi join注意事項:
右表的條件過濾只能寫在on后面,不能寫在where后面
最終結(jié)果只能展示左表的列,右表的列不能展示
left semi join與join的差異:主要在于右表有重復(fù)數(shù)據(jù)時,left semi join是遍歷到右表一條數(shù)據(jù)后就跳過,只取一條,而join是一直遍歷至右表最后一條數(shù)據(jù),這也就是要注意實際數(shù)據(jù)場景是否有重復(fù)和是否要保留
大表join小表場景
大表join小表的話,要把小表放在左邊,大表放在右邊,這是因為join操作發(fā)生在reduce階段,在hive2.x版本以前,位于左邊的表會被加載進(jìn)內(nèi)存中,所以如果是大表放左邊被加載進(jìn)內(nèi)存的話就會有內(nèi)存溢出的風(fēng)險,不過在hive2.x版本后就已經(jīng)優(yōu)化好這塊了,無需關(guān)注,底層幫我們優(yōu)化好這個問題了。
啟用mapjoin
mapjoin就是把join的表直接分發(fā)到map端的內(nèi)存中,即在map端來執(zhí)行join操作,就不用在reduce階段進(jìn)行join了,提高了執(zhí)行效率。如果表比較小的話最好是啟用mapjoin,hive默認(rèn)是開啟自動mapjoin的。
set hive.auto.convert.join = true;
-- 大表小表的閾值設(shè)置(默認(rèn)25M一下認(rèn)為是小表)
set hive.mapjoin.smalltable.filesize=26214400;
大表join大表場景
舉例,假設(shè)a表是包括許多空值的數(shù)據(jù),b表是不包含空值的數(shù)據(jù)
-- 不做優(yōu)化時的原始hql
select a.id
from a left join b
on a.id = b.id
1、空key過濾,過濾空key的數(shù)據(jù)
關(guān)聯(lián)的過程是相同key對應(yīng)的數(shù)據(jù)都會發(fā)送到相同的reducer上,如果某些空key過多是會導(dǎo)致內(nèi)存不夠的,從而引發(fā)join超時,所以如果不需要這類空key數(shù)據(jù)的時候,可以先過濾掉這些異常數(shù)據(jù)。
-- 做空key過濾優(yōu)化時的hql,利用子查詢先處理掉后再關(guān)聯(lián)
select a.id
from (select * from a where id is not null) a
join b
on a.id = b.id
2、空key轉(zhuǎn)換,轉(zhuǎn)換key的數(shù)據(jù)進(jìn)行關(guān)聯(lián)時打散key
當(dāng)然,有時候空值的數(shù)據(jù)又不一定是異常數(shù)據(jù),還是需要保留的,但是空key過多都分配到一個reducer去了,這樣執(zhí)行起來就算不內(nèi)存溢出也會發(fā)生數(shù)據(jù)傾斜情況,數(shù)據(jù)傾斜的話對集群資源的利用率來看的話是極其不利的,我們可以通過把空key虛擬成隨機(jī)數(shù),但要保證不是同一個空key,從而降低數(shù)據(jù)傾斜概率,雖然這樣在對關(guān)聯(lián)鍵做處理反而會總體增長執(zhí)行時間,但卻減輕了reducer負(fù)擔(dān)。
-- 做空key轉(zhuǎn)換優(yōu)化時的hql,利用case when判斷加隨機(jī)數(shù)
select a.id
from a.left join b
on case when a.id is null then concat('hive'+rand()) else a.id end = b.id
避免笛卡爾積
盡量避免笛卡爾積,即避免join的時候不加on條件,或者無效的on條件,因為Hive只能使用1個reducer來完成笛卡爾積,不過這點hive會通過嚴(yán)格模式下來提醒,在嚴(yán)格模式下出現(xiàn)笛卡爾積時報錯。
7. 合理選擇文件存儲格式和壓縮方式
關(guān)于這點,我專門寫過一篇文章介紹hive常見的幾種存儲格式和壓縮方式,具體可以去上次我寫過的這篇文章看看
!上鏈接:https://mp.weixin.qq.com/s/RndQKF5y9Mto7QfgiiAOvQ
8. 解決小文件過多問題
先來說一說什么是小文件,怎么發(fā)生的
顧名思義,小文件就是文件很小的文件,小文件的產(chǎn)生一定是發(fā)生在向hive表導(dǎo)入數(shù)據(jù)的時候,比如:
-- 第①種導(dǎo)入數(shù)據(jù)方式
insert into table A values(); -- 每執(zhí)行一條語句hive表就產(chǎn)生一個文件,但這種導(dǎo)入數(shù)據(jù)方式生產(chǎn)環(huán)境少見;
-- 第②種導(dǎo)入數(shù)據(jù)方式
load data local path '本地文件/本地文件夾 路徑' overwrite into table A; -- 導(dǎo)入文件/文件夾`,即有多少個文件hive表就會產(chǎn)生多少個文件
-- 第③種導(dǎo)入數(shù)據(jù)方式
insert overwrite table A select * from B; -- 通過查詢的方式導(dǎo)入數(shù)據(jù)是生產(chǎn)環(huán)境最常見的
MR中 reduce 有多少個就輸出多少個文件,文件數(shù)量 = reduce數(shù)量 * 分區(qū)數(shù),如果說某些簡單job沒有reduce階段只有map階段,那文件數(shù)量 = map數(shù)量 * 分區(qū)數(shù)。從公式上看,reduce的個數(shù)和分區(qū)數(shù)最終決定了輸出的文件的個數(shù),所以可以調(diào)整reduce的個數(shù)以及分區(qū) 達(dá)到控制hive表的文件數(shù)量。
小文件過多有什么影響
首先第一點從HDFS底層來看,小文件過多會給集群namenode帶來負(fù)擔(dān),即namenode元數(shù)據(jù)大占用內(nèi)存,影響HDFS的性能
第二點從hive來看,在進(jìn)行查詢時,每個小文件都會當(dāng)成一個塊,啟動一個Map任務(wù)來完成,而一個Map任務(wù)啟動和初始化的時間遠(yuǎn)遠(yuǎn)大于邏輯處理的時間,就會造成很大的資源浪費
如何解決小文件過多問題
1、使用hive自帶的 concatenate 命令,來合并小文件
不過要注意的是concatenate命令只支持hive表存儲格式是orcfile或者rcfile,還有該方式不支持指定合并后的文件數(shù)量
-- 對于非分區(qū)表
alter table test_table concatenate;
-- 對于分區(qū)表
alter table test_table partition(dt = '2022-07-16') concatenate;
2、調(diào)整參數(shù)減少Map數(shù)
設(shè)置map輸入合并小文件
-- 102400000B=102400KB=100M
-- 每個Map最大輸入大小(這個值決定了合并后文件的數(shù)量)
set mapred.max.split.size=102400000;
-- 一個節(jié)點上split的至少的大小(這個值決定了多個DataNode上的文件是否需要合并)
set mapred.min.split.size.per.node=102400000;
-- 一個交換機(jī)下split的至少的大小(這個值決定了多個交換機(jī)上的文件是否需要合并)
set mapred.min.split.size.per.rack=102400000;
-- 前3行設(shè)置是確定合并文件塊的大小,>128M的文件按128M切塊,>100M和<128M的文件按100M切塊,剩下的<100M的小文件直接合并
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; -- map執(zhí)行前合并小文件
設(shè)置map輸出和reduce輸出合并小文件
-- 設(shè)置map端輸出進(jìn)行合并,默認(rèn)為true
set hive.merge.mapfiles = true;
-- 設(shè)置reduce端輸出進(jìn)行合并,默認(rèn)為false
set hive.merge.mapredfiles = true;
-- 設(shè)置合并文件的大小
set hive.merge.size.per.task = 256*1000*1000;
-- 當(dāng)輸出文件的平均大小小于該值時,啟動一個獨立的MapReduce任務(wù)進(jìn)行文件merge
set hive.merge.smallfiles.avgsize=16000000;
3、調(diào)整參數(shù)減少Reduce數(shù)
-- hive中的分區(qū)函數(shù) distribute by 正好是控制MR中partition分區(qū)的,然后通過設(shè)置reduce的數(shù)量,結(jié)合分區(qū)函數(shù)讓數(shù)據(jù)均衡的進(jìn)入每個reduce即可。
-- 直接設(shè)置reduce個數(shù)
set mapreduce.job.reduces=10;
-- 執(zhí)行以下語句,將數(shù)據(jù)均衡的分配到reduce中
set mapreduce.job.reduces=10;
insert overwrite table A partition(dt)
select * from B
distribute by rand();
解釋:如設(shè)置reduce數(shù)量為10,則使用 rand(), 隨機(jī)生成一個數(shù)x % 10,這樣數(shù)據(jù)就會隨機(jī)進(jìn)入 reduce 中,防止出現(xiàn)有的文件過大或過小
9. count(distinct ) 和 group by
在計算去重指標(biāo)的時候,比如不同年齡段的用戶數(shù)這個指標(biāo),一般都是采用count(distinct user_id)直接計算,當(dāng)表數(shù)據(jù)量不大的話影響不大,但如果數(shù)據(jù)量大count distinct就很耗性能了,因為其只會用一個reduce task來執(zhí)行,容易reduce端數(shù)據(jù)傾斜,通常優(yōu)化就使用里層group by age然后再外層count(user_id)來替代。
注意事項:
關(guān)于使用里層group by age然后再外層count(user_id)來替代count(distinct user_id)直接去重計算是否一定就起到優(yōu)化效果這也是看情況的,假設(shè)表數(shù)據(jù)量不是特別大,有些情況下里層group by age然后再外層count(user_id)未必就見得比count(distinct user_id)好。所以還是具體業(yè)務(wù)場景具體分析為好,優(yōu)化從來不是考慮局部就好,要全局考慮。
hive3.x版本里已經(jīng)新增了對count(distinct )的優(yōu)化,通過set hive.optimize.countdistinct配置,即使真的出現(xiàn)數(shù)據(jù)傾斜也可以自動優(yōu)化,自動改變SQL執(zhí)行的邏輯
里層group by age然后再外層count(user_id)這種方式會生成2個job任務(wù),會消耗更多的磁盤網(wǎng)絡(luò)I/O資源
10. 參數(shù)調(diào)優(yōu)
set hive.optimize.countdistinct=true開啟對count(distinct )的自動優(yōu)化
set hive.auto.convert.join = true;開啟自動mapjoin
set hive.mapjoin.smalltable.filesize=26214400;大表小表的閾值設(shè)置(默認(rèn)25M一下認(rèn)為是小表)
set hive.exec.parallel=true;打開任務(wù)并行執(zhí)行
set hive.exec.parallel.thread.number=16;同一個sql允許最大并行度,默認(rèn)值為8。默認(rèn)情況下,Hive一次只會執(zhí)行一個階段。開啟并行執(zhí)行時會把一個sql語句中沒有相互依賴的階段并行去運行,這樣可能使得整個job的執(zhí)行時間縮短。提高集群資源利用率,不過這當(dāng)然得是在系統(tǒng)資源比較空閑的時候才有優(yōu)勢,否則沒資源,并行也起不來。
set hive.map.aggr=true;默認(rèn)值是true,當(dāng)選項設(shè)定為true時,開啟map端部分聚合
set hive.groupby.skewindata = ture;默認(rèn)值是false,當(dāng)有數(shù)據(jù)傾斜的時候進(jìn)行負(fù)載均衡,生成的查詢計劃有兩個MapReduce任務(wù),第一個MR Job中,Map的輸出結(jié)果會隨機(jī)分布到Reduce中,每個Reduce做部分聚合操作,并輸出結(jié)果,這樣處理的結(jié)果是相同的Group By Key有可能被分發(fā)到不同的Reduce中,從而達(dá)到負(fù)載均衡的目的;第二個MR Job再根據(jù)預(yù)處理的數(shù)據(jù)結(jié)果按照Group By Key分布到Reduce中(這個過程可以保證相同的Group By Key被分布到同一個Reduce中),最后完成最終的聚合操作
set hive.mapred.mode=strict;設(shè)置嚴(yán)格模式,默認(rèn)值是nonstrict非嚴(yán)格模式。嚴(yán)格模式下會禁止以下3種類型不合理查詢,即以下3種情況會報錯
對于查詢分區(qū)表,必須where加上分區(qū)限制條件
使用order by全局排序時,必須加上limit限制數(shù)據(jù)查詢條數(shù)
限制了笛卡爾積查詢
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;設(shè)置map端執(zhí)行前合并小文件
set hive.exec.compress.output=true;設(shè)置hive的查詢結(jié)果輸出是否進(jìn)行壓縮
set mapreduce.output.fileoutputformat.compress=true;設(shè)置MapReduce Job的結(jié)果輸出是否使用壓縮
set hive.cbo.enable=false;關(guān)閉CBO優(yōu)化,默認(rèn)值true開啟,可以自動優(yōu)化HQL中多個JOIN的順序,并選擇合適的JOIN算法
11. 解決數(shù)據(jù)傾斜問題
什么是數(shù)據(jù)傾斜
數(shù)據(jù)傾斜是大量的相同key被partition分配到同一個reduce里,造成了'一個人累死,其他人閑死'的情況,違背了并行計算的初衷,而且當(dāng)其他節(jié)點計算好了還要等待這個忙碌節(jié)點的計算,效率就被拉低了
數(shù)據(jù)傾斜的明顯表現(xiàn)
任務(wù)進(jìn)度長時間維持在99%,查看任務(wù)監(jiān)控頁面,發(fā)現(xiàn)只有少量(1個或幾個)reduce子任務(wù)未完成。因為其處理的數(shù)據(jù)量和其他reduce差異過大
數(shù)據(jù)傾斜的根本原因是什么?
key分布不均勻,redurce數(shù)據(jù)處理不均勻
如何盡量避免數(shù)據(jù)傾斜
如何將數(shù)據(jù)均勻的分配到各個reduce中,就是避免數(shù)據(jù)傾斜的根本所在。舉例下2個典型案例,關(guān)于join操作發(fā)生的數(shù)據(jù)傾斜和解決方案:就在文章上面的第六點join優(yōu)化【大表join大表場景】,還有合理設(shè)置map數(shù)和reduce數(shù)的解決方案。
合理設(shè)置map數(shù)和reduce數(shù)
1、Map端優(yōu)化
通常情況下,Job會通過input目錄產(chǎn)生一個或多個map任務(wù),map數(shù)主要取決與input的文件總個數(shù),文件總大小,集群設(shè)置的文件塊大小。
從hadoop2.7.3版本開始,HDFS的默認(rèn)塊大小block size是128M。每張hive表在hdfs上對應(yīng)存儲都是一個文件,關(guān)于執(zhí)行task時,每一個128M的文件都是一個塊block,每個塊就用一個map任務(wù)來完成,若文件超過128M就分塊,若小于128M則獨立成塊。
那么:①當(dāng)小文件過多怎么辦?
答案是map任務(wù)增多,map任務(wù)的啟動和初始化時間遠(yuǎn)大于執(zhí)行邏輯處理時間,從而集群造成資源浪費。
②是不是讓每個文件都接近128M大小就毫無問題了呢?
答案是不可能,假設(shè)一個文件大小127M,但表只有一兩個字段,文件大小是由幾千萬條記錄撐大的,如果數(shù)據(jù)處理邏輯復(fù)雜則用一個map任務(wù)去執(zhí)行也是很耗時的。
③是不是map數(shù)越多越好?
答案是這種說法是片面的,map數(shù)增多有利于提升并行度,但一個map在啟動和初始化時間是遠(yuǎn)大于執(zhí)行邏輯處理時間,越多的map啟動初始化就造成很大的集群資源浪費。
減少map數(shù)量,降低資源浪費,如何做?
以下相當(dāng)于是把小文件合并成大文件處理 (多合一)
-- 102400000B=102400KB=100M
-- 每個Map最大輸入大小(這個值決定了合并后文件的數(shù)量)
set mapred.max.split.size=102400000;
-- 一個節(jié)點上split的至少的大小(這個值決定了多個DataNode上的文件是否需要合并)
set mapred.min.split.size.per.node=102400000;
-- 一個交換機(jī)下split的至少的大小(這個值決定了多個交換機(jī)上的文件是否需要合并)
set mapred.min.split.size.per.rack=102400000;
-- 前3行設(shè)置是確定合并文件塊的大小,>128M的文件按128M切塊,>100M和<128M的文件按100M切塊,剩下的<100M的小文件直接合并
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; -- map執(zhí)行前合并小文件
有時候?qū)ive進(jìn)行優(yōu)化,在執(zhí)行時間上可能沒什么大的改觀,但是在計算資源上就有很大改善。
增大map數(shù)量,分擔(dān)每個map處理的數(shù)據(jù)量提升任務(wù)效率,如何做?
以下相當(dāng)于是把小文件合并成大文件處理 (一拆多)
根據(jù)mapreduce切片的公式:computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize))),從公式可以看出調(diào)整maxSize最大值,讓maxSize最大值低于blocksize就可以增加map的個數(shù)。
mapreduce.input.fileinputformat.split.minsize(切片最小值),默認(rèn)值=1,參數(shù)調(diào)的比blockSize大,則可以讓切片變得比blocksize還大,從而減少map數(shù)
mapreduce.input.fileinputformat.split.maxsize(切片最大值),默認(rèn)值=blocksize塊大小,參數(shù)如果調(diào)到比blocksize小,則會讓切片變小,從而增大map數(shù)
2、Reduce端優(yōu)化
reduce個數(shù)設(shè)置過大也會產(chǎn)生很多小文件對namenode有影響,且輸出的小文件偶爾也會作為下一個任務(wù)的輸入導(dǎo)致出現(xiàn)小文件過多問題,設(shè)置過小又會導(dǎo)致單個reduce處理的數(shù)據(jù)量過大導(dǎo)致OOM異常。
不指定時則hive會默認(rèn)根據(jù)計算公式hive.exec.reducers.bytes.per.reducer(每個reduce任務(wù)處理數(shù)據(jù)量,默認(rèn)1G)和hive.exec.reducers.max(每個任務(wù)的最大reduce數(shù),默認(rèn)1009個),來做min(hive.exec.reducers.max值,總輸入數(shù)據(jù)量/hive.exec.reducers.bytes.per.reducer值)計算,得出結(jié)果確定reduce個數(shù),所以可以通過調(diào)整參數(shù)1和參數(shù)2來調(diào)整reduce個數(shù),不過最簡便的還是通過下面的參數(shù)來直接控制reduce個數(shù)。
-- 手動指定reduce個數(shù)
set mapred.reduce.tasks=50;
-- 設(shè)置每一個job中reduce個數(shù)
set mapreduce.job.reduces=50;
那么:①reduce數(shù)是不是越多越好?
答案是錯誤的,同map數(shù)一樣,啟動reduce和初始化同樣耗時和占資源,而且過多的reduce會生成多個文件,同樣會出現(xiàn)小文件問題。
②什么情況下當(dāng)設(shè)置了參數(shù)指定reduce個數(shù)后還是只有單個reduce在跑?
本身輸入數(shù)據(jù)量就小于1G
在做測數(shù)據(jù)量驗證時沒加group by分組匯總。比如select count(1) from test_table where dt = 20201228;
用了order by排序
關(guān)聯(lián)出現(xiàn)了笛卡爾積
合理設(shè)置map數(shù)和reduce數(shù)的小結(jié):
set hive.input.format = org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; //系統(tǒng)默認(rèn)格式,設(shè)置在map執(zhí)行前合并小文件,減少map數(shù)
set mapreduce.input.fileinputformat.split.maxsize = 100; //調(diào)整最大切片值,讓maxSize值低于blocksize就可以增加map數(shù)
根據(jù)mapreduce切片的公式:computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize))),從公式可以看出調(diào)整maxSize最大值,讓maxSize最大值低于blocksize,從而使切片變小,就可以增加map的個數(shù)
三、總結(jié)
日常hive開發(fā)中時刻養(yǎng)成提前數(shù)據(jù)收斂的習(xí)慣,避免無用數(shù)據(jù)參與到計算中
不要過度進(jìn)行優(yōu)化,有可能做的是無用功甚至產(chǎn)生負(fù)效應(yīng),在調(diào)優(yōu)上投入的工作成本和回報不成正比
對于公共可復(fù)用的邏輯代碼,可以抽取出來落地臨時表或者中間表,提升復(fù)用性,強(qiáng)調(diào)復(fù)用!
理解hiveQL底層執(zhí)行的原理,優(yōu)化起來才有章可循
理透需求是代碼優(yōu)化的前提,關(guān)注全局?jǐn)?shù)據(jù)鏈路,一些常見的hive優(yōu)化策略要懂
做hive優(yōu)化的時候,涉及到參數(shù)調(diào)優(yōu)時要慎重,比如把內(nèi)存都申請搶占滿了,避免因為你自己的任務(wù)調(diào)優(yōu)了但影響到整個集群其他任務(wù)的資源分配,全局優(yōu)才是優(yōu)!
作者:大數(shù)據(jù)小江 Akin
歡迎關(guān)注微信公眾號 :大數(shù)據(jù)階梯之路