Java面試題~Java線程池實(shí)戰(zhàn)總結(jié)一之百萬數(shù)據(jù)的批量插入

作者: 修羅debug
版權(quán)聲明:本文為博主原創(chuàng)文章,遵循 CC 4.0 by-sa 版權(quán)協(xié)議,轉(zhuǎn)載請附上原文出處鏈接和本聲明。



摘要

對于從事Java開發(fā)的小伙伴而言,“線程池”一詞應(yīng)當(dāng)不陌生,雖然在實(shí)際工作、項(xiàng)目實(shí)戰(zhàn)中可能很少用過,但是在工作閑暇或吹水之余還是會(huì)聽到他人在討論,更有甚者,在跳槽面試等場合更是屢見不鮮,已然成為一道“必面題”。從本文開始我們將開啟“Java線程池實(shí)戰(zhàn)總結(jié)”系列文章的分享,幫助各位小伙伴認(rèn)識(shí)、鞏固并實(shí)戰(zhàn)線程池的相關(guān)技術(shù)要點(diǎn)。

內(nèi)容

一、吹一波“線程池”

“線程池”,字如其名,是“線程”+“池”合并得來的,“線程”的含義自然不用多說,而“池”其實(shí)是一種“設(shè)計(jì)思想”,即池化設(shè)計(jì)技術(shù),這種思想簡而言之是為了能更好地重復(fù)利用資源,減少因資源重復(fù)創(chuàng)建而帶來的損耗,提高資源重復(fù)利用率。

“池化設(shè)計(jì)”帶來的產(chǎn)物有很多,目前在市面上我們經(jīng)常可以見到的有“數(shù)據(jù)庫連接池”、“Http連接池”、“Redis連接池”以及我們本文要介紹的“線程池”等等,都是對這種設(shè)計(jì)思想的應(yīng)用。

線程池提供了一種限制和管理“線程”資源的機(jī)制,池里面的每個(gè)線程所承擔(dān)的職責(zé)無非就是執(zhí)行項(xiàng)目、程序中待執(zhí)行的任務(wù),當(dāng)待執(zhí)行的任務(wù)數(shù)量大于1時(shí),線程池中預(yù)先分配好的多個(gè)線程便起到了作用,當(dāng)執(zhí)行完任務(wù)后,線程并不會(huì)立即被Kill掉,而是會(huì)保留在線程池一段時(shí)間,等待下次被重復(fù)利用。   

二、基于ThreadPoolExecutor實(shí)現(xiàn)百萬數(shù)據(jù)的批量插入

接下來,我們以實(shí)際項(xiàng)目中典型的業(yè)務(wù)場景“多線程批量插入隨機(jī)生成的200w條數(shù)據(jù)”為案例,初步認(rèn)識(shí)并了解Java中如何使用線程池 ThreadPoolExecutor 做一些事情。

1)首先,我們得先定義這些事情,也就是“任務(wù)” 是什么,特別是要讓我們的系統(tǒng)、線程知道待執(zhí)行的每個(gè)“任務(wù)”是什么,很顯然,我們的任務(wù)是“插入數(shù)據(jù)”,而這些數(shù)據(jù)將插入數(shù)據(jù)庫表 book中,該數(shù)據(jù)庫表的DDL定義如下所示:

CREATE TABLE `book` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(255) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '書名',
`author` varchar(255) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '作者',
`release_date` date DEFAULT NULL COMMENT '發(fā)布日期',
`is_active` tinyint(255) DEFAULT '1' COMMENT '是否有效',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='書籍信息表';

而這個(gè)任務(wù)其實(shí)可以通過 實(shí)現(xiàn) Runnable 接口或者 Callable 接口來定義,只有通過這種方式定義,我們的系統(tǒng)才認(rèn)識(shí),如下代碼所示:

class insertTask implements Callable<Boolean>{
@Override
public Boolean call() throws Exception {
Book b;
String msg;
for (int i=0;i<50000;i++){
b=new Book();
msg=snowflake.nextIdStr();
b.setName(msg);
b.setAuthor(msg+"-A");
b.setReleaseDate(new Date());
bookMapper.insertSelective(b);
}
return true;
}
}

在上述代碼中,我們定義了每個(gè)線程待執(zhí)行的“任務(wù)”:循環(huán)插入 5w 條數(shù)據(jù),其中,為了避免多線程并發(fā)生成相同的數(shù)據(jù),我們采用了雪花算法工具生成了“全局唯一ID”;而200w的數(shù)據(jù),需要在外部循環(huán)遍歷 40次,如果用傳統(tǒng)的“單一線程”進(jìn)行實(shí)現(xiàn),很顯然是行不通的(那得插到天昏地暗才能完成….

 

2)有了任務(wù),那么接下來就需要有“線程”去執(zhí)行,在這里我們需要定義線程執(zhí)行器 ThreadPoolExecutor 實(shí)例,講給這位大佬安排處理,即用于在線程池創(chuàng)建 N 個(gè)線程,并用于執(zhí)行執(zhí)行這 40個(gè) 任務(wù),其源碼如下所示:

@Autowired
private BookMapper bookMapper;

private ArrayBlockingQueue queue=new ArrayBlockingQueue(8,true);

private ThreadPoolExecutor.CallerRunsPolicy policy=new ThreadPoolExecutor.CallerRunsPolicy();

private ThreadPoolExecutor executor=new ThreadPoolExecutor(4,8,10, TimeUnit.SECONDS
,queue,policy);

private static final Snowflake snowflake=new Snowflake(3,2);

@Test
public void testBatchInsert() throws Exception{
List<insertTask> tasks= Lists.newLinkedList();
for (int j=0;j<40;j++){
tasks.add(new insertTask());
}
executor.invokeAll(tasks);
}

