Java并發(fā)編程(2)- FutureTask詳解與池化思想的設(shè)計(jì)和實(shí)戰(zhàn)一
作者:
修羅debug
版權(quán)聲明:本文為博主原創(chuàng)文章,遵循 CC 4.0 by-sa 版權(quán)協(xié)議,轉(zhuǎn)載請(qǐng)附上原文出處鏈接和本聲明。
在Java并發(fā)編程領(lǐng)域,FutureTask可以說是一個(gè)非常強(qiáng)大的利器,它通過實(shí)現(xiàn)RunnableFuture接口間接擁有了Runnable和Future接口的相關(guān)特性,既可以用于充當(dāng)線程執(zhí)行的任務(wù)(Runnable),也可以用于獲取線程異步執(zhí)行任務(wù)后返回的結(jié)果(Future);本文將通過剖析解讀FutureTask底層相關(guān)的核心源碼,并基于FutureTask自設(shè)計(jì)并實(shí)戰(zhàn)一款“池容器”,即池化思想的設(shè)計(jì)和實(shí)戰(zhàn);
寫在前面的話:debug最近又出了一本新書:《Spring
Boot企業(yè)級(jí)項(xiàng)目-入門到精通》 感興趣的小伙伴可以前往各大商城平臺(tái)(淘寶、天貓、當(dāng)當(dāng)、京東等)一睹為快!書籍的封面如下所示,后續(xù)debug會(huì)專門出篇文章專門介紹這本書(同時(shí)提供優(yōu)惠購(gòu)書渠道):
言歸正傳,在上篇文章中:Java并發(fā)編程(1): Callable、Future和FutureTask ,我們已經(jīng)介紹并實(shí)戰(zhàn)過了Java并發(fā)編程中Callable、Future以及FutureTask的相關(guān)基本概念以及API,本文將不再贅述;
值得一提的是,Future或者FutureTask需要通過線程池才能發(fā)揮出實(shí)際的功效,因此在實(shí)際應(yīng)用中它跟線程池又有著千絲萬縷的聯(lián)系,本文將從源碼的角度進(jìn)行剖析,通過解讀FutureTask底層相關(guān)的核心源碼,并基于FutureTask自設(shè)計(jì)并實(shí)戰(zhàn)一款“池容器”,即池化思想的設(shè)計(jì)和實(shí)戰(zhàn);
(1)在上篇文章中想必各位觀看老爺們已經(jīng)基本知道了Future、FutureTask需要結(jié)合線程池來使用,看下方代碼:
ArrayBlockingQueue queue=new ArrayBlockingQueue(2);
ExecutorService executor=new ThreadPoolExecutor(2,4,1, TimeUnit.MINUTES,queue);
FutureTask<Map<String,Object>> futureTask=new FutureTask<Map<String, Object>>(new ProductThread());
executor.execute(futureTask);
Map<String,Object> resMap=futureTask.get();
System.out.println("--子線程執(zhí)行任務(wù)后得到的結(jié)果:"+resMap);
簡(jiǎn)短解說:在上述該代碼中,ProductThread是一個(gè)實(shí)現(xiàn)了Callable接口的類,其中的call()方法便是真正要執(zhí)行的任務(wù)代碼邏輯,在此就不貼出來了(在上篇文章有源碼);然后通過它構(gòu)造一FutureTask,最后便是提交給線程池的execute()方法進(jìn)行執(zhí)行,執(zhí)行完成之后,通過futureTask的get()方法獲取線程異步執(zhí)行后返回的結(jié)果;
而本文我們將基于這一段“解說”進(jìn)行核心源碼的剖析。
(2)首先是new
FutureTask<Map<String, Object>>(new ProductThread()),即創(chuàng)建FutureTask,其底層源碼如下所示:
//創(chuàng)建任務(wù).等待被線程池中的線程執(zhí)行
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
在上述源碼中,值得一提的是,任務(wù)的運(yùn)行狀態(tài)變量state的設(shè)計(jì)個(gè)人覺得相當(dāng)巧妙,采用volatile關(guān)鍵字進(jìn)行定義,而volatile的作用想必有些小伙伴是比較熟悉的:
(1)保證線程之間可見性:即對(duì) 用volatile關(guān)鍵字修飾的變量 的可見性,即 “當(dāng)一個(gè)線程修改了這個(gè)變量的值時(shí),volatile 可以保證新值能立即同步到主內(nèi)存,以及每次使用前立即從主內(nèi)存刷新”
(2)禁止指令重排序:A.那么什么是“指令重排序”呢,我們寫的代碼最終都將轉(zhuǎn)化為相應(yīng)的指令交付給底層控制單元、計(jì)算單元執(zhí)行,即CPU執(zhí)行,重排序 則指的是CPU采用了允許將多條指令不按程序規(guī)定的順序分開發(fā)送給各相應(yīng)電路單元處理,這樣做的弊端在于多核處理器下各處理器會(huì)發(fā)生亂序執(zhí)行,從而導(dǎo)致我們所謂的 “并發(fā)安全”問題,而volatile關(guān)鍵詞就禁止了這種現(xiàn)象,即通過在本地代碼中插入許多內(nèi)存屏障指令來保證處理器不發(fā)生亂序執(zhí)行;
OK,回到線程待執(zhí)行任務(wù)的運(yùn)行狀態(tài)變量state,其定義和取值如下所示(總共有6個(gè)狀態(tài)的取值,其含義直接翻譯過來就行了):
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
從官方的源碼注釋中可以看出 一個(gè)任務(wù)從創(chuàng)建到完畢,可能經(jīng)歷的狀態(tài)變化為:
NEW -> COMPLETING -> NORMAL
NEW -> COMPLETING -> EXCEPTIONAL
NEW -> CANCELLED
NEW -> INTERRUPTING -> INTERRUPTED
(3)FutureTask任務(wù)定義好了之后,接下來就應(yīng)該是交給線程準(zhǔn)備執(zhí)行了,即:executor.execute(futureTask) ,其底層源碼如下所示:
public void execute(Runnable command){
//如果任務(wù)對(duì)象為null,拋異常
if (command == null)
throw new NullPointerException();
//先獲取當(dāng)前池中工作中的線程(活躍的、可用的線程數(shù))
//如果當(dāng)前池中的線程數(shù)小于核心線程數(shù),就會(huì)調(diào)用addWorker檢查運(yùn)行狀態(tài)和正在運(yùn)行的
//線程數(shù)量
//通過return操作可以用于防止錯(cuò)誤地添加線程、然后執(zhí)行當(dāng)前任務(wù)
//(因?yàn)殡S意的添加線程只會(huì)造成資源浪費(fèi))
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//否則當(dāng) 池中的線程數(shù)大于核心線程數(shù)的時(shí)候 且 任務(wù)可以被加入任務(wù)隊(duì)列時(shí)
//我們需要來個(gè)雙重判斷,判斷是否真的需要添加一個(gè)新的線程來執(zhí)行這個(gè)任務(wù),
//因?yàn)榭赡芤呀?jīng)存在這樣的情況:線程執(zhí)行完畢任務(wù)后的那一刻可能處于空閑狀態(tài),
//這個(gè)時(shí)候該線程就可以直接復(fù)用;
//否則直接創(chuàng)建一個(gè)新的線程來執(zhí)行此任務(wù)
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//判斷池中是否有可用的線程,如果沒有 而且 也無法將當(dāng)前任務(wù)加入到任務(wù)隊(duì)列時(shí)
//則拒絕執(zhí)行當(dāng)前的任務(wù)(拒絕的策略取決于創(chuàng)建線程池時(shí)指定的策略)
if (!isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果不能創(chuàng)建新的線程去執(zhí)行新任務(wù)的話,就拒絕當(dāng)前任務(wù)
else if (!addWorker(command, false))
reject(command);
}
在上述源碼中,addWorker()方法很搶眼,因此不得不稍微介紹一番,因?yàn)樵摲椒ùa有點(diǎn)長(zhǎng),debug特意將其截成長(zhǎng)圖供各位看官老爺們閱讀,如下所示:
在上圖中有一小段代碼便是真正觸發(fā)“線程執(zhí)行任務(wù)”的時(shí)機(jī),即:
if (workerAdded) {
t.start();
workerStarted = true;
}
start()方法一調(diào)用,便最終會(huì)調(diào)用FutureTask中的run()方法,其底層源碼如下所示:
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
//將前面執(zhí)行 new FutureTask(callable) 代碼時(shí)傳入該構(gòu)造方法的任務(wù)對(duì)象
//callable引用 交給新的 c
//如果當(dāng)前任務(wù)處于 NEW,即創(chuàng)建的狀態(tài),則執(zhí)行callable原生定義的call()方法的
//代碼邏輯,其實(shí)就是 ProductThread 中的 call()方法的代碼邏輯
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
//執(zhí)行完成之后,將得到的結(jié)果通過 set() 方法設(shè)置到 FutureTask 中的
//私有變量 outCome中
if (ran)
set(result);
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
其中,if (ran) set(result); 表示將線程異步執(zhí)行完之后得到的結(jié)果通過 set() 方法設(shè)置到 FutureTask 中的私有變量 outCome中,其代碼如下所示:
//上述執(zhí)行完run()方法后,會(huì)得到一個(gè)結(jié)果V,將該結(jié)果通過 set(V) 方法可以設(shè)置
//到 FutureTask
//中的私有變量 outCome中
//其中設(shè)置回去的過程其實(shí)也是加了鎖,只不過是一個(gè)樂觀鎖,即通過cas機(jī)制來實(shí)現(xiàn)
//即“判斷當(dāng)前任務(wù)的舊狀態(tài)old=stateOffset是否真的為New,如果是,//則將其設(shè)置為COMPLETING,
//代表任務(wù)正在執(zhí)行中”,其他線程就執(zhí)行不成功
//然后設(shè)置完成后,將該任務(wù)的運(yùn)行狀態(tài)設(shè)置為NORMAL,即最終態(tài)為//“任務(wù)已執(zhí)行完成”
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
最后,便是通過futureTask的get()方法獲取線程異步執(zhí)行后返回的結(jié)果,即對(duì)應(yīng)的業(yè)務(wù)代碼為: Map<String,Object> resMap=futureTask.get(); 接下來一起研讀get()方法底層的源碼:
//獲取線程異步執(zhí)行后的結(jié)果,過程:先獲取當(dāng)前任務(wù)的運(yùn)行狀態(tài)
//如果是已完成,即Normal狀態(tài)時(shí),則直接report(s),即獲取結(jié)果
//如果是未完成(即處于運(yùn)行期間 : <= COMPLETING),
//則進(jìn)入等待的邏輯,即awaitDone()方法里面的代碼邏輯
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
而report()方法的底層源碼也很簡(jiǎn)潔,如下所示:
//其中report()方法的核心邏輯,就是通過判斷當(dāng)前任務(wù)的運(yùn)行狀態(tài)(已完成)
//從而將線程的執(zhí)行結(jié)果 result 返回:
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
如果當(dāng)前任務(wù)仍然處于運(yùn)行中的狀態(tài)時(shí),則執(zhí)行 awaitDone() 方法進(jìn)入堵塞隊(duì)列等待獲取執(zhí)行結(jié)果的代碼邏輯,如下所示:
/**
* Awaits completion or aborts on interrupt or timeout.
* 等待任務(wù)執(zhí)行完成、或者任務(wù)被終止、或者任務(wù)被中斷、或者任務(wù)執(zhí)行超時(shí)
* @param timed true if use timed waits
* @param nanos time to wait, if timed
*
* 如果設(shè)置timed為true,則代表在獲取線程異步執(zhí)行的結(jié)果時(shí) 可以等待一定的時(shí)間nanos
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
//這其實(shí)也是一個(gè)死循環(huán) - CAS自旋 + AQS 即同步隊(duì)列控制器 的原理
for (;;) {
//看看執(zhí)行任務(wù)的線程是不是被中斷interrupt,如果是的話做出:
//1.在等待隊(duì)列中移除這個(gè)調(diào)用get方法的線程結(jié)點(diǎn)WaitNode
//(怎么移除呢:很簡(jiǎn)單,只需要調(diào)整鏈表中的線程節(jié)點(diǎn)即可)
//2. 拋出異常
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
//表示任務(wù)執(zhí)行完畢:可能正常完成也可能拋異常,總之就是結(jié)束了
//就把這個(gè)waitNode的執(zhí)行線程thread指向null
if (q != null)
q.thread = null;
return s;
}
//如果state==COMPLETING,意味著基本完成但還沒保存結(jié)果,就yield,//表示線程掛起
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
//q == null時(shí),即等待節(jié)點(diǎn)q為null,就創(chuàng)建等待節(jié)點(diǎn),//這個(gè)節(jié)點(diǎn)后面會(huì)被插入阻塞隊(duì)列
//第一次循環(huán)時(shí)一般會(huì)執(zhí)行到
else if (q == null)
q = new WaitNode();
//一般第二次循環(huán)時(shí)會(huì)執(zhí)行到
//大概的含義為:判斷queued,即是否入隊(duì)列成功,//這里是將創(chuàng)建的線程節(jié)點(diǎn)q加入隊(duì)列頭
//使用Unsafe的CAS方法,對(duì)waiters進(jìn)行賦值,//waiters也是一個(gè)WaitNode節(jié)點(diǎn),
//相當(dāng)于隊(duì)列頭,
//或者理解為隊(duì)列的頭指針,通過WaitNode可以遍歷整個(gè)阻塞隊(duì)列//(頭插法)
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
//設(shè)置了超時(shí)的處理機(jī)制:設(shè)置超時(shí)時(shí)間之后,//調(diào)用get()的線程最多阻塞nanos 納秒,
//就會(huì)從阻塞狀態(tài)醒過來。
//如果最終真的超時(shí)的話,就移除 調(diào)用get()方法的線程wait結(jié)點(diǎn),//并返回state
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
//否則就進(jìn)入堵塞等待狀態(tài)(nanos納秒),等待被喚醒
LockSupport.parkNanos(this, nanos);
}
else
//沒有設(shè)置超時(shí)時(shí)間但任務(wù)又還沒執(zhí)行出結(jié)果,就直接進(jìn)入阻塞狀態(tài),
//等待被其他線程喚醒
LockSupport.park(this);
}
}
至此,對(duì)于FutureTask的核心源碼剖析我們已經(jīng)擼完了,當(dāng)然啦,還有像cancel()方法,即取消任務(wù)的執(zhí)行就留給各位看官老爺們的研讀了(很簡(jiǎn)單,通過CAS機(jī)制判斷任務(wù)的運(yùn)行狀態(tài) 以及 mayInterruptIfRunning 參數(shù)決定最終是否可以中斷該任務(wù)的執(zhí)行線程,同時(shí)也改變了任務(wù)的運(yùn)行狀態(tài): New-> INTERRUPTING->NORMAL ; New-> CANCELLED):
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
像finishCompletion()等方法的源碼解讀也是如此,就留給各位小伙伴細(xì)品了!本文我們就到這里吧!OK,打完收工,下期還有個(gè)高級(jí)案例實(shí)戰(zhàn),即“池化思想的設(shè)計(jì)與實(shí)戰(zhàn)”,我們下期再見?。?!
總結(jié):
(1)代碼下載:關(guān)注“程序員實(shí)戰(zhàn)基地”微信公眾號(hào)(掃描下圖微信公眾號(hào)即可),回復(fù)“100”,即可獲取代碼下載鏈接;歡迎關(guān)注debug的技術(shù)公眾號(hào)一起學(xué)習(xí)干貨技術(shù)吧!