sqoop用法之mysql與hive數(shù)據(jù)導(dǎo)入導(dǎo)出

本文目錄

一. Sqoop介紹
二. Mysql 數(shù)據(jù)導(dǎo)入到 Hive
三. Hive數(shù)據(jù)導(dǎo)入到Mysql
四. mysql數(shù)據(jù)增量導(dǎo)入hive

  • 1 基于遞增列Append導(dǎo)入
    • 1). 創(chuàng)建hive
    • 2). 創(chuàng)建job
    • 3) 執(zhí)行job
  • 2 Lastmodified 導(dǎo)入實戰(zhàn)
    • 1). 新建一張表
    • 2). 初始化hive表:

一. Sqoop介紹

Sqoop是一個用來將Hadoop和關(guān)系型數(shù)據(jù)庫中的數(shù)據(jù)相互轉(zhuǎn)移的工具,可以將一個關(guān)系型數(shù)據(jù)庫(例如:MySQL、Oracle、Postgres等)中的數(shù)據(jù)導(dǎo)進(jìn)到HadoopHDFS中,也可以將HDFS的數(shù)據(jù)導(dǎo)進(jìn)到關(guān)系型數(shù)據(jù)庫中。對于某些NoSQL數(shù)據(jù)庫它也提供了連接器。Sqoop,類似于其他ETL工具,使用元數(shù)據(jù)模型來判斷數(shù)據(jù)類型并在數(shù)據(jù)從數(shù)據(jù)源轉(zhuǎn)移到Hadoop時確保類型安全的數(shù)據(jù)處理。Sqoop專為大數(shù)據(jù)批量傳輸設(shè)計,能夠分割數(shù)據(jù)集并創(chuàng)建Hadoop任務(wù)來處理每個區(qū)塊。

本文版本說明

hadoop版本 : hadoop-2.7.2
hive版本 : hive-2.1.0
sqoop版本:sqoop-1.4.6

二. Mysql 數(shù)據(jù)導(dǎo)入到 Hive

1). 將mysqlpeople_access_log表導(dǎo)入到hiveweb.people_access_log,并且hive中的表不存在。
mysql中表people_access_log數(shù)據(jù)為:

1,15110101010,1577003281739,'112.168.1.2','https://www.baidu.com'
2,15110101011,1577003281749,'112.16.1.23','https://www.baidu.com'
3,15110101012,1577003281759,'193.168.1.2','https://www.taobao.com'
4,15110101013,1577003281769,'112.18.1.2','https://www.baidu.com'
5,15110101014,1577003281779,'112.168.10.2','https://www.baidu.com'
6,15110101015,1577003281789,'11.168.1.2','https://www.taobao.com'
Java

mysql數(shù)據(jù)導(dǎo)入hive的命令為:

sqoop import \
--connect jdbc:mysql://master1.hadoop:3306/test \
--username root \
--password 123456 \
--table people_access_log \
-m 1 \
--hive-import \
--create-hive-table \
--fields-terminated-by '\t' \
--hive-table web.people_access_log
Bash

該命令會啟用一個mapreduce任務(wù),將mysql數(shù)據(jù)導(dǎo)入到hive表,并且指定了hive表的分隔符為\t,如果不指定則為默認(rèn)分隔符^A(ctrl+A)。

參數(shù)說明


2). 也可以通過--query條件查詢Mysql數(shù)據(jù),將查詢結(jié)果導(dǎo)入到Hive

sqoop import \
--connect jdbc:mysql://master1.hadoop:3306/test \
--username root \
--password 123456 \
--query 'select * from people_access_log where \$CONDITIONS and url = "https://www.baidu.com"' \
--target-dir /user/hive/warehouse/web/people_access_log \
--delete-target-dir \
--fields-terminated-by '\t' \
-m 1

三. Hive數(shù)據(jù)導(dǎo)入到Mysql

還是使用上面的hiveweb.people_access_log,將其導(dǎo)入到mysql中的people_access_log_out表中.

