sqoop用法之mysql與hive數(shù)據(jù)導(dǎo)入導(dǎo)出
本文目錄
一. Sqoop
介紹
二. Mysql
數(shù)據(jù)導(dǎo)入到 Hive
三. Hive
數(shù)據(jù)導(dǎo)入到Mysql
四. mysql
數(shù)據(jù)增量導(dǎo)入hive
- 1 基于遞增列
Append
導(dǎo)入- 1). 創(chuàng)建
hive
表 - 2). 創(chuàng)建
job
- 3) 執(zhí)行
job
- 1). 創(chuàng)建
- 2
Lastmodified
導(dǎo)入實戰(zhàn)- 1). 新建一張表
- 2). 初始化
hive
表:
一. Sqoop介紹
Sqoop
是一個用來將Hadoop
和關(guān)系型數(shù)據(jù)庫中的數(shù)據(jù)相互轉(zhuǎn)移的工具,可以將一個關(guān)系型數(shù)據(jù)庫(例如:MySQL、Oracle、Postgres
等)中的數(shù)據(jù)導(dǎo)進(jìn)到Hadoop
的HDFS
中,也可以將HDFS
的數(shù)據(jù)導(dǎo)進(jìn)到關(guān)系型數(shù)據(jù)庫中。對于某些NoSQL
數(shù)據(jù)庫它也提供了連接器。Sqoop
,類似于其他ETL
工具,使用元數(shù)據(jù)模型來判斷數(shù)據(jù)類型并在數(shù)據(jù)從數(shù)據(jù)源轉(zhuǎn)移到Hadoop
時確保類型安全的數(shù)據(jù)處理。Sqoop
專為大數(shù)據(jù)批量傳輸設(shè)計,能夠分割數(shù)據(jù)集并創(chuàng)建Hadoop
任務(wù)來處理每個區(qū)塊。
本文版本說明
hadoop
版本 :hadoop-2.7.2
hive版本
:hive-2.1.0
sqoop版本:sqoop-1.4.6
二. Mysql 數(shù)據(jù)導(dǎo)入到 Hive
1). 將mysql
的people_access_log
表導(dǎo)入到hive
表web.people_access_log
,并且hive
中的表不存在。
mysql
中表people_access_log
數(shù)據(jù)為:
將mysql
數(shù)據(jù)導(dǎo)入hive
的命令為:
該命令會啟用一個mapreduce
任務(wù),將mysql
數(shù)據(jù)導(dǎo)入到hive
表,并且指定了hive
表的分隔符為\t
,如果不指定則為默認(rèn)分隔符^A(ctrl+A)
。
參數(shù)說明
2). 也可以通過--query
條件查詢Mysql
數(shù)據(jù),將查詢結(jié)果導(dǎo)入到Hive
sqoop import \
--connect jdbc:mysql://master1.hadoop:3306/test \
--username root \
--password 123456 \
--query 'select * from people_access_log where \$CONDITIONS and url = "https://www.baidu.com"' \
--target-dir /user/hive/warehouse/web/people_access_log \
--delete-target-dir \
--fields-terminated-by '\t' \
-m 1
三. Hive數(shù)據(jù)導(dǎo)入到Mysql
還是使用上面的hive
表web.people_access_log
,將其導(dǎo)入到mysql
中的people_access_log_out
表中.
注意:mysql
表people_access_log_out
需要提前建好,否則報錯:ErrorException: Table 'test.people_access_log_out' doesn't exist
。如果有id
自增列,hive
表也需要有,hive
表與mysql
表字段必須完全相同。
create table people_access_log_out like people_access_log;
執(zhí)行完一個mr
任務(wù)后,成功導(dǎo)入到mysql
表people_access_log_out
中.
四. mysql數(shù)據(jù)增量導(dǎo)入hive
實際中mysql
數(shù)據(jù)會不斷增加,這時候需要用sqoop
將數(shù)據(jù)增量導(dǎo)入hive
,然后進(jìn)行海量數(shù)據(jù)分析統(tǒng)計。增量數(shù)據(jù)導(dǎo)入分兩種,一是基于遞增列的增量數(shù)據(jù)導(dǎo)入(Append
方式)。二是基于時間列的增量數(shù)據(jù)導(dǎo)入(LastModified
方式)。有幾個核心參數(shù):
–check-column
:用來指定一些列,這些列在增量導(dǎo)入時用來檢查這些數(shù)據(jù)是否作為增量數(shù)據(jù)進(jìn)行導(dǎo)入,和關(guān)系型數(shù)據(jù)庫中的自增字段及時間戳類似.注意:這些被指定的列的類型不能使任意字符類型,如char、varchar等類型都是不可以的,同時–check-column
可以去指定多個列–incremental
:用來指定增量導(dǎo)入的模式,兩種模式分別為Append
和Lastmodified
–last-value
:指定上一次導(dǎo)入中檢查列指定字段最大值
1. 基于遞增列Append導(dǎo)入
接著前面的日志表,里面每行有一個唯一標(biāo)識自增列ID
,在關(guān)系型數(shù)據(jù)庫中以主鍵形式存在。之前已經(jīng)將id在0~6
之間的編號的訂單導(dǎo)入到Hadoop
中了(這里為HDFS
),現(xiàn)在一段時間后我們需要將近期產(chǎn)生的新的訂 單數(shù)據(jù)導(dǎo)入Hadoop
中(這里為HDFS
),以供后續(xù)數(shù)倉進(jìn)行分析。此時我們只需要指定–incremental
參數(shù)為append
,–last-value
參數(shù)為6
即可。表示只從id
大于6
后即7
開始導(dǎo)入。
1). 創(chuàng)建hive
表
首先我們需要創(chuàng)建一張與mysql
結(jié)構(gòu)相同的hive
表,假設(shè)指定字段分隔符為\t
,后面導(dǎo)入數(shù)據(jù)時候分隔符也需要保持一致。
2). 創(chuàng)建job
增量導(dǎo)入肯定是多次進(jìn)行的,可能每隔一個小時、一天等,所以需要創(chuàng)建計劃任務(wù),然后定時執(zhí)行即可。我們都知道hive
的數(shù)據(jù)是存在hdfs
上面的,我們創(chuàng)建sqoop job
的時候需要指定hive
的數(shù)據(jù)表對應(yīng)的hdfs
目錄,然后定時執(zhí)行這個job
即可。
當(dāng)前mysql
中數(shù)據(jù),hive
中數(shù)據(jù)與mysql
一樣也有6條:
增量導(dǎo)入有幾個參數(shù),保證下次同步的時候可以接著上次繼續(xù)同步.
這里通過sqoop job --create job_name
命令創(chuàng)建了一個名為mysql2hive_job
的sqoop job
。
3). 執(zhí)行job
創(chuàng)建好了job
,后面只需要定時周期執(zhí)行這個提前定義好的job
即可。我們先往mysql
里面插入2條數(shù)據(jù)。
這樣mysql
里面就會多了2條數(shù)據(jù)。此時hive
里面只有id
為1 ~ 6
的數(shù)據(jù),執(zhí)行同步job
使用以下命令。
sqoop job -exec mysql2hive_job
執(zhí)行完成后,發(fā)現(xiàn)剛才mysql
新加入的id
為7 ~ 8
的兩條數(shù)據(jù)已經(jīng)同步到hive
。
由于實際場景中,mysql
表中的數(shù)據(jù),比如訂單表等,通常是一致有數(shù)據(jù)進(jìn)入的,這時候只需要將sqoop job -exec mysql2hive_job
這個命令定時(比如說10分鐘頻率)執(zhí)行一次,就能將數(shù)據(jù)10分鐘同步一次到hive
數(shù)據(jù)倉庫。
2. Lastmodified
導(dǎo)入實戰(zhàn)
append
適合業(yè)務(wù)系統(tǒng)庫,一般業(yè)務(wù)系統(tǒng)表會通過自增ID作為主鍵標(biāo)識唯一性。Lastmodified
適合ETL
的數(shù)據(jù)根據(jù)時間戳字段導(dǎo)入,表示只導(dǎo)入比這個時間戳大,即比這個時間晚的數(shù)據(jù)。
1). 新建一張表
在mysql
中新建一張表people_access_log2
,并且初始化幾條數(shù)據(jù):
插入數(shù)據(jù):
mysql
里面的數(shù)據(jù)就是這樣:
2). 初始化hive
表:
初始化hive
數(shù)據(jù),將mysql
里面的6
條數(shù)據(jù)導(dǎo)入hive
中,并且可以自動幫助我們創(chuàng)建對應(yīng)hive
表,何樂而不為,否則我們需要自己手動創(chuàng)建,完成初始化工作。
可以看到執(zhí)行該命令后,啟動了二一個mapreduce
任務(wù),這樣6條數(shù)據(jù)就進(jìn)入hive
表web.people_access_log2
了:
3). 增量導(dǎo)入數(shù)據(jù):
我們再次插入一條數(shù)據(jù)進(jìn)入mysql
的people_access_log2
表:
insert into people_access_log2(id,user_id, ip, url) values(7,15110101016,'112.168.12.45','https://www.qq.com');
此時,mysql
表里面已經(jīng)有7
條數(shù)據(jù)了,我們使用incremental
的方式進(jìn)行增量的導(dǎo)入到hive
:
2019-12-28 16:23:56
就是第6條數(shù)據(jù)的時間,這里需要指定。報錯了:
19/12/28 16:17:25 ERROR tool.ImportTool: Error during import: --merge-key or --append is required when using --incremental lastmodified and the output directory exists.
注意:可以看到--merge-key or --append is required when using --incremental lastmodified
意思是,這種基于時間導(dǎo)入模式,需要指定--merge-key
或者--append
參數(shù),表示根據(jù)時間戳導(dǎo)入,數(shù)據(jù)是直接在末尾追加(append)還是合并(merge),這里使用merge
方式,根據(jù)id
合并:
執(zhí)行該命令后,與直接導(dǎo)入不同,該命令啟動了2個mapreduce
任務(wù),這樣就把數(shù)據(jù)增量merge
導(dǎo)入hive
表了.
可以看到id=6
的數(shù)據(jù),有2條,它的時間剛好是--last-value
指定的時間,則會導(dǎo)入大于等于--last-value
指定時間的數(shù)據(jù),這點需要注意。
作者:柯廣的網(wǎng)絡(luò)日志
微信公眾號:Java大數(shù)據(jù)與數(shù)據(jù)倉庫