手把手教你手寫一個(gè)RPC

文章目錄

介紹

SPI

如何使用SPI?

服務(wù)端

ZK注冊(cè)中心

客戶端

負(fù)載均衡

服務(wù)端處理數(shù)據(jù)

如何使用

結(jié)尾

在之前的文章中對(duì)實(shí)現(xiàn)Rpc以及dubbo中的SPI機(jī)制有了一些了解,為了更進(jìn)一步了解rpc調(diào)用的整個(gè)過程,在本篇中我們會(huì)自己實(shí)現(xiàn)簡單的SPI機(jī)制、實(shí)現(xiàn)zookeeper注冊(cè)中心、負(fù)載均衡等功能。源碼的鏈接我會(huì)放在文章的末尾,如果有需要的話可以clone下來,debug整個(gè)項(xiàng)目,這樣也可以對(duì)整個(gè)rpc調(diào)用過程更加清楚。OK,廢話不多說啦,接下來就是展示它的功能。

介紹

為了方便對(duì)功能進(jìn)行管理,這里進(jìn)行了多模塊劃分:



這個(gè)見名字就知道模塊是負(fù)責(zé)什么的了,比如api模塊就是專門負(fù)責(zé)存放接口的,cluster就是容錯(cuò)部分,這里就是負(fù)載均衡那塊啦。

在上一篇文章中,我們就知道了在dubbo中,SPI可以說是它的核心部分了,所以我們也先從SPI部分說起。

SPI

如何使用SPI?

這個(gè)與dubbo中的基本一樣,在META-INF下建一個(gè)extensions包,以接口名為文件名。在文件中,使用key-value的形式來設(shè)置它的實(shí)現(xiàn)類:



SPI的具體實(shí)現(xiàn)類為ExtensionLoader,他與dubbo基本上是一樣,可以說是dubbo中弱化版的實(shí)現(xiàn)。相信看了上一篇文章的話,再看這個(gè)SPI的實(shí)現(xiàn),簡直就是小菜一碟。



服務(wù)端

在dubbo中有xml形式、注解形式來提供服務(wù),這篇文章中以注解的方式來實(shí)現(xiàn)提供服務(wù)。所以我們要先定義一個(gè)服務(wù)端的注解,以便將這個(gè)注解添加到實(shí)現(xiàn)類上:

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Inherited
public @interface Service {
    String interfaceName() default "";
    String version() default "";
    String group() default "";
}

將這個(gè)注解添加到實(shí)現(xiàn)類上就可以了,我們啟動(dòng)下服務(wù)端:

public class AnnotationProvider {
    public static void main(String[] args) throws IOException {
        new AnnotationConfigApplicationContext(ProviderComponentScan.class);
        //啟動(dòng)netty
        NettyRpcServer nettyRpcServer=new NettyRpcServer();
        nettyRpcServer.start();
    }
    @Configuration
    @RpcComponentScan(basePackages = {"com.xlfc.provider.impl"})
    static public class ProviderComponentScan{
    }
}

僅僅將注解添加到實(shí)現(xiàn)類上是不行的,因?yàn)樗鼪]有進(jìn)入到spring容器,我們的spring也就無法處理它,所以接下來就是要spring掃描到這些類:

@Override
    public void registerBeanDefinitions(AnnotationMetadata annotationMetadata, BeanDefinitionRegistry beanDefinitionRegistry) {
        CustomScanner rpcServiceScanner=new CustomScanner(beanDefinitionRegistry, Service.class);
        CustomScanner springBeanScanner=new CustomScanner(beanDefinitionRegistry, Component.class);
        if (resourceLoader!=null){
            rpcServiceScanner.setResourceLoader(resourceLoader);
            springBeanScanner.setResourceLoader(resourceLoader);
        }
        String[] packageToScan=getPackageToScan(annotationMetadata);
        //其實(shí)就是掃描包下面有這些注解的類,將其加入到容器中后才可以使用。
        springBeanScanner.scan(SPRING_BEAN_BASE_PACKAGE);
        rpcServiceScanner.scan(packageToScan);

    }


    /**
     * 獲取到需要掃描的內(nèi)容
     * */
    private String[] getPackageToScan(AnnotationMetadata annotationMetadata) {
        String[] packageToScan=new String[0];
        //可見DubboComponentScanRegistrar的getPackagesToScan0方法
        AnnotationAttributes attributes = AnnotationAttributes.fromMap(
                annotationMetadata.getAnnotationAttributes(annotationClass.getName()));

        if (attributes!=null){
            packageToScan=attributes.getStringArray(BASE_PACKAGE_ATTRIBUTE_NAME);
        }
        //說明是沒有掃描的
        if (packageToScan.length==0){
            packageToScan=new String[]{((StandardAnnotationMetadata)annotationMetadata).getIntrospectedClass().getPackage().getName()};
        }
        return packageToScan;
    }

