SpringBoot系列(16):線程池-多線程Executors并發(fā)編程之批量查詢-插入數(shù)據(jù)


作者: 修羅debug
版權(quán)聲明:本文為博主原創(chuàng)文章,遵循 CC 4.0 by-sa 版權(quán)協(xié)議,轉(zhuǎn)載請附上原文出處鏈接和本聲明。

摘要:在上篇文章中Debug給大家分享介紹了“Java線程池-多線程的其中一種應(yīng)用場景~廣播式給所有有效用戶發(fā)送郵件(通知)”,本篇文章我們將繼續(xù)向前邁進(jìn),繼續(xù)介紹并實戰(zhàn)“線程池-多線程的應(yīng)用場景”,這一場景簡稱為“批量插入大量的數(shù)據(jù)”,同樣是采用Java中的Executors下的其中某種線程池進(jìn)行實戰(zhàn)實現(xiàn)!

內(nèi)容:“批量插入數(shù)據(jù)”這一業(yè)務(wù)場景在企業(yè)級應(yīng)用開發(fā)中還是比較常見的,顧明思議,即“將給定的大批量的數(shù)據(jù)插入到指定的數(shù)據(jù)庫中去”,本文我們將采用Java代碼、線程池-多線程的方式進(jìn)行實現(xiàn),感受一下開辟N個子線程去批量插入數(shù)據(jù)時的高效之處!

需要指出的是,“待插入的大批量的數(shù)據(jù)”的某個字段 來源于數(shù)據(jù)庫表“codes”,即字段item_id,在這一業(yè)務(wù)場景中我們將首先從該數(shù)據(jù)庫表codes中拉取出所有的item_id,然后復(fù)制給另外的數(shù)據(jù)庫表item_data中另外的字段code 中去,即實現(xiàn)所謂的“查詢-批量插入大批量的數(shù)據(jù)”。

其中,目標(biāo)數(shù)據(jù)庫表item_data的DDL定義如下所示:

CREATE TABLE `item_data` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`code` varchar(255) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '編碼',
`p_id` int(11) DEFAULT NULL COMMENT '編號',
`create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '創(chuàng)建時間',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='數(shù)據(jù)表';

值得一提的是,那些待拉取的數(shù)據(jù)來源于數(shù)據(jù)庫表codes中,在實戰(zhàn)實現(xiàn)“插入大批量的數(shù)據(jù)”之前,我們寫個Java單元測試或者其他的方式往codes表中模擬生成幾十萬甚至幾百萬數(shù)據(jù)記錄,即codes數(shù)據(jù)庫表中item_id的取值需要提前大批量生成并插入進(jìn)去,在這里,我們預(yù)先生成的數(shù)據(jù)量為40w。

接下來,我們便進(jìn)入實際的代碼實戰(zhàn)實現(xiàn)環(huán)節(jié)!

(1)首先,我們直接在ThreadController中寫一個請求方法,待會兒用于Postman發(fā)起“批量插入數(shù)據(jù)”的請求,其代碼如下所示:

    @RequestMapping(value = "all/insert/data",method = RequestMethod.GET)
public BaseResponse insertAllData(){
BaseResponse response=new BaseResponse(StatusCode.Success);
try {
threadService.insertDatas();

}catch (Exception e){
response=new BaseResponse(StatusCode.Fail.getCode(),e.getMessage());
}
return response;
}

(2)threadService.insertDatas(); 即為批量插入大批量數(shù)據(jù)的實際核心業(yè)務(wù)邏輯,其完整的核心業(yè)務(wù)代碼如下所示:  

    @Autowired
private DataMapper dataMapper;

@Autowired
private DataService dataService;

//TODO:批量拉取、插入數(shù)據(jù) - 40w
public void insertDatas() throws Exception{
//TODO:總線程數(shù) 10
final Integer threadSize=10;

//TODO:總數(shù)據(jù)量 40w
final Long total=dataMapper.getTotal();

if (total>0){
//TODO:每個線程將執(zhí)行插入操作的數(shù)據(jù)條目
Long pageSize=(total%threadSize==0)?total/threadSize:total/threadSize+1;

Set<String> datas;
ExecutorService executorService=Executors.newFixedThreadPool(threadSize);

List<ThreadInsertDataDto> list=Lists.newLinkedList();
for (Long i=1L;i<=threadSize;i++){
//TODO:將每個線程即將執(zhí)行的具體條目記錄拿出來
if (Objects.equals(i, threadSize)){
pageSize = total - (threadSize-1) * pageSize;
}
datas=dataService.pageLimitData(i,pageSize);

//TODO:構(gòu)造線程實例
list.add(new ThreadInsertDataDto(dataService,datas));
}
//TODO:多線程批量插入數(shù)據(jù)-邏輯
executorService.invokeAll(list);
}
}

在該核心業(yè)務(wù)邏輯中,我們首先是開辟了10個線程(大伙兒要根據(jù)實際的機(jī)器配置此參數(shù)哈,我的機(jī)子是8核16線程的),根據(jù)這10個線程,按照“待拉取的總數(shù)據(jù)條目/10”得到的“數(shù)據(jù)量pageSize”即為每個線程要去數(shù)據(jù)庫表codes中拉取的數(shù)據(jù)量!

每個線程拉取到相應(yīng)的數(shù)據(jù)條目之后,即可構(gòu)造對應(yīng)的線程實例ThreadInsertDataDto,該類實例本質(zhì)上就是一個“線程實例”,其中實現(xiàn)的run方法即為核心的“將數(shù)據(jù)插入到指定的數(shù)據(jù)表中”!

(3)ThreadInsertDataDto的完整代碼如下所示:

public class ThreadInsertDataDto implements Callable<Boolean>{

private DataService dataService;
private Set<String> set;

public ThreadInsertDataDto(DataService dataService, Set<String> set) {
this.dataService = dataService;
this.set = set;
}

//TODO:實際的插入數(shù)據(jù)到數(shù)據(jù)庫表的真正邏輯
@Override
public Boolean call() throws Exception {
if (dataService!=null){
dataService.insertBatchData(set);
}
return true;
}
}

而dataService.insertBatchData(set); 就是具體的“插入大批量的數(shù)據(jù)到數(shù)據(jù)庫表的實際代碼實現(xiàn)”,其完整的源代碼如下所示:  

    @Autowired
private ItemDataMapper itemDataMapper;


//TODO:批量插入數(shù)據(jù)
@Async("taskDataExecutor")
public void insertBatchData(Set<String> set){
log.info("----開始批量插入數(shù)據(jù)----");

List<ItemData> list= Lists.newLinkedList();
//TODO:真正的插入數(shù)據(jù)的業(yè)務(wù)邏輯
set.forEach(s -> {
ItemData data=new ItemData(null,s,1);
list.add(data);
});
//這是一個批量插入的方法
itemDataMapper.insertBatch(list);
}

其中,itemDataMapper.insertBatch(list); 即為Mybatis批量插入的方法,其完整的源代碼如下所示:  

<!--批量插入-->
<insert id="insertBatch">
insert into item_data (id, code, p_id)
values

<foreach collection="datas" item="data" separator="," >
(null,#{data.code},#{data.pId})
</foreach>

</insert>

至此,我們已經(jīng)擼完“采用線程池-多線程的方式實現(xiàn)批量插入大批量數(shù)據(jù)”的業(yè)務(wù)場景,下面我們將整個項目跑起來,并采用Postman發(fā)起請求,你會驚訝的發(fā)現(xiàn)40w的數(shù)據(jù),采用10個線程去批量插入時,5秒的時間都不到就可以完成了(其實是1s而已 哈哈)!

如下圖所示:




當(dāng)然啦,大家也可以將待插入的數(shù)據(jù)量上調(diào)到100w、甚至是1000w,那樣測出來的效果才能更加令自己興奮!

好了,本篇文章我們就介紹到這里了,其他相關(guān)的技術(shù),感興趣的小伙伴可以關(guān)注底部Debug的技術(shù)公眾號,或者加Debug的微信,拉你進(jìn)“微信版”的真正技術(shù)交流群!一起學(xué)習(xí)、共同成長!

補(bǔ)充:

1、本文涉及到的相關(guān)的源代碼可以到此地址,check出來進(jìn)行查看學(xué)習(xí):

https://gitee.com/steadyjack/SpringBootTechnology

2、目前Debug已將本文所涉及的內(nèi)容整理錄制成視頻教程,感興趣的小伙伴可以前往觀看學(xué)習(xí):https://www.fightjava.com/web/index/course/detail/5

3、關(guān)注一下Debug的技術(shù)微信公眾號,最新的技術(shù)文章、課程以及技術(shù)專欄將會第一時間在公眾號發(fā)布哦!