Java并發(fā)編程(1)- Callable、Future和FutureTask

作者: 修羅debug
版權(quán)聲明:本文為博主原創(chuàng)文章,遵循 CC 4.0 by-sa 版權(quán)協(xié)議,轉(zhuǎn)載請(qǐng)附上原文出處鏈接和本聲明。




擼過(guò)JavaSE(即Java基礎(chǔ)技術(shù)棧)的小伙伴都知道,實(shí)現(xiàn)多線程有兩種方式,一種是繼承Thread,extends Thread 然后實(shí)現(xiàn)其中的run()方法;另外一種是實(shí)現(xiàn)Runnable接口,即implements Runnable,然后實(shí)現(xiàn)其中的run()方法;仔細(xì)觀察這兩種方式,會(huì)發(fā)現(xiàn)這兩者都不能返回線程異步執(zhí)行完的結(jié)果,但在實(shí)際項(xiàng)目開(kāi)發(fā)中卻偶爾需要獲取其中的返回結(jié)果,咋辦嘞?于是乎CallableFuture就排上用場(chǎng)了,本文我們將對(duì)其做一番詳盡的介紹!

  還是先介紹下多線程的傳統(tǒng)實(shí)現(xiàn)方式吧,如下代碼所示:

public class ThreadUtil {
public static void main(String[] args) throws Exception{
Thread thread=new Thread(new Runnable() {
@Override
public void run() {
System.out.println("---子線程正在執(zhí)行---"+Thread.currentThread().getName());
Map<String,Object> dataMap=Maps.newHashMap();
dataMap.put("id",10010);
dataMap.put("name","steadyjack");
dataMap.put("nickName","多隆");
System.out.println("---子線程執(zhí)行后得到的結(jié)果:"+dataMap);
}
});
try {
thread.start();
}catch (Exception e){
e.printStackTrace();
}
System.out.println("---主線程正在執(zhí)行---"+Thread.currentThread().getName());
}
}

  在上述代碼中,我們首先通過(guò)一個(gè)實(shí)現(xiàn)Runnable接口的匿名實(shí)現(xiàn)類創(chuàng)建了一個(gè)線程對(duì)象實(shí)例,即thread,并編寫(xiě)實(shí)現(xiàn)了其中run()方法的代碼邏輯(而這就是該線程要執(zhí)行的任務(wù)),主要是構(gòu)造一個(gè)私有變量Map<String,Object>,并將相關(guān)的數(shù)據(jù)塞入進(jìn)去。

  點(diǎn)擊運(yùn)行該代碼后,顯而易見(jiàn)可以預(yù)測(cè)其運(yùn)行結(jié)果:   


  從上述編寫(xiě)的代碼以及運(yùn)行結(jié)果來(lái)看,會(huì)發(fā)現(xiàn)如果我們想獲取得到dataMap的內(nèi)容是很困難的,因?yàn)?span lang="EN-US">run()方法的返回值為void;當(dāng)然啦,也不是完全沒(méi)有辦法,在上面的條件下,如果想要獲取到dataMap并做進(jìn)一步的操作的話,則可以將dataMap定義為全局的共享變量,或者使用線程通信的方式來(lái)達(dá)到效果,如下所示為通過(guò)共享全局變量的方式:   

public class ThreadUtil {
private static final Map<String,Object> dataMap=Maps.newHashMap();

………
}

  之后就可以在該類的其他地方使用了!

  但這種方式有個(gè)很明顯的弊端,那就是多線程共享、并發(fā)訪問(wèn)可能會(huì)出現(xiàn)安全性問(wèn)題,即如果開(kāi)啟10個(gè)線程,每個(gè)線程需要對(duì)dataMap里頭的key,即id 1,在高并發(fā)的情況下其最終的運(yùn)行效果很可能不一定是 10020 (因?yàn)槌跏贾禐?span lang="EN-US">10010,每個(gè)線程加1次,10個(gè)線程下來(lái)就是加10次,理想情況下為10020),如下圖所示:


  但有時(shí)候我們?cè)陧?xiàng)目里頭既要用到異步(為了解耦)、也想要獲取異步執(zhí)行的結(jié)果,可以說(shuō)是“魚(yú)和熊掌皆想兼得”:


  于是乎這個(gè)重任就落到了CallableFuture身上了,這是JDK1.5版本開(kāi)始就已經(jīng)提供了,可以通過(guò)它們實(shí)現(xiàn)在任務(wù)異步執(zhí)行完畢之后得到任務(wù)的執(zhí)行結(jié)果。

  看到這里,可能有些小伙伴會(huì)發(fā)問(wèn):為什么通過(guò)Callable就可以獲取到線程異步執(zhí)行的結(jié)果呢?這一切還得回歸到源碼身上,如下所示為Callable的定義:   

