「Hive進(jìn)階篇」萬字長文超詳述Hive企業(yè)級(jí)優(yōu)化
大家好,我是Akin,肝了幾個(gè)晚上,梳理總結(jié)了一份萬字長文超詳述hive企業(yè)級(jí)優(yōu)化文章,也整理了一份hive優(yōu)化總結(jié)思維導(dǎo)圖和hive優(yōu)化詳細(xì)PDF文檔,有需要可點(diǎn)贊+在看關(guān)注公眾號(hào)《大數(shù)據(jù)階梯之路》找小編獲取文檔保存本地吧,學(xué)習(xí)和復(fù)習(xí)都是絕佳,公眾號(hào)不斷分享技術(shù)相關(guān)文章。話不多說,下面就直接開講吧!
一、問題背景
hive離線數(shù)倉開發(fā),一個(gè)良好的數(shù)據(jù)任務(wù),它的運(yùn)行時(shí)長一般是在合理范圍內(nèi)的,當(dāng)發(fā)現(xiàn)報(bào)表應(yīng)用層的指標(biāo)數(shù)據(jù)總是產(chǎn)出延遲,排查定位發(fā)現(xiàn)是有些任務(wù)執(zhí)行了超10小時(shí)這樣肯定是不合理的,此時(shí)就該想想如何優(yōu)化數(shù)據(jù)任務(wù)鏈路,主要從以下幾個(gè)角度來考慮問題解決:
從數(shù)據(jù)任務(wù)本身hive邏輯代碼出發(fā),即hive邏輯優(yōu)化,偏理解業(yè)務(wù)角度
從集群的資源設(shè)置出發(fā),即hive參數(shù)調(diào)優(yōu),偏理解技術(shù)角度
從全局?jǐn)?shù)據(jù)鏈路的任務(wù)設(shè)置出發(fā),觀測(cè)是否任務(wù)執(zhí)行調(diào)度設(shè)置不合理
從數(shù)倉的數(shù)據(jù)易用性和模型復(fù)用性的角度出發(fā),針對(duì)某些中間邏輯過程可以復(fù)用的就落地中間模型表
附上一份個(gè)人梳理總結(jié)的思維導(dǎo)圖部分截圖
下面就先分享下常見的hive優(yōu)化策略吧~ 會(huì)附帶案例實(shí)踐幫助理解
hive優(yōu)化文章大綱
列裁剪和分區(qū)裁剪
提前數(shù)據(jù)收斂
謂詞下推(PPD)
多路輸出,減少表讀取次數(shù)寫多個(gè)結(jié)果表
合理選擇排序
join優(yōu)化
合理選擇文件存儲(chǔ)格式和壓縮方式
解決小文件過多問題
distinct 和 group by
參數(shù)調(diào)優(yōu)
解決數(shù)據(jù)傾斜問題
二、hive優(yōu)化
1. 列裁剪和分區(qū)裁剪
裁剪 顧名思義就是不需要的數(shù)據(jù)不要多查。
列裁剪,盡量減少直接select * from table這種操作,首先可讀性不好,根本不知道具體用到哪幾個(gè)列,其次列選擇多了也會(huì)增大IO傳輸;
分區(qū)裁剪就是針對(duì)分區(qū)表切記要加上分區(qū)過濾條件,比如表以時(shí)間作為分區(qū)字段,要加上分區(qū)篩選。
2. 提前數(shù)據(jù)收斂
在子查詢中,有些條件能先過濾的盡量放在子查詢里先過濾,減少子查詢輸出的數(shù)據(jù)量。
-- 原腳本
select
a.字段a,a.字段b,b.字段a,b.字段b
from
(
select 字段a,字段b
from table_a
where dt = date_sub(current_date,1)
) a
left join
(
select 字段a,字段b
from table_b
where dt = date_sub(current_date,1)
) b
on a.字段a = b.字段a
where a.字段b <> ''
and b.字段b <> 'xxx'
;
-- 優(yōu)化腳本 (數(shù)據(jù)收斂)
select
a.字段a,a.字段b,b.字段a,b.字段b
from
(
select 字段a,字段b
from table_a
where dt = date_sub(current_date,1)
and 字段b <> ''
) a
left join
(
select 字段a,字段b
from table_b
where dt = date_sub(current_date,1)
and 字段b <> 'xxx'
) b
on a.字段a = b.字段a
;
3. 謂詞下推(Predicate Pushdown)
謂詞下推Predicate Pushdown是什么?簡稱PPD,指的是在不影響數(shù)據(jù)結(jié)果的情況下,將過濾表達(dá)式盡可能移動(dòng)至靠近數(shù)據(jù)源的位置,以使真正執(zhí)行時(shí)能直接跳過無關(guān)的數(shù)據(jù),這樣在map執(zhí)行過濾條件,可以減少map端數(shù)據(jù)輸出,起到了數(shù)據(jù)收斂的作用,降低了數(shù)據(jù)在集群上傳輸?shù)牧浚?jié)約了集群的資源,也提升了任務(wù)的性能。
hive默認(rèn)是開啟謂詞下推該參數(shù)設(shè)置的,hive.optimize.ppd=true
所謂下推,即謂詞過濾在map端執(zhí)行;所謂不下推,即謂詞過濾在reduce端執(zhí)行。
關(guān)于謂詞下推的規(guī)則,主要分為join的on條件過濾下推和where條件過濾下推,我整理了一張圖方便理解。
核心判斷邏輯:join的on條件過濾不能下推到保留行表中;where條件過濾不能下推到null補(bǔ)充表中。
-- 舉例說明:以下腳本 on后面的a表?xiàng)l件過濾沒有下推至map端運(yùn)行而是在reduce端運(yùn)行,where后面的b表?xiàng)l件過濾則有下推至map端運(yùn)行
select
a.字段a,a.字段b,b.字段a,b.字段b
from table_a a
left join table_b b
on a.字段a <> '' -- a表?xiàng)l件過濾
where a.字段b <> 'xxx' -- a表?xiàng)l件過濾
;
謂詞下推注意事項(xiàng):
如果在表達(dá)式中含有不確定函數(shù),整個(gè)表達(dá)式的謂詞將不會(huì)被下推。例如下面腳本,則整個(gè)條件過濾都是在reduce端執(zhí)行:
select a.*
from a join b
on a.id = b.id
where a.ds = '2019-10-09'
and a.create_time = unix_timestamp()
;
因?yàn)樯厦鎢nix_timestamp()是不確定函數(shù),在編譯的時(shí)候無法得知,所以,整個(gè)表達(dá)式不會(huì)被下推,即ds='2022-07-04'也不會(huì)被提前過濾。類似的不確定函數(shù)還有rand()函數(shù)等。
附上2篇關(guān)于謂詞下推的詳細(xì)案例分析講解
!上鏈接,自行復(fù)制去訪問哈:
① https://cloud.tencent.com/developer/article/1616687
② https://cloud.tencent.com/developer/article/1616689
4. 多路輸出
當(dāng)我們有使用一次查詢,多次插入的場(chǎng)景時(shí),則可以采用多路輸出的寫法,減少表的讀取次數(shù),起到性能優(yōu)化的作用。
-- 讀取一次源表,同時(shí)寫入多張目標(biāo)表
from table_source
insert overwrite table table_a
select *
where dt = date_sub(current_date,1)
and event_name = '事件A'
insert overwrite table table_b
select *
where dt = date_sub(current_date,1)
and event_name = '事件B'
insert oveewrite table table_c
select *
where dt = date_sub(current_date,1)
and event_name = '事件C'
;
多路輸出注意事項(xiàng):
一般情況下,一個(gè)sql里面最多支持128路輸出,超過了則會(huì)報(bào)錯(cuò)
在多插往同一張分區(qū)表的不同分區(qū)時(shí),不允許在一個(gè)sql里面多路輸出時(shí)既包含insert overwrite和insert into,要統(tǒng)一操作
5. 合理選擇排序
order by
全局排序,只走一個(gè)reducer,當(dāng)表數(shù)據(jù)量較大時(shí)容易計(jì)算不出來,性能不佳慎用,在嚴(yán)格模式下需要加limit
sort by
局部排序,即保證單個(gè)reduce內(nèi)結(jié)果有序,但沒有全局排序的能力。
distribute by
按照指定的字段把數(shù)據(jù)劃分輸出到不同的reducer中,是控制數(shù)據(jù)如何從map端輸出到reduce端,hive會(huì)根據(jù)distribute by后面的字段和對(duì)應(yīng)reducer的個(gè)數(shù)進(jìn)行hash分發(fā)
cluster by
擁有distrubute by的能力,同時(shí)也擁有sort by的能力,所以可以理解cluster by是 distrubute by+sort by
以下舉個(gè)排序方式優(yōu)化案例,取用戶信息表(10億數(shù)據(jù)量)中年齡排前100的用戶信息:以下案例實(shí)現(xiàn)也體現(xiàn)了一個(gè)大數(shù)據(jù)思想,分而治之,大job拆分小job。
-- 原腳本
select *
from tmp.user_info_table
where dt = '2022-07-04'
order by age -- 全局排序,只走一個(gè)reduce
limit 100
;
-- 優(yōu)化腳本
set mapred.reduce.tasks=50; -- 設(shè)置reduce個(gè)數(shù)為50
select *
from tmp.user_info_table
where dt = '2022-07-04'
distribute by (case when age<20 then 0
when age >=20 and age <= 40 then 1
else 2
end
) -- distribute by主要是為了控制map端輸出的數(shù)據(jù)在reduce端中是如何劃分的,防止map端數(shù)據(jù)隨機(jī)分配到reduce。這里字段做case when判斷是因?yàn)橛脩裟挲g的零散值會(huì)導(dǎo)致分布不均勻,起太多reduce本身也耗時(shí)浪費(fèi)資源
sort by age -- 起多個(gè)reduce排序,保證單個(gè)reduce結(jié)果有序
limit 100 -- 取前100,因?yàn)槭前凑漳挲g局部排序過,所以前100個(gè)也一定是年齡最小的
;
排序選擇的小結(jié):
order by全局排序,但只有一個(gè)reducer執(zhí)行,數(shù)據(jù)量大的話容易計(jì)算不過來,慎用
sort by局部排序,單個(gè)reducer內(nèi)有序,把map端隨機(jī)分發(fā)給reduce端執(zhí)行,如果是要實(shí)現(xiàn)全局排序且走多個(gè)reducer的優(yōu)化需求時(shí),可以在外層嵌套一層,例如:select * from (select * from 表名 sort by 字段名 limit N) order by 字段名 limit N,這樣就有2個(gè)Job,一個(gè)是內(nèi)層的局部排序,一個(gè)是外層的歸并全局排序
distribute by可以按照指定字段將數(shù)據(jù)進(jìn)行hash分發(fā)到對(duì)應(yīng)的reducer去執(zhí)行
當(dāng)分區(qū)字段和排序字段相同時(shí)可以使用cluster by來簡化distribute by+sort by的寫法,但是cluster by排序只能是升序排序,不能指定排序規(guī)則是ASC或者DESC
6. join優(yōu)化
hive在redurce階段完成的join就是common join,在map階段完成的join就是map join。
提前收斂數(shù)據(jù)量,保證在join關(guān)聯(lián)前無用數(shù)據(jù)不參與關(guān)聯(lián)
這塊可以跟前面的數(shù)據(jù)收斂模塊&謂詞下推模塊 搭配起來看,主要就是提前收斂數(shù)據(jù)量,不止在join場(chǎng)景,在其他復(fù)雜計(jì)算前同樣適用。
left semi join左半關(guān)聯(lián)
left semi join一開始出現(xiàn)的使用場(chǎng)景其實(shí)是解決hive不支持in/exists子查詢的高效實(shí)現(xiàn),雖然left semi join含有l(wèi)eft,但其實(shí)不是保留左表全部數(shù)據(jù),效果類似于join吧,只是最終結(jié)果只取左表中的列,還有最終結(jié)果某些場(chǎng)景下會(huì)跟join結(jié)果不同。
select a.*
from
(
select 1 as id,'a' as name
union all
select 2 as id,'b' as name
) a
left semi join
(
select 1 as id,'b' as name
union all
select 1 as id,'c' as name
) b
on a.id = b.id
-- 你猜left semi join結(jié)果是?
id name
1 a
-- 而如果上面的腳本是join呢,結(jié)果?
id name
1 a
1 a
left semi join注意事項(xiàng):
右表的條件過濾只能寫在on后面,不能寫在where后面
最終結(jié)果只能展示左表的列,右表的列不能展示
left semi join與join的差異:主要在于右表有重復(fù)數(shù)據(jù)時(shí),left semi join是遍歷到右表一條數(shù)據(jù)后就跳過,只取一條,而join是一直遍歷至右表最后一條數(shù)據(jù),這也就是要注意實(shí)際數(shù)據(jù)場(chǎng)景是否有重復(fù)和是否要保留
大表join小表場(chǎng)景
大表join小表的話,要把小表放在左邊,大表放在右邊,這是因?yàn)閖oin操作發(fā)生在reduce階段,在hive2.x版本以前,位于左邊的表會(huì)被加載進(jìn)內(nèi)存中,所以如果是大表放左邊被加載進(jìn)內(nèi)存的話就會(huì)有內(nèi)存溢出的風(fēng)險(xiǎn),不過在hive2.x版本后就已經(jīng)優(yōu)化好這塊了,無需關(guān)注,底層幫我們優(yōu)化好這個(gè)問題了。
啟用mapjoin
mapjoin就是把join的表直接分發(fā)到map端的內(nèi)存中,即在map端來執(zhí)行join操作,就不用在reduce階段進(jìn)行join了,提高了執(zhí)行效率。如果表比較小的話最好是啟用mapjoin,hive默認(rèn)是開啟自動(dòng)mapjoin的。
set hive.auto.convert.join = true;
-- 大表小表的閾值設(shè)置(默認(rèn)25M一下認(rèn)為是小表)
set hive.mapjoin.smalltable.filesize=26214400;
大表join大表場(chǎng)景
舉例,假設(shè)a表是包括許多空值的數(shù)據(jù),b表是不包含空值的數(shù)據(jù)
-- 不做優(yōu)化時(shí)的原始hql
select a.id
from a left join b
on a.id = b.id
1、空key過濾,過濾空key的數(shù)據(jù)
關(guān)聯(lián)的過程是相同key對(duì)應(yīng)的數(shù)據(jù)都會(huì)發(fā)送到相同的reducer上,如果某些空key過多是會(huì)導(dǎo)致內(nèi)存不夠的,從而引發(fā)join超時(shí),所以如果不需要這類空key數(shù)據(jù)的時(shí)候,可以先過濾掉這些異常數(shù)據(jù)。
-- 做空key過濾優(yōu)化時(shí)的hql,利用子查詢先處理掉后再關(guān)聯(lián)
select a.id
from (select * from a where id is not null) a
join b
on a.id = b.id
2、空key轉(zhuǎn)換,轉(zhuǎn)換key的數(shù)據(jù)進(jìn)行關(guān)聯(lián)時(shí)打散key
當(dāng)然,有時(shí)候空值的數(shù)據(jù)又不一定是異常數(shù)據(jù),還是需要保留的,但是空key過多都分配到一個(gè)reducer去了,這樣執(zhí)行起來就算不內(nèi)存溢出也會(huì)發(fā)生數(shù)據(jù)傾斜情況,數(shù)據(jù)傾斜的話對(duì)集群資源的利用率來看的話是極其不利的,我們可以通過把空key虛擬成隨機(jī)數(shù),但要保證不是同一個(gè)空key,從而降低數(shù)據(jù)傾斜概率,雖然這樣在對(duì)關(guān)聯(lián)鍵做處理反而會(huì)總體增長執(zhí)行時(shí)間,但卻減輕了reducer負(fù)擔(dān)。
-- 做空key轉(zhuǎn)換優(yōu)化時(shí)的hql,利用case when判斷加隨機(jī)數(shù)
select a.id
from a.left join b
on case when a.id is null then concat('hive'+rand()) else a.id end = b.id
避免笛卡爾積
盡量避免笛卡爾積,即避免join的時(shí)候不加on條件,或者無效的on條件,因?yàn)镠ive只能使用1個(gè)reducer來完成笛卡爾積,不過這點(diǎn)hive會(huì)通過嚴(yán)格模式下來提醒,在嚴(yán)格模式下出現(xiàn)笛卡爾積時(shí)報(bào)錯(cuò)。
7. 合理選擇文件存儲(chǔ)格式和壓縮方式
關(guān)于這點(diǎn),我專門寫過一篇文章介紹hive常見的幾種存儲(chǔ)格式和壓縮方式,具體可以去上次我寫過的這篇文章看看
!上鏈接:https://mp.weixin.qq.com/s/RndQKF5y9Mto7QfgiiAOvQ
8. 解決小文件過多問題
先來說一說什么是小文件,怎么發(fā)生的
顧名思義,小文件就是文件很小的文件,小文件的產(chǎn)生一定是發(fā)生在向hive表導(dǎo)入數(shù)據(jù)的時(shí)候,比如:
-- 第①種導(dǎo)入數(shù)據(jù)方式
insert into table A values(); -- 每執(zhí)行一條語句hive表就產(chǎn)生一個(gè)文件,但這種導(dǎo)入數(shù)據(jù)方式生產(chǎn)環(huán)境少見;
-- 第②種導(dǎo)入數(shù)據(jù)方式
load data local path '本地文件/本地文件夾 路徑' overwrite into table A; -- 導(dǎo)入文件/文件夾`,即有多少個(gè)文件hive表就會(huì)產(chǎn)生多少個(gè)文件
-- 第③種導(dǎo)入數(shù)據(jù)方式
insert overwrite table A select * from B; -- 通過查詢的方式導(dǎo)入數(shù)據(jù)是生產(chǎn)環(huán)境最常見的
MR中 reduce 有多少個(gè)就輸出多少個(gè)文件,文件數(shù)量 = reduce數(shù)量 * 分區(qū)數(shù),如果說某些簡單job沒有reduce階段只有map階段,那文件數(shù)量 = map數(shù)量 * 分區(qū)數(shù)。從公式上看,reduce的個(gè)數(shù)和分區(qū)數(shù)最終決定了輸出的文件的個(gè)數(shù),所以可以調(diào)整reduce的個(gè)數(shù)以及分區(qū) 達(dá)到控制hive表的文件數(shù)量。
小文件過多有什么影響
首先第一點(diǎn)從HDFS底層來看,小文件過多會(huì)給集群namenode帶來負(fù)擔(dān),即namenode元數(shù)據(jù)大占用內(nèi)存,影響HDFS的性能
第二點(diǎn)從hive來看,在進(jìn)行查詢時(shí),每個(gè)小文件都會(huì)當(dāng)成一個(gè)塊,啟動(dòng)一個(gè)Map任務(wù)來完成,而一個(gè)Map任務(wù)啟動(dòng)和初始化的時(shí)間遠(yuǎn)遠(yuǎn)大于邏輯處理的時(shí)間,就會(huì)造成很大的資源浪費(fèi)
如何解決小文件過多問題
1、使用hive自帶的 concatenate 命令,來合并小文件
不過要注意的是concatenate命令只支持hive表存儲(chǔ)格式是orcfile或者rcfile,還有該方式不支持指定合并后的文件數(shù)量
-- 對(duì)于非分區(qū)表
alter table test_table concatenate;
-- 對(duì)于分區(qū)表
alter table test_table partition(dt = '2022-07-16') concatenate;
2、調(diào)整參數(shù)減少M(fèi)ap數(shù)
設(shè)置map輸入合并小文件
-- 102400000B=102400KB=100M
-- 每個(gè)Map最大輸入大小(這個(gè)值決定了合并后文件的數(shù)量)
set mapred.max.split.size=102400000;
-- 一個(gè)節(jié)點(diǎn)上split的至少的大小(這個(gè)值決定了多個(gè)DataNode上的文件是否需要合并)
set mapred.min.split.size.per.node=102400000;
-- 一個(gè)交換機(jī)下split的至少的大小(這個(gè)值決定了多個(gè)交換機(jī)上的文件是否需要合并)
set mapred.min.split.size.per.rack=102400000;
-- 前3行設(shè)置是確定合并文件塊的大小,>128M的文件按128M切塊,>100M和<128M的文件按100M切塊,剩下的<100M的小文件直接合并
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; -- map執(zhí)行前合并小文件
設(shè)置map輸出和reduce輸出合并小文件
-- 設(shè)置map端輸出進(jìn)行合并,默認(rèn)為true
set hive.merge.mapfiles = true;
-- 設(shè)置reduce端輸出進(jìn)行合并,默認(rèn)為false
set hive.merge.mapredfiles = true;
-- 設(shè)置合并文件的大小
set hive.merge.size.per.task = 256*1000*1000;
-- 當(dāng)輸出文件的平均大小小于該值時(shí),啟動(dòng)一個(gè)獨(dú)立的MapReduce任務(wù)進(jìn)行文件merge
set hive.merge.smallfiles.avgsize=16000000;
3、調(diào)整參數(shù)減少Reduce數(shù)
-- hive中的分區(qū)函數(shù) distribute by 正好是控制MR中partition分區(qū)的,然后通過設(shè)置reduce的數(shù)量,結(jié)合分區(qū)函數(shù)讓數(shù)據(jù)均衡的進(jìn)入每個(gè)reduce即可。
-- 直接設(shè)置reduce個(gè)數(shù)
set mapreduce.job.reduces=10;
-- 執(zhí)行以下語句,將數(shù)據(jù)均衡的分配到reduce中
set mapreduce.job.reduces=10;
insert overwrite table A partition(dt)
select * from B
distribute by rand();
解釋:如設(shè)置reduce數(shù)量為10,則使用 rand(), 隨機(jī)生成一個(gè)數(shù)x % 10,這樣數(shù)據(jù)就會(huì)隨機(jī)進(jìn)入 reduce 中,防止出現(xiàn)有的文件過大或過小
9. count(distinct ) 和 group by
在計(jì)算去重指標(biāo)的時(shí)候,比如不同年齡段的用戶數(shù)這個(gè)指標(biāo),一般都是采用count(distinct user_id)直接計(jì)算,當(dāng)表數(shù)據(jù)量不大的話影響不大,但如果數(shù)據(jù)量大count distinct就很耗性能了,因?yàn)槠渲粫?huì)用一個(gè)reduce task來執(zhí)行,容易reduce端數(shù)據(jù)傾斜,通常優(yōu)化就使用里層group by age然后再外層count(user_id)來替代。
注意事項(xiàng):
關(guān)于使用里層group by age然后再外層count(user_id)來替代count(distinct user_id)直接去重計(jì)算是否一定就起到優(yōu)化效果這也是看情況的,假設(shè)表數(shù)據(jù)量不是特別大,有些情況下里層group by age然后再外層count(user_id)未必就見得比count(distinct user_id)好。所以還是具體業(yè)務(wù)場(chǎng)景具體分析為好,優(yōu)化從來不是考慮局部就好,要全局考慮。
hive3.x版本里已經(jīng)新增了對(duì)count(distinct )的優(yōu)化,通過set hive.optimize.countdistinct配置,即使真的出現(xiàn)數(shù)據(jù)傾斜也可以自動(dòng)優(yōu)化,自動(dòng)改變SQL執(zhí)行的邏輯
里層group by age然后再外層count(user_id)這種方式會(huì)生成2個(gè)job任務(wù),會(huì)消耗更多的磁盤網(wǎng)絡(luò)I/O資源
10. 參數(shù)調(diào)優(yōu)
set hive.optimize.countdistinct=true開啟對(duì)count(distinct )的自動(dòng)優(yōu)化
set hive.auto.convert.join = true;開啟自動(dòng)mapjoin
set hive.mapjoin.smalltable.filesize=26214400;大表小表的閾值設(shè)置(默認(rèn)25M一下認(rèn)為是小表)
set hive.exec.parallel=true;打開任務(wù)并行執(zhí)行
set hive.exec.parallel.thread.number=16;同一個(gè)sql允許最大并行度,默認(rèn)值為8。默認(rèn)情況下,Hive一次只會(huì)執(zhí)行一個(gè)階段。開啟并行執(zhí)行時(shí)會(huì)把一個(gè)sql語句中沒有相互依賴的階段并行去運(yùn)行,這樣可能使得整個(gè)job的執(zhí)行時(shí)間縮短。提高集群資源利用率,不過這當(dāng)然得是在系統(tǒng)資源比較空閑的時(shí)候才有優(yōu)勢(shì),否則沒資源,并行也起不來。
set hive.map.aggr=true;默認(rèn)值是true,當(dāng)選項(xiàng)設(shè)定為true時(shí),開啟map端部分聚合
set hive.groupby.skewindata = ture;默認(rèn)值是false,當(dāng)有數(shù)據(jù)傾斜的時(shí)候進(jìn)行負(fù)載均衡,生成的查詢計(jì)劃有兩個(gè)MapReduce任務(wù),第一個(gè)MR Job中,Map的輸出結(jié)果會(huì)隨機(jī)分布到Reduce中,每個(gè)Reduce做部分聚合操作,并輸出結(jié)果,這樣處理的結(jié)果是相同的Group By Key有可能被分發(fā)到不同的Reduce中,從而達(dá)到負(fù)載均衡的目的;第二個(gè)MR Job再根據(jù)預(yù)處理的數(shù)據(jù)結(jié)果按照Group By Key分布到Reduce中(這個(gè)過程可以保證相同的Group By Key被分布到同一個(gè)Reduce中),最后完成最終的聚合操作
set hive.mapred.mode=strict;設(shè)置嚴(yán)格模式,默認(rèn)值是nonstrict非嚴(yán)格模式。嚴(yán)格模式下會(huì)禁止以下3種類型不合理查詢,即以下3種情況會(huì)報(bào)錯(cuò)
對(duì)于查詢分區(qū)表,必須where加上分區(qū)限制條件
使用order by全局排序時(shí),必須加上limit限制數(shù)據(jù)查詢條數(shù)
限制了笛卡爾積查詢
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;設(shè)置map端執(zhí)行前合并小文件
set hive.exec.compress.output=true;設(shè)置hive的查詢結(jié)果輸出是否進(jìn)行壓縮
set mapreduce.output.fileoutputformat.compress=true;設(shè)置MapReduce Job的結(jié)果輸出是否使用壓縮
set hive.cbo.enable=false;關(guān)閉CBO優(yōu)化,默認(rèn)值true開啟,可以自動(dòng)優(yōu)化HQL中多個(gè)JOIN的順序,并選擇合適的JOIN算法
11. 解決數(shù)據(jù)傾斜問題
什么是數(shù)據(jù)傾斜
數(shù)據(jù)傾斜是大量的相同key被partition分配到同一個(gè)reduce里,造成了'一個(gè)人累死,其他人閑死'的情況,違背了并行計(jì)算的初衷,而且當(dāng)其他節(jié)點(diǎn)計(jì)算好了還要等待這個(gè)忙碌節(jié)點(diǎn)的計(jì)算,效率就被拉低了
數(shù)據(jù)傾斜的明顯表現(xiàn)
任務(wù)進(jìn)度長時(shí)間維持在99%,查看任務(wù)監(jiān)控頁面,發(fā)現(xiàn)只有少量(1個(gè)或幾個(gè))reduce子任務(wù)未完成。因?yàn)槠涮幚淼臄?shù)據(jù)量和其他reduce差異過大
數(shù)據(jù)傾斜的根本原因是什么?
key分布不均勻,redurce數(shù)據(jù)處理不均勻
如何盡量避免數(shù)據(jù)傾斜
如何將數(shù)據(jù)均勻的分配到各個(gè)reduce中,就是避免數(shù)據(jù)傾斜的根本所在。舉例下2個(gè)典型案例,關(guān)于join操作發(fā)生的數(shù)據(jù)傾斜和解決方案:就在文章上面的第六點(diǎn)join優(yōu)化【大表join大表場(chǎng)景】,還有合理設(shè)置map數(shù)和reduce數(shù)的解決方案。
合理設(shè)置map數(shù)和reduce數(shù)
1、Map端優(yōu)化
通常情況下,Job會(huì)通過input目錄產(chǎn)生一個(gè)或多個(gè)map任務(wù),map數(shù)主要取決與input的文件總個(gè)數(shù),文件總大小,集群設(shè)置的文件塊大小。
從hadoop2.7.3版本開始,HDFS的默認(rèn)塊大小block size是128M。每張hive表在hdfs上對(duì)應(yīng)存儲(chǔ)都是一個(gè)文件,關(guān)于執(zhí)行task時(shí),每一個(gè)128M的文件都是一個(gè)塊block,每個(gè)塊就用一個(gè)map任務(wù)來完成,若文件超過128M就分塊,若小于128M則獨(dú)立成塊。
那么:①當(dāng)小文件過多怎么辦?
答案是map任務(wù)增多,map任務(wù)的啟動(dòng)和初始化時(shí)間遠(yuǎn)大于執(zhí)行邏輯處理時(shí)間,從而集群造成資源浪費(fèi)。
②是不是讓每個(gè)文件都接近128M大小就毫無問題了呢?
答案是不可能,假設(shè)一個(gè)文件大小127M,但表只有一兩個(gè)字段,文件大小是由幾千萬條記錄撐大的,如果數(shù)據(jù)處理邏輯復(fù)雜則用一個(gè)map任務(wù)去執(zhí)行也是很耗時(shí)的。
③是不是map數(shù)越多越好?
答案是這種說法是片面的,map數(shù)增多有利于提升并行度,但一個(gè)map在啟動(dòng)和初始化時(shí)間是遠(yuǎn)大于執(zhí)行邏輯處理時(shí)間,越多的map啟動(dòng)初始化就造成很大的集群資源浪費(fèi)。
減少map數(shù)量,降低資源浪費(fèi),如何做?
以下相當(dāng)于是把小文件合并成大文件處理 (多合一)
-- 102400000B=102400KB=100M
-- 每個(gè)Map最大輸入大小(這個(gè)值決定了合并后文件的數(shù)量)
set mapred.max.split.size=102400000;
-- 一個(gè)節(jié)點(diǎn)上split的至少的大小(這個(gè)值決定了多個(gè)DataNode上的文件是否需要合并)
set mapred.min.split.size.per.node=102400000;
-- 一個(gè)交換機(jī)下split的至少的大小(這個(gè)值決定了多個(gè)交換機(jī)上的文件是否需要合并)
set mapred.min.split.size.per.rack=102400000;
-- 前3行設(shè)置是確定合并文件塊的大小,>128M的文件按128M切塊,>100M和<128M的文件按100M切塊,剩下的<100M的小文件直接合并
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; -- map執(zhí)行前合并小文件
有時(shí)候?qū)ive進(jìn)行優(yōu)化,在執(zhí)行時(shí)間上可能沒什么大的改觀,但是在計(jì)算資源上就有很大改善。
增大map數(shù)量,分擔(dān)每個(gè)map處理的數(shù)據(jù)量提升任務(wù)效率,如何做?
以下相當(dāng)于是把小文件合并成大文件處理 (一拆多)
根據(jù)mapreduce切片的公式:computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize))),從公式可以看出調(diào)整maxSize最大值,讓maxSize最大值低于blocksize就可以增加map的個(gè)數(shù)。
mapreduce.input.fileinputformat.split.minsize(切片最小值),默認(rèn)值=1,參數(shù)調(diào)的比blockSize大,則可以讓切片變得比blocksize還大,從而減少map數(shù)
mapreduce.input.fileinputformat.split.maxsize(切片最大值),默認(rèn)值=blocksize塊大小,參數(shù)如果調(diào)到比blocksize小,則會(huì)讓切片變小,從而增大map數(shù)
2、Reduce端優(yōu)化
reduce個(gè)數(shù)設(shè)置過大也會(huì)產(chǎn)生很多小文件對(duì)namenode有影響,且輸出的小文件偶爾也會(huì)作為下一個(gè)任務(wù)的輸入導(dǎo)致出現(xiàn)小文件過多問題,設(shè)置過小又會(huì)導(dǎo)致單個(gè)reduce處理的數(shù)據(jù)量過大導(dǎo)致OOM異常。
不指定時(shí)則hive會(huì)默認(rèn)根據(jù)計(jì)算公式hive.exec.reducers.bytes.per.reducer(每個(gè)reduce任務(wù)處理數(shù)據(jù)量,默認(rèn)1G)和hive.exec.reducers.max(每個(gè)任務(wù)的最大reduce數(shù),默認(rèn)1009個(gè)),來做min(hive.exec.reducers.max值,總輸入數(shù)據(jù)量/hive.exec.reducers.bytes.per.reducer值)計(jì)算,得出結(jié)果確定reduce個(gè)數(shù),所以可以通過調(diào)整參數(shù)1和參數(shù)2來調(diào)整reduce個(gè)數(shù),不過最簡便的還是通過下面的參數(shù)來直接控制reduce個(gè)數(shù)。
-- 手動(dòng)指定reduce個(gè)數(shù)
set mapred.reduce.tasks=50;
-- 設(shè)置每一個(gè)job中reduce個(gè)數(shù)
set mapreduce.job.reduces=50;
那么:①reduce數(shù)是不是越多越好?
答案是錯(cuò)誤的,同map數(shù)一樣,啟動(dòng)reduce和初始化同樣耗時(shí)和占資源,而且過多的reduce會(huì)生成多個(gè)文件,同樣會(huì)出現(xiàn)小文件問題。
②什么情況下當(dāng)設(shè)置了參數(shù)指定reduce個(gè)數(shù)后還是只有單個(gè)reduce在跑?
本身輸入數(shù)據(jù)量就小于1G
在做測(cè)數(shù)據(jù)量驗(yàn)證時(shí)沒加group by分組匯總。比如select count(1) from test_table where dt = 20201228;
用了order by排序
關(guān)聯(lián)出現(xiàn)了笛卡爾積
合理設(shè)置map數(shù)和reduce數(shù)的小結(jié):
set hive.input.format = org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; //系統(tǒng)默認(rèn)格式,設(shè)置在map執(zhí)行前合并小文件,減少map數(shù)
set mapreduce.input.fileinputformat.split.maxsize = 100; //調(diào)整最大切片值,讓maxSize值低于blocksize就可以增加map數(shù)
根據(jù)mapreduce切片的公式:computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize))),從公式可以看出調(diào)整maxSize最大值,讓maxSize最大值低于blocksize,從而使切片變小,就可以增加map的個(gè)數(shù)
三、總結(jié)
日常hive開發(fā)中時(shí)刻養(yǎng)成提前數(shù)據(jù)收斂的習(xí)慣,避免無用數(shù)據(jù)參與到計(jì)算中
不要過度進(jìn)行優(yōu)化,有可能做的是無用功甚至產(chǎn)生負(fù)效應(yīng),在調(diào)優(yōu)上投入的工作成本和回報(bào)不成正比
對(duì)于公共可復(fù)用的邏輯代碼,可以抽取出來落地臨時(shí)表或者中間表,提升復(fù)用性,強(qiáng)調(diào)復(fù)用!
理解hiveQL底層執(zhí)行的原理,優(yōu)化起來才有章可循
理透需求是代碼優(yōu)化的前提,關(guān)注全局?jǐn)?shù)據(jù)鏈路,一些常見的hive優(yōu)化策略要懂
做hive優(yōu)化的時(shí)候,涉及到參數(shù)調(diào)優(yōu)時(shí)要慎重,比如把內(nèi)存都申請(qǐng)搶占滿了,避免因?yàn)槟阕约旱娜蝿?wù)調(diào)優(yōu)了但影響到整個(gè)集群其他任務(wù)的資源分配,全局優(yōu)才是優(yōu)!
作者:大數(shù)據(jù)小江 Akin
歡迎關(guān)注微信公眾號(hào) :大數(shù)據(jù)階梯之路