Hive知識點

    Hive知識點

    2020-04-14 3523點熱度 0人點贊 0條評論

    hive知識點

    工作中hive常用知識點。

    Hive簡介

    hive是基于Hadoop的一個數(shù)據(jù)倉庫工具,可以將結(jié)構(gòu)化的數(shù)據(jù)文件映射為一張數(shù)據(jù)庫表,并提供簡單的sql查詢功能,可以將sql語句轉(zhuǎn)換為MapReduce任務(wù)進(jìn)行運行。其優(yōu)點是學(xué)習(xí)成本低,可以通過類SQL語句快速實現(xiàn)簡單的MapReduce統(tǒng)計,不必開發(fā)專門的MapReduce應(yīng)用,十分適合數(shù)據(jù)倉庫的統(tǒng)計分析。

    創(chuàng)建hive表

    # 新建個數(shù)據(jù)庫test
    create database test;
    create external table if not exists test.test(id string, name string);

    這里創(chuàng)建了一個名為testhive外部表,外部表與普通表的區(qū)別:

    1. 在導(dǎo)入數(shù)據(jù)到外部表,數(shù)據(jù)并沒有移動到自己的數(shù)據(jù)倉庫目錄下,也就是說外部表中的數(shù)據(jù)并不是由它自己來管理的!而表則不一樣;
    2. 在刪除表的時候,Hive將會把屬于表的元數(shù)據(jù)和數(shù)據(jù)全部刪掉;而刪除外部表的時候,Hive僅僅刪除外部表的元數(shù)據(jù),數(shù)據(jù)是不會刪除的!那么,應(yīng)該如何選擇使用哪種表呢?在大多數(shù)情況沒有太多的區(qū)別,因此選擇只是個人喜好的問題。但是作為一個經(jīng)驗,如果所有處理都需要由Hive完成,那么你應(yīng)該創(chuàng)建表,否則使用外部表!

    創(chuàng)建分區(qū)表

    通過partition關(guān)鍵字指定分區(qū)字段,分區(qū)表方便hive快速查詢索引數(shù)據(jù)。

    create table if not exists test.test
    (
    id string,
    name string
    )
    partitioned by (dt string,hour string)
    row format delimited fields terminated by '\t';
    SQL

    這里指定了兩個分區(qū):dthour,對應(yīng)hdfs的2級目錄dt,hour

    [hadoop@qcloud-test-hadoop01 ~]$ hdfs dfs -ls -R /hive/warehouse/test.db/test
    drwxr-xr-x   - hadoop supergroup          0 2019-09-10 19:07 /hive/warehouse/test.db/test/dt=2019-09-10
    drwxr-xr-x   - hadoop supergroup          0 2019-09-10 17:59 /hive/warehouse/test.db/test/dt=2019-09-10/hour=02
    -rwxr-xr-x   2 hadoop supergroup         39 2019-09-10 17:59 /hive/warehouse/test.db/test/dt=2019-09-10/hour=02/test.txt
    drwxr-xr-x   - hadoop supergroup          0 2019-09-10 19:07 /hive/warehouse/test.db/test/dt=2019-09-10/hour=03
    -rwxr-xr-x   2 hadoop supergroup         39 2019-09-10 19:07 /hive/warehouse/test.db/test/dt=2019-09-10/hour=03/test.txt

    刪除分區(qū)

    ALTER TABLE table_name DROP IF EXISTS PARTITION(year = 2015, month = 10, day = 1);

    創(chuàng)建orc存儲格式表

    hive創(chuàng)建orc格式表不能像textfile格式一樣直接load數(shù)據(jù)到表中,一般需要創(chuàng)建臨時textfile表,然后通過insert into 或者insert overwriteorc存儲格式表中。

    1) 臨時表testfile存儲格式

    create table if not exists test.test
    (
    id string,
    name string
    )
    partitioned by (dt string,hour string)
    row format delimited fields terminated by '\t';
    SQL

    test.txt

    001 keguang
    002 kg
    003 kk
    004 ikeguang

    load data local inpath '/home/hadoop/data/test.txt' into table test.test partition(dt = '2019-09-10', hour = '02');

    2) 導(dǎo)入數(shù)據(jù)到orc表

    create table if not exists test.test2
    (
    id string,
    name string
    )
    partitioned by (dt string,hour string)
    row format delimited fields terminated by '\t'
    stored as orc;

    insert select 導(dǎo)入數(shù)據(jù)

    insert overwrite table test.test2 partition(dt, hour) select `(dt|hour)?+.+`,dt,hour from test.test;

    這里(dt|hour)?+.+表示排除dt,hour兩個字段,由于動態(tài)分區(qū)partition(dt, hour)是按照select出來的最后2個字段作為分區(qū)字段的。其實這里select * 也是可以的,因為分區(qū)表查詢結(jié)果,最后兩個字段就是分區(qū)字段。

    select * from test.test2;
    
    結(jié)果
    001 keguang 2019-09-10  02
    002 kg  2019-09-10  02
    003 kk  2019-09-10  02
    004 ikeguang    2019-09-10  02

    所以說,非textfile存儲格式表導(dǎo)入數(shù)據(jù)步驟:

    1. 導(dǎo)入數(shù)據(jù)到textfile
    2. 查詢數(shù)據(jù)插入orc格式表

    select 排除字段

    選擇tableName表中除了name、id、pwd之外的所有字段

    set hive.support.quoted.identifiers=None;
    select `(name|id|pwd)?+.+` from tableName;

    UDF用法

    添加臨時函數(shù)

    add jar /home/hadoop/codejar/flash_format.jar;
    create temporary function gamelabel as 'com.js.dataclean.hive.udf.hm2.GameLabel';
    SQL

    刪除臨時函數(shù)

    drop temporary function 數(shù)據(jù)庫名.函數(shù)名;
    SQL

    添加永久函數(shù)

    create function hm2.gamelabel as 'com.js.dataclean.hive.udf.hm2.GameLabel' using jar 'hdfsJarPath'
    SQL

    ==注意==:1). 需要指定數(shù)據(jù)庫.函數(shù)名,即hm2.gamelabel,否則默認(rèn)在default數(shù)據(jù)庫下面:default.gamelabel;2). hdfsJarPath即該jar包需要上傳到hdfs目錄;

    刪除永久函數(shù):

    drop function 數(shù)據(jù)庫名.函數(shù)名字;
    SQL

    如果客戶端通過hiveserver2連接hive,為了正常使用自定義的永久udf,需要執(zhí)行reload function;

    查看函數(shù)用法

    查month 相關(guān)的函數(shù)

    show functions like '*month*'

    查看 add_months 函數(shù)的用法

    desc function add_months;

    查看 add_months 函數(shù)的詳細(xì)說明并舉例

    desc function extended add_months;

    UDAF用法

    關(guān)于UDAF開發(fā)注意點:

    1.需要import org.apache.hadoop.hive.ql.exec.UDAF以及org.apache.hadoop.hive.ql.exec.UDAFEvaluator,這兩個包都是必須的

    2.函數(shù)類需要繼承UDAF類,內(nèi)部類Evaluator實現(xiàn)UDAFEvaluator接口

    3.Evaluator需要實現(xiàn) init、iterate、terminatePartial、merge、terminate這幾個函數(shù)

    1)init函數(shù)類似于構(gòu)造函數(shù),用于UDAF的初始化
    
    2)iterate接收傳入的參數(shù),并進(jìn)行內(nèi)部的輪轉(zhuǎn)。其返回類型為boolean
    
    3)terminatePartial無參數(shù),其為iterate函數(shù)輪轉(zhuǎn)結(jié)束后,返回亂轉(zhuǎn)數(shù)據(jù),iterate和terminatePartial類似于hadoop的Combiner
    
    4)merge接收terminatePartial的返回結(jié)果,進(jìn)行數(shù)據(jù)merge操作,其返回類型為boolean
    
    5)terminate返回最終的聚集函數(shù)結(jié)果  

    hive hbase 關(guān)聯(lián)

    create 'flash_people','info','label'
    
    create external table if not exists hm2.flash_people(
    guid string comment "people guid",
    firsttime string comment "首次入庫時間",
    ip string comment "用戶ip",
    jstimestamp bigint comment "json時間戳"
    )
    stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
    with serdeproperties("hbase.columns.mapping" = ":key,info:firsttime,info:ip,:timestamp")
    tblproperties("hbase.table.name" = "hm2:flash_people");

    hive -e 用法

    hive -e主要用來在命令行執(zhí)行sql

    hive -e 'set mapred.reduce.tasks = 30;insert into hm2.flash_people select guid,dt,remote_addr,(32523145869-ngx_timestamp) from hm2.data where dt = "2018-07-01" and length(guid) = 38 and ngx_timestamp is not null and ngx_timestamp != '' and ngx_timestamp is regexp '\\d{8}' and remote_addr is not null and remote_addr != '';'
    set mapred.reduce.tasks = 30;insert into hm2.flash_people select guid,dt,remote_addr,(32523145869-ngx_timestamp) from hm2.data where dt = "2018-07-01" and length(guid) = 38 and ngx_timestamp is not null and ngx_timestamp != '' and ngx_timestamp rlike'^\\d+$' and remote_addr is not null and remote_addr != '';
    Bash

    hive一些優(yōu)化參數(shù)

    set hive.auto.convert.join = false;
    set hive.ignore.mapjoin.hint=false;
    set hive.exec.parallel=true;
    set mapred.reduce.tasks = 60;
    SQL

    字段變更

    添加字段

    alter table hm2.helper add columns(country string, province string, city string);

    hive添加字段后,前面數(shù)據(jù)會有空值,就算將前面數(shù)據(jù)hdfs文件刪除,重新導(dǎo)入,仍然查詢出來是 NULL,這個問題有待解決。

    • 解決
    • 未解決

    添加 map 復(fù)雜類型字段

    alter table hm2.helper add columns(data_map map<string, string>);
    
    hive> desc formatted hm2.helper;
    OK
    # col_name              data_type               comment             
    
    time                    string                                      
    uuid                    string                                      
    country                 string                                      
    province                string                                      
    city                    string                                      
    data_map                map<string,string>                        
    
    # Partition Information      
    # col_name              data_type               comment             
    
    dt                      string                                      
    hour                    string                                      
    msgtype                 string                                      
    
    # Detailed Table Information         
    Database:               hm2                      
    Owner:                  hadoop                   
    CreateTime:             Wed Apr 24 10:12:30 CST 2019     
    LastAccessTime:         UNKNOWN                  
    Protect Mode:           None                     
    Retention:              0                        
    Location:               hdfs://nameser/hive/warehouse/hm2.db/helper  
    Table Type:             EXTERNAL_TABLE           
    Table Parameters:        
        EXTERNAL                TRUE                
        last_modified_by        hadoop              
        last_modified_time      1556072221          
        transient_lastDdlTime   1556072221          
    
    # Storage Information        
    SerDe Library:          org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe   
    InputFormat:            org.apache.hadoop.mapred.TextInputFormat     
    OutputFormat:           org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat   
    Compressed:             No                       
    Num Buckets:            -1                       
    Bucket Columns:         []                       
    Sort Columns:           []                       
    Storage Desc Params:         
        field.delim             \t                  
        serialization.format    \t                  
    Time taken: 0.105 seconds, Fetched: 59 row(s)

    仿照一張在創(chuàng)建表時定義了map類型字段的表的屬性描述

    Storage Desc Params:         
        colelction.delim        ,                   
        field.delim             \t                  
        mapkey.delim            :                   
        serialization.format    \t                  

    只需要將map屬性修改為:

    hive> alter table hm2.helper set serdeproperties('colelction.delim' = ',', 'mapkey.delim' = ':');
    OK
    Time taken: 0.132 seconds

    那么

    hive> desc formatted hm2.helper;
    OK
    # col_name              data_type               comment             
    
    time                    string                                      
    uuid                    string                                      
    country                 string                                      
    province                string                                      
    city                    string                                      
    data_map                map<string,string>                        
    
    # Partition Information      
    # col_name              data_type               comment             
    
    dt                      string                                      
    hour                    string                                      
    msgtype                 string                                      
    
    # Detailed Table Information         
    Database:               hm2                      
    Owner:                  hadoop                   
    CreateTime:             Wed Apr 24 10:12:30 CST 2019     
    LastAccessTime:         UNKNOWN                  
    Protect Mode:           None                     
    Retention:              0                        
    Location:               hdfs://nameser/hive/warehouse/hm2.db/helper  
    Table Type:             EXTERNAL_TABLE           
    Table Parameters:        
        EXTERNAL                TRUE                
        last_modified_by        hadoop              
        last_modified_time      1556072669          
        transient_lastDdlTime   1556072669          
    
    # Storage Information        
    SerDe Library:          org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe   
    InputFormat:            org.apache.hadoop.mapred.TextInputFormat     
    OutputFormat:           org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat   
    Compressed:             No                       
    Num Buckets:            -1                       
    Bucket Columns:         []                       
    Sort Columns:           []                       
    Storage Desc Params:         
        colelction.delim        ,                   
        field.delim             \t                  
        mapkey.delim            :                   
        serialization.format    \t                  
    Time taken: 0.079 seconds, Fetched: 61 row(s)

    即可

    刪除字段

    CREATE TABLE test (
    creatingTs BIGINT,
    a STRING,
    b BIGINT,
    c STRING,
    d STRING,
    e BIGINT,
    f BIGINT
    );

    如果需要刪除 column f 列,可以使用以下語句:

    ALTER TABLE test REPLACE COLUMNS (
    creatingTs BIGINT,
    a STRING,
    b BIGINT,
    c STRING,
    d STRING,
    e BIGINT
    );

    增加列:

    alter table of_test columns (judegment int)

    hive-1.2.1 支持insert,update,delete的配置

    hive-site.xml中添加配置

    <property>
        <name>hive.support.concurrency</name>
        <value>true</value>
    </property>
    <property>
        <name>hive.exec.dynamic.partition.mode</name>
        <value>nonstrict</value>
    </property>
    <property>
        <name>hive.txn.manager</name>
        <value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
    </property>
    <property>
        <name>hive.compactor.initiator.on</name>
        <value>true</value>
    </property>
    <property>
        <name>hive.compactor.worker.threads</name>
        <value>1</value>
    </property>
    <property>
      <name>hive.enforce.bucketing</name>
      <value>true</value>
    </property>
    XML

    建表語句

    create external table if not exists hm2.history_helper
    (
    guid string,
    starttime string,
    endtime string,
    num int
    )
    clustered by(guid) into 50 buckets
    stored as orc TBLPROPERTIES ('transactional'='true');

    ==說明:建表語句必須帶有into buckets子句和stored as orc TBLPROPERTIES ('transactional'='true')子句,并且不能帶有sorted by子句。==

    這樣,這個表就可以就行insert,update,delete操作了。

    ==注意:== 上面規(guī)則在 hive-1.2.1 是可以的,在 hive-2.1.1 中需要將external關(guān)鍵字去掉,即高版本不支持外部表update了。

    hive表中的鎖

    場景:

    在執(zhí)行insert into或insert overwrite任務(wù)時,中途手動將程序停掉,會出現(xiàn)卡死情況(無法提交MapReduce),只能執(zhí)行查詢操作,而drop insert操作均不可操作,無論執(zhí)行多久,都會保持卡死狀態(tài)

    臨時解決辦法是……把表名換一個……

    根本原因是:hive表被鎖或者某個分區(qū)被鎖,需要解鎖

    show locks 表名:

    可以查看表被鎖的情況

    解鎖

    unlock table 表名;  -- 解鎖表
    unlock table 表名 partition(dt='2014-04-01');  -- 解鎖某個分區(qū)

    表鎖和分區(qū)鎖是兩個不同的鎖,對表解鎖,對分區(qū)是無效的,分區(qū)需要單獨解鎖

    高版本hive默認(rèn)插入數(shù)據(jù)時,不能查詢,因為有鎖

    hive> show locks;
    OK
    test@helper EXCLUSIVE

    解決辦法:關(guān)閉鎖機(jī)制

    set hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager; // 這是默認(rèn)值
    set hive.support.concurrency=false; 默認(rèn)為true

    基本知識

    查看表結(jié)構(gòu)信息

     desc formatted table_name;
     desc table_name;

    導(dǎo)入數(shù)據(jù)到hive表

    load命令

    hive -e 'load data inpath "/data/MROutput/hm2_data_gamelabel_output2/part-r-*" into table hm2.game_label partition(dt="2018-10-11");'

    ==注意==:inpath 后接的hdfs路徑需要引號

    python中的hive命令字符串示例:

    cmd = 'hive -e \'load data inpath "%s/part-r-*" into table hm2.game_label partition(dt=%s);\''%(outpath, formatDate(day))

    orc格式表

    • hive創(chuàng)建orc格式表不能像textfile格式一樣直接load數(shù)據(jù)到表中,一般需要load創(chuàng)建臨時textfile表,然后通過insert into 或者insert overwrite到orc存儲格式表中。
    • 或者將現(xiàn)有orc文件cp到hive對應(yīng)表的目錄

      map,reduce知識






      什么情況下只有一個reduce?

    很多時候你會發(fā)現(xiàn)任務(wù)中不管數(shù)據(jù)量多大,不管你有沒有設(shè)置調(diào)整reduce個數(shù)的參數(shù),任務(wù)中一直都只有一個reduce任務(wù);其實只有一個reduce任務(wù)的情況,除了數(shù)據(jù)量小于hive.exec.reducers.bytes.per.reducer參數(shù)值的情況外,還有以下原因:

    1. 沒有g(shù)roup by的匯總,比如把select pt,count(1) from popt_tbaccountcopy_mes where pt = '2012-07-04' group by pt; 寫成 select count(1) from popt_tbaccountcopy_mes where pt = '2012-07-04';
      這點非常常見,希望大家盡量改寫。
    2. 用了Order by
    3. 有笛卡爾積
      通常這些情況下,除了找辦法來變通和避免,我暫時沒有什么好的辦法,因為這些操作都是全局的,所以hadoop不得不用一個reduce去完成;
      同樣的,在設(shè)置reduce個數(shù)的時候也需要考慮這兩個原則:使大數(shù)據(jù)量利用合適的reduce數(shù);使單個reduce任務(wù)處理合適的數(shù)據(jù)量。

    hive 優(yōu)化

    1. hive mapreduce參數(shù)優(yōu)化
      設(shè)置map,reduce任務(wù)分配的資源
      set mapreduce.map.memory.mb = 4096 ;
      set mapreduce.reduce.memory.mb = 4096 ; 
      set mapreduce.map.java.opts=-Xmx3686m;
      set mapreduce.reduce.java.opts=-Xmx3428m;
    2. hive.exec.parallel參數(shù)控制在同一個sql中的不同的job是否可以同時運行,默認(rèn)為false.

    下面是對于該參數(shù)的測試過程:

    測試sql:

    select r1.a 
    from 
    (select t.a from sunwg_10 t join sunwg_10000000 s on t.a=s.b) r1 
    join 
    (select s.b from sunwg_100000 t join sunwg_10 s on t.a=s.b) r2 
    on (r1.a=r2.b);
    • 當(dāng)參數(shù)為false的時候,三個job是順序的執(zhí)行
      set hive.exec.parallel=false;
    • 但是可以看出來其實兩個子查詢中的sql并無關(guān)系,可以并行的跑
      set hive.exec.parallel=true;
      1. 設(shè)置reduce個數(shù)
        set mapred.reduce.tasks = 15;

        ==總結(jié)==:
        在資源充足的時候hive.exec.parallel會讓那些存在并發(fā)job的sql運行得更快,但同時消耗更多的資源
        可以評估下hive.exec.parallel對我們的刷新任務(wù)是否有幫助.

    1. 參數(shù)設(shè)置

      set mapred.max.split.size=256000000;        -- 決定每個map處理的最大的文件大小,單位為B
    2. orc 小文件合并

      set hive.execution.engine = mr; # 不是必須的
      alter table hm3.hm3_format_log partition (dt="2019-09-17", hour="16", msgtype="web", action="image") concatenate;
      SQL

  1. msck repair修復(fù)大量分區(qū)

    set hive.msck.path.validation=ignore;
    msck repair table tbName;