public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}

  會(huì)發(fā)現(xiàn)它跟Runnable一樣都是接口,但區(qū)別在于創(chuàng)建Callable時(shí)可以傳入一個(gè)泛型V,而這個(gè)泛型類型V會(huì)發(fā)現(xiàn)真是call()方法執(zhí)行后返回的結(jié)果(call()方法的作用類似于run()方法,反正都是指一個(gè)線程要執(zhí)行的任務(wù)),OK,到此謎底就解開(kāi)了!

  那么怎么使用Callable呢?在Java里面可以通過(guò)調(diào)用ExecutorService類里面的相關(guān)API來(lái)使用Callable,如下圖所示:


  仔細(xì)觀察上圖,會(huì)發(fā)現(xiàn)如果想要獲取線程執(zhí)行Callable類型任務(wù)后的結(jié)果時(shí),需要通過(guò)Future進(jìn)行獲取,那么Future為何物呢?

  Future,也是一個(gè)接口,可以對(duì)具體的Runnable或者Callable任務(wù)的執(zhí)行結(jié)果進(jìn)行取消、查詢是否完成、獲取結(jié)果,必要時(shí)可以通過(guò)get()方法獲取執(zhí)行結(jié)果,該方法會(huì)阻塞直到任務(wù)返回結(jié)果,如下圖所示:


  從上圖中可以得知,在Future接口中聲明了5個(gè)方法,下面依次解釋每個(gè)方法的作用:

1cancel():該方法用來(lái)取消任務(wù),如果取消任務(wù)成功則返回true,如果取消任務(wù)失敗則返回false;方法里的參數(shù)mayInterruptIfRunning表示是否允許取消正在執(zhí)行卻沒(méi)有執(zhí)行完畢的任務(wù),如果設(shè)置true,則表示可以取消正在執(zhí)行過(guò)程中的任務(wù);如果任務(wù)已經(jīng)完成,則無(wú)論mayInterruptIfRunningtrue還是false,此方法肯定返回false,即如果取消已經(jīng)完成的任務(wù)會(huì)返回false;如果任務(wù)正在執(zhí)行,若mayInterruptIfRunning設(shè)置為true,則返回true,若mayInterruptIfRunning設(shè)置為false,則返回false;如果任務(wù)還沒(méi)有執(zhí)行,則無(wú)論mayInterruptIfRunningtrue還是false,肯定返回true。

2isCancelled():該方法表示任務(wù)是否被取消成功,如果在任務(wù)正常完成前被取消成功,則返回 true;

3isDone():該方法表示任務(wù)是否已經(jīng)完成,若任務(wù)已經(jīng)完成,則返回true;

4get():該方法用來(lái)獲取線程的執(zhí)行結(jié)果,這個(gè)方法會(huì)產(chǎn)生阻塞,會(huì)一直等到任務(wù)執(zhí)行完畢才返回;

5get(long timeout, TimeUnit unit):用來(lái)獲取執(zhí)行結(jié)果,如果在指定時(shí)間內(nèi),還沒(méi)獲取到結(jié)果,就直接返回null。

  綜上所述,Future提供了三種功能:判斷任務(wù)是否完成;可以中斷任務(wù);可以獲取任務(wù)的執(zhí)行結(jié) 果;

  實(shí)踐是檢驗(yàn)整理的唯一標(biāo)準(zhǔn),我們還是需要編寫(xiě)一定的代碼進(jìn)行驗(yàn)證,如下代碼所示我們先定義一個(gè)線程實(shí)現(xiàn)類:   

public class ProductThread implements Callable<Map<String,Object>>{
private Map<String,Object> dataMap;

@Override
public Map<String, Object> call() throws Exception {
System.out.println("---子線程在執(zhí)行任務(wù)---");
Thread.sleep(3000);

dataMap=Maps.newHashMap();
dataMap.put("id",10010);
dataMap.put("name","steadyjack");
dataMap.put("nickName","多隆");
return dataMap;
}
}

  然后通過(guò)ExecutorService調(diào)用相應(yīng)的API執(zhí)行該任務(wù),如下代碼所示:   