sqoop export \
--connect jdbc:mysql://master1.hadoop:3306/test \
--username root \
--password 123456 \
--table people_access_log_out \
--input-fields-terminated-by '\t' \
--export-dir /user/hive/warehouse/web.db/people_access_log \
--num-mappers 1
Bash

注意:mysqlpeople_access_log_out需要提前建好,否則報錯:ErrorException: Table 'test.people_access_log_out' doesn't exist。如果有id自增列,hive表也需要有,hive表與mysql表字段必須完全相同。

create table people_access_log_out like people_access_log;

執(zhí)行完一個mr任務(wù)后,成功導(dǎo)入到mysqlpeople_access_log_out中.

四. mysql數(shù)據(jù)增量導(dǎo)入hive

實際中mysql數(shù)據(jù)會不斷增加,這時候需要用sqoop將數(shù)據(jù)增量導(dǎo)入hive,然后進(jìn)行海量數(shù)據(jù)分析統(tǒng)計。增量數(shù)據(jù)導(dǎo)入分兩種,一是基于遞增列的增量數(shù)據(jù)導(dǎo)入(Append方式)。二是基于時間列的增量數(shù)據(jù)導(dǎo)入(LastModified方式)。有幾個核心參數(shù):

  • –check-column:用來指定一些列,這些列在增量導(dǎo)入時用來檢查這些數(shù)據(jù)是否作為增量數(shù)據(jù)進(jìn)行導(dǎo)入,和關(guān)系型數(shù)據(jù)庫中的自增字段及時間戳類似.注意:這些被指定的列的類型不能使任意字符類型,如char、varchar等類型都是不可以的,同時–check-column可以去指定多個列
  • –incremental:用來指定增量導(dǎo)入的模式,兩種模式分別為AppendLastmodified
  • –last-value:指定上一次導(dǎo)入中檢查列指定字段最大值

1. 基于遞增列Append導(dǎo)入

接著前面的日志表,里面每行有一個唯一標(biāo)識自增列ID,在關(guān)系型數(shù)據(jù)庫中以主鍵形式存在。之前已經(jīng)將id在0~6之間的編號的訂單導(dǎo)入到Hadoop中了(這里為HDFS),現(xiàn)在一段時間后我們需要將近期產(chǎn)生的新的訂 單數(shù)據(jù)導(dǎo)入Hadoop中(這里為HDFS),以供后續(xù)數(shù)倉進(jìn)行分析。此時我們只需要指定–incremental 參數(shù)為append,–last-value參數(shù)為6即可。表示只從id大于6后即7開始導(dǎo)入。

1). 創(chuàng)建hive

首先我們需要創(chuàng)建一張與mysql結(jié)構(gòu)相同的hive表,假設(shè)指定字段分隔符為\t,后面導(dǎo)入數(shù)據(jù)時候分隔符也需要保持一致。

2). 創(chuàng)建job

增量導(dǎo)入肯定是多次進(jìn)行的,可能每隔一個小時、一天等,所以需要創(chuàng)建計劃任務(wù),然后定時執(zhí)行即可。我們都知道hive的數(shù)據(jù)是存在hdfs上面的,我們創(chuàng)建sqoop job的時候需要指定hive的數(shù)據(jù)表對應(yīng)的hdfs目錄,然后定時執(zhí)行這個job即可。

當(dāng)前mysql中數(shù)據(jù),hive中數(shù)據(jù)與mysql一樣也有6條:


增量導(dǎo)入有幾個參數(shù),保證下次同步的時候可以接著上次繼續(xù)同步.

sqoop job --create mysql2hive_job -- import \
--connect jdbc:mysql://master1.hadoop:3306/test \
--username root \
--password 123456 \
--table people_access_log \
--target-dir /user/hive/warehouse/web.db/people_access_log \
--check-column id \
--incremental append \
--fields-terminated-by '\t' \
--last-value 6 \
-m 1
Bash

這里通過sqoop job --create job_name命令創(chuàng)建了一個名為mysql2hive_jobsqoop job。

3). 執(zhí)行job

創(chuàng)建好了job,后面只需要定時周期執(zhí)行這個提前定義好的job即可。我們先往mysql里面插入2條數(shù)據(jù)。

