Java并發(fā)編程(1)- Callable、Future和FutureTask

作者: 修羅debug
版權聲明:本文為博主原創(chuàng)文章,遵循 CC 4.0 by-sa 版權協議,轉載請附上原文出處鏈接和本聲明。




擼過JavaSE(即Java基礎技術棧)的小伙伴都知道,實現多線程有兩種方式,一種是繼承Thread,extends Thread 然后實現其中的run()方法;另外一種是實現Runnable接口,即implements Runnable,然后實現其中的run()方法;仔細觀察這兩種方式,會發(fā)現這兩者都不能返回線程異步執(zhí)行完的結果,但在實際項目開發(fā)中卻偶爾需要獲取其中的返回結果,咋辦嘞?于是乎CallableFuture就排上用場了,本文我們將對其做一番詳盡的介紹!

  還是先介紹下多線程的傳統(tǒng)實現方式吧,如下代碼所示:

public class ThreadUtil {
public static void main(String[] args) throws Exception{
Thread thread=new Thread(new Runnable() {
@Override
public void run() {
System.out.println("---子線程正在執(zhí)行---"+Thread.currentThread().getName());
Map<String,Object> dataMap=Maps.newHashMap();
dataMap.put("id",10010);
dataMap.put("name","steadyjack");
dataMap.put("nickName","多隆");
System.out.println("---子線程執(zhí)行后得到的結果:"+dataMap);
}
});
try {
thread.start();
}catch (Exception e){
e.printStackTrace();
}
System.out.println("---主線程正在執(zhí)行---"+Thread.currentThread().getName());
}
}

  在上述代碼中,我們首先通過一個實現Runnable接口的匿名實現類創(chuàng)建了一個線程對象實例,即thread,并編寫實現了其中run()方法的代碼邏輯(而這就是該線程要執(zhí)行的任務),主要是構造一個私有變量Map<String,Object>,并將相關的數據塞入進去。

  點擊運行該代碼后,顯而易見可以預測其運行結果:   


  從上述編寫的代碼以及運行結果來看,會發(fā)現如果我們想獲取得到dataMap的內容是很困難的,因為run()方法的返回值為void;當然啦,也不是完全沒有辦法,在上面的條件下,如果想要獲取到dataMap并做進一步的操作的話,則可以將dataMap定義為全局的共享變量,或者使用線程通信的方式來達到效果,如下所示為通過共享全局變量的方式:   

public class ThreadUtil {
private static final Map<String,Object> dataMap=Maps.newHashMap();

………
}

  之后就可以在該類的其他地方使用了!

  但這種方式有個很明顯的弊端,那就是多線程共享、并發(fā)訪問可能會出現安全性問題,即如果開啟10個線程,每個線程需要對dataMap里頭的key,即id 1,在高并發(fā)的情況下其最終的運行效果很可能不一定是 10020 (因為初始值為10010,每個線程加1次,10個線程下來就是加10次,理想情況下為10020),如下圖所示:


  但有時候我們在項目里頭既要用到異步(為了解耦)、也想要獲取異步執(zhí)行的結果,可以說是“魚和熊掌皆想兼得”:


  于是乎這個重任就落到了CallableFuture身上了,這是JDK1.5版本開始就已經提供了,可以通過它們實現在任務異步執(zhí)行完畢之后得到任務的執(zhí)行結果。

  看到這里,可能有些小伙伴會發(fā)問:為什么通過Callable就可以獲取到線程異步執(zhí)行的結果呢?這一切還得回歸到源碼身上,如下所示為Callable的定義:   

public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}

  會發(fā)現它跟Runnable一樣都是接口,但區(qū)別在于創(chuàng)建Callable時可以傳入一個泛型V,而這個泛型類型V會發(fā)現真是call()方法執(zhí)行后返回的結果(call()方法的作用類似于run()方法,反正都是指一個線程要執(zhí)行的任務),OK,到此謎底就解開了!

  那么怎么使用Callable呢?在Java里面可以通過調用ExecutorService類里面的相關API來使用Callable,如下圖所示:


  仔細觀察上圖,會發(fā)現如果想要獲取線程執(zhí)行Callable類型任務后的結果時,需要通過Future進行獲取,那么Future為何物呢?

  Future,也是一個接口,可以對具體的Runnable或者Callable任務的執(zhí)行結果進行取消、查詢是否完成、獲取結果,必要時可以通過get()方法獲取執(zhí)行結果,該方法會阻塞直到任務返回結果,如下圖所示:


  從上圖中可以得知,在Future接口中聲明了5個方法,下面依次解釋每個方法的作用:

1cancel():該方法用來取消任務,如果取消任務成功則返回true,如果取消任務失敗則返回false;方法里的參數mayInterruptIfRunning表示是否允許取消正在執(zhí)行卻沒有執(zhí)行完畢的任務,如果設置true,則表示可以取消正在執(zhí)行過程中的任務;如果任務已經完成,則無論mayInterruptIfRunningtrue還是false,此方法肯定返回false,即如果取消已經完成的任務會返回false;如果任務正在執(zhí)行,若mayInterruptIfRunning設置為true,則返回true,若mayInterruptIfRunning設置為false,則返回false;如果任務還沒有執(zhí)行,則無論mayInterruptIfRunningtrue還是false,肯定返回true。