hive on spark 知識

cdh 6.0.1 下通過設(shè)置:

set hive.execution.engine=spark;

也可以將默認(rèn)的application執(zhí)行引擎切換為spark;
apache hadoop 下配置 hive on spark

參數(shù)調(diào)優(yōu)

了解完了Spark作業(yè)運行的基本原理之后,對資源相關(guān)的參數(shù)就容易理解了。所謂的Spark資源參數(shù)調(diào)優(yōu),其實主要就是對Spark運行過程中各個使用資源的地方,通過調(diào)節(jié)各種參數(shù),來優(yōu)化資源使用的效率,從而提升Spark作業(yè)的執(zhí)行性能。以下參數(shù)就是Spark中主要的資源參數(shù),每個參數(shù)都對應(yīng)著作業(yè)運行原理中的某個部分。

num-executors/spark.executor.instances

  • 參數(shù)說明:該參數(shù)用于設(shè)置Spark作業(yè)總共要用多少個Executor進(jìn)程來執(zhí)行。Driver在向YARN集群管理器申請資源時,YARN集群管理器會盡可能按照你的設(shè)置來在集群的各個工作節(jié)點上,啟動相應(yīng)數(shù)量的Executor進(jìn)程。這個參數(shù)非常之重要,如果不設(shè)置的話,默認(rèn)只會給你啟動少量的Executor進(jìn)程,此時你的Spark作業(yè)的運行速度是非常慢的。
  • 參數(shù)調(diào)優(yōu)建議:每個Spark作業(yè)的運行一般設(shè)置50~100個左右的Executor進(jìn)程比較合適,設(shè)置太少或太多的Executor進(jìn)程都不好。設(shè)置的太少,無法充分利用集群資源;設(shè)置的太多的話,大部分隊列可能無法給予充分的資源。

