gRPC的使用
作者:xcbeyond
瘋狂源自夢(mèng)想,技術(shù)成就輝煌!微信公眾號(hào):《程序猿技術(shù)大咖》號(hào)主,專(zhuān)注后端開(kāi)發(fā)多年,擁有豐富的研發(fā)經(jīng)驗(yàn),樂(lè)于技術(shù)輸出、分享,現(xiàn)階段從事微服務(wù)架構(gòu)項(xiàng)目的研發(fā)工作,涉及架構(gòu)設(shè)計(jì)、技術(shù)選型、業(yè)務(wù)研發(fā)等工作。對(duì)于Java、微服務(wù)、數(shù)據(jù)庫(kù)、Docker有深入了解,并有大量的調(diào)優(yōu)經(jīng)驗(yàn)。
1、概述
gRPC是由google開(kāi)發(fā)的,是一款語(yǔ)言中立、平臺(tái)中立、開(kāi)源的RPC(Remote Procedure Call,遠(yuǎn)程過(guò)程調(diào)用)框架。
在gRPC里客戶(hù)端應(yīng)用可以像調(diào)用本地對(duì)象一樣直接調(diào)用另一臺(tái)不同的機(jī)器上服務(wù)端應(yīng)用的方法,使得您能夠更容易地創(chuàng)建分布式應(yīng)用和服務(wù)。與許多 RPC框架類(lèi)似,gRPC也是基于以下理念:定義一個(gè)服務(wù),指定其能夠被遠(yuǎn)程調(diào)用的方法(包含參數(shù)和返回類(lèi)型)。在服務(wù)端實(shí)現(xiàn)這個(gè)接口,并運(yùn)行一個(gè) gRPC 服務(wù)器來(lái)處理客戶(hù)端調(diào)用。
2、特性
基于HTTP/2
HTTP/2 提供了連接多路復(fù)用、雙向流、服務(wù)器推送、請(qǐng)求優(yōu)先級(jí)、首部壓縮等機(jī)制??梢怨?jié)省帶寬、降低TCP鏈接次數(shù)、節(jié)省CPU,幫助移動(dòng)設(shè)備延長(zhǎng)電池壽命等。gRPC 的協(xié)議設(shè)計(jì)上使用了HTTP2 現(xiàn)有的語(yǔ)義,請(qǐng)求和響應(yīng)的數(shù)據(jù)使用HTTP Body 發(fā)送,其他的控制信息則用Header 表示。
IDL使用ProtoBuf
gRPC使用ProtoBuf來(lái)定義服務(wù),ProtoBuf是由Google開(kāi)發(fā)的一種數(shù)據(jù)序列化協(xié)議(類(lèi)似于XML、JSON、hessian)。ProtoBuf能夠?qū)?shù)據(jù)進(jìn)行序列化,并廣泛應(yīng)用在數(shù)據(jù)存儲(chǔ)、通信協(xié)議等方面。壓縮和傳輸效率高,語(yǔ)法簡(jiǎn)單,表達(dá)力強(qiáng)。
多語(yǔ)言支持(C, C++, Python, PHP, Nodejs, C#, Objective-C、Golang、Java)
gRPC支持多種語(yǔ)言,并能夠基于語(yǔ)言自動(dòng)生成客戶(hù)端和服務(wù)端功能庫(kù)。目前已提供了C版本grpc、Java版本grpc-java 和 Go版本grpc-go,其它語(yǔ)言的版本正在積極開(kāi)發(fā)中,其中,grpc支持C、C++、Node.js、Python、Ruby、Objective-C、PHP和C#等語(yǔ)言,grpc-java已經(jīng)支持Android開(kāi)發(fā)。
3、Java開(kāi)發(fā)gRPC服務(wù)端和客戶(hù)端
3.1 定義接口
基于protobuf來(lái)聲明數(shù)據(jù)模型和RPC接口服務(wù)。
3.1.1 proto文件
例如:helloworld.proto
syntax = "proto3";
option java_multiple_files = true;
option java_package = "com.xcbeyond.grpc.helloworld";
option java_outer_classname = "HelloWorldProto";
option objc_class_prefix = "xcbeyond";
package helloworld;
// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
3.1.2 生成java代碼
根據(jù)proto的命令可以轉(zhuǎn)換成對(duì)應(yīng)的語(yǔ)言的代碼,生成java代碼,可以借助maven插件,在編譯時(shí)自動(dòng)生成。可以在pom.xml中增加如下依賴(lài):
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.5.0.Final</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.5.0</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.3.0:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.19.0:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
然后只需要執(zhí)行“mvn
compile”指令即可,此后我們會(huì)在項(xiàng)目的target目錄下看到生成的classes文件,當(dāng)然最終我們還是需要將service打成jar包發(fā)布的。maven仍然可以幫助我們做這些工作,由.proto生成classes是在compile階段,那么jar階段仍然是可以將classes打成jar,只需要借助maven-jar-plugin插件即可。
3.2 服務(wù)端開(kāi)發(fā)
服務(wù)端代碼如下,運(yùn)行這個(gè)類(lèi)的 main 方法,就可以在 50010 端口啟動(dòng)服務(wù)。
public class TestServer {
public static void main(String[] args) throws Exception{
ServerImpl server =
NettyServerBuilder.forPort(50010).addService(TestRpcServiceGrpc.bindService(new
TestServiceImpl())).build();
server.start();
}
}
//server端實(shí)現(xiàn)類(lèi),擴(kuò)展原有接口
public class TestServiceImpl implements TestRpcServiceGrpc.TestRpcService {
@Override
public void sayHello(TestModel.TestRequest request, StreamObserver<TestModel.TestResponse> responseObserver) {
String result = request.getName() + request.getId();
TestModel.TestResponse response = TestModel.TestResponse.newBuilder().setMessage(result).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
}
3.3 客戶(hù)端開(kāi)發(fā)
public class TestClient {
private final TestRpcServiceGrpc.TestRpcServiceBlockingStub client;
public TestClient(String host,int port) {
ManagedChannel channel = NettyChannelBuilder.forAddress(host, port).usePlaintext(true).build();
client = TestRpcServiceGrpc.newBlockingStub(channel).withDeadlineAfter(60000, TimeUnit.MILLISECONDS);
}
public String sayHello(String name,Integer id) {
TestModel.TestRequest request = TestModel.TestRequest.newBuilder().setId(id).setName(name).build();
TestModel.TestResponse response = client.sayHello(request);
return response.getMessage();
}
}
4、原理解析
GRPC的Client與Server,均通過(guò)Netty
Channel作為數(shù)據(jù)通信,序列化、反序列化則使用Protobuf,每個(gè)請(qǐng)求都將被封裝成HTTP2的Stream,在整個(gè)生命周期中,客戶(hù)端Channel應(yīng)該保持長(zhǎng)連接,而不是每次調(diào)用重新創(chuàng)建Channel、響應(yīng)結(jié)束后關(guān)閉Channel(即短連接、交互式的RPC),目的就是達(dá)到鏈接的復(fù)用,進(jìn)而提高交互效率。
4.1 Server端
我們通常使用NettyServerBuilder,即IO處理模型基于Netty,將來(lái)可能會(huì)支持其他的IO模型。Netty Server的IO模型簡(jiǎn)析:
1)創(chuàng)建ServerBootstrap,設(shè)定BossGroup與workerGroup線程池
2)注冊(cè)childHandler,用來(lái)處理客戶(hù)端鏈接中的請(qǐng)求成幀
3)bind到指定的port,即內(nèi)部初始化ServerSocketChannel等,開(kāi)始偵聽(tīng)和接受客戶(hù)端鏈接。
4)BossGroup中的線程用于accept客戶(hù)端鏈接,并轉(zhuǎn)發(fā)(輪訓(xùn))給workerGroup中的線程。
5)workerGroup中的特定線程用于初始化客戶(hù)端鏈接,初始化pipeline和handler,并將其注冊(cè)到worker線程的selector上(每個(gè)worker線程持有一個(gè)selector,不共享)
6)selector上發(fā)生讀寫(xiě)事件后,獲取事件所屬的鏈接句柄,然后執(zhí)行handler(inbound),同時(shí)進(jìn)行拆封package,handler執(zhí)行完畢后,數(shù)據(jù)寫(xiě)入通過(guò),由outbound handler處理(封包)通過(guò)鏈接發(fā)出。 注意每個(gè)worker線程上的數(shù)據(jù)請(qǐng)求是隊(duì)列化的。
參見(jiàn)源碼:SingleThreadEventLoop、NioEventLoop。(請(qǐng)求隊(duì)列化)
GRPC而言,只是對(duì)Netty Server的簡(jiǎn)單封裝,底層使用了PlaintextHandler、Http2ConnectionHandler的相關(guān)封裝等。具體Framer、Stream方式請(qǐng)參考Http2相關(guān)文檔。
1)bossEventLoopGroup:如果沒(méi)指定,默認(rèn)為一個(gè)static共享的對(duì)象,即JVM內(nèi)所有的NettyServer都使用同一個(gè)Group,默認(rèn)線程池大小為1。
2)workerEventLoopGroup:如果沒(méi)指定,默認(rèn)為一個(gè)static共享的對(duì)象,線程池大小為coreSize * 2。這兩個(gè)對(duì)象采用默認(rèn)值并不會(huì)帶來(lái)問(wèn)題;通常情況下,即使你的application中有多個(gè)GRPC Server,默認(rèn)值也一樣能夠帶來(lái)收益。不合適的線程池大小,有可能會(huì)是性能受限。
3)channelType:默認(rèn)為NioServerSocketChannel,通常我們采用默認(rèn)值;當(dāng)然你也可以開(kāi)發(fā)自己的類(lèi)。如果此值為NioServerSocketChannel,則開(kāi)啟keepalive,同時(shí)設(shè)定SO_BACKLOG為128;BACKLOG就是系統(tǒng)底層已經(jīng)建立引入鏈接但是尚未被accept的Socket隊(duì)列的大小,在鏈接密集型(特別是短連接)時(shí),如果隊(duì)列超過(guò)此值,新的創(chuàng)建鏈接請(qǐng)求將會(huì)被拒絕(有可能你在壓力測(cè)試時(shí),會(huì)遇到這樣的問(wèn)題),keepalive和BACKLOG特性目前無(wú)法直接修改。
[root@sh149 ~]# sysctl -a|grep tcp_keepalive
net.ipv4.tcp_keepalive_time = 60 ##單位:秒
net.ipv4.tcp_keepalive_probes = 9
net.ipv4.tcp_keepalive_intvl = 75 ##單位:秒
##可以在/etc/sysctl.conf查看和修改相關(guān)值
##tcp_keepalive_time:最后一個(gè)實(shí)際數(shù)據(jù)包發(fā)送完畢后,首個(gè)keepalive探測(cè)包發(fā)送的時(shí)間。
##如果首個(gè)keepalive包探測(cè)成功,那么鏈接會(huì)被標(biāo)記為keepalive(首先TCP開(kāi)啟了keepalive)
##此后此參數(shù)將不再生效,而是使用下述的2個(gè)參數(shù)繼續(xù)探測(cè)
##tcp_keepalive_intvl:此后,無(wú)論通道上是否發(fā)生數(shù)據(jù)交換,keepalive探測(cè)包發(fā)送的時(shí)間間隔
##tcp_keepalive_probes:在斷定鏈接失效之前,嘗試發(fā)送探測(cè)包的次數(shù);
##如果都失敗,則斷定鏈接已關(guān)閉。
對(duì)于Server端,我們需要關(guān)注上述keepalive的一些設(shè)置;如果Netty Client在空閑一段時(shí)間后,Server端會(huì)主動(dòng)關(guān)閉鏈接,有可能Client仍然保持鏈接的句柄,將會(huì)導(dǎo)致RPC調(diào)用時(shí)發(fā)生異常。這也會(huì)導(dǎo)致GRPC客戶(hù)端調(diào)用時(shí)偶爾發(fā)生錯(cuò)誤的原因之一。
4)followControlWindow:流量控制的窗口大小,單位:字節(jié),默認(rèn)值為1M,HTTP2中的“Flow Control”特性;連接上,已經(jīng)發(fā)送尚未ACK的數(shù)據(jù)幀大小,比如window大小為100K,且winow已滿(mǎn),每次向Client發(fā)送消息時(shí),如果客戶(hù)端反饋ACK(攜帶此次ACK數(shù)據(jù)的大?。?,window將會(huì)減掉此大?。幻看蜗騱indow中添加亟待發(fā)送的數(shù)據(jù)時(shí),window增加;如果window中的數(shù)據(jù)已達(dá)到限定值,它將不能繼續(xù)添加數(shù)據(jù),只能等待Client端ACK。
5)maxConcurrentCallPerConnection:每個(gè)connection允許的最大并發(fā)請(qǐng)求數(shù),默認(rèn)值為Integer.MAX_VALUE;如果此連接上已經(jīng)接受但尚未響應(yīng)的streams個(gè)數(shù)達(dá)到此值,新的請(qǐng)求將會(huì)被拒絕。為了避免TCP通道的過(guò)度擁堵,我們可以適度調(diào)整此值,以便Server端平穩(wěn)處理,畢竟buffer太多的streams會(huì)對(duì)server的內(nèi)存造成巨大壓力。
6)maxMessageSize:每次調(diào)用允許發(fā)送的最大數(shù)據(jù)量,默認(rèn)為100M。
7)maxHeaderListSize:每次調(diào)用允許發(fā)送的header的最大條數(shù),GRPC中默認(rèn)為8192。
對(duì)于其他的比如SSL/TSL等,可以參考其他文檔。
GRPC Server端,還有一個(gè)最終要的方法:addService?!救缦挛膕ervice代理模式】
在此之前,我們需要介紹一下bindService方法,每個(gè)GRPC生成的service代碼中都有此方法,它以硬編碼的方式遍歷此service的方法列表,將每個(gè)方法的調(diào)用過(guò)程都與“被代理實(shí)例”綁定,這個(gè)模式有點(diǎn)類(lèi)似于靜態(tài)代理,比如調(diào)用sayHello方法時(shí),其實(shí)內(nèi)部直接調(diào)用“被代理實(shí)例”的sayHello方法(參見(jiàn)MethodHandler.invoke方法,每個(gè)方法都有一個(gè)唯一的index,通過(guò)硬編碼方式執(zhí)行);bindService方法的最終目的是創(chuàng)建一個(gè)ServerServiceDefinition對(duì)象,這個(gè)對(duì)象內(nèi)部位置一個(gè)map,key為此Service的方法的全名(fullname,{package}.{service}.{method}),value就是此方法的GRPC封裝類(lèi)(ServerMethodDefinition)。
源碼分析:
private static final int METHODID_SAY_HELLO = 0;
private static class MethodHandlers<Req, Resp> implements
... {
private final TestRpcService serviceImpl;//實(shí)際被代理實(shí)例
private final int methodId;
public MethodHandlers(TestRpcService serviceImpl, int methodId) {
this.serviceImpl = serviceImpl;
this.methodId = methodId;
}
@java.lang.SuppressWarnings("unchecked")
public void invoke(Req request, io.grpc.stub.StreamObserver<Resp> responseObserver) {
switch (methodId) {
case METHODID_SAY_HELLO: //通過(guò)方法的index來(lái)判定具體需要代理那個(gè)方法
serviceImpl.sayHello((com.test.grpc.service.model.TestModel.TestRequest) request,
(io.grpc.stub.StreamObserver<com.test.grpc.service.model.TestModel.TestResponse>) responseObserver);
break;
default:
throw new AssertionError();
}
}
....
}
public static io.grpc.ServerServiceDefinition bindService(
final TestRpcService serviceImpl) {
return io.grpc.ServerServiceDefinition.builder(SERVICE_NAME)
.addMethod(
METHOD_SAY_HELLO,
asyncUnaryCall(
new MethodHandlers<
com.test.grpc.service.model.TestModel.TestRequest,
com.test.grpc.service.model.TestModel.TestResponse>(
serviceImpl, METHODID_SAY_HELLO)))
.build();
}
addService方法可以添加多個(gè)Service,即一個(gè)Netty Server可以為多個(gè)service服務(wù),這并不違背設(shè)計(jì)模式和架構(gòu)模式。addService方法將會(huì)把service保存在內(nèi)部的一個(gè)map中,key為serviceName(即{package}.{service}),value就是上述bindService生成的對(duì)象。
那么究竟Server端是如何解析RPC過(guò)程的?Client在調(diào)用時(shí)會(huì)將調(diào)用的service名稱(chēng) + method信息保存在一個(gè)GRPC“保留”的header中,那么Server端即可通過(guò)獲取這個(gè)特定的header信息,就可以得知此stream需要請(qǐng)求的service、以及其method,那么接下來(lái)只需要從上述提到的map中找到service,然后找到此method,直接代理調(diào)用即可。執(zhí)行結(jié)果在Encoder之后發(fā)送給Client。(參見(jiàn):NettyServerHandler)
因?yàn)槭莔ap存儲(chǔ),所以我們需要在定義.proto文件時(shí),盡可能的指定package信息,以避免因?yàn)閟ervice過(guò)多導(dǎo)致名稱(chēng)可能重復(fù)的問(wèn)題。
4.2 Client端
我們使用ManagedChannelBuilder來(lái)創(chuàng)建客戶(hù)端channel,ManagedChannelBuilder使用了provider機(jī)制,具體是創(chuàng)建了哪種channel有provider決定,可以參看META-INF下同類(lèi)名的文件中的注冊(cè)信息。當(dāng)前Channel有2種:NettyChannelBuilder與OkHttpChannelBuilder。本人的當(dāng)前版本中為NettyChannelBuilder;我們可以直接使用NettyChannelBuilder來(lái)構(gòu)建channel。如下描述則針對(duì)NettyChannelBuilder:
配置參數(shù)與NettyServerBuilder基本類(lèi)似,再次不再贅言。默認(rèn)情況下,Client端默認(rèn)的eventLoopGroup線程池也是static的,全局共享的,默認(rèn)線程個(gè)數(shù)為coreSize * 2。合理的線程池個(gè)數(shù)可以提高客戶(hù)端的吞吐能力。
ManagedChannel是客戶(hù)端最核心的類(lèi),它表示邏輯上的一個(gè)channel;底層持有一個(gè)物理的transport(TCP通道,參見(jiàn)NettyClientTransport),并負(fù)責(zé)維護(hù)此transport的活性;即在RPC調(diào)用的任何時(shí)機(jī),如果檢測(cè)到底層transport處于關(guān)閉狀態(tài)(terminated),將會(huì)嘗試重建transport。(參見(jiàn)TransportSet.obtainActiveTransport())
通常情況下,我們不需要在RPC調(diào)用結(jié)束后就關(guān)閉Channel,Channel可以被一直重用,直到Client不再需要請(qǐng)求位置或者Channel無(wú)法真的異常中斷而無(wú)法繼續(xù)使用。當(dāng)然,為了提高Client端application的整體并發(fā)能力,我們可以使用連接池模式,即創(chuàng)建多個(gè)ManagedChannel,然后使用輪訓(xùn)、隨機(jī)等算法,在每次RPC請(qǐng)求時(shí)選擇一個(gè)Channel即可。(備注,連接池特性,目前GRPC尚未提供,需要額外的開(kāi)發(fā))
每個(gè)Service客戶(hù)端,都生成了2種stub:BlockingStub和FutureStub;這兩個(gè)Stub內(nèi)部調(diào)用過(guò)程幾乎一樣,唯一不同的是BlockingStub的方法直接返回Response Model,而FutureStub返回一個(gè)Future對(duì)象。BlockingStub內(nèi)部也是基于Future機(jī)制,只是封裝了阻塞等待的過(guò)程:
try {
//也是基于Future
ListenableFuture<RespT> responseFuture = futureUnaryCall(call, param);
//阻塞過(guò)程
while (!responseFuture.isDone()) {
try {
executor.waitAndDrain();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Status.CANCELLED.withCause(e).asRuntimeException();
}
}
return getUnchecked(responseFuture);
} catch (Throwable t) {
call.cancel();
throw t instanceof RuntimeException ? (RuntimeException) t : new RuntimeException(t);
}
創(chuàng)建一個(gè)Stub的成本是非常低的,我們可以在每次請(qǐng)求時(shí)都通過(guò)channel創(chuàng)建新的stub,這并不會(huì)帶來(lái)任何問(wèn)題(只不過(guò)是創(chuàng)建了大量對(duì)象);其實(shí)更好的方式是,我們應(yīng)該使用一個(gè)Stub發(fā)送多次請(qǐng)求,即Stub也是可以重用的;直到Stub上的狀態(tài)異常而無(wú)法使用。最常見(jiàn)的異常,就是“io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED”,即表示DEADLINE時(shí)間過(guò)期,我們可以為每個(gè)Stub配置deadline時(shí)間,那么如果此stub被使用的時(shí)長(zhǎng)超過(guò)此值(不是空閑的時(shí)間),將不能再發(fā)送請(qǐng)求,此時(shí)我們應(yīng)該創(chuàng)建新的Stub。很多人想盡辦法來(lái)使用“withDeadlineAfter”方法來(lái)實(shí)現(xiàn)一些奇怪的事情,此參數(shù)的主要目的就是表明:此stub只能被使用X時(shí)長(zhǎng),此后將不能再進(jìn)行請(qǐng)求,應(yīng)該被釋放。所以,它并不能實(shí)現(xiàn)類(lèi)似于“keepAlive”的語(yǔ)義,即使我們需要keepAlive,也應(yīng)該在Channel級(jí)別,而不是在一個(gè)Stub上。
如果你使用了連接池,那么其實(shí)連接池不應(yīng)該關(guān)注DEADLINE的錯(cuò)誤,只要Channel本身沒(méi)有terminated即可;就把這個(gè)問(wèn)題交給調(diào)用者處理。如果你也對(duì)Stub使用了對(duì)象池,那么你就可能需要關(guān)注這個(gè)情況了,你不應(yīng)該向調(diào)用者返回一個(gè)“DEADLINE”的stub,或者如果調(diào)用者發(fā)現(xiàn)了DEADLINE,你的對(duì)象池應(yīng)該能夠移除它。
1)實(shí)例化ManagedChannel,此channel可以被任意多個(gè)Stub實(shí)例引用;如上文說(shuō)述,我們可以通過(guò)創(chuàng)建Channel池,來(lái)提高application整體的吞吐能力。此Channel實(shí)例,不應(yīng)該被shutdown,直到Client端停止服務(wù);在任何時(shí)候,特別是創(chuàng)建Stub時(shí),我們應(yīng)該判定Channel的狀態(tài)。
synchronized (this) {
if (channel.isShutdown() || channel.isTerminated()) {
channel = ManagedChannelBuilder.forAddress(poolConfig.host, poolConfig.port).usePlaintext(true).build();
}
//new Stub
}
//或者
ManagedChannel channel = (ManagedChannel)client.getChannel();
if(channel.isShutdown() || channel.isTerminated()) {
client = createBlockStub();
}
client.sayHello(...)
因?yàn)镃hannel是可以多路復(fù)用,所以我們用Pool機(jī)制(比如commons-pool)也可以實(shí)現(xiàn)連接池,只是這種池并非完全符合GRPC/HTTP2的設(shè)計(jì)語(yǔ)義,因?yàn)镚RPC允許一個(gè)Channel上連續(xù)發(fā)送對(duì)個(gè)Requests(然后一次性接收多個(gè)Responses),而不是“交互式”的Request-Response模式,當(dāng)然這么使用并不會(huì)有任何問(wèn)題。
2)對(duì)于批量調(diào)用的場(chǎng)景,我們可以使用FutureStub,對(duì)于普通的業(yè)務(wù)類(lèi)型RPC,我們應(yīng)該使用BlockingStub。
3)每個(gè)RPC方法的調(diào)用,比如sayHello,調(diào)用開(kāi)始后,將會(huì)為每個(gè)調(diào)用請(qǐng)求創(chuàng)建一個(gè)ClientCall實(shí)例,其內(nèi)部封裝了調(diào)用的方法、配置選項(xiàng)(headers)等。此后將會(huì)創(chuàng)建Stream對(duì)象,每個(gè)Stream都持有唯一的streamId,它是Transport用于分揀Response的憑證。最終調(diào)用的所有參數(shù)都會(huì)被封裝在Stream中。
4)檢測(cè)DEADLINE,是否已經(jīng)過(guò)期,如果過(guò)期,將使用FailingClientStream對(duì)象來(lái)模擬整個(gè)RPC過(guò)程,當(dāng)然請(qǐng)求不會(huì)通過(guò)通道發(fā)出,直接經(jīng)過(guò)異常流處理過(guò)程。
5)然后獲取transport,如果此時(shí)檢測(cè)到transport已經(jīng)中斷,則重建transport。(自動(dòng)重練機(jī)制,ClientCallImpl.start()方法)
6)發(fā)送請(qǐng)求參數(shù),即我們Request實(shí)例。一次RPC調(diào)用,數(shù)據(jù)是分多次發(fā)送,但是ClientCall在創(chuàng)建時(shí)已經(jīng)綁定到了指定的線程上,所以數(shù)據(jù)發(fā)送總是通過(guò)一個(gè)線程進(jìn)行(不會(huì)亂序)。
7)將ClientCall實(shí)例置為halfClose,即半關(guān)閉,并不是將底層Channel或者Transport半關(guān)閉,只是邏輯上限定此ClientCall實(shí)例上將不能繼續(xù)發(fā)送任何stream信息,而是等待Response。
8)Netty底層IO將會(huì)對(duì)reponse數(shù)據(jù)流進(jìn)行解包(Http2ConnectionDecoder),并根據(jù)streamId分揀Response,同時(shí)喚醒響應(yīng)的ClientCalls阻塞。(參見(jiàn)ClientCalls,GrpcFuture)
9)如果是BlockingStub,則請(qǐng)求返回,如果響應(yīng)中包含應(yīng)用異常,則封裝后拋出;如果是網(wǎng)絡(luò)異常,則可能觸發(fā)Channel重建、Stream重置等。