點(diǎn)擊運(yùn)行該單元測試方法,之后需要等待一定的時(shí)間,最終你會(huì)在IDEA控制臺(tái)看到同時(shí)會(huì)有好幾個(gè)線程并行插入數(shù)據(jù),在數(shù)據(jù)庫表中也最終可以看到其成功插入了 200w 條數(shù)據(jù),如下圖所示:   




三、說道說道ThreadPoolExecutor

1)對于 ThreadPoolExecutor,可以說在實(shí)際項(xiàng)目中算是很常見的了,其頂層實(shí)現(xiàn)類為Executor,繼承關(guān)系如下所示(通過看源碼就可以一眼得到):


2)主線程首先需要?jiǎng)?chuàng)建實(shí)現(xiàn) Runnable 或者 Callable 接口的任務(wù)對象,并把創(chuàng)建完成的 Runnable/Callable接口的對象實(shí)例直接交給 ExecutorService 執(zhí)行,即主要通過 execute()或者submit()執(zhí)行或者invokeAll()。

 

3)接下來分析一下創(chuàng)建 ThreadPoolExecutor 的源碼,ThreadPoolExecutor 類中提供了4個(gè)構(gòu)造方法,我們只需要看最長的那個(gè),因?yàn)槠溆嗳齻€(gè)都是在這個(gè)構(gòu)造方法的基礎(chǔ)上產(chǎn)生的,如圖所示:   


4)下面重點(diǎn)介紹一下其中的幾個(gè)參數(shù):

A. corePoolSize:核心線程數(shù)定義了最小可以同時(shí)運(yùn)行的線程數(shù)量


B. maximumPoolSize:當(dāng)隊(duì)列中存放的任務(wù)達(dá)到隊(duì)列容量的時(shí)候,當(dāng)前可以同時(shí)運(yùn)行的線程數(shù)量將變?yōu)樽畲缶€程數(shù)


C. workQueue:當(dāng)新任務(wù)到來是,系統(tǒng)會(huì)先判斷當(dāng)前運(yùn)行的線程數(shù)是否已達(dá)到核心線程數(shù),如果達(dá)到的話,新任務(wù)就會(huì)被存放在隊(duì)列中


D. keepAliveTime:當(dāng)線程池中的線程數(shù)量大于 corePoolSize 的時(shí)候,如果這時(shí)沒有新的任務(wù)提交,核心線程外的線程不會(huì)立即銷毀,而是會(huì)等待,直到等待的時(shí)間超過了 keepAliveTime才會(huì)被回收銷毀


E. unitkeepAliveTime 參數(shù)的時(shí)間單位


F. threadFactoryexecutor 創(chuàng)建新線程的時(shí)候會(huì)用到


G. handler:飽和策略,下面會(huì)單獨(dú)介紹

 

下面這張圖或許可以加深你對線程池中各個(gè)參數(shù)的相互關(guān)系的理解(圖片來源:《Java 性能調(diào)優(yōu)實(shí)戰(zhàn)》)



5ThreadPoolExecutor 飽和策略定義:如果當(dāng)前同時(shí)正在運(yùn)行的線程數(shù)量達(dá)到了最大線程數(shù)量并且隊(duì)列也已經(jīng)被放滿了任務(wù)時(shí),ThreadPoolTaskExecutor 將定義一些策略:

 

AThreadPoolExecutor.AbortPolicy:拋出 RejectedExecutionException來拒絕新任務(wù)的處理。

BThreadPoolExecutor.CallerRunsPolicy:調(diào)用執(zhí)行自己的線程運(yùn)行任務(wù),也就是直接在調(diào)用execute方法的線程中運(yùn)行(run)被拒絕的任務(wù),如果執(zhí)行程序已關(guān)閉,則會(huì)丟棄該任務(wù)。因此這種策略會(huì)降低對于新任務(wù)提交速度,影響程序的整體性能。如果您的應(yīng)用程序可以承受此延遲并且你要求任何一個(gè)任務(wù)請求都要被執(zhí)行的話,你可以選擇這個(gè)策略。

CThreadPoolExecutor.DiscardPolicy: 不處理新任務(wù),直接丟棄掉。

DThreadPoolExecutor.DiscardOldestPolicy: 此策略將丟棄最早的未處理的任務(wù)請求。

 

附帶一提:當(dāng)我們不指定 RejectedExecutionHandler 飽和策略的話來配置線程池的時(shí)候默認(rèn)使用的是 ThreadPoolExecutor.AbortPolicy


四、實(shí)際項(xiàng)目中的使用建議

1在《阿里巴巴 Java 開發(fā)手冊》中強(qiáng)制線程池不允許使用 Executors 去創(chuàng)建,而是通過 ThreadPoolExecutor 構(gòu)造函數(shù)的方式,這樣的處理方式是為了讓寫的同學(xué)更加明確線程池的運(yùn)行規(guī)則,規(guī)避資源耗盡的風(fēng)險(xiǎn)

 

2Executors 返回線程池對象的弊端如下:

AFixedThreadPool SingleThreadExecutor:當(dāng)允許請求的隊(duì)列長度設(shè)置為 Integer.MAX_VALUE 時(shí),很有可能系統(tǒng)會(huì)堆積大量的請求,從而導(dǎo)致 OOM;

BCachedThreadPool ScheduledThreadPool:當(dāng)允許創(chuàng)建的線程數(shù)量設(shè)置為 Integer.MAX_VALUE時(shí),則很有可能會(huì)創(chuàng)建大量線程,從而導(dǎo)致 OOM

 

總結(jié)

本文我們先初步認(rèn)識(shí)一下Java中經(jīng)常使用的線程池ThreadPoolExecutor,下一小節(jié)我們將接著本節(jié)的內(nèi)容,分析分析 ThreadPoolExecutor 的原理。