2isCancelled():該方法表示任務是否被取消成功,如果在任務正常完成前被取消成功,則返回 true;

3isDone():該方法表示任務是否已經完成,若任務已經完成,則返回true;

4get():該方法用來獲取線程的執(zhí)行結果,這個方法會產生阻塞,會一直等到任務執(zhí)行完畢才返回;

5get(long timeout, TimeUnit unit):用來獲取執(zhí)行結果,如果在指定時間內,還沒獲取到結果,就直接返回null

  綜上所述,Future提供了三種功能:判斷任務是否完成;可以中斷任務;可以獲取任務的執(zhí)行結 果;

  實踐是檢驗整理的唯一標準,我們還是需要編寫一定的代碼進行驗證,如下代碼所示我們先定義一個線程實現類:   

public class ProductThread implements Callable<Map<String,Object>>{
private Map<String,Object> dataMap;

@Override
public Map<String, Object> call() throws Exception {
System.out.println("---子線程在執(zhí)行任務---");
Thread.sleep(3000);

dataMap=Maps.newHashMap();
dataMap.put("id",10010);
dataMap.put("name","steadyjack");
dataMap.put("nickName","多隆");
return dataMap;
}
}

  然后通過ExecutorService調用相應的API執(zhí)行該任務,如下代碼所示:   

public class ThreadUtil {
private static final Map<String,Object> dataMap=Maps.newHashMap();

public static void main(String[] args) throws Exception{
ArrayBlockingQueue queue=new ArrayBlockingQueue(2);
ExecutorService executorService=new ThreadPoolExecutor(2,4,1, TimeUnit.MINUTES,queue);
Future<Map<String,Object>> future=executorService.submit(new ProductThread());
executorService.shutdown();
System.out.println("---主線程在執(zhí)行任務---");

Map<String,Object> map=future.get();
System.out.println("子線程執(zhí)行結果:"+map);
}
}

  點擊運行代碼后即可得到最終的運行效果,如下圖所示:   


  從該運行結果中可以得知,當調用executorService.submit()方法時,主線程main會開啟一個異步的子線程去執(zhí)行ProductThread中的任務,然后繼續(xù)往后面的代碼走,即繼續(xù)執(zhí)行executorService.shutdown()System.out.println("---主線程在執(zhí)行任務---");等代碼;最后是通過future.get()獲取線程最終異步執(zhí)行返回的結果。

  至此我們已經通過Callable + Future組合實現兩個目的:

1)開啟異步的線程執(zhí)行相應的任務(解耦;提高系統(tǒng)吞吐量,提高資源利用率)

2)可以獲取到異步執(zhí)行后的結果

  有了這兩大利器,其實在實際項目開發(fā)中就已經夠用了,但JDK的開源者很用心,還提供了另外一大利器FutureTask,那么為啥要有這玩意呢?很簡單,因為Future是一個接口,所以無法直接通過new來創(chuàng)建對象,因此就有了FutureTask;我們先來看下它的定義吧:   

public class FutureTask<V> implements RunnableFuture<V> {}

  它實現了RunnableFuture接口,而RunnableFuture接口的定義如下所示:   

public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}

  從中可以看出RunnableFuture繼承了Runnable接口和Future接口,而FutureTask實現了RunnableFuture接口,所以它既可以作為Runnable被線程執(zhí)行,又可以作為Future得到Callable的返回值,想想都覺得確實挺牛逼:


  話不多說,直接上代碼吧:   

ArrayBlockingQueue queue=new ArrayBlockingQueue(2);
ExecutorService executorService=new ThreadPoolExecutor(2,4,1, TimeUnit.MINUTES,queue);
FutureTask<Map<String,Object>> futureTask=new FutureTask<Map<String, Object>>(new ProductThread());
executorService.execute(futureTask);
Map<String,Object> resMap=futureTask.get();
executorService.shutdown();
System.out.println("---主線程正在執(zhí)行任務---"+Thread.currentThread().getName());

System.out.println("--子線程執(zhí)行任務后得到的結果:"+resMap);

  運行結果如下圖所示:


  從上述該源代碼中可以看出:
1futureTask可以作為Runnable被執(zhí)行:executorService.execute(futureTask);

2)也可以當做Future獲取線程異步執(zhí)行后的結果:futureTask.get();

總結:

1)代碼下載:關注“程序員實戰(zhàn)基地”微信公眾號(掃描下圖微信公眾號即可),回復“100”,即可獲取代碼下載鏈接;至此,我們已經介紹完了Callable、Future以及FutureTask相關基礎特性,下一篇我們將重點再詳細地介紹下FutureTask更層次的東西,歡迎關注debug的技術公眾號一起學習干貨技術吧!


我是debug,一個相信技術改變生活、技術成就夢想 的攻城獅;如果本文對你有幫助,請關注公眾號,并動動手指點贊、收藏以及轉發(fā),你的三連可是debug分享的動力哦