gRPC的使用
作者:xcbeyond
瘋狂源自夢想,技術成就輝煌!微信公眾號:《程序猿技術大咖》號主,專注后端開發(fā)多年,擁有豐富的研發(fā)經(jīng)驗,樂于技術輸出、分享,現(xiàn)階段從事微服務架構項目的研發(fā)工作,涉及架構設計、技術選型、業(yè)務研發(fā)等工作。對于Java、微服務、數(shù)據(jù)庫、Docker有深入了解,并有大量的調優(yōu)經(jīng)驗。
1、概述
gRPC是由google開發(fā)的,是一款語言中立、平臺中立、開源的RPC(Remote Procedure Call,遠程過程調用)框架。
在gRPC里客戶端應用可以像調用本地對象一樣直接調用另一臺不同的機器上服務端應用的方法,使得您能夠更容易地創(chuàng)建分布式應用和服務。與許多 RPC框架類似,gRPC也是基于以下理念:定義一個服務,指定其能夠被遠程調用的方法(包含參數(shù)和返回類型)。在服務端實現(xiàn)這個接口,并運行一個 gRPC 服務器來處理客戶端調用。
2、特性
基于HTTP/2
HTTP/2 提供了連接多路復用、雙向流、服務器推送、請求優(yōu)先級、首部壓縮等機制??梢怨?jié)省帶寬、降低TCP鏈接次數(shù)、節(jié)省CPU,幫助移動設備延長電池壽命等。gRPC 的協(xié)議設計上使用了HTTP2 現(xiàn)有的語義,請求和響應的數(shù)據(jù)使用HTTP Body 發(fā)送,其他的控制信息則用Header 表示。
IDL使用ProtoBuf
gRPC使用ProtoBuf來定義服務,ProtoBuf是由Google開發(fā)的一種數(shù)據(jù)序列化協(xié)議(類似于XML、JSON、hessian)。ProtoBuf能夠將數(shù)據(jù)進行序列化,并廣泛應用在數(shù)據(jù)存儲、通信協(xié)議等方面。壓縮和傳輸效率高,語法簡單,表達力強。
多語言支持(C, C++, Python, PHP, Nodejs, C#, Objective-C、Golang、Java)
gRPC支持多種語言,并能夠基于語言自動生成客戶端和服務端功能庫。目前已提供了C版本grpc、Java版本grpc-java 和 Go版本grpc-go,其它語言的版本正在積極開發(fā)中,其中,grpc支持C、C++、Node.js、Python、Ruby、Objective-C、PHP和C#等語言,grpc-java已經(jīng)支持Android開發(fā)。
3、Java開發(fā)gRPC服務端和客戶端
3.1 定義接口
基于protobuf來聲明數(shù)據(jù)模型和RPC接口服務。
3.1.1 proto文件
例如:helloworld.proto
syntax = "proto3";
option java_multiple_files = true;
option java_package = "com.xcbeyond.grpc.helloworld";
option java_outer_classname = "HelloWorldProto";
option objc_class_prefix = "xcbeyond";
package helloworld;
// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
3.1.2 生成java代碼
根據(jù)proto的命令可以轉換成對應的語言的代碼,生成java代碼,可以借助maven插件,在編譯時自動生成??梢栽趐om.xml中增加如下依賴:
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.5.0.Final</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.5.0</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.3.0:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.19.0:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
然后只需要執(zhí)行“mvn
compile”指令即可,此后我們會在項目的target目錄下看到生成的classes文件,當然最終我們還是需要將service打成jar包發(fā)布的。maven仍然可以幫助我們做這些工作,由.proto生成classes是在compile階段,那么jar階段仍然是可以將classes打成jar,只需要借助maven-jar-plugin插件即可。
3.2 服務端開發(fā)
服務端代碼如下,運行這個類的 main 方法,就可以在 50010 端口啟動服務。
public class TestServer {
public static void main(String[] args) throws Exception{
ServerImpl server =
NettyServerBuilder.forPort(50010).addService(TestRpcServiceGrpc.bindService(new
TestServiceImpl())).build();
server.start();
}
}
//server端實現(xiàn)類,擴展原有接口
public class TestServiceImpl implements TestRpcServiceGrpc.TestRpcService {
@Override
public void sayHello(TestModel.TestRequest request, StreamObserver<TestModel.TestResponse> responseObserver) {
String result = request.getName() + request.getId();
TestModel.TestResponse response = TestModel.TestResponse.newBuilder().setMessage(result).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
}
3.3 客戶端開發(fā)
public class TestClient {
private final TestRpcServiceGrpc.TestRpcServiceBlockingStub client;
public TestClient(String host,int port) {
ManagedChannel channel = NettyChannelBuilder.forAddress(host, port).usePlaintext(true).build();
client = TestRpcServiceGrpc.newBlockingStub(channel).withDeadlineAfter(60000, TimeUnit.MILLISECONDS);
}
public String sayHello(String name,Integer id) {
TestModel.TestRequest request = TestModel.TestRequest.newBuilder().setId(id).setName(name).build();
TestModel.TestResponse response = client.sayHello(request);
return response.getMessage();
}
}
4、原理解析
GRPC的Client與Server,均通過Netty
Channel作為數(shù)據(jù)通信,序列化、反序列化則使用Protobuf,每個請求都將被封裝成HTTP2的Stream,在整個生命周期中,客戶端Channel應該保持長連接,而不是每次調用重新創(chuàng)建Channel、響應結束后關閉Channel(即短連接、交互式的RPC),目的就是達到鏈接的復用,進而提高交互效率。
4.1 Server端
我們通常使用NettyServerBuilder,即IO處理模型基于Netty,將來可能會支持其他的IO模型。Netty Server的IO模型簡析:
1)創(chuàng)建ServerBootstrap,設定BossGroup與workerGroup線程池
2)注冊childHandler,用來處理客戶端鏈接中的請求成幀
3)bind到指定的port,即內部初始化ServerSocketChannel等,開始偵聽和接受客戶端鏈接。
4)BossGroup中的線程用于accept客戶端鏈接,并轉發(fā)(輪訓)給workerGroup中的線程。
5)workerGroup中的特定線程用于初始化客戶端鏈接,初始化pipeline和handler,并將其注冊到worker線程的selector上(每個worker線程持有一個selector,不共享)
6)selector上發(fā)生讀寫事件后,獲取事件所屬的鏈接句柄,然后執(zhí)行handler(inbound),同時進行拆封package,handler執(zhí)行完畢后,數(shù)據(jù)寫入通過,由outbound handler處理(封包)通過鏈接發(fā)出。 注意每個worker線程上的數(shù)據(jù)請求是隊列化的。
參見源碼:SingleThreadEventLoop、NioEventLoop。(請求隊列化)
GRPC而言,只是對Netty Server的簡單封裝,底層使用了PlaintextHandler、Http2ConnectionHandler的相關封裝等。具體Framer、Stream方式請參考Http2相關文檔。
1)bossEventLoopGroup:如果沒指定,默認為一個static共享的對象,即JVM內所有的NettyServer都使用同一個Group,默認線程池大小為1。
2)workerEventLoopGroup:如果沒指定,默認為一個static共享的對象,線程池大小為coreSize * 2。這兩個對象采用默認值并不會帶來問題;通常情況下,即使你的application中有多個GRPC Server,默認值也一樣能夠帶來收益。不合適的線程池大小,有可能會是性能受限。
3)channelType:默認為NioServerSocketChannel,通常我們采用默認值;當然你也可以開發(fā)自己的類。如果此值為NioServerSocketChannel,則開啟keepalive,同時設定SO_BACKLOG為128;BACKLOG就是系統(tǒng)底層已經(jīng)建立引入鏈接但是尚未被accept的Socket隊列的大小,在鏈接密集型(特別是短連接)時,如果隊列超過此值,新的創(chuàng)建鏈接請求將會被拒絕(有可能你在壓力測試時,會遇到這樣的問題),keepalive和BACKLOG特性目前無法直接修改。
[root@sh149 ~]# sysctl -a|grep tcp_keepalive
net.ipv4.tcp_keepalive_time = 60 ##單位:秒
net.ipv4.tcp_keepalive_probes = 9
net.ipv4.tcp_keepalive_intvl = 75 ##單位:秒
##可以在/etc/sysctl.conf查看和修改相關值
##tcp_keepalive_time:最后一個實際數(shù)據(jù)包發(fā)送完畢后,首個keepalive探測包發(fā)送的時間。
##如果首個keepalive包探測成功,那么鏈接會被標記為keepalive(首先TCP開啟了keepalive)
##此后此參數(shù)將不再生效,而是使用下述的2個參數(shù)繼續(xù)探測
##tcp_keepalive_intvl:此后,無論通道上是否發(fā)生數(shù)據(jù)交換,keepalive探測包發(fā)送的時間間隔
##tcp_keepalive_probes:在斷定鏈接失效之前,嘗試發(fā)送探測包的次數(shù);
##如果都失敗,則斷定鏈接已關閉。
對于Server端,我們需要關注上述keepalive的一些設置;如果Netty Client在空閑一段時間后,Server端會主動關閉鏈接,有可能Client仍然保持鏈接的句柄,將會導致RPC調用時發(fā)生異常。這也會導致GRPC客戶端調用時偶爾發(fā)生錯誤的原因之一。
4)followControlWindow:流量控制的窗口大小,單位:字節(jié),默認值為1M,HTTP2中的“Flow Control”特性;連接上,已經(jīng)發(fā)送尚未ACK的數(shù)據(jù)幀大小,比如window大小為100K,且winow已滿,每次向Client發(fā)送消息時,如果客戶端反饋ACK(攜帶此次ACK數(shù)據(jù)的大小),window將會減掉此大??;每次向window中添加亟待發(fā)送的數(shù)據(jù)時,window增加;如果window中的數(shù)據(jù)已達到限定值,它將不能繼續(xù)添加數(shù)據(jù),只能等待Client端ACK。
5)maxConcurrentCallPerConnection:每個connection允許的最大并發(fā)請求數(shù),默認值為Integer.MAX_VALUE;如果此連接上已經(jīng)接受但尚未響應的streams個數(shù)達到此值,新的請求將會被拒絕。為了避免TCP通道的過度擁堵,我們可以適度調整此值,以便Server端平穩(wěn)處理,畢竟buffer太多的streams會對server的內存造成巨大壓力。
6)maxMessageSize:每次調用允許發(fā)送的最大數(shù)據(jù)量,默認為100M。
7)maxHeaderListSize:每次調用允許發(fā)送的header的最大條數(shù),GRPC中默認為8192。
對于其他的比如SSL/TSL等,可以參考其他文檔。
GRPC Server端,還有一個最終要的方法:addService?!救缦挛膕ervice代理模式】
在此之前,我們需要介紹一下bindService方法,每個GRPC生成的service代碼中都有此方法,它以硬編碼的方式遍歷此service的方法列表,將每個方法的調用過程都與“被代理實例”綁定,這個模式有點類似于靜態(tài)代理,比如調用sayHello方法時,其實內部直接調用“被代理實例”的sayHello方法(參見MethodHandler.invoke方法,每個方法都有一個唯一的index,通過硬編碼方式執(zhí)行);bindService方法的最終目的是創(chuàng)建一個ServerServiceDefinition對象,這個對象內部位置一個map,key為此Service的方法的全名(fullname,{package}.{service}.{method}),value就是此方法的GRPC封裝類(ServerMethodDefinition)。
源碼分析:
private static final int METHODID_SAY_HELLO = 0;
private static class MethodHandlers<Req, Resp> implements
... {
private final TestRpcService serviceImpl;//實際被代理實例
private final int methodId;
public MethodHandlers(TestRpcService serviceImpl, int methodId) {
this.serviceImpl = serviceImpl;
this.methodId = methodId;
}
@java.lang.SuppressWarnings("unchecked")
public void invoke(Req request, io.grpc.stub.StreamObserver<Resp> responseObserver) {
switch (methodId) {
case METHODID_SAY_HELLO: //通過方法的index來判定具體需要代理那個方法
serviceImpl.sayHello((com.test.grpc.service.model.TestModel.TestRequest) request,
(io.grpc.stub.StreamObserver<com.test.grpc.service.model.TestModel.TestResponse>) responseObserver);
break;
default:
throw new AssertionError();
}
}
....
}
public static io.grpc.ServerServiceDefinition bindService(
final TestRpcService serviceImpl) {
return io.grpc.ServerServiceDefinition.builder(SERVICE_NAME)
.addMethod(
METHOD_SAY_HELLO,
asyncUnaryCall(
new MethodHandlers<
com.test.grpc.service.model.TestModel.TestRequest,
com.test.grpc.service.model.TestModel.TestResponse>(
serviceImpl, METHODID_SAY_HELLO)))
.build();
}
addService方法可以添加多個Service,即一個Netty Server可以為多個service服務,這并不違背設計模式和架構模式。addService方法將會把service保存在內部的一個map中,key為serviceName(即{package}.{service}),value就是上述bindService生成的對象。
那么究竟Server端是如何解析RPC過程的?Client在調用時會將調用的service名稱 + method信息保存在一個GRPC“保留”的header中,那么Server端即可通過獲取這個特定的header信息,就可以得知此stream需要請求的service、以及其method,那么接下來只需要從上述提到的map中找到service,然后找到此method,直接代理調用即可。執(zhí)行結果在Encoder之后發(fā)送給Client。(參見:NettyServerHandler)
因為是map存儲,所以我們需要在定義.proto文件時,盡可能的指定package信息,以避免因為service過多導致名稱可能重復的問題。
4.2 Client端
我們使用ManagedChannelBuilder來創(chuàng)建客戶端channel,ManagedChannelBuilder使用了provider機制,具體是創(chuàng)建了哪種channel有provider決定,可以參看META-INF下同類名的文件中的注冊信息。當前Channel有2種:NettyChannelBuilder與OkHttpChannelBuilder。本人的當前版本中為NettyChannelBuilder;我們可以直接使用NettyChannelBuilder來構建channel。如下描述則針對NettyChannelBuilder:
配置參數(shù)與NettyServerBuilder基本類似,再次不再贅言。默認情況下,Client端默認的eventLoopGroup線程池也是static的,全局共享的,默認線程個數(shù)為coreSize * 2。合理的線程池個數(shù)可以提高客戶端的吞吐能力。
ManagedChannel是客戶端最核心的類,它表示邏輯上的一個channel;底層持有一個物理的transport(TCP通道,參見NettyClientTransport),并負責維護此transport的活性;即在RPC調用的任何時機,如果檢測到底層transport處于關閉狀態(tài)(terminated),將會嘗試重建transport。(參見TransportSet.obtainActiveTransport())
通常情況下,我們不需要在RPC調用結束后就關閉Channel,Channel可以被一直重用,直到Client不再需要請求位置或者Channel無法真的異常中斷而無法繼續(xù)使用。當然,為了提高Client端application的整體并發(fā)能力,我們可以使用連接池模式,即創(chuàng)建多個ManagedChannel,然后使用輪訓、隨機等算法,在每次RPC請求時選擇一個Channel即可。(備注,連接池特性,目前GRPC尚未提供,需要額外的開發(fā))
每個Service客戶端,都生成了2種stub:BlockingStub和FutureStub;這兩個Stub內部調用過程幾乎一樣,唯一不同的是BlockingStub的方法直接返回Response Model,而FutureStub返回一個Future對象。BlockingStub內部也是基于Future機制,只是封裝了阻塞等待的過程:
try {
//也是基于Future
ListenableFuture<RespT> responseFuture = futureUnaryCall(call, param);
//阻塞過程
while (!responseFuture.isDone()) {
try {
executor.waitAndDrain();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Status.CANCELLED.withCause(e).asRuntimeException();
}
}
return getUnchecked(responseFuture);
} catch (Throwable t) {
call.cancel();
throw t instanceof RuntimeException ? (RuntimeException) t : new RuntimeException(t);
}
創(chuàng)建一個Stub的成本是非常低的,我們可以在每次請求時都通過channel創(chuàng)建新的stub,這并不會帶來任何問題(只不過是創(chuàng)建了大量對象);其實更好的方式是,我們應該使用一個Stub發(fā)送多次請求,即Stub也是可以重用的;直到Stub上的狀態(tài)異常而無法使用。最常見的異常,就是“io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED”,即表示DEADLINE時間過期,我們可以為每個Stub配置deadline時間,那么如果此stub被使用的時長超過此值(不是空閑的時間),將不能再發(fā)送請求,此時我們應該創(chuàng)建新的Stub。很多人想盡辦法來使用“withDeadlineAfter”方法來實現(xiàn)一些奇怪的事情,此參數(shù)的主要目的就是表明:此stub只能被使用X時長,此后將不能再進行請求,應該被釋放。所以,它并不能實現(xiàn)類似于“keepAlive”的語義,即使我們需要keepAlive,也應該在Channel級別,而不是在一個Stub上。
如果你使用了連接池,那么其實連接池不應該關注DEADLINE的錯誤,只要Channel本身沒有terminated即可;就把這個問題交給調用者處理。如果你也對Stub使用了對象池,那么你就可能需要關注這個情況了,你不應該向調用者返回一個“DEADLINE”的stub,或者如果調用者發(fā)現(xiàn)了DEADLINE,你的對象池應該能夠移除它。
1)實例化ManagedChannel,此channel可以被任意多個Stub實例引用;如上文說述,我們可以通過創(chuàng)建Channel池,來提高application整體的吞吐能力。此Channel實例,不應該被shutdown,直到Client端停止服務;在任何時候,特別是創(chuàng)建Stub時,我們應該判定Channel的狀態(tài)。
synchronized (this) {
if (channel.isShutdown() || channel.isTerminated()) {
channel = ManagedChannelBuilder.forAddress(poolConfig.host, poolConfig.port).usePlaintext(true).build();
}
//new Stub
}
//或者
ManagedChannel channel = (ManagedChannel)client.getChannel();
if(channel.isShutdown() || channel.isTerminated()) {
client = createBlockStub();
}
client.sayHello(...)
因為Channel是可以多路復用,所以我們用Pool機制(比如commons-pool)也可以實現(xiàn)連接池,只是這種池并非完全符合GRPC/HTTP2的設計語義,因為GRPC允許一個Channel上連續(xù)發(fā)送對個Requests(然后一次性接收多個Responses),而不是“交互式”的Request-Response模式,當然這么使用并不會有任何問題。
2)對于批量調用的場景,我們可以使用FutureStub,對于普通的業(yè)務類型RPC,我們應該使用BlockingStub。
3)每個RPC方法的調用,比如sayHello,調用開始后,將會為每個調用請求創(chuàng)建一個ClientCall實例,其內部封裝了調用的方法、配置選項(headers)等。此后將會創(chuàng)建Stream對象,每個Stream都持有唯一的streamId,它是Transport用于分揀Response的憑證。最終調用的所有參數(shù)都會被封裝在Stream中。
4)檢測DEADLINE,是否已經(jīng)過期,如果過期,將使用FailingClientStream對象來模擬整個RPC過程,當然請求不會通過通道發(fā)出,直接經(jīng)過異常流處理過程。
5)然后獲取transport,如果此時檢測到transport已經(jīng)中斷,則重建transport。(自動重練機制,ClientCallImpl.start()方法)
6)發(fā)送請求參數(shù),即我們Request實例。一次RPC調用,數(shù)據(jù)是分多次發(fā)送,但是ClientCall在創(chuàng)建時已經(jīng)綁定到了指定的線程上,所以數(shù)據(jù)發(fā)送總是通過一個線程進行(不會亂序)。
7)將ClientCall實例置為halfClose,即半關閉,并不是將底層Channel或者Transport半關閉,只是邏輯上限定此ClientCall實例上將不能繼續(xù)發(fā)送任何stream信息,而是等待Response。
8)Netty底層IO將會對reponse數(shù)據(jù)流進行解包(Http2ConnectionDecoder),并根據(jù)streamId分揀Response,同時喚醒響應的ClientCalls阻塞。(參見ClientCalls,GrpcFuture)
9)如果是BlockingStub,則請求返回,如果響應中包含應用異常,則封裝后拋出;如果是網(wǎng)絡異常,則可能觸發(fā)Channel重建、Stream重置等。