INSERT INTO `people_access_log` (`id`,`user_id`,`access_time`,`ip`,`url`) VALUES
(7,15110101016,1577003281790,'112.168.1.3','https://www.qq.com'),
(8,15110101017,1577003281791,'112.1.1.3','https://www.microsoft.com');
SQL

這樣mysql里面就會多了2條數(shù)據(jù)。此時hive里面只有id1 ~ 6的數(shù)據(jù),執(zhí)行同步job使用以下命令。

sqoop job -exec mysql2hive_job

執(zhí)行完成后,發(fā)現(xiàn)剛才mysql新加入的id7 ~ 8的兩條數(shù)據(jù)已經(jīng)同步到hive。

hive> select * from web.people_access_log;
OK
1   15110101010 1577003281739   112.168.1.2 https://www.baidu.com
2   15110101011 1577003281749   112.16.1.23 https://www.baidu.com
3   15110101012 1577003281759   193.168.1.2 https://www.taobao.com
4   15110101013 1577003281769   112.18.1.2  https://www.baidu.com
5   15110101014 1577003281779   112.168.10.2    https://www.baidu.com
6   15110101015 1577003281789   11.168.1.2  https://www.taobao.com
7   15110101016 1577003281790   112.168.1.3 https://www.qq.com
8   15110101017 1577003281791   112.1.1.3   https://www.microsoft.com
SQL

由于實際場景中,mysql表中的數(shù)據(jù),比如訂單表等,通常是一致有數(shù)據(jù)進(jìn)入的,這時候只需要將sqoop job -exec mysql2hive_job這個命令定時(比如說10分鐘頻率)執(zhí)行一次,就能將數(shù)據(jù)10分鐘同步一次到hive數(shù)據(jù)倉庫。

2. Lastmodified 導(dǎo)入實戰(zhàn)

append適合業(yè)務(wù)系統(tǒng)庫,一般業(yè)務(wù)系統(tǒng)表會通過自增ID作為主鍵標(biāo)識唯一性。Lastmodified適合ETL的數(shù)據(jù)根據(jù)時間戳字段導(dǎo)入,表示只導(dǎo)入比這個時間戳大,即比這個時間晚的數(shù)據(jù)。

1). 新建一張表

mysql中新建一張表people_access_log2,并且初始化幾條數(shù)據(jù):

CREATE TABLE `people_access_log2` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
  `user_id` bigint(20) unsigned NOT NULL COMMENT '用戶id',
  `access_time` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  `ip` varchar(15) NOT NULL COMMENT '訪客ip',
  `url` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
SQL

插入數(shù)據(jù):

insert into people_access_log2(id,user_id, ip, url) values(1,15110101010,'112.168.1.200','https://www.baidu.com');
insert into people_access_log2(id,user_id, ip, url) values(2,15110101011,'112.16.1.2','https://www.baidu.com');
insert into people_access_log2(id,user_id, ip, url) values(3,15110101012,'112.168.1.2','https://www.taobao.com');
insert into people_access_log2(id,user_id, ip, url) values(4,15110101013,'112.168.10.2','https://www.baidu.com');
insert into people_access_log2(id,user_id, ip, url) values(5,15110101014,'112.168.1.2','https://www.jd.com');
insert into people_access_log2(id,user_id, ip, url) values(6,15110101015,'112.168.12.4','https://www.qq.com');
SQL

mysql里面的數(shù)據(jù)就是這樣:


2). 初始化hive表:

初始化hive數(shù)據(jù),將mysql里面的6條數(shù)據(jù)導(dǎo)入hive中,并且可以自動幫助我們創(chuàng)建對應(yīng)hive表,何樂而不為,否則我們需要自己手動創(chuàng)建,完成初始化工作。

sqoop import \
--connect jdbc:mysql://master1.hadoop:3306/test \
--username root \
--password 123456 \
--table people_access_log2 \
--hive-import \
--create-hive-table \
--fields-terminated-by ',' \
--hive-table web.people_access_log2
Bash