public class ThreadUtil {
private static final Map<String,Object> dataMap=Maps.newHashMap();

public static void main(String[] args) throws Exception{
ArrayBlockingQueue queue=new ArrayBlockingQueue(2);
ExecutorService executorService=new ThreadPoolExecutor(2,4,1, TimeUnit.MINUTES,queue);
Future<Map<String,Object>> future=executorService.submit(new ProductThread());
executorService.shutdown();
System.out.println("---主線程在執(zhí)行任務(wù)---");

Map<String,Object> map=future.get();
System.out.println("子線程執(zhí)行結(jié)果:"+map);
}
}

  點(diǎn)擊運(yùn)行代碼后即可得到最終的運(yùn)行效果,如下圖所示:   


  從該運(yùn)行結(jié)果中可以得知,當(dāng)調(diào)用executorService.submit()方法時(shí),主線程main會(huì)開(kāi)啟一個(gè)異步的子線程去執(zhí)行ProductThread中的任務(wù),然后繼續(xù)往后面的代碼走,即繼續(xù)執(zhí)行executorService.shutdown()System.out.println("---主線程在執(zhí)行任務(wù)---");等代碼;最后是通過(guò)future.get()獲取線程最終異步執(zhí)行返回的結(jié)果。

  至此我們已經(jīng)通過(guò)Callable + Future組合實(shí)現(xiàn)兩個(gè)目的:

1)開(kāi)啟異步的線程執(zhí)行相應(yīng)的任務(wù)(解耦;提高系統(tǒng)吞吐量,提高資源利用率)

2)可以獲取到異步執(zhí)行后的結(jié)果

  有了這兩大利器,其實(shí)在實(shí)際項(xiàng)目開(kāi)發(fā)中就已經(jīng)夠用了,但JDK的開(kāi)源者很用心,還提供了另外一大利器FutureTask,那么為啥要有這玩意呢?很簡(jiǎn)單,因?yàn)?span lang="EN-US">Future是一個(gè)接口,所以無(wú)法直接通過(guò)new來(lái)創(chuàng)建對(duì)象,因此就有了FutureTask;我們先來(lái)看下它的定義吧:   

public class FutureTask<V> implements RunnableFuture<V> {}

  它實(shí)現(xiàn)了RunnableFuture接口,而RunnableFuture接口的定義如下所示:   

public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}

  從中可以看出RunnableFuture繼承了Runnable接口和Future接口,而FutureTask實(shí)現(xiàn)了RunnableFuture接口,所以它既可以作為Runnable被線程執(zhí)行,又可以作為Future得到Callable的返回值,想想都覺(jué)得確實(shí)挺牛逼:


  話不多說(shuō),直接上代碼吧:   

ArrayBlockingQueue queue=new ArrayBlockingQueue(2);
ExecutorService executorService=new ThreadPoolExecutor(2,4,1, TimeUnit.MINUTES,queue);
FutureTask<Map<String,Object>> futureTask=new FutureTask<Map<String, Object>>(new ProductThread());
executorService.execute(futureTask);
Map<String,Object> resMap=futureTask.get();
executorService.shutdown();
System.out.println("---主線程正在執(zhí)行任務(wù)---"+Thread.currentThread().getName());

System.out.println("--子線程執(zhí)行任務(wù)后得到的結(jié)果:"+resMap);

  運(yùn)行結(jié)果如下圖所示:


  從上述該源代碼中可以看出:
1futureTask可以作為Runnable被執(zhí)行:executorService.execute(futureTask);

2)也可以當(dāng)做Future獲取線程異步執(zhí)行后的結(jié)果:futureTask.get();

總結(jié):

1)代碼下載:關(guān)注“程序員實(shí)戰(zhàn)基地”微信公眾號(hào)(掃描下圖微信公眾號(hào)即可),回復(fù)“100”,即可獲取代碼下載鏈接;至此,我們已經(jīng)介紹完了Callable、Future以及FutureTask相關(guān)基礎(chǔ)特性,下一篇我們將重點(diǎn)再詳細(xì)地介紹下FutureTask更層次的東西,歡迎關(guān)注debug的技術(shù)公眾號(hào)一起學(xué)習(xí)干貨技術(shù)吧!


我是debug,一個(gè)相信技術(shù)改變生活、技術(shù)成就夢(mèng)想 的攻城獅;如果本文對(duì)你有幫助,請(qǐng)關(guān)注公眾號(hào),并動(dòng)動(dòng)手指點(diǎn)贊、收藏以及轉(zhuǎn)發(fā),你的三連可是debug分享的動(dòng)力哦