executor-memory/spark.executor.memory

  • 參數(shù)說明:該參數(shù)用于設(shè)置每個Executor進(jìn)程的內(nèi)存。Executor內(nèi)存的大小,很多時候直接決定了Spark作業(yè)的性能,而且跟常見的JVM OOM異常,也有直接的關(guān)聯(lián)。
  • 參數(shù)調(diào)優(yōu)建議:每個Executor進(jìn)程的內(nèi)存設(shè)置4G8G較為合適。但是這只是一個參考值,具體的設(shè)置還是得根據(jù)不同部門的資源隊列來定??梢钥纯醋约簣F(tuán)隊的資源隊列的最大內(nèi)存限制是多少,num-executors乘以executor-memory,是不能超過隊列的最大內(nèi)存量的。此外,如果你是跟團(tuán)隊里其他人共享這個資源隊列,那么申請的內(nèi)存量最好不要超過資源隊列最大總內(nèi)存的1/31/2,避免你自己的Spark作業(yè)占用了隊列所有的資源,導(dǎo)致別的同學(xué)的作業(yè)無法運行。

executor-cores/spark.executor.cores

  • 參數(shù)說明:該參數(shù)用于設(shè)置每個Executor進(jìn)程的CPU core數(shù)量。這個參數(shù)決定了每個Executor進(jìn)程并行執(zhí)行task線程的能力。因為每個CPU core同一時間只能執(zhí)行一個task線程,因此每個Executor進(jìn)程的CPU core數(shù)量越多,越能夠快速地執(zhí)行完分配給自己的所有task線程。
  • 參數(shù)調(diào)優(yōu)建議:Executor的CPU core數(shù)量設(shè)置為2~4個較為合適。同樣得根據(jù)不同部門的資源隊列來定,可以看看自己的資源隊列的最大CPU core限制是多少,再依據(jù)設(shè)置的Executor數(shù)量,來決定每個Executor進(jìn)程可以分配到幾個CPU core。同樣建議,如果是跟他人共享這個隊列,那么num-executors * executor-cores不要超過隊列總CPU core的1/3~1/2左右比較合適,也是避免影響其他同學(xué)的作業(yè)運行。