這一步就是讓spring掃描到@Service標(biāo)記的文件,關(guān)于這一部分dubbo中實(shí)現(xiàn)的更加深入,如果想了解更多,可以去看DubboComponentScanRegistrar的getPackagesToScan0。

這樣spring在掃描到文件中后,就可以在容器加載完畢時(shí)對(duì)其進(jìn)行一些處理。通過實(shí)現(xiàn)ApplicationListener來重寫onApplicationEvent方法:

@SneakyThrows
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        if (Objects.isNull(event.getApplicationContext().getParent())){
            ApplicationContext context = event.getApplicationContext();
            //處理提供端/生產(chǎn)者
            handlerProvider(context);
            //處理消費(fèi)端
            handlerConsumer(context);
        }
    }
    @SneakyThrows
    private void handlerProvider(ApplicationContext context) throws UnknownHostException {
        Map<String, Object> beans = context.getBeansWithAnnotation(Service.class);
        String host = InetAddress.getLocalHost().getHostAddress();
        if (!beans.isEmpty()){
            for (Object bean:beans.values()){
                Service service = bean.getClass().getAnnotation(Service.class);
                RpcService rpcServiceConfig=new RpcService(host,PORT,bean,service.version(),service.group());
                serviceProvider.register(rpcServiceConfig);
            }
        }
    }

因?yàn)槭欠?wù)端嘛,所以這個(gè)信息是必須要注冊(cè)到注冊(cè)中心的。dubbo是以zookeeper為默認(rèn)的注冊(cè)中心的,這里我們就也已zookeeper為注冊(cè)中心。






ZK注冊(cè)中心

使用zk為注冊(cè)中心,大部分功能可以直接使用org.apache.curator里面的方法:

@Override
    public void register(RpcService rpcService) {
        this.addService(rpcService);
        InetSocketAddress address = new InetSocketAddress(rpcService.getHost(), rpcService.getPort());
        String servicePath= RpcConstants.ZK_REGISTER_ROOT_PATH+"/"+rpcService.getServiceName()+rpcService.getGroup()+rpcService.getVersion()+address;
        ZookeeperUtils.createPersistentNode(zkClient,servicePath);
    }


public static void createPersistentNode(CuratorFramework zkClient, String path) {
        try {
            if (!(REGISTERED_PATH_SET.contains(path) || zkClient.checkExists().forPath(path) != null)) {
                zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path);
            }
            REGISTERED_PATH_SET.add(path);
        } catch (Exception e) {
            log.error("create persistent node for path [{}] fail", path);
        }
    }
我們可以通過ZooInspector這個(gè)軟件來看zookeeper注冊(cè)的信息:



zookeeper有了服務(wù)信息,我們才可以為客戶端提供服務(wù)。

客戶端

服務(wù)端有自定義的注解,那么客戶端也要定義一個(gè)注解,這樣才能夠知道哪些地方需要服務(wù):

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD})
@Inherited
public @interface Reference {
    String interfaceName() default "";
    String version() default "";
    String group() default "";
}
public class AnnotationConsumer {
    public static void main(String[] args) throws IOException {
        AnnotationConfigApplicationContext annotationConfigApplicationContext = new AnnotationConfigApplicationContext(ProviderComponentScan.class);
        annotationConfigApplicationContext.start();
        final HelloController helloController = (HelloController) annotationConfigApplicationContext.getBean("helloController");
        helloController.test();
        System.in.read();
    }
    @Configuration
    @RpcComponentScan(basePackages = {"com.xlfc.consumer"})
    static public class ProviderComponentScan{
    }
}

