手把手教你手寫一個(gè)RPC
文章目錄
介紹
SPI
如何使用SPI?
服務(wù)端
ZK注冊(cè)中心
客戶端
負(fù)載均衡
服務(wù)端處理數(shù)據(jù)
如何使用
結(jié)尾
在之前的文章中對(duì)實(shí)現(xiàn)Rpc以及dubbo中的SPI機(jī)制有了一些了解,為了更進(jìn)一步了解rpc調(diào)用的整個(gè)過程,在本篇中我們會(huì)自己實(shí)現(xiàn)簡單的SPI機(jī)制、實(shí)現(xiàn)zookeeper注冊(cè)中心、負(fù)載均衡等功能。源碼的鏈接我會(huì)放在文章的末尾,如果有需要的話可以clone下來,debug整個(gè)項(xiàng)目,這樣也可以對(duì)整個(gè)rpc調(diào)用過程更加清楚。OK,廢話不多說啦,接下來就是展示它的功能。
介紹
為了方便對(duì)功能進(jìn)行管理,這里進(jìn)行了多模塊劃分:
這個(gè)見名字就知道模塊是負(fù)責(zé)什么的了,比如api模塊就是專門負(fù)責(zé)存放接口的,cluster就是容錯(cuò)部分,這里就是負(fù)載均衡那塊啦。
在上一篇文章中,我們就知道了在dubbo中,SPI可以說是它的核心部分了,所以我們也先從SPI部分說起。
SPI
如何使用SPI?
這個(gè)與dubbo中的基本一樣,在META-INF下建一個(gè)extensions包,以接口名為文件名。在文件中,使用key-value的形式來設(shè)置它的實(shí)現(xiàn)類:
SPI的具體實(shí)現(xiàn)類為ExtensionLoader,他與dubbo基本上是一樣,可以說是dubbo中弱化版的實(shí)現(xiàn)。相信看了上一篇文章的話,再看這個(gè)SPI的實(shí)現(xiàn),簡直就是小菜一碟。
服務(wù)端
在dubbo中有xml形式、注解形式來提供服務(wù),這篇文章中以注解的方式來實(shí)現(xiàn)提供服務(wù)。所以我們要先定義一個(gè)服務(wù)端的注解,以便將這個(gè)注解添加到實(shí)現(xiàn)類上:
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Inherited
public @interface Service {
String interfaceName() default "";
String version() default "";
String group() default "";
}
將這個(gè)注解添加到實(shí)現(xiàn)類上就可以了,我們啟動(dòng)下服務(wù)端:
public class AnnotationProvider {
public static void main(String[] args) throws IOException {
new AnnotationConfigApplicationContext(ProviderComponentScan.class);
//啟動(dòng)netty
NettyRpcServer nettyRpcServer=new NettyRpcServer();
nettyRpcServer.start();
}
@Configuration
@RpcComponentScan(basePackages = {"com.xlfc.provider.impl"})
static public class ProviderComponentScan{
}
}
僅僅將注解添加到實(shí)現(xiàn)類上是不行的,因?yàn)樗鼪]有進(jìn)入到spring容器,我們的spring也就無法處理它,所以接下來就是要spring掃描到這些類:
@Override
public void registerBeanDefinitions(AnnotationMetadata annotationMetadata, BeanDefinitionRegistry beanDefinitionRegistry) {
CustomScanner rpcServiceScanner=new CustomScanner(beanDefinitionRegistry, Service.class);
CustomScanner springBeanScanner=new CustomScanner(beanDefinitionRegistry, Component.class);
if (resourceLoader!=null){
rpcServiceScanner.setResourceLoader(resourceLoader);
springBeanScanner.setResourceLoader(resourceLoader);
}
String[] packageToScan=getPackageToScan(annotationMetadata);
//其實(shí)就是掃描包下面有這些注解的類,將其加入到容器中后才可以使用。
springBeanScanner.scan(SPRING_BEAN_BASE_PACKAGE);
rpcServiceScanner.scan(packageToScan);
}
/**
* 獲取到需要掃描的內(nèi)容
* */
private String[] getPackageToScan(AnnotationMetadata annotationMetadata) {
String[] packageToScan=new String[0];
//可見DubboComponentScanRegistrar的getPackagesToScan0方法
AnnotationAttributes attributes = AnnotationAttributes.fromMap(
annotationMetadata.getAnnotationAttributes(annotationClass.getName()));
if (attributes!=null){
packageToScan=attributes.getStringArray(BASE_PACKAGE_ATTRIBUTE_NAME);
}
//說明是沒有掃描的
if (packageToScan.length==0){
packageToScan=new String[]{((StandardAnnotationMetadata)annotationMetadata).getIntrospectedClass().getPackage().getName()};
}
return packageToScan;
}
這一步就是讓spring掃描到@Service標(biāo)記的文件,關(guān)于這一部分dubbo中實(shí)現(xiàn)的更加深入,如果想了解更多,可以去看DubboComponentScanRegistrar的getPackagesToScan0。
這樣spring在掃描到文件中后,就可以在容器加載完畢時(shí)對(duì)其進(jìn)行一些處理。通過實(shí)現(xiàn)ApplicationListener來重寫onApplicationEvent方法:
@SneakyThrows
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (Objects.isNull(event.getApplicationContext().getParent())){
ApplicationContext context = event.getApplicationContext();
//處理提供端/生產(chǎn)者
handlerProvider(context);
//處理消費(fèi)端
handlerConsumer(context);
}
}
@SneakyThrows
private void handlerProvider(ApplicationContext context) throws UnknownHostException {
Map<String, Object> beans = context.getBeansWithAnnotation(Service.class);
String host = InetAddress.getLocalHost().getHostAddress();
if (!beans.isEmpty()){
for (Object bean:beans.values()){
Service service = bean.getClass().getAnnotation(Service.class);
RpcService rpcServiceConfig=new RpcService(host,PORT,bean,service.version(),service.group());
serviceProvider.register(rpcServiceConfig);
}
}
}
因?yàn)槭欠?wù)端嘛,所以這個(gè)信息是必須要注冊(cè)到注冊(cè)中心的。dubbo是以zookeeper為默認(rèn)的注冊(cè)中心的,這里我們就也已zookeeper為注冊(cè)中心。
ZK注冊(cè)中心
使用zk為注冊(cè)中心,大部分功能可以直接使用org.apache.curator里面的方法:
@Override
public void register(RpcService rpcService) {
this.addService(rpcService);
InetSocketAddress address = new InetSocketAddress(rpcService.getHost(), rpcService.getPort());
String servicePath= RpcConstants.ZK_REGISTER_ROOT_PATH+"/"+rpcService.getServiceName()+rpcService.getGroup()+rpcService.getVersion()+address;
ZookeeperUtils.createPersistentNode(zkClient,servicePath);
}
public static void createPersistentNode(CuratorFramework zkClient, String path) {
try {
if (!(REGISTERED_PATH_SET.contains(path) || zkClient.checkExists().forPath(path) != null)) {
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path);
}
REGISTERED_PATH_SET.add(path);
} catch (Exception e) {
log.error("create persistent node for path [{}] fail", path);
}
}
我們可以通過ZooInspector這個(gè)軟件來看zookeeper注冊(cè)的信息:
zookeeper有了服務(wù)信息,我們才可以為客戶端提供服務(wù)。
客戶端
服務(wù)端有自定義的注解,那么客戶端也要定義一個(gè)注解,這樣才能夠知道哪些地方需要服務(wù):
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD})
@Inherited
public @interface Reference {
String interfaceName() default "";
String version() default "";
String group() default "";
}
public class AnnotationConsumer {
public static void main(String[] args) throws IOException {
AnnotationConfigApplicationContext annotationConfigApplicationContext = new AnnotationConfigApplicationContext(ProviderComponentScan.class);
annotationConfigApplicationContext.start();
final HelloController helloController = (HelloController) annotationConfigApplicationContext.getBean("helloController");
helloController.test();
System.in.read();
}
@Configuration
@RpcComponentScan(basePackages = {"com.xlfc.consumer"})
static public class ProviderComponentScan{
}
}
在上面的將服務(wù)端@Service注解的類添加進(jìn)spring掃描的地方,我們將客戶端@Reference也添加進(jìn)去了。所以只需要在spring加載完畢后對(duì)@Reference相關(guān)類進(jìn)行一些處理。
@SneakyThrows
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (Objects.isNull(event.getApplicationContext().getParent())){
ApplicationContext context = event.getApplicationContext();
//處理提供端/生產(chǎn)者
handlerProvider(context);
//處理消費(fèi)端
handlerConsumer(context);
}
}
private void handlerConsumer(ApplicationContext context) {
Map<String, Object> beans = context.getBeansWithAnnotation(Component.class);
if (!beans.isEmpty()){
for (Object bean:beans.values()){
Class<?> targetClass = bean.getClass();
Field[] declaredFields =targetClass.getDeclaredFields();
for (Field declaredField:declaredFields){
Reference rpcReference = declaredField.getAnnotation(Reference.class);
if (rpcReference!=null){
RpcService rpcServiceConfig=new RpcService(rpcReference.version(),rpcReference.group());
RpcClientProxy rpcClientProxy=new RpcClientProxy(rpcClient,rpcServiceConfig);
Object clientProxy=rpcClientProxy.getProxy(declaredField.getType());
declaredField.setAccessible(true);
try {
declaredField.set(bean,clientProxy);
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
}
}
}
}
我們獲得的是一個(gè)接口,如果才能調(diào)用對(duì)象的方法呢?沒錯(cuò),就是生成一個(gè)代理類,這樣就可以通過調(diào)用代理類來執(zhí)行它的方法:
public <T> T getProxy(Class<?> clazz) {
return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz},new ConsumerInvocationHandler());
}
執(zhí)行代理類時(shí)的操作,通過使用CompletableFuture,保證最后一定可以獲得該方法的結(jié)果。當(dāng)然也可以使用countDownLatch來保證最后一定獲得執(zhí)行結(jié)果,兩種方式都可以,無非是實(shí)現(xiàn)過程不一樣:
private class ConsumerInvocationHandler implements InvocationHandler{
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
RpcRequest rpcRequest=new RpcRequest(method,args,rpcServiceConfig);
RpcResponse<Object> rpcResponse=null;
CompletableFuture<RpcResponse<Object>> completableFuture= (CompletableFuture<RpcResponse<Object>>) rpcRequestTransport.sendRpcRequest(rpcRequest);
rpcResponse=completableFuture.get();
return rpcResponse.getData();
}
}
接下來就是要獲取注冊(cè)中心中的信息,然后根據(jù)負(fù)載均衡選出一個(gè)合適的ip,開啟netty進(jìn)行訪問:
@Override
public Object sendRpcRequest(RpcRequest rpcRequest) throws Exception {
CompletableFuture<RpcResponse<Object>> resultFuture = new CompletableFuture<>();
InetSocketAddress inetSocketAddress = serviceConsumer.getIpAndPort(rpcRequest);//獲取到ip與地址。
Channel channel=getChannel(inetSocketAddress);
if (channel.isActive()){
NettyClientHandler.COMPLETABLE_CLIENT.put(rpcRequest.getRequestId(),resultFuture);
RpcMessage rpcMessage=this.createRpcMessage(rpcRequest);
channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future->{
if (!future.isSuccess()){
future.channel().close();
resultFuture.completeExceptionally(future.cause());
}
});
}else{
throw new IllegalStateException();
}
return resultFuture;
}
負(fù)載均衡
在dubbo中負(fù)載均衡算法有很多,比如像隨機(jī)、權(quán)重輪詢、一致性hash等等,這里我們只是實(shí)現(xiàn)一個(gè)簡單的隨機(jī)。
將負(fù)載均衡作為一個(gè)SPI接口:
@SPI
public interface LoadBalance {
String selectServiceAddress(List<String> serviceAddresses, RpcRequest rpcRequest);
}
public abstract class AbstractLoadBalance implements LoadBalance {
@Override
public String selectServiceAddress(List<String> serviceAddresses, RpcRequest rpcRequest) {
if (serviceAddresses==null || serviceAddresses.isEmpty()){
return null;
}
if (serviceAddresses.size()==1){
return serviceAddresses.get(0);
}
return doSelect(serviceAddresses,rpcRequest);
}
protected abstract String doSelect(List<String> serviceAddresses, RpcRequest rpcRequest) ;
}
public class RandomLoadBalance extends AbstractLoadBalance {
@Override
protected String doSelect(List<String> serviceAddresses, RpcRequest rpcRequest) {
Random random=new Random();
return serviceAddresses.get(random.nextInt(serviceAddresses.size()));
}
}
因?yàn)橛蠸PI機(jī)制,如果想擴(kuò)展負(fù)載均衡算法也是極為方便的。
有了接口名就可以以它為查詢條件,在zk中找到有哪些ip提供這個(gè)服務(wù),根據(jù)這些提供服務(wù)的地址呢,可以坐下負(fù)載均衡,得到最后要執(zhí)行的ip。
public InetSocketAddress getIpAndPort(RpcRequest rpcRequest) {
String rpcServiceName = rpcRequest.getRpcServiceName();
CuratorFramework zkClient = ZookeeperUtils.getZkClient();
List<String> serviceUrlList = ZookeeperUtils.getChildrenNodes(zkClient,rpcServiceName);
if (serviceUrlList==null || serviceUrlList.size()==0){
throw new RpcException("未找到服務(wù),該服務(wù)為:"+rpcServiceName);
}
//做下負(fù)載均衡
LoadBalance random = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension("random");
// List<String> list=new ArrayList<>();
// for (int i = 0; i < 15; i++) {
// list.add();
// }
// String targetServiceUrl = list.get(0);
String targetServiceUrl = random.selectServiceAddress(serviceUrlList, rpcRequest);
String[] socketAddressArray = targetServiceUrl.split(":");
String host = socketAddressArray[0];
int port = Integer.parseInt(socketAddressArray[1]);
return new InetSocketAddress(host,port);
}
在netty傳遞數(shù)據(jù)之前,需要對(duì)數(shù)據(jù)進(jìn)行編碼、序列化等操作。序列化方式有很多,比如像java、hession2、kryo等,在實(shí)際效率中還是kryo的效率更高,因此我們就采用這個(gè)kryo來進(jìn)行序列化。
同樣為了方便擴(kuò)展,也是將它設(shè)置為SPI接口:
@Override
public byte[] serialize(Object obj) {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
Output output = new Output(byteArrayOutputStream)) {
Kryo kryo = kryoThreadLocal.get();
kryo.writeObject(output, obj);
kryoThreadLocal.remove();
return output.toBytes();
} catch (Exception e) {
throw new RpcException("序列化失敗");
}
}
@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
Input input = new Input(byteArrayInputStream)) {
Kryo kryo = kryoThreadLocal.get();
Object o = kryo.readObject(input, clazz);
kryoThreadLocal.remove();
return clazz.cast(o);
} catch (Exception e) {
throw new RpcException("反序列化失敗");
}
}
服務(wù)端處理數(shù)據(jù)
因?yàn)槭遣捎肗etty來作為通信框架,所以可以繼承ChannelInboundHandlerAdapter來重寫channelRead方法,這樣就可以獲取到傳遞的數(shù)據(jù)了:
if (messageType == RpcConstants.HEARTBEAT_REQUEST_TYPE) {
rpcMessage.setMessageType(RpcConstants.HEARTBEAT_RESPONSE_TYPE);
rpcMessage.setData(RpcConstants.PONG);
} else {
RpcRequest rpcRequest = (RpcRequest) ((RpcMessage) msg).getData();
log.info("服務(wù)端接收一條新消息:請(qǐng)求id為"+rpcRequest.getRequestId()+",接口為"+rpcRequest.getInterfaceName()+",方法為"+rpcRequest.getMethodName());
Object result = this.handlerRequest(rpcRequest);
rpcMessage.setMessageType(RpcConstants.RESPONSE_TYPE);
if (ctx.channel().isActive() && ctx.channel().isWritable()) {
rpcResponse= RpcResponse.success(result, rpcRequest.getRequestId());
} else {
rpcResponse = RpcResponse.fail(RpcResponseCodeEnum.FAIL);
}
rpcMessage.setData(rpcResponse);
}
ctx.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
通過反射來執(zhí)行對(duì)應(yīng)的方法:
public Object handlerRequest(RpcRequest invocation) {
Object service = serviceProvider.getService(invocation.getRpcServiceName());
Object result;
try {
Method method = service.getClass().getMethod(invocation.getMethodName(), invocation.getParamTypes());
result = method.invoke(service, invocation.getParameters());
} catch (NoSuchMethodException | IllegalArgumentException | InvocationTargetException | IllegalAccessException e) {
throw new RpcException(e.getMessage(), e);
}
return result;
}
至此就完成了一整條的RPC調(diào)用過程。
如何使用
那么下載完項(xiàng)目后如何啟動(dòng)?
在啟動(dòng)之前需要先做一件事,那就是啟動(dòng)zookeeper。在代碼中我設(shè)置zk的ip為本地,如果是使用云服務(wù)器的話,那么是需要修改下ip地址的。
首先要啟動(dòng)服務(wù)端的類,切記要先啟動(dòng)服務(wù)端的,因?yàn)槟悴惶峁┳?cè)中心數(shù)據(jù),人家客戶端怎么拉數(shù)據(jù)嘛;
public class AnnotationProvider {
public static void main(String[] args) throws IOException {
new AnnotationConfigApplicationContext(ProviderComponentScan.class);
//啟動(dòng)netty
NettyRpcServer nettyRpcServer=new NettyRpcServer();
nettyRpcServer.start();
}
@Configuration
@RpcComponentScan(basePackages = {"com.xlfc.provider.impl"})
static public class ProviderComponentScan{
}
}
等待服務(wù)端啟動(dòng)完畢后,然后再啟動(dòng)客戶端的類:
public class AnnotationConsumer {
public static void main(String[] args) throws IOException {
AnnotationConfigApplicationContext annotationConfigApplicationContext = new AnnotationConfigApplicationContext(ProviderComponentScan.class);
annotationConfigApplicationContext.start();
final HelloController helloController = (HelloController) annotationConfigApplicationContext.getBean("helloController");
helloController.test();
System.in.read();
}
@Configuration
@RpcComponentScan(basePackages = {"com.xlfc.consumer"})
static public class ProviderComponentScan{
}
}
結(jié)尾
雖然RPC調(diào)用過程不復(fù)雜,但是實(shí)際上要寫出來的話,還是有些坑的,畢竟紙上得來終覺淺,絕知此事要躬行。為此也是參考了dubbo源碼以及github上其他人寫的的rpc例子。以上就是本文對(duì)于這個(gè)RPC的介紹了,下面附上它的gitee鏈接。
https://gitee.com/stylesmile/java_write_frame/tree/master/myRpc
netty實(shí)現(xiàn)tomcat(簡易版)
消息隊(duì)列技術(shù)點(diǎn)梳理(思維導(dǎo)圖版)
關(guān)于一個(gè)簡單接口的高并發(fā)測試與優(yōu)化記錄
Java實(shí)現(xiàn)線程間的通信的五種方式
手寫簡單版SpringMVC
使用java手寫一個(gè)簡單的web服務(wù)器
idea 插件激活(該方法為備用,建議優(yōu)先使用我們提供的正版激活)
作者:java知路
歡迎關(guān)注微信公眾號(hào) :java知路