driver-memory

  • 參數(shù)說明:該參數(shù)用于設(shè)置Driver進(jìn)程的內(nèi)存。
  • 參數(shù)調(diào)優(yōu)建議:Driver的內(nèi)存通常來說不設(shè)置,或者設(shè)置1G左右應(yīng)該就夠了。唯一需要注意的一點是,如果需要使用collect算子將RDD的數(shù)據(jù)全部拉取到Driver上進(jìn)行處理,那么必須確保Driver的內(nèi)存足夠大,否則會出現(xiàn)OOM內(nèi)存溢出的問題。

spark.default.parallelism

  • 參數(shù)說明:該參數(shù)用于設(shè)置每個stage的默認(rèn)task數(shù)量。這個參數(shù)極為重要,如果不設(shè)置可能會直接影響你的Spark作業(yè)性能。
  • 參數(shù)調(diào)優(yōu)建議:Spark作業(yè)的默認(rèn)task數(shù)量為500~1000個較為合適。很多同學(xué)常犯的一個錯誤就是不去設(shè)置這個參數(shù),那么此時就會導(dǎo)致Spark自己根據(jù)底層HDFS的block數(shù)量來設(shè)置task的數(shù)量,默認(rèn)是一個HDFS block對應(yīng)一個task。通常來說,Spark默認(rèn)設(shè)置的數(shù)量是偏少的(比如就幾十個task),如果task數(shù)量偏少的話,就會導(dǎo)致你前面設(shè)置好的Executor的參數(shù)都前功盡棄。試想一下,無論你的Executor進(jìn)程有多少個,內(nèi)存和CPU有多大,但是task只有1個或者10個,那么90%的Executor進(jìn)程可能根本就沒有task執(zhí)行,也就是白白浪費了資源!因此Spark官網(wǎng)建議的設(shè)置原則是,設(shè)置該參數(shù)為num-executors * executor-cores的2~3倍較為合適,比如Executor的總CPU core數(shù)量為300個,那么設(shè)置1000個task是可以的,此時可以充分地利用Spark集群的資源。

