并發(fā)編程 14:CompletableFuture異步編程沒(méi)有那么難
以下文章來(lái)源于七哥聊編程 ,作者七哥
大家好,我是七哥,今天給大家分享一個(gè)非常強(qiáng)大的工具類(lèi):CompletableFuture,如果你平時(shí)也會(huì)遇到用多線(xiàn)程優(yōu)化業(yè)務(wù)邏輯的場(chǎng)景,那么今天這篇文章我建議你讀完,相信一定會(huì)讓你在重構(gòu)相關(guān)代碼時(shí)得心應(yīng)手,寫(xiě)出讓人稱(chēng)贊的好代碼,不過(guò)使用CompletableFuture的前提是JDK需要1.8以上哦~
那我們下面進(jìn)入今天的正文。
項(xiàng)目中串行調(diào)用的例子
并行調(diào)用的幾種實(shí)現(xiàn)
CompletableFuture解析
JDK8流程編程結(jié)合
總結(jié)
前言
在Java開(kāi)發(fā)的web項(xiàng)目中,我們經(jīng)常會(huì)遇到接口響應(yīng)耗時(shí)過(guò)長(zhǎng),或者定時(shí)任務(wù)處理過(guò)慢,那在Java中最常見(jiàn)的解決方法就是并行了,想必大家也都不陌生了。
今天的分享主要帶大家從一個(gè)實(shí)際的串行場(chǎng)景出發(fā),如何一步步優(yōu)化,同時(shí)也會(huì)分享在Java中實(shí)現(xiàn)并行處理的多種方式,以及它們之間的區(qū)別和優(yōu)缺點(diǎn),通過(guò)對(duì)比總結(jié)更加深入的了解并且使用Java中并發(fā)編程的相關(guān)技術(shù)。
一個(gè)串行調(diào)用的例子
現(xiàn)在我們有一個(gè)查詢(xún)carrier下所有Load的接口,它需要查詢(xún)Loads信息、Instruction信息、Stops信息、Actions信息后然后組裝數(shù)據(jù)。
private List<Load> getHydratedLoads(Optional<Pageable> pageable, String predicate, List<Object> params) {
// 1. 耗時(shí)3秒
List<Load> loads = executeQuery("查詢(xún)Loads列表");
// 2. 耗時(shí)4秒
List<Instruction> instructions = executeQuery("查詢(xún)instructions列表");
// 3. 耗時(shí)2秒
List<Stop> stops = executeQuery("查詢(xún)stops列表");
// 4. 耗時(shí)3秒
List<Action> actions = executeQuery("查詢(xún)actions列表");
Multimap<String, Instruction> instructionsByLoadId = index(instructions, i -> i.getLoad().getId());
Multimap<String, Stop> stopsByLoadId = index(stops, s -> s.getLoad().getId());
Multimap<String, Action> actionsByStopId = index(actions, a -> a.getStop().getId());
// 數(shù)據(jù)處理
handle(loads,instructions,stops,actions);
return loads;
}
這段代碼會(huì)有什么問(wèn)題?其實(shí)算是一段比較正常的代碼,但是在某一個(gè)carrier下數(shù)據(jù)量比較大時(shí),sql查詢(xún)是相對(duì)較慢的,那有沒(méi)有辦法優(yōu)化一下呢?
當(dāng)前這個(gè)請(qǐng)求耗時(shí)總計(jì)就是12s。上面實(shí)現(xiàn)中查詢(xún)Load、Instruction、Stop、Action 等信息是串行的,那串行的系統(tǒng)要做性能優(yōu)化很常見(jiàn)的就是利用多線(xiàn)程并行了。
這種相互之間沒(méi)有影響的任務(wù),利用并行處理后耗時(shí)就可以?xún)?yōu)化為4s。
并行調(diào)用實(shí)現(xiàn)的幾種方式
因?yàn)轫?xiàng)目中多線(xiàn)程都用線(xiàn)程池,所以Thread.join()這種方式就不演示了。
1. Future+Callable
Future接口在Java 5中被引入,設(shè)計(jì)初衷是對(duì)將來(lái)某個(gè)時(shí)刻會(huì)發(fā)生的結(jié)果進(jìn)行建模。它建模了一種異步計(jì)算,返回一個(gè)執(zhí)行運(yùn)算結(jié)果的引用,當(dāng)運(yùn)算結(jié)束后,這個(gè)引用被返回給調(diào)用方。在Future中觸發(fā)那些潛在耗時(shí)的操作把調(diào)用線(xiàn)程解放出來(lái),讓調(diào)用線(xiàn)程能繼續(xù)執(zhí)行其他有價(jià)值的工作,不再需要呆呆等待耗時(shí)的操作完成。
因?yàn)槲覀兌际切枰@取任務(wù)的返回值的,所以大家肯定想到是用 Future+Callable來(lái)做。
ThreadPoolExecutor提供了3個(gè)submit方法支持我們需要獲取任務(wù)執(zhí)行結(jié)果的需求。
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
簡(jiǎn)單介紹下這三個(gè)submit方法:
提交 Runnable 任務(wù) submit(Runnable task),這個(gè)方法入?yún)⑹荝unnable接口,它只有一個(gè)run()方法沒(méi)有返回值,所以它返回的 Future 只能用來(lái)判斷任務(wù)是否結(jié)束;
提交 Callable 任務(wù) submit(Callable task),它的入?yún)⑹荂allable接口,只有一個(gè)call()方法,是有返回值的,所以可以獲取任務(wù)執(zhí)行結(jié)果;
提交 Runnable 任務(wù)及結(jié)果引用 submit(Runnable task, T result),這個(gè)方法返回的Future,調(diào)用get()方法的返回值就是傳入的result對(duì)象,一般用法就是實(shí)現(xiàn)Runnable接口時(shí),聲明一個(gè)有參構(gòu)造函數(shù),將result傳進(jìn)去,result 相當(dāng)于主線(xiàn)程和子線(xiàn)程之間的橋梁,通過(guò)它主子線(xiàn)程可以共享數(shù)據(jù)。
這三個(gè)方法的返回值都是Future接口,F(xiàn)uture 提供了5個(gè)方法:
分別是取消任務(wù)的方法 cancel()、判斷任務(wù)是否已取消的方法 isCancelled()、判斷任務(wù)是否已結(jié)束的方法 isDone()以及2 個(gè)獲得任務(wù)執(zhí)行結(jié)果的 get() 和 get(timeout, unit),其中最后一個(gè) get(timeout, unit) 支持超時(shí)機(jī)制。
需要注意的是:這兩個(gè) get() 方法都是阻塞式的,如果被調(diào)用的時(shí)候,任務(wù)還沒(méi)有執(zhí)行完,那么調(diào)用 get() 方法的線(xiàn)程會(huì)阻塞,直到任務(wù)執(zhí)行完才會(huì)被喚醒。
2. FutureTask實(shí)現(xiàn)并行調(diào)用
我們?cè)俳榻B下FutureTask工具類(lèi),這是一個(gè)實(shí)實(shí)在在的工具類(lèi),有兩個(gè)構(gòu)造函數(shù),和上面類(lèi)似,一看就明白了。
FutureTask(Callable callable);
FutureTask(Runnable runnable, V result);
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
private static final class RunnableAdapter<T> implements Callable<T> {
private final Runnable task;
private final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
public String toString() {
return super.toString() + "[Wrapped task = " + task + "]";
}
}
這個(gè)類(lèi)實(shí)現(xiàn)了 Runnable 和 Future 接口,可以理解就是將任務(wù)和結(jié)果結(jié)合起來(lái)了,變成一個(gè)可以有響應(yīng)結(jié)果的任務(wù)進(jìn)行提交,本質(zhì)上FutureTask里面封裝的還是一個(gè)Callable接口,它實(shí)現(xiàn)可以有返回值就是因?yàn)樗膔un方法里面調(diào)用了Callable的call()方法,將結(jié)果賦值給result,然后返回。
下面我們看下如何優(yōu)化我們上面的查詢(xún)接口,實(shí)現(xiàn)并行查詢(xún):
private List<Load> getHydratedLoadsUsingFutureTask() throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
FutureTask<List<Load>> queryLoadFutureTask = new FutureTask<>(() -> executeQuery("sql1"));
executorService.submit(queryLoadFutureTask);
FutureTask<List<Instruction>> queryInstructionFutureTask = new FutureTask<>(() -> executeQuery("sql2"));
executorService.submit(queryInstructionFutureTask);
FutureTask<List<Stop>> queryStopFutureTask = new FutureTask<>(() -> executeQuery("sql3"));
executorService.submit(queryStopFutureTask);
FutureTask<List<Action>> queryActionFutureTask = new FutureTask<>(() -> executeQuery("sql4"));
executorService.submit(queryActionFutureTask);
// 獲取結(jié)果
List<Load> loads = queryLoadFutureTask.get();
List<Instruction> instructions = queryInstructionFutureTask.get();
List<Stop> stops = queryStopFutureTask.get();
List<Action> actions = queryActionFutureTask.get();
// We got all the entities we need, so now let's fill in all of their references to each other.
handleData(loads, instructions, stops, actions);
return loads;
}
那你可能會(huì)想到,如果任務(wù)之間有依賴(lài)關(guān)系,比如當(dāng)前任務(wù)依賴(lài)前一個(gè)任務(wù)的執(zhí)行結(jié)果,該怎么處理呢?
這種問(wèn)題基本上也都可以用 Future 來(lái)解決,但是需要將對(duì)應(yīng)的 FutureTask傳入到當(dāng)前任務(wù)中,然后調(diào)用get()方法即可。
比如,我們創(chuàng)建了兩個(gè) FutureTask——ft1 和 ft2,ft1 需要等待 ft2 執(zhí)行完畢后才能做最后的數(shù)據(jù)處理,所以 ft1 內(nèi)部需要引用 ft2,并在執(zhí)行數(shù)據(jù)處理前,調(diào)用 ft2 的 get() 方法實(shí)現(xiàn)等待。
// 創(chuàng)建任務(wù)T2的FutureTask
FutureTask<String> ft2
= new FutureTask<>(new T2Task());
// 創(chuàng)建任務(wù)T1的FutureTask
FutureTask<String> ft1
= new FutureTask<>(new T1Task(ft2));
// 線(xiàn)程T1執(zhí)行任務(wù)ft1
Thread T1 = new Thread(ft1);
T1.start();
// 線(xiàn)程T2執(zhí)行任務(wù)ft2
Thread T2 = new Thread(ft2);
T2.start();
// 等待線(xiàn)程T1執(zhí)行結(jié)果
System.out.println(ft1.get());
// T1Task需要執(zhí)行的任務(wù):
class T1Task implements Callable<String>{
FutureTask<String> ft2;
// T1任務(wù)需要T2任務(wù)的FutureTask
T1Task(FutureTask<String> ft2){
this.ft2 = ft2;
}
@Override
String call() throws Exception {
// 獲取T2線(xiàn)程結(jié)果
String tf = ft2.get();
return "處理完的數(shù)據(jù)結(jié)果";
}
}
// T2Task需要執(zhí)行的任務(wù):
class T2Task implements Callable<String> {
@Override
String call() throws Exception {
return "檢驗(yàn)&查詢(xún)數(shù)據(jù)";
}
}
通過(guò)這上面的的例子,我們明顯的發(fā)現(xiàn) Future 實(shí)現(xiàn)異步編程時(shí)的一些不足之處:
Future 對(duì)于結(jié)果的獲取很不方便,只能通過(guò) get() 方法阻塞或者輪詢(xún)的方式得到任務(wù)的結(jié)果。阻塞的方式顯然是效率低下的,輪詢(xún)的方式又十分耗費(fèi)CPU資源,如果前一個(gè)任務(wù)執(zhí)行比較耗時(shí)的話(huà),get() 方法會(huì)阻塞,形成排隊(duì)等待的情況。
將兩個(gè)異步計(jì)算合并為一個(gè),這兩個(gè)異步計(jì)算之間相互獨(dú)立,同時(shí)第二個(gè)又依賴(lài)于第一個(gè)的結(jié)果。
等待Future集合中的所有任務(wù)都完成。
僅等待Future集合中最快結(jié)束的任務(wù)完成(有可能因?yàn)樗鼈冊(cè)噲D通過(guò)不同的方式計(jì)算同一個(gè)值),并返回它的結(jié)果。
應(yīng)對(duì)Future的完成事件(即當(dāng)Future的完成事件發(fā)生時(shí)會(huì)收到通知,并能使用Future計(jì)算的結(jié)果進(jìn)行下一步的操作,不只是簡(jiǎn)單地阻塞等待操作的結(jié)果)。
我們很難表述Future結(jié)果之間的依賴(lài)性,從文字描述上這很簡(jiǎn)單。比如,下面文字描述的關(guān)系,如果用Future去實(shí)現(xiàn)時(shí)還是很復(fù)雜的。
比如:“當(dāng)長(zhǎng)時(shí)間計(jì)算任務(wù)完成時(shí),請(qǐng)將該計(jì)算的結(jié)果通知到另一個(gè)長(zhǎng)時(shí)間運(yùn)行的計(jì)算任務(wù),這兩個(gè)計(jì)算任務(wù)都完成后,將計(jì)算的結(jié)果與另一個(gè)查詢(xún)操作結(jié)果合并”
在JDK8中引入了CompletableFuture,對(duì)Future進(jìn)行了改進(jìn),可以在定義CompletableFuture時(shí)傳入回調(diào)對(duì)象,任務(wù)在完成或者異常時(shí),自動(dòng)回調(diào),再也不需要每次主動(dòng)通過(guò) Future 去詢(xún)問(wèn)結(jié)果了,我們接著往下看。
3. CompletableFuture
Java 在 1.8 版本提供了 CompletableFuture 來(lái)支持異步編程,CompletableFuture 類(lèi)實(shí)現(xiàn)了CompletionStage 和 Future 接口,提供了非常強(qiáng)大的 Future 的擴(kuò)展功能,可以幫助我們簡(jiǎn)化異步編程的復(fù)雜性,提供了函數(shù)式編程的能力,可以通過(guò) 完成時(shí)回調(diào) 的方式處理計(jì)算結(jié)果,并且提供了 轉(zhuǎn)換和組合 CompletableFuture 的方法。
Callable,有結(jié)果的同步行為,比如做飯,就能產(chǎn)出一盤(pán)菜;
Runnable,無(wú)結(jié)果的同步行為,比如吃飯,僅僅就是吃就完事了;
Future,異步封裝 Callable/Runnable,比如委托給你媳婦去做飯(其他線(xiàn)程);
CompletableFuture,封裝Future,使其擁有回調(diào)功能,比如讓你媳婦做好飯了,主動(dòng)告訴你做好了;
為了體會(huì)到 CompletableFuture 異步編程的優(yōu)勢(shì),我們還是先用 CompletableFuture 重新實(shí)現(xiàn)前面的程序。
public static List<Load> getHydratedLoadsUsingCompletableFuture()
throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
try {
// 任務(wù)1:查詢(xún)loads列表
CompletableFuture<List<Load>> queryLoads = CompletableFuture.supplyAsync(() -> executeQuery("sql1"), executorService);
// 任務(wù)2:查詢(xún)instructions列表
CompletableFuture<List<Instruction>> queryInstructions = CompletableFuture.supplyAsync(() -> executeQuery("sql2"),
executorService);
// 任務(wù)3:查詢(xún)stops列表
CompletableFuture<List<Stop>> queryStops = CompletableFuture.supplyAsync(() -> executeQuery("sql3"), executorService);
// 任務(wù)4:查詢(xún)actions列表
CompletableFuture<List<Action>> queryActions = CompletableFuture.supplyAsync(() -> executeQuery("sql4"),
executorService);
// 任務(wù)1,2,3,4執(zhí)行完成后執(zhí)行數(shù)據(jù)組裝
CompletableFuture<Void> combineFuture = CompletableFuture.allOf(queryLoads,
queryInstructions,
queryStops,
queryActions)
.thenRun(() -> handleData(queryLoads.join(), queryInstructions.join(), queryStops.join(), queryActions.join()));
System.out.println(Thread.currentThread().getName() + ": 主線(xiàn)程執(zhí)行到這里了");
combineFuture.get();
System.out.println(String.format("""
queryLoads: %s ,queryInstructions: %s ,queryStops: %s ,queryActions: %s
""", queryLoads.isDone(), queryInstructions.isDone(), queryStops.isDone(), queryActions.isDone()));
return queryLoads.get();
} finally {
executorService.shutdown();
}
}
通過(guò)上面的代碼我們可以發(fā)現(xiàn) CompletableFuture 有以下優(yōu)勢(shì):
無(wú)需手工維護(hù)線(xiàn)程,省去了手工提交任務(wù)到線(xiàn)程池這一步;
語(yǔ)義更清晰,例如 CompletableFuture.allOf(f1,f2,f3,f4) 能夠清晰地表述“需要等指定的4個(gè)任務(wù)都完成才能執(zhí)行后續(xù)的任務(wù)”;
代碼更簡(jiǎn)練并且專(zhuān)注于業(yè)務(wù)邏輯,幾乎所有代碼都是業(yè)務(wù)邏輯相關(guān)的。
CompletableFuture 解析
1. CompletableFuture創(chuàng)建
CompletableFuture 提供了四個(gè)靜態(tài)方法來(lái)創(chuàng)建一個(gè)異步操作:
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
這四個(gè)方法區(qū)別在于:
runAsync 方法以 Runnable 函數(shù)式接口類(lèi)型為參數(shù),沒(méi)有返回結(jié)果,supplyAsync 方法以 Supplier 函數(shù)式接口類(lèi)型為參數(shù),返回結(jié)果類(lèi)型為U;
沒(méi)有指定 Executor 的方法會(huì)使用 ForkJoinPool.commonPool() 作為它的線(xiàn)程池執(zhí)行異步代碼。如果指定了線(xiàn)程池,則使用指定的線(xiàn)程池運(yùn)行。
ForkJoinPool是JDK7提供的,叫做分支/合并框架??梢酝ㄟ^(guò)將一個(gè)任務(wù)遞歸分成很多分子任務(wù),形成不同的流,進(jìn)行并行執(zhí)行,同時(shí)還伴隨著強(qiáng)大的工作竊取算法,極大的提高效率,這個(gè)不屬于今天我們討論的點(diǎn),感興趣的話(huà)可以后面再聊。
注意:如果所有 CompletableFuture 共享一個(gè)線(xiàn)程池,那么一旦有任務(wù)執(zhí)行一些很慢的 I/O 操作,就會(huì)導(dǎo)致線(xiàn)程池中所有線(xiàn)程都阻塞在 I/O 操作上,從而造成線(xiàn)程饑餓,進(jìn)而影響整個(gè)系統(tǒng)的性能。所以,建議你要根據(jù)不同的業(yè)務(wù)類(lèi)型創(chuàng)建不同的線(xiàn)程池,以避免互相干擾。
問(wèn)題:為什么supplyAsync方法接收一個(gè) Supplier 函數(shù)式接口類(lèi)型參數(shù)而不是一個(gè) Callable 類(lèi)型的參數(shù)呢?
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
@FunctionalInterface
public interface Supplier<T> {
/**
* Gets a result.
*
* @return a result
*/
T get();
}
看了接口定義,我們發(fā)現(xiàn)它們其實(shí)都是一個(gè)不接受任何參數(shù)類(lèi)型的函數(shù)式接口,在實(shí)踐中它們做的是相同的事情(定義一個(gè)業(yè)務(wù)邏輯去處理然后有返回值),但在原則上它們的目的是做不同的事情:
從語(yǔ)義上來(lái)看 Callable 是“返回結(jié)果的任務(wù)”,而 Supplier 是 “結(jié)果的供應(yīng)商”。可以理解為 Callable 引用了一個(gè)未執(zhí)行的工作單元,Supplier 引用了一個(gè)未知的值。側(cè)重點(diǎn)可能不一樣,如果關(guān)心的是提供一個(gè)什么值而不關(guān)心具體做了啥工作使用 Supplier 感覺(jué)更合適。例如,ExecutorService 與 Callable一起工作,因?yàn)樗闹饕康氖菆?zhí)行工作單元。CompletableFuture 使用 Supplier,因?yàn)樗魂P(guān)心提供的值,而不太關(guān)心可能需要做多少工作。
兩個(gè)接口定義之間的一個(gè)基本區(qū)別是,Callable允許從其實(shí)現(xiàn)中拋出檢查異常,而Supplier不允許。
2. 理解CompletionStage接口
通過(guò)接口的繼承關(guān)系,我們可以發(fā)現(xiàn)這里的異步操作到底什么時(shí)候結(jié)束、結(jié)果如何獲取,都可以通過(guò) Future接口來(lái)解決。
另外 CompletableFuture 類(lèi)還實(shí)現(xiàn)了 CompletionStage 接口,這個(gè)接口就比較關(guān)鍵了,之所以能實(shí)現(xiàn)響應(yīng)式編程,都是通過(guò)這個(gè)接口提供的方法。
下面介紹下 CompletionStage 接口,看字面意思可以理解為“完成動(dòng)作的一個(gè)階段”,官方注釋文檔:CompletionStage 是一個(gè)可能執(zhí)行異步計(jì)算的“階段”,這個(gè)階段會(huì)在另一個(gè) CompletionStage 完成時(shí)調(diào)用去執(zhí)行動(dòng)作或者計(jì)算,一個(gè) CompletionStage 會(huì)以正常完成或者中斷的形式“完成”,并且它的“完成”會(huì)觸發(fā)其他依賴(lài)的 CompletionStage 。CompletionStage 接口的方法一般都返回新的CompletionStage,因此構(gòu)成了鏈?zhǔn)降恼{(diào)用。
這個(gè)看完還是有點(diǎn)懵逼的,不清楚什么是 CompletionStage?
在Java中什么是 CompletionStage ?
一個(gè)Function、Comsumer、Supplier 或者 Runnable 都會(huì)被描述為一個(gè)CompletionStage。
stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println())
x -> square(x) 就是一個(gè) Function 類(lèi)型的 Stage,它返回了x。
x -> System.out.println(x) 就是一個(gè) Comsumer 類(lèi)型的Stage,用于接收上一個(gè)Stage的結(jié)果x。
() ->System.out.println() 就是一個(gè)Runnable類(lèi)型的Stage,既不消耗結(jié)果也不產(chǎn)生結(jié)果。
但是 CompletionStage 這里面一共有40多個(gè)方法,我們?cè)撊绾卫斫饽兀?br>
CompletionStage 接口可以清晰的描述任務(wù)之間的關(guān)系,可以分為 順序串行、并行、匯聚關(guān)系以及異常處理。
串行關(guān)系
CompletionStage 接口里面描述串行關(guān)系,主要是 thenApply、thenAccept、thenRun 和 thenCompose 這四個(gè)系列的接口。
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;
public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) ;
thenApply 系列方法里參數(shù) fn 的類(lèi)型是接口 Function,使用該函數(shù)處理上一個(gè)CompletableFuture 調(diào)用的結(jié)果,并返回一個(gè)具有處理結(jié)果的 CompletionStage 對(duì)象,這個(gè)方法既能接收參數(shù)也支持返回值,可以理解為對(duì)于結(jié)果的轉(zhuǎn)換;
thenAccept 系列方法里參數(shù) action 的類(lèi)型是接口 Consumer,這個(gè)方法雖然支持參數(shù),但卻不支持回值,可以理解為對(duì)于結(jié)果的消費(fèi);
thenRun 系列方法里 action 的參數(shù)是 Runnable,所以 action 既不能接收參數(shù)也不支持返回值,也是對(duì)于結(jié)果的一種消費(fèi),和 thenAccept 區(qū)別在于 Runnable 并不使用前一步 CompletableFuture 計(jì)算的結(jié)果;
thenCompose 的參數(shù)為一個(gè)返回 CompletableFuture 實(shí)例的函數(shù),該函數(shù)的參數(shù)是先前計(jì)算步驟的結(jié)果,和 thenApply 執(zhí)行結(jié)果類(lèi)似,區(qū)別在于會(huì)生成一個(gè)新的 CompletableFuture 返回,也可以理解為對(duì)于結(jié)果的轉(zhuǎn)換;
thenApply() 和 thenCompose() 的區(qū)別?thenApply 轉(zhuǎn)換的是泛型中的類(lèi)型,是同一個(gè)CompletableFuture,thenCompose 用來(lái)連接兩個(gè)CompletableFuture,是生成一個(gè)新的 CompletableFuture。他們都是讓 CompletableFuture 可以對(duì)返回的結(jié)果進(jìn)行后續(xù)操作,就像 Stream 一樣進(jìn)行 map 和 flatMap 的轉(zhuǎn)換。
public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> result1 = future.thenApply(param -> param + " World");
CompletableFuture<String> result2 = future.thenCompose(param -> CompletableFuture.supplyAsync(() -> param + " World"));
System.out.println(result1.get());
System.out.println(result2.get());
}
這些方法里面 Async 代表的是異步執(zhí)行 fn、consumer 或者 action。
CompletableFuture<String> f0 =
CompletableFuture.supplyAsync(
() -> "Hello World") //①
.thenApply(s -> s + " QQ") //②
.thenApply(String::toUpperCase);//③
System.out.println(f0.join());
//輸出結(jié)果
HELLO WORLD QQ
可以看一下 thenApply() 方法是如何使用的。首先通過(guò) supplyAsync() 啟動(dòng)一個(gè)異步流程,之后是兩個(gè)串行操作,整體看起來(lái)還是挺簡(jiǎn)單的。不過(guò),雖然這是一個(gè)異步流程,但任務(wù)①②③卻是串行執(zhí)行的,②依賴(lài)①的執(zhí)行結(jié)果,③依賴(lài)②的執(zhí)行結(jié)果。
CompletableFuture 中 thenApply 如何實(shí)現(xiàn)?
先看下靜態(tài)創(chuàng)建CompletableFuture的方法 supplyAsync;
//靜態(tài)方法,如果沒(méi)有傳入線(xiàn)程池,使用ForkJoinPool的common線(xiàn)程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(ASYNC_POOL, supplier);
}
static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
Supplier<U> f) {
if (f == null) throw new NullPointerException();
//新建CompletableFuture對(duì)象
CompletableFuture<U> d = new CompletableFuture<U>();
//構(gòu)造AsyncSupply對(duì)象,線(xiàn)程池提交AsyncSupply任務(wù)
e.execute(new AsyncSupply<U>(d, f));
//將CompletableFuture對(duì)象返回
return d;
}
static final class AsyncSupply<T> extends ForkJoinTask<Void>
//可以看到AsyncSupply是一個(gè)Runnable對(duì)象
implements Runnable, AsynchronousCompletionTask {
CompletableFuture<T> dep; Supplier<? extends T> fn;
AsyncSupply(CompletableFuture<T> dep, Supplier<? extends T> fn) {
this.dep = dep; this.fn = fn;
}
public final Void getRawResult() { return null; }
public final void setRawResult(Void v) {}
public final boolean exec() { run(); return false; }
public void run() {
CompletableFuture<T> d; Supplier<? extends T> f;
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null;
//CompletableFuture對(duì)象的result為空時(shí)
if (d.result == null) {
try {
//調(diào)用傳入的supplier的get方法,并將結(jié)果放入result字段
//注意:這是在線(xiàn)程池中提交的,所以是異步處理的
d.completeValue(f.get());
} catch (Throwable ex) {
d.completeThrowable(ex);
}
}
//處理完當(dāng)前方法后,處理依賴(lài)它的棧頂方法,后面的回調(diào)方法入棧和這塊呼應(yīng)
d.postComplete();
}
}
}
final void postComplete() {
// 變量f存儲(chǔ)的是當(dāng)前已經(jīng)完成的CompletableFuture
CompletableFuture<?> f = this; Completion h;
while ((h = f.stack) != null ||
(f != this && (h = (f = this).stack) != null)) {
CompletableFuture<?> d; Completion t;
// CAS操作,將依賴(lài)此階段的棧頂元素取出,并且設(shè)置為下一個(gè)
if (STACK.compareAndSet(f, h, t = h.next)) {
if (t != null) {
if (f != this) {
//如果f不是this,將剛出棧的h入this的棧頂
pushStack(h);
continue;
}
// 將h剝離出來(lái),h.next=null,幫助gc
NEXT.compareAndSet(h, t, null); // try to detach
}
//調(diào)用tryFire
f = (d = h.tryFire(NESTED)) == null ? this : d;
}
}
}
再看下異步處理完 supplyAsync 后的回調(diào)方法 thenApply 方法,看看它是如何實(shí)現(xiàn)回調(diào)的;
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
private <V> CompletableFuture<V> uniApplyStage(
Executor e, Function<? super T,? extends V> f) {
if (f == null) throw new NullPointerException();
Object r;
// 如果當(dāng)前階段結(jié)果已經(jīng)返回,則直接運(yùn)行回調(diào)方法
if ((r = result) != null)
return uniApplyNow(r, e, f);
CompletableFuture<V> d = newIncompleteFuture();
// 構(gòu)造Completion放入等待棧的頂
unipush(new UniApply<T,V>(e, d, this, f));
return d;
}
private <V> CompletableFuture<V> uniApplyNow(
Object r, Executor e, Function<? super T,? extends V> f) {
Throwable x;
CompletableFuture<V> d = newIncompleteFuture();
// 如果依賴(lài)的方法異常中斷,則直接處理并返回異常
if (r instanceof AltResult) {
if ((x = ((AltResult)r).ex) != null) {
d.result = encodeThrowable(x, r);
return d;
}
r = null;
}
try {
// 執(zhí)行到這里說(shuō)明依賴(lài)的任務(wù)已經(jīng)有結(jié)果了,用它的結(jié)果當(dāng)作參數(shù)調(diào)用回調(diào)方法
// 注意這里都是線(xiàn)程池中的線(xiàn)程在執(zhí)行,所以是異步執(zhí)行
if (e != null) {
e.execute(new UniApply<T,V>(null, d, this, f));
} else {
@SuppressWarnings("unchecked") T t = (T) r;
d.result = d.encodeValue(f.apply(t));
}
} catch (Throwable ex) {
d.result = encodeThrowable(ex);
}
return d;
}
final void unipush(Completion c) {
if (c != null) {
// CAS自旋將回調(diào)方法壓入棧頂
while (!tryPushStack(c)) {
if (result != null) {
NEXT.set(c, null);
break;
}
}
// 可能在重試中完成,判斷result不為空就執(zhí)行
if (result != null)
c.tryFire(SYNC);
}
}
//再次嘗試判斷依賴(lài)方法是否處理完成,處理完成則調(diào)用目標(biāo)回調(diào)方法
final CompletableFuture<V> tryFire(int mode) {
CompletableFuture<V> d; CompletableFuture<T> a;
Object r; Throwable x; Function<? super T,? extends V> f;
if ((a = src) == null || (r = a.result) == null
|| (d = dep) == null || (f = fn) == null)
return null;
tryComplete: if (d.result == null) {
if (r instanceof AltResult) {
if ((x = ((AltResult)r).ex) != null) {
d.completeThrowable(x, r);
break tryComplete;
}
r = null;
}
try {
if (mode <= 0 && !claim())
return null;
else {
@SuppressWarnings("unchecked") T t = (T) r;
d.completeValue(f.apply(t));
}
} catch (Throwable ex) {
d.completeThrowable(ex);
}
}
src = null; dep = null; fn = null;
//成功處理完依賴(lài)方法和回調(diào)方法后進(jìn)行處理,可能喚醒其他的回調(diào)方法或者清理?xiàng)?br> return d.postFire(a, mode);
}
描述 AND 匯聚關(guān)系
CompletionStage 接口里面描述 AND 匯聚關(guān)系,主要是 thenCombine、thenAcceptBoth 和 runAfterBoth 系列的接口,這些接口的區(qū)別是源自 fn、consumer、action 這三個(gè)核心參數(shù)不同。
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);
public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor);
public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);
Async后綴的方法表示,前面的 CompletionStage 執(zhí)行完成,在執(zhí)行后續(xù)操作時(shí)會(huì)提交到線(xiàn)程池處理,否則就還是使用同一個(gè)處理線(xiàn)程完成CompletableFuture的所有任務(wù)。
這三種方法意思都是等兩個(gè) CompletionStage 都完成了計(jì)算才會(huì)執(zhí)行下一步的操作,區(qū)別在于參數(shù)接口類(lèi)型不一樣。
thenCombine 參數(shù)接口類(lèi)型為 BiFunction,可以拿到前一步兩個(gè) CompletionStage 的運(yùn)算結(jié)果,進(jìn)行下一步處理,同時(shí)有返回值(轉(zhuǎn)化操作);
thenAcceptBoth 參數(shù)接口類(lèi)型為 BiConsumer,也可以拿到前一步的運(yùn)算結(jié)果進(jìn)行下一步處理,但是無(wú)返回值(消費(fèi)操作);
runAfterBoth 參數(shù)接口類(lèi)型為 Runnable,即不能獲取到上一步的執(zhí)行結(jié)果,也無(wú)返回值(不關(guān)心運(yùn)行結(jié)果);
CompletableFuture 中 thenAcceptBoth 如何實(shí)現(xiàn)?talk is cheap?。?br>
public <U> CompletableFuture<Void> thenAcceptBoth(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action) {
return biAcceptStage(null, other, action);
}
private <U> CompletableFuture<Void> biAcceptStage(
Executor e, CompletionStage<U> o,
BiConsumer<? super T,? super U> f) {
CompletableFuture<U> b; Object r, s;
if (f == null || (b = o.toCompletableFuture()) == null)
throw new NullPointerException();
CompletableFuture<Void> d = newIncompleteFuture();
// 如果兩個(gè)階段有任何一個(gè)沒(méi)有執(zhí)行完成,則將回調(diào)方法分別放到兩個(gè)互相依賴(lài)階段的棧頂
if ((r = result) == null || (s = b.result) == null)
bipush(b, new BiAccept<T,U>(e, d, this, b, f));
else if (e == null)
// 如果兩個(gè)依賴(lài)的階段都執(zhí)行完成則調(diào)用回調(diào)方法
d.biAccept(r, s, f, null);
else
try {
e.execute(new BiAccept<T,U>(null, d, this, b, f));
} catch (Throwable ex) {
d.result = encodeThrowable(ex);
}
return d;
}
描述 OR 匯聚關(guān)系
OR的關(guān)系,表示誰(shuí)運(yùn)行快就用誰(shuí)的結(jié)果執(zhí)行下一步操作。
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);
public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);
public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);
同樣也是有Async后綴的表示,當(dāng)前面的 CompletionStage 執(zhí)行完成,在執(zhí)行后續(xù)操作時(shí)會(huì)提交到線(xiàn)程池處理。applyToEither、acceptEither、runAfterEither 三個(gè)方法的區(qū)別還是來(lái)自于不同的接口參數(shù)類(lèi)型:Function、Consumer、Runnable。
CompletableFuture 中 applyToEither 如何實(shí)現(xiàn)?
public <U> CompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn) {
return orApplyStage(defaultExecutor(), other, fn);
}
private <U extends T,V> CompletableFuture<V> orApplyStage(
Executor e, CompletionStage<U> o, Function<? super T, ? extends V> f) {
CompletableFuture<U> b;
if (f == null || (b = o.toCompletableFuture()) == null)
throw new NullPointerException();
Object r; CompletableFuture<? extends T> z;
// 這塊是重點(diǎn),有任何一個(gè)階段的結(jié)果不為空就直接執(zhí)行function
if ((r = (z = this).result) != null ||
(r = (z = b).result) != null)
return z.uniApplyNow(r, e, f);
CompletableFuture<V> d = newIncompleteFuture();
// 如果都為空則將回調(diào)方法分別push到被依賴(lài)的兩個(gè)階段的棧頂
orpush(b, new OrApply<T,U,V>(e, d, this, b, f));
return d;
}
異常處理
在Java編程中,異常處理當(dāng)然是必不可少的一環(huán),那你可能會(huì)想到如果在使用 CompletableFuture 進(jìn)行異步鏈?zhǔn)骄幊虝r(shí),如果出現(xiàn)異常該怎么處理呢?
首先上面我們提到的 fn、consumer、action 它們的核心方法是不允許拋出可檢查異常的,但是卻無(wú)法限制它們拋出運(yùn)行時(shí)異常。在同步方法中,我們可以使用 try-catch{} 來(lái)捕獲并處理異常,但在異步編程里面異常該如何處理 ?CompletionStage 接口給我們提供的方案非常簡(jiǎn)單,比 try-catch{} 還要簡(jiǎn)單。
下面是相關(guān)的方法,使用這些方法進(jìn)行異常處理和串行操作是一樣的,都支持鏈?zhǔn)骄幊谭绞健?br>
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
參數(shù)的類(lèi)型是 BiConsumer<? super T,? super Throwable>,它可以處理正常的計(jì)算結(jié)果,或者異常情況,可以獲取到上一步的執(zhí)行結(jié)果作為參數(shù);
無(wú)論是否發(fā)生異常都會(huì)執(zhí)行 whenComplete() 中的回調(diào)函數(shù) action;
方法不以Async結(jié)尾,意味著Action使用相同的線(xiàn)程執(zhí)行,而Async可能會(huì)使用其它的線(xiàn)程去執(zhí)行;
這幾個(gè)方法都會(huì)返回 CompletableFuture,當(dāng)Action執(zhí)行完畢后它的結(jié)果返回原始的CompletableFuture 的計(jì)算結(jié)果或者返回異常。
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
if (new Random().nextInt() % 2 == 0) {
int i = 12 / 0;
}
System.out.println("執(zhí)行結(jié)束!");
});
future.whenComplete(new BiConsumer<Void, Throwable>() {
@Override
public void accept(Void t, Throwable action) {
System.out.println("執(zhí)行完成!");
}
});
future.exceptionally(new Function<Throwable, Void>() {
@Override
public Void apply(Throwable t) {
System.out.println("執(zhí)行失?。? + t.getMessage());
return null;
}
}).join();
handle 也是執(zhí)行任務(wù)完成時(shí)對(duì)結(jié)果的處理,whenComplete() 和 handle() 的區(qū)別在于 whenComplete() 不支持返回結(jié)果,而 handle() 是支持返回結(jié)果的。
當(dāng)上一個(gè)的 CompletableFuture 的值計(jì)算完成或者拋出異常的時(shí)候,會(huì)觸發(fā) handle 方法中定義的函數(shù),結(jié)果由 BiFunction 參數(shù)計(jì)算而得,因此這組方法兼有 whenComplete 和轉(zhuǎn)換的兩個(gè)功能。
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
JDK8流式編程結(jié)合
public static List<String> exampleCompletableFutureAndStream() {
ExecutorService executorService = Executors.newCachedThreadPool();
List<String> loads = null;
try {
// 所有需要查詢(xún)遠(yuǎn)程服務(wù)的load列表
List<String> requestList = Lists.newArrayList("load1", "load2", "load3", "load4");
List<CompletableFuture<String>> completableFutures = requestList.stream()
// 使用CompletableFuture以異步方式查詢(xún)數(shù)據(jù)
.map(req -> CompletableFuture.supplyAsync(() -> invokeReq(req), executorService))
.map(future -> future.thenApply(Load::getStatus))
.map(future -> future.thenCompose(status -> CompletableFuture.supplyAsync(() -> status.name().toUpperCase())))
.toList();
loads = completableFutures.stream().map(CompletableFuture::join).toList();
System.out.println(Thread.currentThread().getName() + ": CompletableFuture異步方式查詢(xún)請(qǐng)求已完成:" + loads.size());
} finally {
executorService.shutdown();
}
return loads;
}
注意到了嗎?這里使用了兩個(gè)不同的Stream流水線(xiàn),是否可以在同一個(gè)處理流的流水線(xiàn)上一個(gè)接一個(gè)地放置多個(gè)map操作。
public static List<String> exampleCompletableFutureAndStream() {
ExecutorService executorService = Executors.newCachedThreadPool();
List<String> loads = null;
try {
// 所有需要查詢(xún)遠(yuǎn)程服務(wù)的load列表
List<String> requestList = Lists.newArrayList("load1", "load2", "load3", "load4");
loads = requestList.stream()
// 使用CompletableFuture以異步方式查詢(xún)數(shù)據(jù)
.map(req -> CompletableFuture.supplyAsync(() -> invokeReq(req), executorService))
.map(future -> future.thenApply(Load::getStatus))
.map(future -> future.thenCompose(status -> CompletableFuture.supplyAsync(() -> status.name().toUpperCase())))
.map(CompletableFuture::join)
.toList();
System.out.println(Thread.currentThread().getName() + ": CompletableFuture異步方式查詢(xún)請(qǐng)求已完成:" + loads.size());
} finally {
executorService.shutdown();
}
return loads;
}
這其實(shí)是有原因的。考慮流操作之間的延遲特性,如果你在單一流水線(xiàn)中處理流,不同的請(qǐng)求只能以同步、順序執(zhí)行的方式才會(huì)成功。因此,每個(gè)創(chuàng)建CompletableFuture對(duì)象只能在前一個(gè)操作結(jié)束之后執(zhí)行查詢(xún)指定服務(wù)請(qǐng)求的動(dòng)作、通知join方法返回結(jié)果。
再來(lái)看一個(gè)例子:
我們的系統(tǒng)提供的運(yùn)費(fèi)價(jià)格是以美元計(jì)價(jià)的,但是你希望以人民幣(RMB)的方式提供給你的客戶(hù)。你可以用異步的方式向計(jì)費(fèi)中心查詢(xún)指定Load的價(jià)格,同時(shí)從遠(yuǎn)程的匯率服務(wù)那里查到人民幣和美元之間的匯率。當(dāng)二者都結(jié)束時(shí),再將這兩個(gè)結(jié)果結(jié)合起來(lái),用返回的商品價(jià)格乘以當(dāng)時(shí)的匯率,得到以人民幣計(jì)價(jià)的商品價(jià)格。
public class MultiThreadTest {
@Test
public void test18() {
long start = System.nanoTime();
List<CompletableFuture<Double>> futures = loads.stream()
.map(laod ->
CompletableFuture
// 查商品價(jià)格操作和查兌換匯率操作同時(shí)進(jìn)行,當(dāng)兩者都完成時(shí)將結(jié)果進(jìn)行整合
.supplyAsync(() -> load.getPrice("load1"))
.thenCombine(CompletableFuture.supplyAsync(() -> RateService.getRate("RMB", "USD")), (price, rate) -> price * rate)
)
.collect(toList());
List<Double> usdPrices = futures.stream()
.map(CompletableFuture::join)
.collect(toList());
}
}
通過(guò)上述例子,可以看到相對(duì)于采用Java 8之前提供的Future實(shí)現(xiàn),CompletableFuture版本實(shí)現(xiàn)所具備的巨大優(yōu)勢(shì)。CompletableFuture利用Lambda表達(dá)式以聲明式的API提供了一種機(jī)制,能夠用最有效的方式,非常容易地將多個(gè)以同步或異步方式執(zhí)行復(fù)雜操作的任務(wù)結(jié)合到一起。
為了更直觀(guān)地感受一下使用CompletableFuture在代碼可讀性上帶來(lái)的巨大提升,下面嘗試僅使用Java 7中提供的特性,重新實(shí)現(xiàn)上述例子的功能。
public class MultiThreadTest {
@Test
public void test19() throws ExecutionException, InterruptedException {
long start = System.nanoTime();
List<Future<Double>> usdFuturePrices = new ArrayList<>(shops.size());
for (Shop shop : shops) {
// 創(chuàng)建一個(gè)查詢(xún)?nèi)嗣駧诺矫涝D(zhuǎn)換匯率的Future
final Future<Double> usdFutureRate = executor.submit(new Callable<Double>() {
public Double call() {
return RateService.getRate("RMB", "USD");
}
});
// 在第二個(gè)Future中查詢(xún)指定商店中特定商品的價(jià)格
Future<Double> usdFuturePrice = executor.submit(new Callable<Double>() {
public Double call() throws ExecutionException, InterruptedException {
double rmbPrice = shop.getPrice("肥皂");
// 在查找價(jià)格操作的同一個(gè)Future中, 將價(jià)格和匯率做乘法計(jì)算出匯后價(jià)格
return rmbPrice * usdFutureRate.get();
}
});
usdFuturePrices.add(usdFuturePrice);
}
List<Double> usdPrices = new ArrayList<>(usdFuturePrices.size());
for (Future<Double> usdFuturePrice : usdFuturePrices) {
usdPrices.add(usdFuturePrice.get());
}
}
}
這里我們思考這樣一個(gè)問(wèn)題:并行使用流還是CompletableFuture?
對(duì)集合進(jìn)行并行計(jì)算有兩種方式:要么將其轉(zhuǎn)化為并行流,利用map這樣的操作開(kāi)展工作,要么枚舉出集合中的每一個(gè)元素,創(chuàng)建新的線(xiàn)程,在 CompletableFuture 內(nèi)對(duì)其進(jìn)行操作。后者提供了更多的靈活性,你可以調(diào)整線(xiàn)程池的大小,而這能幫助你確保整體的計(jì)算不會(huì)因?yàn)榫€(xiàn)程都在等待I/O而發(fā)生阻塞。同時(shí)也可以提供更多描述任務(wù)之間關(guān)系的接口,我們不需要為之編寫(xiě)更多的代碼。
這里對(duì)使用這些API的建議如下:
如果你進(jìn)行的是計(jì)算密集型的操作,并且沒(méi)有I/O,那么推薦使用Stream接口,因?yàn)閷?shí)現(xiàn)簡(jiǎn)單,同時(shí)效率也可能是最高的(如果所有的線(xiàn)程都是計(jì)算密集型的,那就沒(méi)有必要?jiǎng)?chuàng)建比處理器核數(shù)更多的線(xiàn)程)。
反之,如果你并行的工作單元還涉及等待I/O的操作(包括網(wǎng)絡(luò)連接等待),那么使用CompletableFuture靈活性更好。
總結(jié)
今天大家學(xué)到了哪些知識(shí)呢?
如何優(yōu)化接口性能?某些場(chǎng)景下可以使用多線(xiàn)程并行代替串行。
如何實(shí)現(xiàn)接口并行調(diào)用?通過(guò)今天的學(xué)習(xí)可以使用 Future+Callable、FutureTask、CompletableFuture。
詳細(xì)介紹了CompletableFuture的強(qiáng)大,掌握CompletableFuture提供的函數(shù)式編程的能力,以及與JDK8流式編程結(jié)合使用,使代碼更加美觀(guān)優(yōu)雅,寫(xiě)起來(lái)簡(jiǎn)潔和便利;
在接口設(shè)計(jì)時(shí)可以參考CompletableFuture的實(shí)現(xiàn),將兩個(gè)無(wú)關(guān)的接口能力組裝在一起以實(shí)現(xiàn)更加強(qiáng)大的功能;
不足之處:今天只是對(duì)于并發(fā)編程中的工具類(lèi)使用和相關(guān)原理做了分享,在實(shí)際開(kāi)發(fā)過(guò)程中可能需要考慮到更多的通用性,封裝通過(guò)調(diào)用模版方法,不要每一個(gè)地方都寫(xiě)一堆類(lèi)似的代碼。
通過(guò)今天的分享,希望大家可以在平時(shí)開(kāi)發(fā)工作中遇到合適的場(chǎng)景時(shí)嘗試使用 CompletableFuture 提供的API,優(yōu)化程序性能、提高開(kāi)發(fā)效率。
作者:七哥
公眾號(hào):牧小農(nóng),微信掃碼關(guān)注或搜索公眾號(hào)名稱(chēng)