Hive知識點
- 在導(dǎo)入數(shù)據(jù)到外部表,數(shù)據(jù)并沒有移動到自己的數(shù)據(jù)倉庫目錄下,也就是說外部表中的數(shù)據(jù)并不是由它自己來管理的!而表則不一樣;
- 在刪除表的時候,Hive將會把屬于表的元數(shù)據(jù)和數(shù)據(jù)全部刪掉;而刪除外部表的時候,Hive僅僅刪除外部表的元數(shù)據(jù),數(shù)據(jù)是不會刪除的!那么,應(yīng)該如何選擇使用哪種表呢?在大多數(shù)情況沒有太多的區(qū)別,因此選擇只是個人喜好的問題。但是作為一個經(jīng)驗,如果所有處理都需要由Hive完成,那么你應(yīng)該創(chuàng)建表,否則使用外部表!
- 導(dǎo)入數(shù)據(jù)到textfile
- 查詢數(shù)據(jù)插入orc格式表
- 解決
- 未解決
- hive創(chuàng)建orc格式表不能像
textfile
格式一樣直接load數(shù)據(jù)到表中,一般需要load創(chuàng)建臨時textfile表,然后通過insert into 或者insert overwrite
到orc存儲格式表中。 - 或者將現(xiàn)有orc文件cp到
hive
對應(yīng)表的目錄
map,reduce知識
什么情況下只有一個reduce? - 沒有g(shù)roup by的匯總,比如把select pt,count(1) from popt_tbaccountcopy_mes
where pt = '2012-07-04' group by pt; 寫成 select count(1) from
popt_tbaccountcopy_mes where pt = '2012-07-04';
這點非常常見,希望大家盡量改寫。 - 用了Order by
- 有笛卡爾積
通常這些情況下,除了找辦法來變通和避免,我暫時沒有什么好的辦法,因為這些操作都是全局的,所以hadoop不得不用一個reduce去完成;
同樣的,在設(shè)置reduce個數(shù)的時候也需要考慮這兩個原則:使大數(shù)據(jù)量利用合適的reduce數(shù);使單個reduce任務(wù)處理合適的數(shù)據(jù)量。 - hive mapreduce參數(shù)優(yōu)化
設(shè)置map,reduce任務(wù)分配的資源set mapreduce.map.memory.mb = 4096 ; set mapreduce.reduce.memory.mb = 4096 ; set mapreduce.map.java.opts=-Xmx3686m; set mapreduce.reduce.java.opts=-Xmx3428m;
- hive.exec.parallel參數(shù)控制在同一個sql中的不同的job是否可以同時運行,默認(rèn)為false.
- 當(dāng)參數(shù)為false的時候,三個job是順序的執(zhí)行
set hive.exec.parallel=false;
- 但是可以看出來其實兩個子查詢中的sql并無關(guān)系,可以并行的跑
set hive.exec.parallel=true;
- 設(shè)置reduce個數(shù)
set mapred.reduce.tasks = 15;
==總結(jié)==:
在資源充足的時候hive.exec.parallel會讓那些存在并發(fā)job的sql運行得更快,但同時消耗更多的資源
可以評估下hive.exec.parallel對我們的刷新任務(wù)是否有幫助.
- 設(shè)置reduce個數(shù)
-
參數(shù)設(shè)置
set mapred.max.split.size=256000000; -- 決定每個map處理的最大的文件大小,單位為B
-
orc 小文件合并
-
-
msck repair修復(fù)大量分區(qū)
Hive知識點
hive知識點
工作中hive
常用知識點。
Hive簡介
hive是基于Hadoop的一個數(shù)據(jù)倉庫工具,可以將結(jié)構(gòu)化的數(shù)據(jù)文件映射為一張數(shù)據(jù)庫表,并提供簡單的sql查詢功能,可以將sql語句轉(zhuǎn)換為MapReduce任務(wù)進(jìn)行運行。其優(yōu)點是學(xué)習(xí)成本低,可以通過類SQL語句快速實現(xiàn)簡單的MapReduce統(tǒng)計,不必開發(fā)專門的MapReduce應(yīng)用,十分適合數(shù)據(jù)倉庫的統(tǒng)計分析。
創(chuàng)建hive表
# 新建個數(shù)據(jù)庫test
create database test;
create external table if not exists test.test(id string, name string);
這里創(chuàng)建了一個名為test
的hive
外部表,外部表與普通表的區(qū)別:
創(chuàng)建分區(qū)表
通過partition
關(guān)鍵字指定分區(qū)字段,分區(qū)表方便hive
快速查詢索引數(shù)據(jù)。
這里指定了兩個分區(qū):dt
和hour
,對應(yīng)hdfs
的2級目錄dt,hour
:
[hadoop@qcloud-test-hadoop01 ~]$ hdfs dfs -ls -R /hive/warehouse/test.db/test
drwxr-xr-x - hadoop supergroup 0 2019-09-10 19:07 /hive/warehouse/test.db/test/dt=2019-09-10
drwxr-xr-x - hadoop supergroup 0 2019-09-10 17:59 /hive/warehouse/test.db/test/dt=2019-09-10/hour=02
-rwxr-xr-x 2 hadoop supergroup 39 2019-09-10 17:59 /hive/warehouse/test.db/test/dt=2019-09-10/hour=02/test.txt
drwxr-xr-x - hadoop supergroup 0 2019-09-10 19:07 /hive/warehouse/test.db/test/dt=2019-09-10/hour=03
-rwxr-xr-x 2 hadoop supergroup 39 2019-09-10 19:07 /hive/warehouse/test.db/test/dt=2019-09-10/hour=03/test.txt
刪除分區(qū)
ALTER TABLE table_name DROP IF EXISTS PARTITION(year = 2015, month = 10, day = 1);
創(chuàng)建orc存儲格式表
hive
創(chuàng)建orc
格式表不能像textfile
格式一樣直接load
數(shù)據(jù)到表中,一般需要創(chuàng)建臨時textfile
表,然后通過insert into
或者insert overwrite
到orc
存儲格式表中。
1) 臨時表testfile存儲格式
test.txt
001 keguang
002 kg
003 kk
004 ikeguang
load data local inpath '/home/hadoop/data/test.txt' into table test.test partition(dt = '2019-09-10', hour = '02');
2) 導(dǎo)入數(shù)據(jù)到orc表
create table if not exists test.test2
(
id string,
name string
)
partitioned by (dt string,hour string)
row format delimited fields terminated by '\t'
stored as orc;
insert select
導(dǎo)入數(shù)據(jù)
insert overwrite table test.test2 partition(dt, hour) select `(dt|hour)?+.+`,dt,hour from test.test;
這里(dt|hour)?+.+
表示排除dt,hour
兩個字段,由于動態(tài)分區(qū)partition(dt, hour)
是按照select
出來的最后2個字段作為分區(qū)字段的。其實這里
也是可以的,因為分區(qū)表查詢結(jié)果,最后兩個字段就是分區(qū)字段。select *
select * from test.test2;
結(jié)果
001 keguang 2019-09-10 02
002 kg 2019-09-10 02
003 kk 2019-09-10 02
004 ikeguang 2019-09-10 02
所以說,非textfile存儲格式表導(dǎo)入數(shù)據(jù)步驟:
select 排除字段
選擇tableName
表中除了name、id、pwd
之外的所有字段
set hive.support.quoted.identifiers=None;
select `(name|id|pwd)?+.+` from tableName;
UDF用法
添加臨時函數(shù)
刪除臨時函數(shù)
添加永久函數(shù)
==注意==:1). 需要指定數(shù)據(jù)庫.函數(shù)名,即hm2.gamelabel
,否則默認(rèn)在default
數(shù)據(jù)庫下面:default.gamelabel
;2). hdfsJarPath
即該jar
包需要上傳到hdfs
目錄;
刪除永久函數(shù):
如果客戶端通過hiveserver2
連接hive
,為了正常使用自定義的永久udf
,需要執(zhí)行reload function
;
查看函數(shù)用法
查month 相關(guān)的函數(shù)
show functions like '*month*'
查看 add_months 函數(shù)的用法
desc function add_months;
查看 add_months 函數(shù)的詳細(xì)說明并舉例
desc function extended add_months;
UDAF用法
關(guān)于UDAF開發(fā)注意點:
1.需要import org.apache.hadoop.hive.ql.exec.UDAF
以及org.apache.hadoop.hive.ql.exec.UDAFEvaluator
,這兩個包都是必須的
2.函數(shù)類需要繼承UDAF
類,內(nèi)部類Evaluator實現(xiàn)UDAFEvaluator接口
3.Evaluator需要實現(xiàn) init、iterate、terminatePartial、merge、terminate這幾個函數(shù)
1)init函數(shù)類似于構(gòu)造函數(shù),用于UDAF的初始化
2)iterate接收傳入的參數(shù),并進(jìn)行內(nèi)部的輪轉(zhuǎn)。其返回類型為boolean
3)terminatePartial無參數(shù),其為iterate函數(shù)輪轉(zhuǎn)結(jié)束后,返回亂轉(zhuǎn)數(shù)據(jù),iterate和terminatePartial類似于hadoop的Combiner
4)merge接收terminatePartial的返回結(jié)果,進(jìn)行數(shù)據(jù)merge操作,其返回類型為boolean
5)terminate返回最終的聚集函數(shù)結(jié)果
hive hbase 關(guān)聯(lián)
create 'flash_people','info','label'
create external table if not exists hm2.flash_people(
guid string comment "people guid",
firsttime string comment "首次入庫時間",
ip string comment "用戶ip",
jstimestamp bigint comment "json時間戳"
)
stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
with serdeproperties("hbase.columns.mapping" = ":key,info:firsttime,info:ip,:timestamp")
tblproperties("hbase.table.name" = "hm2:flash_people");
hive -e 用法
hive -e
主要用來在命令行執(zhí)行sql
hive一些優(yōu)化參數(shù)
字段變更
添加字段
alter table hm2.helper add columns(country string, province string, city string);
hive添加字段后,前面數(shù)據(jù)會有空值,就算將前面數(shù)據(jù)hdfs文件刪除,重新導(dǎo)入,仍然查詢出來是 NULL,這個問題有待解決。
添加 map 復(fù)雜類型字段
alter table hm2.helper add columns(data_map map<string, string>);
hive> desc formatted hm2.helper;
OK
# col_name data_type comment
time string
uuid string
country string
province string
city string
data_map map<string,string>
# Partition Information
# col_name data_type comment
dt string
hour string
msgtype string
# Detailed Table Information
Database: hm2
Owner: hadoop
CreateTime: Wed Apr 24 10:12:30 CST 2019
LastAccessTime: UNKNOWN
Protect Mode: None
Retention: 0
Location: hdfs://nameser/hive/warehouse/hm2.db/helper
Table Type: EXTERNAL_TABLE
Table Parameters:
EXTERNAL TRUE
last_modified_by hadoop
last_modified_time 1556072221
transient_lastDdlTime 1556072221
# Storage Information
SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.TextInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Compressed: No
Num Buckets: -1
Bucket Columns: []
Sort Columns: []
Storage Desc Params:
field.delim \t
serialization.format \t
Time taken: 0.105 seconds, Fetched: 59 row(s)
仿照一張在創(chuàng)建表時定義了map類型字段的表的屬性描述
Storage Desc Params:
colelction.delim ,
field.delim \t
mapkey.delim :
serialization.format \t
只需要將map屬性修改為:
hive> alter table hm2.helper set serdeproperties('colelction.delim' = ',', 'mapkey.delim' = ':');
OK
Time taken: 0.132 seconds
那么
hive> desc formatted hm2.helper;
OK
# col_name data_type comment
time string
uuid string
country string
province string
city string
data_map map<string,string>
# Partition Information
# col_name data_type comment
dt string
hour string
msgtype string
# Detailed Table Information
Database: hm2
Owner: hadoop
CreateTime: Wed Apr 24 10:12:30 CST 2019
LastAccessTime: UNKNOWN
Protect Mode: None
Retention: 0
Location: hdfs://nameser/hive/warehouse/hm2.db/helper
Table Type: EXTERNAL_TABLE
Table Parameters:
EXTERNAL TRUE
last_modified_by hadoop
last_modified_time 1556072669
transient_lastDdlTime 1556072669
# Storage Information
SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.TextInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Compressed: No
Num Buckets: -1
Bucket Columns: []
Sort Columns: []
Storage Desc Params:
colelction.delim ,
field.delim \t
mapkey.delim :
serialization.format \t
Time taken: 0.079 seconds, Fetched: 61 row(s)
即可
刪除字段
CREATE TABLE test (
creatingTs BIGINT,
a STRING,
b BIGINT,
c STRING,
d STRING,
e BIGINT,
f BIGINT
);
如果需要刪除 column f 列,可以使用以下語句:
ALTER TABLE test REPLACE COLUMNS (
creatingTs BIGINT,
a STRING,
b BIGINT,
c STRING,
d STRING,
e BIGINT
);
增加列:
alter table of_test columns (judegment int)
hive-1.2.1 支持insert,update,delete
的配置
hive-site.xml
中添加配置
建表語句
create external table if not exists hm2.history_helper
(
guid string,
starttime string,
endtime string,
num int
)
clustered by(guid) into 50 buckets
stored as orc TBLPROPERTIES ('transactional'='true');
==說明:建表語句必須帶有into buckets
子句和stored as orc TBLPROPERTIES ('transactional'='true')
子句,并且不能帶有sorted by
子句。==
這樣,這個表就可以就行insert,update,delete
操作了。
==注意:== 上面規(guī)則在 hive-1.2.1 是可以的,在 hive-2.1.1 中需要將external
關(guān)鍵字去掉,即高版本不支持外部表update了。
hive表中的鎖
場景:
在執(zhí)行insert into
或insert overwrite任務(wù)時,中途手動將程序停掉,會出現(xiàn)卡死情況(無法提交MapReduce),只能執(zhí)行查詢操作,而drop insert
操作均不可操作,無論執(zhí)行多久,都會保持卡死狀態(tài)
臨時解決辦法是……把表名換一個……
根本原因是:hive表被鎖或者某個分區(qū)被鎖,需要解鎖
show locks 表名:
可以查看表被鎖的情況
解鎖
unlock table 表名; -- 解鎖表
unlock table 表名 partition(dt='2014-04-01'); -- 解鎖某個分區(qū)
注意
表鎖和分區(qū)鎖是兩個不同的鎖,對表解鎖,對分區(qū)是無效的,分區(qū)需要單獨解鎖
高版本hive默認(rèn)插入數(shù)據(jù)時,不能查詢,因為有鎖
hive> show locks;
OK
test@helper EXCLUSIVE
解決辦法:關(guān)閉鎖機(jī)制
set hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager; // 這是默認(rèn)值
set hive.support.concurrency=false; 默認(rèn)為true
基本知識
查看表結(jié)構(gòu)信息
desc formatted table_name;
desc table_name;
導(dǎo)入數(shù)據(jù)到hive表
load命令
hive -e 'load data inpath "/data/MROutput/hm2_data_gamelabel_output2/part-r-*" into table hm2.game_label partition(dt="2018-10-11");'
==注意==:inpath 后接的hdfs路徑需要引號
python中的hive命令字符串示例:
cmd = 'hive -e \'load data inpath "%s/part-r-*" into table hm2.game_label partition(dt=%s);\''%(outpath, formatDate(day))
orc格式表
很多時候你會發(fā)現(xiàn)任務(wù)中不管數(shù)據(jù)量多大,不管你有沒有設(shè)置調(diào)整reduce個數(shù)的參數(shù),任務(wù)中一直都只有一個reduce任務(wù);其實只有一個reduce任務(wù)的情況,除了數(shù)據(jù)量小于
參數(shù)值的情況外,還有以下原因:hive.exec.reducers.bytes.per.reducer
hive 優(yōu)化
下面是對于該參數(shù)的測試過程:
測試sql:
select r1.a
from
(select t.a from sunwg_10 t join sunwg_10000000 s on t.a=s.b) r1
join
(select s.b from sunwg_100000 t join sunwg_10 s on t.a=s.b) r2
on (r1.a=r2.b);
hive on spark 知識
cdh 6.0.1 下通過設(shè)置:
set hive.execution.engine=spark;
也可以將默認(rèn)的application
執(zhí)行引擎切換為spark
;
apache hadoop 下配置 hive on spark
參數(shù)調(diào)優(yōu)
了解完了Spark作業(yè)運行的基本原理之后,對資源相關(guān)的參數(shù)就容易理解了。所謂的Spark資源參數(shù)調(diào)優(yōu),其實主要就是對Spark運行過程中各個使用資源的地方,通過調(diào)節(jié)各種參數(shù),來優(yōu)化資源使用的效率,從而提升Spark作業(yè)的執(zhí)行性能。以下參數(shù)就是Spark中主要的資源參數(shù),每個參數(shù)都對應(yīng)著作業(yè)運行原理中的某個部分。
num-executors/spark.executor.instances
- 參數(shù)說明:該參數(shù)用于設(shè)置Spark作業(yè)總共要用多少個Executor進(jìn)程來執(zhí)行。Driver在向YARN集群管理器申請資源時,YARN集群管理器會盡可能按照你的設(shè)置來在集群的各個工作節(jié)點上,啟動相應(yīng)數(shù)量的Executor進(jìn)程。這個參數(shù)非常之重要,如果不設(shè)置的話,默認(rèn)只會給你啟動少量的Executor進(jìn)程,此時你的Spark作業(yè)的運行速度是非常慢的。
- 參數(shù)調(diào)優(yōu)建議:每個Spark作業(yè)的運行一般設(shè)置50~100個左右的Executor進(jìn)程比較合適,設(shè)置太少或太多的Executor進(jìn)程都不好。設(shè)置的太少,無法充分利用集群資源;設(shè)置的太多的話,大部分隊列可能無法給予充分的資源。
executor-memory/spark.executor.memory
- 參數(shù)說明:該參數(shù)用于設(shè)置每個Executor進(jìn)程的內(nèi)存。Executor內(nèi)存的大小,很多時候直接決定了Spark作業(yè)的性能,而且跟常見的JVM OOM異常,也有直接的關(guān)聯(lián)。
- 參數(shù)調(diào)優(yōu)建議:每個Executor進(jìn)程的內(nèi)存設(shè)置4G8G較為合適。但是這只是一個參考值,具體的設(shè)置還是得根據(jù)不同部門的資源隊列來定??梢钥纯醋约簣F(tuán)隊的資源隊列的最大內(nèi)存限制是多少,num-executors乘以executor-memory,是不能超過隊列的最大內(nèi)存量的。此外,如果你是跟團(tuán)隊里其他人共享這個資源隊列,那么申請的內(nèi)存量最好不要超過資源隊列最大總內(nèi)存的1/31/2,避免你自己的Spark作業(yè)占用了隊列所有的資源,導(dǎo)致別的同學(xué)的作業(yè)無法運行。
executor-cores/spark.executor.cores
- 參數(shù)說明:該參數(shù)用于設(shè)置每個Executor進(jìn)程的CPU core數(shù)量。這個參數(shù)決定了每個Executor進(jìn)程并行執(zhí)行task線程的能力。因為每個CPU core同一時間只能執(zhí)行一個task線程,因此每個Executor進(jìn)程的CPU core數(shù)量越多,越能夠快速地執(zhí)行完分配給自己的所有task線程。
- 參數(shù)調(diào)優(yōu)建議:Executor的CPU core數(shù)量設(shè)置為2~4個較為合適。同樣得根據(jù)不同部門的資源隊列來定,可以看看自己的資源隊列的最大CPU core限制是多少,再依據(jù)設(shè)置的Executor數(shù)量,來決定每個Executor進(jìn)程可以分配到幾個CPU core。同樣建議,如果是跟他人共享這個隊列,那么num-executors * executor-cores不要超過隊列總CPU core的1/3~1/2左右比較合適,也是避免影響其他同學(xué)的作業(yè)運行。
driver-memory
- 參數(shù)說明:該參數(shù)用于設(shè)置Driver進(jìn)程的內(nèi)存。
- 參數(shù)調(diào)優(yōu)建議:Driver的內(nèi)存通常來說不設(shè)置,或者設(shè)置1G左右應(yīng)該就夠了。唯一需要注意的一點是,如果需要使用collect算子將RDD的數(shù)據(jù)全部拉取到Driver上進(jìn)行處理,那么必須確保Driver的內(nèi)存足夠大,否則會出現(xiàn)OOM內(nèi)存溢出的問題。
spark.default.parallelism
- 參數(shù)說明:該參數(shù)用于設(shè)置每個stage的默認(rèn)task數(shù)量。這個參數(shù)極為重要,如果不設(shè)置可能會直接影響你的Spark作業(yè)性能。
- 參數(shù)調(diào)優(yōu)建議:Spark作業(yè)的默認(rèn)task數(shù)量為500~1000個較為合適。很多同學(xué)常犯的一個錯誤就是不去設(shè)置這個參數(shù),那么此時就會導(dǎo)致Spark自己根據(jù)底層HDFS的block數(shù)量來設(shè)置task的數(shù)量,默認(rèn)是一個HDFS block對應(yīng)一個task。通常來說,Spark默認(rèn)設(shè)置的數(shù)量是偏少的(比如就幾十個task),如果task數(shù)量偏少的話,就會導(dǎo)致你前面設(shè)置好的Executor的參數(shù)都前功盡棄。試想一下,無論你的Executor進(jìn)程有多少個,內(nèi)存和CPU有多大,但是task只有1個或者10個,那么90%的Executor進(jìn)程可能根本就沒有task執(zhí)行,也就是白白浪費了資源!因此Spark官網(wǎng)建議的設(shè)置原則是,設(shè)置該參數(shù)為num-executors * executor-cores的2~3倍較為合適,比如Executor的總CPU core數(shù)量為300個,那么設(shè)置1000個task是可以的,此時可以充分地利用Spark集群的資源。
spark.storage.memoryFraction
- 參數(shù)說明:該參數(shù)用于設(shè)置RDD持久化數(shù)據(jù)在Executor內(nèi)存中能占的比例,默認(rèn)是0.6。也就是說,默認(rèn)Executor 60%的內(nèi)存,可以用來保存持久化的RDD數(shù)據(jù)。根據(jù)你選擇的不同的持久化策略,如果內(nèi)存不夠時,可能數(shù)據(jù)就不會持久化,或者數(shù)據(jù)會寫入磁盤。
- 參數(shù)調(diào)優(yōu)建議:如果Spark作業(yè)中,有較多的RDD持久化操作,該參數(shù)的值可以適當(dāng)提高一些,保證持久化的數(shù)據(jù)能夠容納在內(nèi)存中。避免內(nèi)存不夠緩存所有的數(shù)據(jù),導(dǎo)致數(shù)據(jù)只能寫入磁盤中,降低了性能。但是如果Spark作業(yè)中的shuffle類操作比較多,而持久化操作比較少,那么這個參數(shù)的值適當(dāng)降低一些比較合適。此外,如果發(fā)現(xiàn)作業(yè)由于頻繁的gc導(dǎo)致運行緩慢(通過spark web ui可以觀察到作業(yè)的gc耗時),意味著task執(zhí)行用戶代碼的內(nèi)存不夠用,那么同樣建議調(diào)低這個參數(shù)的值。
spark.shuffle.memoryFraction
- 參數(shù)說明:該參數(shù)用于設(shè)置shuffle過程中一個task拉取到上個stage的task的輸出后,進(jìn)行聚合操作時能夠使用的Executor內(nèi)存的比例,默認(rèn)是0.2。也就是說,Executor默認(rèn)只有20%的內(nèi)存用來進(jìn)行該操作。shuffle操作在進(jìn)行聚合時,如果發(fā)現(xiàn)使用的內(nèi)存超出了這個20%的限制,那么多余的數(shù)據(jù)就會溢寫到磁盤文件中去,此時就會極大地降低性能。
- 參數(shù)調(diào)優(yōu)建議:如果Spark作業(yè)中的RDD持久化操作較少,shuffle操作較多時,建議降低持久化操作的內(nèi)存占比,提高shuffle操作的內(nèi)存占比比例,避免shuffle過程中數(shù)據(jù)過多時內(nèi)存不夠用,必須溢寫到磁盤上,降低了性能。此外,如果發(fā)現(xiàn)作業(yè)由于頻繁的gc導(dǎo)致運行緩慢,意味著task執(zhí)行用戶代碼的內(nèi)存不夠用,那么同樣建議調(diào)低這個參數(shù)的值。
Spark On Yarn執(zhí)行中executor內(nèi)存限制問題
解決Spark On Yarn執(zhí)行中executor內(nèi)存限制問題
集群版本 Spark 2.2.0 + Hadoop 3.0-CDH6.0.1
hive on saprk , 設(shè)置:
hive> set hive.execution.engine=spark;
hive> set spark.executor.memory=31.5g;
hive> set spark.executor.cores=11;
hive> set spark.serializer=org.apache.spark.serializer.KryoSerializer;
提示內(nèi)存不足
Failed to execute spark task, with exception 'org.apache.hadoop.hive.ql.metadata.HiveException(Failed to create Spark client for Spark session 50288c8b-96aa-44ad-9eea-3cb4abb1ae5b)'
FAILED: Execution Error, return code 30041 from org.apache.hadoop.hive.ql.exec.spark.SparkTask. Failed to create Spark client for Spark session 50288c8b-96aa-44ad-9eea-3cb4abb1ae5b
解決方案,修改Yarn
的配置文件:
1、
容器內(nèi)存yarn.nodemanager.resource.memory-mb
設(shè)置為 至少 :
的內(nèi)存,如上例可配置為 16gexecutor-memory(15g) + driver(512m)
2、
最大容器內(nèi)存yarn.scheduler.maximum-allocation-mb
設(shè)置為 至少 :
的內(nèi)存,如上例可配置為 16gexecutor-memory(15g) + driver(512m)
第一個參數(shù)為
的配置。NodeManager
`的配置 ,第二個參數(shù)為
`ResourceManager
字符串處理
1 字符串連接:
concat(str, str2, str3,...) 字符串連接
concat_ws(separator, str, str2, str3, ...) 將字符串用separator作為間隔連接起來
2 字符串截取
substr(s, 0, 1) 截取第一個字符
substr(s, -1) 截取最后一個字符
3 字符串urldecode
hive 中 join
mapjoin
的優(yōu)化在于,在mapreduce task
開始之前,創(chuàng)建一個local task
,小表以hshtable
的形式加載到內(nèi)存,然后序列化到磁盤,把內(nèi)存的hashtable
壓縮為tar
文件。然后把文件分發(fā)到 Hadoop Distributed Cache
,然后傳輸給每一個mapper
,mapper
在本地反序列化文件并加載進(jìn)內(nèi)存在做join
sql
select workflow,count(workflow) from (select guid, substr(workflow, -1) workflow from hm2.workflow_list) m right join hm2.helper helper on m.guid = helper.guid and helper.dt = "2018-10-21" group by workflow;
內(nèi)存溢出解決辦法:
set hive.auto.convert.join = false;
set hive.ignore.mapjoin.hint=false;
set hive.exec.parallel=true;
Hive中Join的原理和機(jī)制
籠統(tǒng)的說,Hive中的Join可分為Common Join(Reduce階段完成join)和Map Join(Map階段完成join)。本文簡單介紹一下兩種join的原理和機(jī)制。
Hive Common Join
如果不指定MapJoin或者不符合MapJoin的條件,那么Hive解析器會將Join操作轉(zhuǎn)換成Common Join,即:在Reduce階段完成join.
整個過程包含Map、Shuffle、Reduce階段。
- Map階段
讀取源表的數(shù)據(jù),Map輸出時候以Join on條件中的列為key,如果Join有多個關(guān)聯(lián)鍵,則以這些關(guān)聯(lián)鍵的組合作為key;
Map輸出的value為join之后所關(guān)心的(select或者where中需要用到的)列;同時在value中還會包含表的Tag信息,用于標(biāo)明此value對應(yīng)哪個表;
按照key進(jìn)行排序
-
Shuffle階段
根據(jù)key的值進(jìn)行hash,并將key/value按照hash值推送至不同的reduce中,這樣確保兩個表中相同的key位于同一個reduce中 -
Reduce階段
根據(jù)key的值完成join操作,期間通過Tag來識別不同表中的數(shù)據(jù)。
以下面的HQL為例,圖解其過程:
SELECT
a.id,a.dept,b.age
FROM a join b
ON (a.id = b.id);
Hive Map Join
MapJoin通常用于一個很小的表和一個大表進(jìn)行join的場景,具體小表有多小,由參數(shù)hive.mapjoin.smalltable.filesize來決定,該參數(shù)表示小表的總大小,默認(rèn)值為25000000字節(jié),即25M。
Hive0.7之前,需要使用hint提示 /+ mapjoin(table) /才會執(zhí)行MapJoin,否則執(zhí)行Common Join,但在0.7版本之后,默認(rèn)自動會轉(zhuǎn)換Map Join,由參數(shù)hive.auto.convert.join來控制,默認(rèn)為true.
仍然以9.1中的HQL來說吧,假設(shè)a表為一張大表,b為小表,并且hive.auto.convert.join=true,那么Hive在執(zhí)行時候會自動轉(zhuǎn)化為MapJoin。
如圖中的流程,首先是Task A,它是一個Local Task(在客戶端本地執(zhí)行的Task),負(fù)責(zé)掃描小表b的數(shù)據(jù),將其轉(zhuǎn)換成一個HashTable的數(shù)據(jù)結(jié)構(gòu),并寫入本地的文件中,之后將該文件加載到DistributeCache中,該HashTable的數(shù)據(jù)結(jié)構(gòu)可以抽象為:
key | value |
---|---|
1 | 26 |
2 | 34 |
- 接下來是Task B,該任務(wù)是一個沒有Reduce的MR,啟動MapTasks掃描大表a,在Map階段,根據(jù)a的每一條記錄去和DistributeCache中b表對應(yīng)的HashTable關(guān)聯(lián),并直接輸出結(jié)果。
- 由于MapJoin沒有Reduce,所以由Map直接輸出結(jié)果文件,有多少個Map Task,就有多少個結(jié)果文件。
轉(zhuǎn)義字符
hive> select split('a:1||b:2||c:3','\\|\\|') from hm2.test;
OK
["a:1","b:2","c:3"]
["a:1","b:2","c:3"]
["a:1","b:2","c:3"]
["a:1","b:2","c:3"]
其它轉(zhuǎn)義字符還有{
, [
insert table select from
insert into tbName select * from tbName2;
insert overwrite table tbName select * from tbName2;
insert overwrite例子
insert overwrite table hm2.helper partition(dt = '2018-06-22', hour = '09',msgtype = 'helper') select time,source,remote_addr,remote_user,body_bytes_sent,request_time,status,host,request_method,http_referrer,http_x_forwarded_for,http_user_agent,upstream_response_time,upstream_addr,guid,helperversion,osversion,ngx_timestamp,get_type,split(ip2area(http_x_forwarded_for,remote_addr), "\t")[0] country,split(ip2area(http_x_forwarded_for,remote_addr), "\t")[1] province,split(ip2area(http_x_forwarded_for,remote_addr), "\t")[2] city from hm2.helper where dt = '2018-06-22' and hour = '09' and msgtype = 'helper';
插入分區(qū)表,不用指定分區(qū),可以自動識別
INSERT overwrite TABLE test.dis_helper PARTITION (dt,hour,msgtype) select `(num)?+.+` from (select *,row_number() over (partition by guid order by time asc) num from hm2.helper where dt ='2018-09-06'and hour between '00' and '23' and msgtype='helper') t where t.num=1;
這里把數(shù)據(jù)去重,插入分區(qū)表test.dis_helper中,自動根據(jù)dt,hour,msgtype字段的取值進(jìn)入分區(qū)表,并且`
(num)?+.+
表示除了
`num
``這個字段。
explain 查看執(zhí)行計劃
對于這么一個插敘sql
explain
select
workflow,count(workflow) cou from (select guid, split(split(workflow,"\\|\\|")[0], ":")[1] workflow
from
hm2.workflow_list) m
inner join
hm2.flash flash
on m.guid = flash.guid and flash.dt = "2018-11-04"
group by workflow order by cou;
可以打印出執(zhí)行計劃
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-2 depends on stages: Stage-1
Stage-3 depends on stages: Stage-2
Stage-0 depends on stages: Stage-3
STAGE PLANS:
Stage: Stage-1
Map Reduce
Map Operator Tree:
TableScan
alias: workflow_list
Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
Filter Operator
predicate: guid is not null (type: boolean)
Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
Select Operator
expressions: guid (type: string), split(split(workflow, '\|\|')[0], ':')[1] (type: string)
outputColumnNames: _col0, _col1
Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
Reduce Output Operator
key expressions: _col0 (type: string)
sort order: +
Map-reduce partition columns: _col0 (type: string)
Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
value expressions: _col1 (type: string)
TableScan
alias: flash
Statistics: Num rows: 489153811 Data size: 48915382339 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: guid is not null (type: boolean)
Statistics: Num rows: 244576906 Data size: 24457691219 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: guid (type: string)
sort order: +
Map-reduce partition columns: guid (type: string)
Statistics: Num rows: 244576906 Data size: 24457691219 Basic stats: COMPLETE Column stats: NONE
Reduce Operator Tree:
Join Operator
condition map:
Inner Join 0 to 1
keys:
0 _col0 (type: string)
1 guid (type: string)
outputColumnNames: _col1
Statistics: Num rows: 269034602 Data size: 26903460924 Basic stats: COMPLETE Column stats: NONE
Group By Operator
aggregations: count(_col1)
keys: _col1 (type: string)
mode: hash
outputColumnNames: _col0, _col1
Statistics: Num rows: 269034602 Data size: 26903460924 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
Stage: Stage-2
Map Reduce
Map Operator Tree:
TableScan
Reduce Output Operator
key expressions: _col0 (type: string)
sort order: +
Map-reduce partition columns: _col0 (type: string)
Statistics: Num rows: 269034602 Data size: 26903460924 Basic stats: COMPLETE Column stats: NONE
value expressions: _col1 (type: bigint)
Reduce Operator Tree:
Group By Operator
aggregations: count(VALUE._col0)
keys: KEY._col0 (type: string)
mode: mergepartial
outputColumnNames: _col0, _col1
Statistics: Num rows: 134517301 Data size: 13451730462 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
Stage: Stage-3
Map Reduce
Map Operator Tree:
TableScan
Reduce Output Operator
key expressions: _col1 (type: bigint)
sort order: +
Statistics: Num rows: 134517301 Data size: 13451730462 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: string)
Reduce Operator Tree:
Select Operator
expressions: VALUE._col0 (type: string), KEY.reducesinkkey0 (type: bigint)
outputColumnNames: _col0, _col1
Statistics: Num rows: 134517301 Data size: 13451730462 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 134517301 Data size: 13451730462 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink
常用 sql 查詢
常用SQL查詢。
explode 一行變多行
explode
主要是將一行數(shù)據(jù)按照某個字段變?yōu)槎嘈小?/p>
select mt.impress,count(mt.guid) from (select (case when t.impressed = '' or t.impressed is null then 'null' else t.impressed end) impress,t.guid from (select guid,impressed from hm2.install lateral view explode(split(replace(replace(offer_impressed,'[',''),']',''), ',')) test_alias as impressed where dt='2018-12-17') t) mt group by mt.impress;
分組后取 top N
row_number() over(partition by)
表hm4.ffbrowser_domain_url_2019_07_17
數(shù)據(jù)如下:
www.baidu.com https://www.baidu.com/link?url=fCBMjU_THQzoAdo0RTAQLQIfVWIlPlgzpEM5Pk5qChKiahWFfMzFo6ckRjd9OFc7w8cj5h8esZXKBab5WqeLPgDYipewXUdz9LFPf-oxOfK&wd=&eqid=d2c80e8e0033424b000000045d2f14e9 1
www.baidu.com https://www.baidu.com/link?url=3jlP1HLS4aYSnN1L5CG3pPr0zXnqBpDdG8mlFQm47w1RcHFwHiBU0t8Hi0UrMD37lSJvQkGWQ3iBtNpc0AJhEei-v8MdGKgRnVqy62tuCA_&wd=&eqid=e3d5845b002790f7000000045d2f0af8 1
www.baidu.com http://www.baidu.com/ 1
www.baidu.com https://www.baidu.com/link?url=8S_ziMFwpClJww3C15iXu__wqMrMOxnYuDnZDpQWnbs1PTTqx_wwjIY7QsrFfaKT&wd=&eqid=eb8f38f70039cf51000000045d2e8716 1
www.baidu.com https://www.baidu.com/link?url=AZvluWbTjZjpaT5lnIpkB-gTIdyX_nZdtoLX_pkbM5i&wd=&eqid=8b09c549000038e7000000035930ca9a 1
www.baidu.com https://www.baidu.com/link?url=IjStquL7c4YwVDk1zQJFYkwBiGY20Kv2PQsXuTQTHH0BhAPLjUaz-XhLLp5Zfe3fE4hU_KNfEs6JxyESkwGlea&wd=&eqid=e7e404c9000012b1000000045d2ee845 1
www.baidu.com https://www.baidu.com/link?url=qRaLKHc_ZZIskkWF_f6azkmHqRlfgmuRQZcrzRovBC5MEBR5yTIG20FiR3O__8Jz&wd=&eqid=e13f05290018e7fb000000045d2eed7d 2
www.baidu.com https://www.baidu.com/s?tn=50000201_hao_pg&usm=3&wd=%E7%AB%AF%E7%81%AB%E9%94%85%E6%B3%BC%E5%A6%BB%E5%AD%90%E5%90%8C%E5%AD%A6&ie=utf-8&rsv_cq=%E4%BA%BA%E7%B1%BB%E7%99%BB%E6%9C%8850%E5%91%A8%E5%B9%B4&rsv_dl=0_right_fyb_pchot_20811_01&rsf=531e82477396136261c6275e8afa58b1_1_10_8&rqid=e3d5845b002790f7 1
www.baidu.com https://www.baidu.com/link?url=NHmzZVrcbQ1tf6JnR4MJlHXJZFy-4RMgKwjNeDvskMyl17vpdi_8XgVCdRvGFtU2WJpNpHQf4VbwIeQi5qDHskDTrDUK5KMUkrkfKcWYxhy&wd=&eqid=e3d5845b002790f7000000045d2f0af8 1
www.baidu.com https://www.baidu.com/s?wd=%E5%BE%AE%E5%8D%9A&ie=utf-8 1
sql查詢語句
select a.domain,a.url,a.cou from (select domain,url,cou,row_number() over(partition by domain order by cou desc) as n from hm4.ffbrowser_domain_url_2019_07_17)a where a.n <= 2;
結(jié)果
www.baidu.com https://www.baidu.com/ 69
www.baidu.com https://www.baidu.com/link?url=r6riiF-vxG9OX70KBVx86FuywJYXHu-TpTTSEst9ggK78xIjVvkI_QoS9tEDBAqq&wd=&eqid=ba409e160014f8c9000000045eec97 3
相似問題:每門課程成績的前N名
case when 用法
select reginlistlength, softname, sum(cou) from (select (case when reginlistlength > '1' then 'more' else reginlistlength end) as reginlistlength, softname, count(l.guid) cou from (select reginlistlength, softname, guid from hm2.lnk where dt = '2019-05-24' and softtype = '1' group by guid,reginlistlength, softname, guid)l where l.guid in (select guid from hm2.helper where dt = '2019-05-24' group by guid) group by reginlistlength, softname)m group by reginlistlength, softname;
select (case when reginlistlength > '1' then 'more' else reginlistlength end) as reginlistlength, softname, count(l.guid) cou from (select reginlistlength, softname, guid from hm2.lnk where dt = '2019-05-24' and softtype = '1' group by guid,reginlistlength, softname, guid)l where l.guid in (select guid from hm2.helper where dt = '2019-05-24' group by guid) group by reginlistlength, softname;
作者:柯廣的網(wǎng)絡(luò)日志
微信公眾號:Java大數(shù)據(jù)與數(shù)據(jù)倉庫