spark.storage.memoryFraction

  • 參數(shù)說明:該參數(shù)用于設(shè)置RDD持久化數(shù)據(jù)在Executor內(nèi)存中能占的比例,默認(rèn)是0.6。也就是說,默認(rèn)Executor 60%的內(nèi)存,可以用來保存持久化的RDD數(shù)據(jù)。根據(jù)你選擇的不同的持久化策略,如果內(nèi)存不夠時,可能數(shù)據(jù)就不會持久化,或者數(shù)據(jù)會寫入磁盤。
  • 參數(shù)調(diào)優(yōu)建議:如果Spark作業(yè)中,有較多的RDD持久化操作,該參數(shù)的值可以適當(dāng)提高一些,保證持久化的數(shù)據(jù)能夠容納在內(nèi)存中。避免內(nèi)存不夠緩存所有的數(shù)據(jù),導(dǎo)致數(shù)據(jù)只能寫入磁盤中,降低了性能。但是如果Spark作業(yè)中的shuffle類操作比較多,而持久化操作比較少,那么這個參數(shù)的值適當(dāng)降低一些比較合適。此外,如果發(fā)現(xiàn)作業(yè)由于頻繁的gc導(dǎo)致運行緩慢(通過spark web ui可以觀察到作業(yè)的gc耗時),意味著task執(zhí)行用戶代碼的內(nèi)存不夠用,那么同樣建議調(diào)低這個參數(shù)的值。

spark.shuffle.memoryFraction

  • 參數(shù)說明:該參數(shù)用于設(shè)置shuffle過程中一個task拉取到上個stage的task的輸出后,進(jìn)行聚合操作時能夠使用的Executor內(nèi)存的比例,默認(rèn)是0.2。也就是說,Executor默認(rèn)只有20%的內(nèi)存用來進(jìn)行該操作。shuffle操作在進(jìn)行聚合時,如果發(fā)現(xiàn)使用的內(nèi)存超出了這個20%的限制,那么多余的數(shù)據(jù)就會溢寫到磁盤文件中去,此時就會極大地降低性能。
  • 參數(shù)調(diào)優(yōu)建議:如果Spark作業(yè)中的RDD持久化操作較少,shuffle操作較多時,建議降低持久化操作的內(nèi)存占比,提高shuffle操作的內(nèi)存占比比例,避免shuffle過程中數(shù)據(jù)過多時內(nèi)存不夠用,必須溢寫到磁盤上,降低了性能。此外,如果發(fā)現(xiàn)作業(yè)由于頻繁的gc導(dǎo)致運行緩慢,意味著task執(zhí)行用戶代碼的內(nèi)存不夠用,那么同樣建議調(diào)低這個參數(shù)的值。