可以看到執(zhí)行該命令后,啟動了二一個mapreduce任務(wù),這樣6條數(shù)據(jù)就進(jìn)入hiveweb.people_access_log2了:

hive> select * from web.people_access_log2;
OK
1   15110101010 2019-12-28 16:23:10.0   112.168.1.200   https://www.baidu.com
2   15110101011 2019-12-28 16:23:33.0   112.16.1.2  https://www.baidu.com
3   15110101012 2019-12-28 16:23:41.0   112.168.1.2 https://www.taobao.com
4   15110101013 2019-12-28 16:23:46.0   112.168.10.2    https://www.baidu.com
5   15110101014 2019-12-28 16:23:52.0   112.168.1.2 https://www.jd.com
6   15110101015 2019-12-28 16:23:56.0   112.168.12.4    https://www.qq.com
Time taken: 0.326 seconds, Fetched: 6 row(s)
SQL

3). 增量導(dǎo)入數(shù)據(jù):

我們再次插入一條數(shù)據(jù)進(jìn)入mysqlpeople_access_log2表:

insert into people_access_log2(id,user_id, ip, url) values(7,15110101016,'112.168.12.45','https://www.qq.com');

此時,mysql表里面已經(jīng)有7條數(shù)據(jù)了,我們使用incremental的方式進(jìn)行增量的導(dǎo)入到hive:

sqoop import \
--connect jdbc:mysql://master1.hadoop:3306/test \
--username root \
--password 123456 \
--table people_access_log2 \
--hive-import \
--hive-table people_access_log2 \
-m 1 \
--check-column access_time \
--incremental lastmodified \
--last-value "2019-12-28 16:23:56" \
Bash

2019-12-28 16:23:56就是第6條數(shù)據(jù)的時間,這里需要指定。報錯了:

19/12/28 16:17:25 ERROR tool.ImportTool: Error during import: --merge-key or --append is required when using --incremental lastmodified and the output directory exists.

注意:可以看到--merge-key or --append is required when using --incremental lastmodified意思是,這種基于時間導(dǎo)入模式,需要指定--merge-key或者--append參數(shù),表示根據(jù)時間戳導(dǎo)入,數(shù)據(jù)是直接在末尾追加(append)還是合并(merge),這里使用merge方式,根據(jù)id合并:

sqoop import \
--connect jdbc:mysql://master1.hadoop:3306/test \
--username root \
--password 123456 \
--table people_access_log2 \
--hive-import \
--hive-table web.people_access_log2 \
--check-column access_time \
--incremental lastmodified \
--last-value "2019-12-28 16:23:56" \
--fields-terminated-by ',' \
--merge-key id
Bash

執(zhí)行該命令后,與直接導(dǎo)入不同,該命令啟動了2個mapreduce任務(wù),這樣就把數(shù)據(jù)增量merge導(dǎo)入hive表了.

hive> select * from web.people_access_log2 order by id;
OK
1   15110101010 2019-12-28 16:23:10.0   112.168.1.200   https://www.baidu.com
2   15110101011 2019-12-28 16:23:33.0   112.16.1.2  https://www.baidu.com
3   15110101012 2019-12-28 16:23:41.0   112.168.1.2 https://www.taobao.com
4   15110101013 2019-12-28 16:23:46.0   112.168.10.2    https://www.baidu.com
5   15110101014 2019-12-28 16:23:52.0   112.168.1.2 https://www.jd.com
6   15110101015 2019-12-28 16:23:56.0   112.168.12.4    https://www.qq.com
6   15110101015 2019-12-28 16:23:56.0   112.168.12.4    https://www.qq.com
7   15110101016 2019-12-28 16:28:24.0   112.168.12.45   https://www.qq.com
Time taken: 0.241 seconds, Fetched: 8 row(s)
SQL

可以看到id=6的數(shù)據(jù),有2條,它的時間剛好是--last-value指定的時間,則會導(dǎo)入大于等于--last-value指定時間的數(shù)據(jù),這點需要注意。










作者:柯廣的網(wǎng)絡(luò)日志

微信公眾號:Java大數(shù)據(jù)與數(shù)據(jù)倉庫