在上面的將服務(wù)端@Service注解的類添加進(jìn)spring掃描的地方,我們將客戶端@Reference也添加進(jìn)去了。所以只需要在spring加載完畢后對(duì)@Reference相關(guān)類進(jìn)行一些處理。

@SneakyThrows
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        if (Objects.isNull(event.getApplicationContext().getParent())){
            ApplicationContext context = event.getApplicationContext();
            //處理提供端/生產(chǎn)者
            handlerProvider(context);
            //處理消費(fèi)端
            handlerConsumer(context);
        }
    }

private void handlerConsumer(ApplicationContext context) {
        Map<String, Object> beans = context.getBeansWithAnnotation(Component.class);
        if (!beans.isEmpty()){
            for (Object bean:beans.values()){
                Class<?> targetClass = bean.getClass();
                Field[] declaredFields =targetClass.getDeclaredFields();
                for (Field declaredField:declaredFields){
                    Reference rpcReference = declaredField.getAnnotation(Reference.class);
                    if (rpcReference!=null){
                        RpcService rpcServiceConfig=new RpcService(rpcReference.version(),rpcReference.group());
                        RpcClientProxy rpcClientProxy=new RpcClientProxy(rpcClient,rpcServiceConfig);
                        Object clientProxy=rpcClientProxy.getProxy(declaredField.getType());
                        declaredField.setAccessible(true);
                        try {
                            declaredField.set(bean,clientProxy);
                        } catch (IllegalAccessException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
    }
我們獲得的是一個(gè)接口,如果才能調(diào)用對(duì)象的方法呢?沒錯(cuò),就是生成一個(gè)代理類,這樣就可以通過調(diào)用代理類來執(zhí)行它的方法:

public <T> T getProxy(Class<?> clazz) {
        return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz},new ConsumerInvocationHandler());
    }

執(zhí)行代理類時(shí)的操作,通過使用CompletableFuture,保證最后一定可以獲得該方法的結(jié)果。當(dāng)然也可以使用countDownLatch來保證最后一定獲得執(zhí)行結(jié)果,兩種方式都可以,無非是實(shí)現(xiàn)過程不一樣:

private class ConsumerInvocationHandler implements InvocationHandler{
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            RpcRequest rpcRequest=new RpcRequest(method,args,rpcServiceConfig);
            RpcResponse<Object> rpcResponse=null;
            CompletableFuture<RpcResponse<Object>> completableFuture= (CompletableFuture<RpcResponse<Object>>) rpcRequestTransport.sendRpcRequest(rpcRequest);
            rpcResponse=completableFuture.get();
            return rpcResponse.getData();
        }
    }
接下來就是要獲取注冊(cè)中心中的信息,然后根據(jù)負(fù)載均衡選出一個(gè)合適的ip,開啟netty進(jìn)行訪問:

@Override
    public Object sendRpcRequest(RpcRequest rpcRequest) throws Exception {
        CompletableFuture<RpcResponse<Object>> resultFuture = new CompletableFuture<>();
        InetSocketAddress inetSocketAddress = serviceConsumer.getIpAndPort(rpcRequest);//獲取到ip與地址。
        Channel channel=getChannel(inetSocketAddress);

        if (channel.isActive()){
            NettyClientHandler.COMPLETABLE_CLIENT.put(rpcRequest.getRequestId(),resultFuture);
            RpcMessage rpcMessage=this.createRpcMessage(rpcRequest);
            channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future->{
                if (!future.isSuccess()){
                    future.channel().close();
                    resultFuture.completeExceptionally(future.cause());
                }
            });
        }else{
            throw new IllegalStateException();
        }
        return resultFuture;
    }
負(fù)載均衡

在dubbo中負(fù)載均衡算法有很多,比如像隨機(jī)、權(quán)重輪詢、一致性hash等等,這里我們只是實(shí)現(xiàn)一個(gè)簡單的隨機(jī)。






將負(fù)載均衡作為一個(gè)SPI接口:

@SPI
public interface LoadBalance {
    String selectServiceAddress(List<String> serviceAddresses, RpcRequest rpcRequest);
}
public abstract class AbstractLoadBalance implements LoadBalance {
    @Override
    public String selectServiceAddress(List<String> serviceAddresses, RpcRequest rpcRequest) {
        if (serviceAddresses==null || serviceAddresses.isEmpty()){
            return null;
        }
        if (serviceAddresses.size()==1){
            return serviceAddresses.get(0);
        }
        return doSelect(serviceAddresses,rpcRequest);
    }
    protected abstract String doSelect(List<String> serviceAddresses, RpcRequest rpcRequest) ;
}

public class RandomLoadBalance extends AbstractLoadBalance {
    @Override
    protected String doSelect(List<String> serviceAddresses, RpcRequest rpcRequest) {
        Random random=new Random();
        return serviceAddresses.get(random.nextInt(serviceAddresses.size()));
    }
}

因?yàn)橛蠸PI機(jī)制,如果想擴(kuò)展負(fù)載均衡算法也是極為方便的。