原文鏈接


Spark On Yarn執(zhí)行中executor內(nèi)存限制問題

解決Spark On Yarn執(zhí)行中executor內(nèi)存限制問題

集群版本 Spark 2.2.0 + Hadoop 3.0-CDH6.0.1

hive on saprk , 設(shè)置:

hive> set hive.execution.engine=spark;
hive> set spark.executor.memory=31.5g;
hive> set spark.executor.cores=11;
hive> set spark.serializer=org.apache.spark.serializer.KryoSerializer;

提示內(nèi)存不足

Failed to execute spark task, with exception 'org.apache.hadoop.hive.ql.metadata.HiveException(Failed to create Spark client for Spark session 50288c8b-96aa-44ad-9eea-3cb4abb1ae5b)'
FAILED: Execution Error, return code 30041 from org.apache.hadoop.hive.ql.exec.spark.SparkTask. Failed to create Spark client for Spark session 50288c8b-96aa-44ad-9eea-3cb4abb1ae5b

解決方案,修改Yarn的配置文件:

1、yarn.nodemanager.resource.memory-mb 容器內(nèi)存

設(shè)置為 至少 : executor-memory(15g) + driver(512m)的內(nèi)存,如上例可配置為 16g

2、yarn.scheduler.maximum-allocation-mb 最大容器內(nèi)存

設(shè)置為 至少 : executor-memory(15g) + driver(512m)的內(nèi)存,如上例可配置為 16g

第一個參數(shù)為NodeManager`的配置 ,第二個參數(shù)為 `ResourceManager的配置。

字符串處理

1 字符串連接:

concat(str, str2, str3,...) 字符串連接
concat_ws(separator, str, str2, str3, ...) 將字符串用separator作為間隔連接起來

2 字符串截取

substr(s, 0, 1) 截取第一個字符
substr(s, -1) 截取最后一個字符

3 字符串urldecode

reflect("java.net.URLDecoder", "decode", trim(originalfilename), "UTF-8")
SQL

hive 中 join

mapjoin的優(yōu)化在于,在mapreduce task開始之前,創(chuàng)建一個local task,小表以hshtable的形式加載到內(nèi)存,然后序列化到磁盤,把內(nèi)存的hashtable壓縮為tar文件。然后把文件分發(fā)到 Hadoop Distributed Cache,然后傳輸給每一個mappermapper在本地反序列化文件并加載進(jìn)內(nèi)存在做join

sql

select workflow,count(workflow) from (select guid, substr(workflow, -1) workflow from hm2.workflow_list) m right join hm2.helper helper on m.guid = helper.guid and helper.dt = "2018-10-21" group by workflow;

內(nèi)存溢出解決辦法:

set hive.auto.convert.join = false;
set hive.ignore.mapjoin.hint=false;
set hive.exec.parallel=true;

Hive中Join的原理和機(jī)制

籠統(tǒng)的說,Hive中的Join可分為Common Join(Reduce階段完成join)和Map Join(Map階段完成join)。本文簡單介紹一下兩種join的原理和機(jī)制。

Hive Common Join

如果不指定MapJoin或者不符合MapJoin的條件,那么Hive解析器會將Join操作轉(zhuǎn)換成Common Join,即:在Reduce階段完成join.
整個過程包含Map、Shuffle、Reduce階段。

  • Map階段

讀取源表的數(shù)據(jù),Map輸出時候以Join on條件中的列為key,如果Join有多個關(guān)聯(lián)鍵,則以這些關(guān)聯(lián)鍵的組合作為key;
Map輸出的value為join之后所關(guān)心的(select或者where中需要用到的)列;同時在value中還會包含表的Tag信息,用于標(biāo)明此value對應(yīng)哪個表;
按照key進(jìn)行排序

  • Shuffle階段
    根據(jù)key的值進(jìn)行hash,并將key/value按照hash值推送至不同的reduce中,這樣確保兩個表中相同的key位于同一個reduce中

  • Reduce階段
    根據(jù)key的值完成join操作,期間通過Tag來識別不同表中的數(shù)據(jù)。

以下面的HQL為例,圖解其過程:

SELECT 
 a.id,a.dept,b.age 
FROM a join b 
ON (a.id = b.id);

Hive Map Join

MapJoin通常用于一個很小的表和一個大表進(jìn)行join的場景,具體小表有多小,由參數(shù)hive.mapjoin.smalltable.filesize來決定,該參數(shù)表示小表的總大小,默認(rèn)值為25000000字節(jié),即25M。
Hive0.7之前,需要使用hint提示 /+ mapjoin(table) /才會執(zhí)行MapJoin,否則執(zhí)行Common Join,但在0.7版本之后,默認(rèn)自動會轉(zhuǎn)換Map Join,由參數(shù)hive.auto.convert.join來控制,默認(rèn)為true.
仍然以9.1中的HQL來說吧,假設(shè)a表為一張大表,b為小表,并且hive.auto.convert.join=true,那么Hive在執(zhí)行時候會自動轉(zhuǎn)化為MapJoin。

如圖中的流程,首先是Task A,它是一個Local Task(在客戶端本地執(zhí)行的Task),負(fù)責(zé)掃描小表b的數(shù)據(jù),將其轉(zhuǎn)換成一個HashTable的數(shù)據(jù)結(jié)構(gòu),并寫入本地的文件中,之后將該文件加載到DistributeCache中,該HashTable的數(shù)據(jù)結(jié)構(gòu)可以抽象為:

key value
1 26
2 34
  • 接下來是Task B,該任務(wù)是一個沒有Reduce的MR,啟動MapTasks掃描大表a,在Map階段,根據(jù)a的每一條記錄去和DistributeCache中b表對應(yīng)的HashTable關(guān)聯(lián),并直接輸出結(jié)果。
  • 由于MapJoin沒有Reduce,所以由Map直接輸出結(jié)果文件,有多少個Map Task,就有多少個結(jié)果文件。

原文鏈接

轉(zhuǎn)義字符

hive> select split('a:1||b:2||c:3','\\|\\|') from hm2.test;
OK
["a:1","b:2","c:3"]
["a:1","b:2","c:3"]
["a:1","b:2","c:3"]
["a:1","b:2","c:3"]

其它轉(zhuǎn)義字符還有{, [

insert table select from

insert into tbName select * from tbName2;
insert overwrite table tbName select * from tbName2;

insert overwrite例子

insert overwrite table hm2.helper partition(dt = '2018-06-22', hour = '09',msgtype = 'helper') select time,source,remote_addr,remote_user,body_bytes_sent,request_time,status,host,request_method,http_referrer,http_x_forwarded_for,http_user_agent,upstream_response_time,upstream_addr,guid,helperversion,osversion,ngx_timestamp,get_type,split(ip2area(http_x_forwarded_for,remote_addr), "\t")[0] country,split(ip2area(http_x_forwarded_for,remote_addr), "\t")[1] province,split(ip2area(http_x_forwarded_for,remote_addr), "\t")[2] city from hm2.helper where dt = '2018-06-22' and hour = '09' and msgtype = 'helper';

插入分區(qū)表,不用指定分區(qū),可以自動識別

INSERT overwrite TABLE test.dis_helper PARTITION (dt,hour,msgtype) select `(num)?+.+` from (select *,row_number() over (partition by guid order by time asc) num from hm2.helper where dt ='2018-09-06'and hour between '00' and '23' and msgtype='helper') t where t.num=1;

這里把數(shù)據(jù)去重,插入分區(qū)表test.dis_helper中,自動根據(jù)dt,hour,msgtype字段的取值進(jìn)入分區(qū)表,并且`(num)?+.+表示除了`num``這個字段。

explain 查看執(zhí)行計劃

對于這么一個插敘sql

explain
select
workflow,count(workflow) cou from (select guid, split(split(workflow,"\\|\\|")[0], ":")[1] workflow
from 
hm2.workflow_list) m
inner join
hm2.flash flash
on m.guid = flash.guid and flash.dt = "2018-11-04"
group by workflow order by cou;

可以打印出執(zhí)行計劃

STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-2 depends on stages: Stage-1
  Stage-3 depends on stages: Stage-2
  Stage-0 depends on stages: Stage-3

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: workflow_list
            Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
            Filter Operator
              predicate: guid is not null (type: boolean)
              Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
              Select Operator
                expressions: guid (type: string), split(split(workflow, '\|\|')[0], ':')[1] (type: string)
                outputColumnNames: _col0, _col1
                Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
                Reduce Output Operator
                  key expressions: _col0 (type: string)
                  sort order: +
                  Map-reduce partition columns: _col0 (type: string)
                  Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
                  value expressions: _col1 (type: string)
          TableScan
            alias: flash
            Statistics: Num rows: 489153811 Data size: 48915382339 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: guid is not null (type: boolean)
              Statistics: Num rows: 244576906 Data size: 24457691219 Basic stats: COMPLETE Column stats: NONE
              Reduce Output Operator
                key expressions: guid (type: string)
                sort order: +
                Map-reduce partition columns: guid (type: string)
                Statistics: Num rows: 244576906 Data size: 24457691219 Basic stats: COMPLETE Column stats: NONE
      Reduce Operator Tree:
        Join Operator
          condition map:
               Inner Join 0 to 1
          keys:
            0 _col0 (type: string)
            1 guid (type: string)
          outputColumnNames: _col1
          Statistics: Num rows: 269034602 Data size: 26903460924 Basic stats: COMPLETE Column stats: NONE
          Group By Operator
            aggregations: count(_col1)
            keys: _col1 (type: string)
            mode: hash
            outputColumnNames: _col0, _col1
            Statistics: Num rows: 269034602 Data size: 26903460924 Basic stats: COMPLETE Column stats: NONE
            File Output Operator
              compressed: false
              table:
                  input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                  output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                  serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe

  Stage: Stage-2
    Map Reduce
      Map Operator Tree:
          TableScan
            Reduce Output Operator
              key expressions: _col0 (type: string)
              sort order: +
              Map-reduce partition columns: _col0 (type: string)
              Statistics: Num rows: 269034602 Data size: 26903460924 Basic stats: COMPLETE Column stats: NONE
              value expressions: _col1 (type: bigint)
      Reduce Operator Tree:
        Group By Operator
          aggregations: count(VALUE._col0)
          keys: KEY._col0 (type: string)
          mode: mergepartial
          outputColumnNames: _col0, _col1
          Statistics: Num rows: 134517301 Data size: 13451730462 Basic stats: COMPLETE Column stats: NONE
          File Output Operator
            compressed: false
            table:
                input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe

  Stage: Stage-3
    Map Reduce
      Map Operator Tree:
          TableScan
            Reduce Output Operator
              key expressions: _col1 (type: bigint)
              sort order: +
              Statistics: Num rows: 134517301 Data size: 13451730462 Basic stats: COMPLETE Column stats: NONE
              value expressions: _col0 (type: string)
      Reduce Operator Tree:
        Select Operator
          expressions: VALUE._col0 (type: string), KEY.reducesinkkey0 (type: bigint)
          outputColumnNames: _col0, _col1
          Statistics: Num rows: 134517301 Data size: 13451730462 Basic stats: COMPLETE Column stats: NONE
          File Output Operator
            compressed: false
            Statistics: Num rows: 134517301 Data size: 13451730462 Basic stats: COMPLETE Column stats: NONE
            table:
                input format: org.apache.hadoop.mapred.TextInputFormat
                output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink

常用 sql 查詢

常用SQL查詢。

explode 一行變多行

explode主要是將一行數(shù)據(jù)按照某個字段變?yōu)槎嘈小?/p>

select mt.impress,count(mt.guid) from (select (case when t.impressed = '' or t.impressed is null then 'null' else t.impressed end) impress,t.guid from (select guid,impressed from hm2.install lateral view explode(split(replace(replace(offer_impressed,'[',''),']',''), ',')) test_alias as impressed where dt='2018-12-17') t) mt group by mt.impress;

分組后取 top N

row_number() over(partition by)

hm4.ffbrowser_domain_url_2019_07_17數(shù)據(jù)如下:

www.baidu.com   https://www.baidu.com/link?url=fCBMjU_THQzoAdo0RTAQLQIfVWIlPlgzpEM5Pk5qChKiahWFfMzFo6ckRjd9OFc7w8cj5h8esZXKBab5WqeLPgDYipewXUdz9LFPf-oxOfK&wd=&eqid=d2c80e8e0033424b000000045d2f14e9    1
www.baidu.com   https://www.baidu.com/link?url=3jlP1HLS4aYSnN1L5CG3pPr0zXnqBpDdG8mlFQm47w1RcHFwHiBU0t8Hi0UrMD37lSJvQkGWQ3iBtNpc0AJhEei-v8MdGKgRnVqy62tuCA_&wd=&eqid=e3d5845b002790f7000000045d2f0af8    1
www.baidu.com   http://www.baidu.com/   1
www.baidu.com   https://www.baidu.com/link?url=8S_ziMFwpClJww3C15iXu__wqMrMOxnYuDnZDpQWnbs1PTTqx_wwjIY7QsrFfaKT&wd=&eqid=eb8f38f70039cf51000000045d2e8716   1
www.baidu.com   https://www.baidu.com/link?url=AZvluWbTjZjpaT5lnIpkB-gTIdyX_nZdtoLX_pkbM5i&wd=&eqid=8b09c549000038e7000000035930ca9a    1
www.baidu.com   https://www.baidu.com/link?url=IjStquL7c4YwVDk1zQJFYkwBiGY20Kv2PQsXuTQTHH0BhAPLjUaz-XhLLp5Zfe3fE4hU_KNfEs6JxyESkwGlea&wd=&eqid=e7e404c9000012b1000000045d2ee845 1
www.baidu.com   https://www.baidu.com/link?url=qRaLKHc_ZZIskkWF_f6azkmHqRlfgmuRQZcrzRovBC5MEBR5yTIG20FiR3O__8Jz&wd=&eqid=e13f05290018e7fb000000045d2eed7d   2
www.baidu.com   https://www.baidu.com/s?tn=50000201_hao_pg&usm=3&wd=%E7%AB%AF%E7%81%AB%E9%94%85%E6%B3%BC%E5%A6%BB%E5%AD%90%E5%90%8C%E5%AD%A6&ie=utf-8&rsv_cq=%E4%BA%BA%E7%B1%BB%E7%99%BB%E6%9C%8850%E5%91%A8%E5%B9%B4&rsv_dl=0_right_fyb_pchot_20811_01&rsf=531e82477396136261c6275e8afa58b1_1_10_8&rqid=e3d5845b002790f7   1
www.baidu.com   https://www.baidu.com/link?url=NHmzZVrcbQ1tf6JnR4MJlHXJZFy-4RMgKwjNeDvskMyl17vpdi_8XgVCdRvGFtU2WJpNpHQf4VbwIeQi5qDHskDTrDUK5KMUkrkfKcWYxhy&wd=&eqid=e3d5845b002790f7000000045d2f0af8    1
www.baidu.com   https://www.baidu.com/s?wd=%E5%BE%AE%E5%8D%9A&ie=utf-8  1

sql查詢語句

select a.domain,a.url,a.cou from (select domain,url,cou,row_number() over(partition by domain order by cou desc) as n from hm4.ffbrowser_domain_url_2019_07_17)a where a.n <= 2;

結(jié)果

www.baidu.com   https://www.baidu.com/  69
www.baidu.com   https://www.baidu.com/link?url=r6riiF-vxG9OX70KBVx86FuywJYXHu-TpTTSEst9ggK78xIjVvkI_QoS9tEDBAqq&wd=&eqid=ba409e160014f8c9000000045eec97 3

相似問題:每門課程成績的前N名

case when 用法

select reginlistlength, softname, sum(cou) from (select (case when reginlistlength > '1' then 'more' else reginlistlength end) as reginlistlength, softname, count(l.guid) cou from (select reginlistlength, softname, guid from hm2.lnk where dt = '2019-05-24' and softtype = '1' group by guid,reginlistlength, softname, guid)l where l.guid in (select guid from hm2.helper where dt = '2019-05-24' group by guid) group by reginlistlength, softname)m group by reginlistlength, softname;
select (case when reginlistlength > '1' then 'more' else reginlistlength end) as reginlistlength, softname, count(l.guid) cou from (select reginlistlength, softname, guid from hm2.lnk where dt = '2019-05-24' and softtype = '1' group by guid,reginlistlength, softname, guid)l where l.guid in (select guid from hm2.helper where dt = '2019-05-24' group by guid) group by reginlistlength, softname;









作者:柯廣的網(wǎng)絡(luò)日志

微信公眾號:Java大數(shù)據(jù)與數(shù)據(jù)倉庫