有了接口名就可以以它為查詢條件,在zk中找到有哪些ip提供這個(gè)服務(wù),根據(jù)這些提供服務(wù)的地址呢,可以坐下負(fù)載均衡,得到最后要執(zhí)行的ip。

 public InetSocketAddress getIpAndPort(RpcRequest rpcRequest) {
        String rpcServiceName = rpcRequest.getRpcServiceName();
        CuratorFramework zkClient = ZookeeperUtils.getZkClient();
        List<String> serviceUrlList = ZookeeperUtils.getChildrenNodes(zkClient,rpcServiceName);
        if (serviceUrlList==null || serviceUrlList.size()==0){
            throw new RpcException("未找到服務(wù),該服務(wù)為:"+rpcServiceName);
        }
        //做下負(fù)載均衡
        LoadBalance random = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension("random");
//        List<String> list=new ArrayList<>();
//        for (int i = 0; i < 15; i++) {
//            list.add();
//        }
//        String targetServiceUrl = list.get(0);
        String targetServiceUrl = random.selectServiceAddress(serviceUrlList, rpcRequest);
        String[] socketAddressArray = targetServiceUrl.split(":");
        String host = socketAddressArray[0];
        int port = Integer.parseInt(socketAddressArray[1]);
        return new InetSocketAddress(host,port);
    }

在netty傳遞數(shù)據(jù)之前,需要對(duì)數(shù)據(jù)進(jìn)行編碼、序列化等操作。序列化方式有很多,比如像java、hession2、kryo等,在實(shí)際效率中還是kryo的效率更高,因此我們就采用這個(gè)kryo來進(jìn)行序列化。

同樣為了方便擴(kuò)展,也是將它設(shè)置為SPI接口:

 @Override
    public byte[] serialize(Object obj) {
        try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
             Output output = new Output(byteArrayOutputStream)) {
            Kryo kryo = kryoThreadLocal.get();
            kryo.writeObject(output, obj);
            kryoThreadLocal.remove();
            return output.toBytes();
        } catch (Exception e) {
            throw new RpcException("序列化失敗");
        }
    }


    @Override
    public <T> T deserialize(byte[] bytes, Class<T> clazz) {
        try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
             Input input = new Input(byteArrayInputStream)) {
            Kryo kryo = kryoThreadLocal.get();
            Object o = kryo.readObject(input, clazz);
            kryoThreadLocal.remove();
            return clazz.cast(o);
        } catch (Exception e) {
            throw new RpcException("反序列化失敗");
        }
    }
服務(wù)端處理數(shù)據(jù)

因?yàn)槭遣捎肗etty來作為通信框架,所以可以繼承ChannelInboundHandlerAdapter來重寫channelRead方法,這樣就可以獲取到傳遞的數(shù)據(jù)了:

if (messageType == RpcConstants.HEARTBEAT_REQUEST_TYPE) {
                    rpcMessage.setMessageType(RpcConstants.HEARTBEAT_RESPONSE_TYPE);
                    rpcMessage.setData(RpcConstants.PONG);
                } else {
                    RpcRequest rpcRequest = (RpcRequest) ((RpcMessage) msg).getData();
                    log.info("服務(wù)端接收一條新消息:請(qǐng)求id為"+rpcRequest.getRequestId()+",接口為"+rpcRequest.getInterfaceName()+",方法為"+rpcRequest.getMethodName());
                    Object result = this.handlerRequest(rpcRequest);
                    rpcMessage.setMessageType(RpcConstants.RESPONSE_TYPE);
                    if (ctx.channel().isActive() && ctx.channel().isWritable()) {
                        rpcResponse= RpcResponse.success(result, rpcRequest.getRequestId());
                    } else {
                        rpcResponse = RpcResponse.fail(RpcResponseCodeEnum.FAIL);
                    }
                    rpcMessage.setData(rpcResponse);
                }
                ctx.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);

通過反射來執(zhí)行對(duì)應(yīng)的方法:

public Object handlerRequest(RpcRequest invocation) {
        Object service = serviceProvider.getService(invocation.getRpcServiceName());
        Object result;
        try {
            Method method = service.getClass().getMethod(invocation.getMethodName(), invocation.getParamTypes());
            result = method.invoke(service, invocation.getParameters());
        } catch (NoSuchMethodException | IllegalArgumentException | InvocationTargetException | IllegalAccessException e) {
            throw new RpcException(e.getMessage(), e);
        }
        return result;
    }

至此就完成了一整條的RPC調(diào)用過程。

如何使用

那么下載完項(xiàng)目后如何啟動(dòng)?

在啟動(dòng)之前需要先做一件事,那就是啟動(dòng)zookeeper。在代碼中我設(shè)置zk的ip為本地,如果是使用云服務(wù)器的話,那么是需要修改下ip地址的。

首先要啟動(dòng)服務(wù)端的類,切記要先啟動(dòng)服務(wù)端的,因?yàn)槟悴惶峁┳?cè)中心數(shù)據(jù),人家客戶端怎么拉數(shù)據(jù)嘛;

public class AnnotationProvider {
    public static void main(String[] args) throws IOException {
        new AnnotationConfigApplicationContext(ProviderComponentScan.class);
        //啟動(dòng)netty
        NettyRpcServer nettyRpcServer=new NettyRpcServer();
        nettyRpcServer.start();
    }
    @Configuration
    @RpcComponentScan(basePackages = {"com.xlfc.provider.impl"})
    static public class ProviderComponentScan{
    }
}

等待服務(wù)端啟動(dòng)完畢后,然后再啟動(dòng)客戶端的類:

public class AnnotationConsumer {
    public static void main(String[] args) throws IOException {
        AnnotationConfigApplicationContext annotationConfigApplicationContext = new AnnotationConfigApplicationContext(ProviderComponentScan.class);
        annotationConfigApplicationContext.start();
        final HelloController helloController = (HelloController) annotationConfigApplicationContext.getBean("helloController");
        helloController.test();
        System.in.read();
    }

    @Configuration
    @RpcComponentScan(basePackages = {"com.xlfc.consumer"})
    static public class ProviderComponentScan{
    }
}

結(jié)尾

雖然RPC調(diào)用過程不復(fù)雜,但是實(shí)際上要寫出來的話,還是有些坑的,畢竟紙上得來終覺淺,絕知此事要躬行。為此也是參考了dubbo源碼以及github上其他人寫的的rpc例子。以上就是本文對(duì)于這個(gè)RPC的介紹了,下面附上它的gitee鏈接。

https://gitee.com/stylesmile/java_write_frame/tree/master/myRpc

netty實(shí)現(xiàn)tomcat(簡易版)

消息隊(duì)列技術(shù)點(diǎn)梳理(思維導(dǎo)圖版)

關(guān)于一個(gè)簡單接口的高并發(fā)測試與優(yōu)化記錄

Java實(shí)現(xiàn)線程間的通信的五種方式

手寫簡單版SpringMVC

使用java手寫一個(gè)簡單的web服務(wù)器

idea 插件激活(該方法為備用,建議優(yōu)先使用我們提供的正版激活)

作者:java知路


歡迎關(guān)注微信公眾號(hào) :java知路