Nacos源碼系列—服務(wù)端那些事兒

前言

在上節(jié)課中,我們講解了客戶端注冊服務(wù)的大體流程,客戶端在注冊服務(wù)的時(shí)候調(diào)用的是 NamingService.registerInstance來完成實(shí)例的注冊,在最后呢我們知道服務(wù)注冊是通過 nacos/v1/ns/instance接口來完成注冊的,我們今天來講解服務(wù)端的注冊,首先就從這個(gè)接口地址開始,來看具體服務(wù)端都做了哪些事情

服務(wù)注冊

上面是我們從官網(wǎng)中找到的Nacos架構(gòu)圖,從這個(gè)圖中我們大體可以得出我們要找的接口應(yīng)該是在NamingService這個(gè)服務(wù)中,同時(shí)我們在項(xiàng)目結(jié)構(gòu)中也可以看到naming這個(gè)模塊,naming就是實(shí)現(xiàn)服務(wù)注冊的,我們都知道請求路徑都是通過controller來進(jìn)行處理的,而在其中我們可以看到一個(gè)InstanceController的這么一個(gè)類,那么注冊實(shí)例肯定會(huì)和它有關(guān)??梢钥吹?code>InstanceController類的請求路由即是我們POST請求的路由的部分,如下:

所以,我們就從開始研究接收請求處理服務(wù)注冊的源碼,我們找到通過RestFul API接口,請求類型為Post,的方法,符合條件的只有InstanceController.register方法,這個(gè)方法用來接收用戶的請求,并且把收到的信息進(jìn)行解析,裝換成實(shí)例信息,然后通過getInstanceOperator().registerInstance進(jìn)行調(diào)用,這個(gè)方法也是服務(wù)注冊的核心

    @CanDistro
    @PostMapping
    @Secured(action = ActionTypes.WRITE)
    public String register(HttpServletRequest request) throws Exception {
        //從 request信息中獲取namespaceId,如果沒有默認(rèn)為public
        final String namespaceId = WebUtils
                .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        //獲取服務(wù)名稱 格式:“group@@serviceName”
        final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        NamingUtils.checkServiceNameFormat(serviceName);
        //將request參數(shù)還原成instance實(shí)例
        final Instance instance = HttpRequestInstanceBuilder.newBuilder()
                .setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build();
        //【核心】注冊服務(wù)實(shí)例
        getInstanceOperator().registerInstance(namespaceId, serviceName, instance);
        return "ok";
    }

我們先來看一下下面這個(gè)核心方法

getInstanceOperator().registerInstance(namespaceId, serviceName, instance);

getInstanceOperator()這個(gè)判斷是否走Grpc協(xié)議,默認(rèn)走Grpc,所以我們使用的是instanceServiceV2這個(gè)實(shí)例對象

  private InstanceOperator getInstanceOperator() {
        return upgradeJudgement.isUseGrpcFeatures() ? instanceServiceV2 : instanceServiceV1;
    }

instanceServiceV2就是InstanceOperatorClientImpl,方法所以我們需要進(jìn)入的是下面這個(gè)實(shí)例的處理類

具體方法如下所示:

    @Override
    public void registerInstance(String namespaceId, String serviceName, Instance instance) {
        //判斷是否為臨時(shí)客戶端
        boolean ephemeral = instance.isEphemeral();
        //獲取客戶端ID
        String clientId = IpPortBasedClient.getClientId(instance.toInetAddr(), ephemeral);
        //通過客戶端ID創(chuàng)建客戶端連接
        createIpPortClientIfAbsent(clientId);
        //獲取服務(wù)信息
        Service service = getService(namespaceId, serviceName, ephemeral);
        //注冊服務(wù)
        clientOperationService.registerInstance(service, instance, clientId);
    }

從Nacos2.0以后,新增了Client模型,管理與該客戶機(jī)有關(guān)的數(shù)據(jù)內(nèi)容,如果一個(gè)客戶機(jī)發(fā)布了一個(gè)服務(wù),那么這個(gè)客戶機(jī)發(fā)布的所有服務(wù)和訂閱者信息都會(huì)被更新到一個(gè)Client對象中,這個(gè)Client對象對應(yīng)于這個(gè)客戶機(jī)的鏈接,然后通過事件機(jī)制觸發(fā)索引信息的更新。Client負(fù)責(zé)管理一個(gè)客戶機(jī)的服務(wù)實(shí)例注冊Publish和服務(wù)訂閱Subscribe,可以方便地對需要推送的服務(wù)范圍進(jìn)行快速聚合,同時(shí)一個(gè)客戶端gRPC長連接對應(yīng)一個(gè)Client,每個(gè)Client有自己唯一的 clientId

package com.alibaba.nacos.naming.core.v2.client;
public interface Client {

    // 客戶端id
    String getClientId();
    // 是否臨時(shí)客戶端
    boolean isEphemeral();
    //設(shè)置客戶端更新時(shí)間
    void setLastUpdatedTime();
    //獲取客戶端更新時(shí)間
    long getLastUpdatedTime();
    // 服務(wù)實(shí)例注冊
    boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo);
    //服務(wù)實(shí)例移除
    InstancePublishInfo removeServiceInstance(Service service);
    //服務(wù)實(shí)例查詢
    InstancePublishInfo getInstancePublishInfo(Service service);
    Collection<Service> getAllPublishedService();
    // 服務(wù)訂閱
    boolean addServiceSubscriber(Service service, Subscriber subscriber);
    ///取消訂閱
    boolean removeServiceSubscriber(Service service);
    //查詢訂閱
    Subscriber getSubscriber(Service service);
    Collection<Service> getAllSubscribeService();
    // 生成同步給其他節(jié)點(diǎn)的client數(shù)據(jù)
    ClientSyncData generateSyncData();
    // 是否過期
    boolean isExpire(long currentTime);
    // 釋放資源
    void release();

}

知道了Client模型后,我們來接著從clientOperationService.registerInstance(service, instance, clientId);找到對應(yīng)的具體實(shí)現(xiàn)

EphemeralClientOperationServiceImpl.registerInstance()

下面這個(gè)方法是具體來負(fù)責(zé)處理服務(wù)注冊,我們來詳細(xì)了解一下:

    @Override
    public void registerInstance(Service service, Instance instance, String clientId) {
        //確保Service單例存在,注意Service的equals和hasCode方法
        Service singleton = ServiceManager.getInstance().getSingleton(service);
        //如果不是臨時(shí)客戶端
        if (!singleton.isEphemeral()) {
            throw new NacosRuntimeException(NacosException.INVALID_PARAM,
                    String.format("Current service %s is persistent service, can't register ephemeral instance.",
                            singleton.getGroupedServiceName()));
        }
        //根據(jù)客戶端ID找到客戶端信息,這個(gè)關(guān)系在連接建立的時(shí)候存儲
        Client client = clientManager.getClient(clientId);
        if (!clientIsLegal(client, clientId)) {
            return;
        }
        //將客戶端實(shí)例模型,裝換成服務(wù)端實(shí)例模型
        InstancePublishInfo instanceInfo = getPublishInfo(instance);
        //將實(shí)例存儲到client中
        client.addServiceInstance(singleton, instanceInfo);
        //設(shè)置最后更新時(shí)間
        client.setLastUpdatedTime();
        //建立服務(wù)和客戶端的關(guān)聯(lián)關(guān)系
        NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
        NotifyCenter
                .publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
    }
  1. ServiceManager.getInstance().getSingleton() 當(dāng)調(diào)用getSingleton的時(shí)候會(huì)負(fù)責(zé)管理service的單例,在這里service會(huì)重寫equlas和hasCode方法作為key
public class ServiceManager {

    //單例service 看service中equals和hasCode方法
    private final ConcurrentHashMap<Service, Service> singletonRepository;
    //namespace下所有的service
    private final ConcurrentHashMap<String, Set<Service>> namespaceSingletonMaps;
    
    //通過Map儲存單例的Service
    public Service getSingleton(Service service) {
        singletonRepository.putIfAbsent(service, service);
        Service result = singletonRepository.get(service);
        namespaceSingletonMaps.computeIfAbsent(result.getNamespace(), (namespace) -> new ConcurrentHashSet<>());
        namespaceSingletonMaps.get(result.getNamespace()).add(result);
        return result;
    }
}
  1. service中 equal和hasCode方法,namespace+group+name在服務(wù)端是一個(gè)單例Service
    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (!(o instanceof Service)) {
            return false;
        }
        Service service = (Service) o;
        return namespace.equals(service.namespace) && group.equals(service.group) && name.equals(service.name);
    }
    
    @Override
    public int hashCode() {
        return Objects.hash(namespace, group, name);
    }
  1. clientManager.getClient() 這里對應(yīng)的實(shí)現(xiàn)類為ConnectionBasedClientManager這個(gè)實(shí)現(xiàn)類負(fù)責(zé)管理長連接clientId和client模型的映射關(guān)系
@Component("connectionBasedClientManager")
public class ConnectionBasedClientManager extends ClientConnectionEventListener implements ClientManager {
    //通過map存儲ID和client之間的關(guān)聯(lián)關(guān)系
    private final ConcurrentMap<String, ConnectionBasedClient> clients = new ConcurrentHashMap<>();

    //根據(jù)clientId查詢client
    @Override
    public Client getClient(String clientId) {
        return clients.get(clientId);
    }
}
  1. client.addServiceInstance(); 抽象類為AbstractClient:負(fù)責(zé)存儲當(dāng)前客戶端服務(wù)注冊表,也就是 service和instance的關(guān)系。
  protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1);、    

  //將service和實(shí)例進(jìn)行關(guān)聯(lián)
    @Override
    public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
        if (null == publishers.put(service, instancePublishInfo)) {
            //監(jiān)控指標(biāo)自增實(shí)例數(shù)
            MetricsMonitor.incrementInstanceCount();
        }
        NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
        Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());
        return true;
    }
  1. ClientOperationEvent.ClientRegisterServiceEvent() :這里目的是為了過濾目標(biāo)服務(wù)得到最終instance列表建立service和client的關(guān)系,能夠方便我們快速查詢,同時(shí)會(huì)觸發(fā)ClientServiceIndexesManager的監(jiān)聽事件。


  //服務(wù)與發(fā)布client的關(guān)系
   private final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();
    //服務(wù)與訂閱clientId的關(guān)系
    private final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();

    private void handleClientOperation(ClientOperationEvent event) {
        Service service = event.getService();
        String clientId = event.getClientId();
        if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {
            addPublisherIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {
            removePublisherIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
            addSubscriberIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {
            removeSubscriberIndexes(service, clientId);
        }
    }
    
        
        private void addPublisherIndexes(Service service, String clientId) {
        publisherIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
        publisherIndexes.get(service).add(clientId);
        NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
    }
    
       private void removePublisherIndexes(Service service, String clientId) {
        if (!publisherIndexes.containsKey(service)) {
            return;
        }
        publisherIndexes.get(service).remove(clientId);
        NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
    }
    
        private void addSubscriberIndexes(Service service, String clientId) {
        subscriberIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
        // Fix #5404, Only first time add need notify event.
        if (subscriberIndexes.get(service).add(clientId)) {
            NotifyCenter.publishEvent(new ServiceEvent.ServiceSubscribedEvent(service, clientId));
        }
    }
    
    private void removeSubscriberIndexes(Service service, String clientId) {
        if (!subscriberIndexes.containsKey(service)) {
            return;
        }
        subscriberIndexes.get(service).remove(clientId);
        if (subscriberIndexes.get(service).isEmpty()) {
            subscriberIndexes.remove(service);
        }
    }

請求流程圖:

服務(wù)端監(jiān)控檢查

Nacos作為注冊中心不止提供了服務(wù)注冊和服務(wù)發(fā)現(xiàn)的功能,還提供了服務(wù)可用性檢測的功能,在1.0的版本中,臨時(shí)實(shí)例走的是distro協(xié)議,客戶端向注冊中心發(fā)送心跳來維持自身的健康(healthy)狀態(tài),持久實(shí)例則走的是Raft協(xié)議存儲。

  1. 兩種檢測機(jī)制
  • 客戶端主動(dòng)上報(bào)機(jī)制
  • 服務(wù)器端主動(dòng)下探機(jī)制

客戶端主動(dòng)上報(bào)機(jī)制:你主動(dòng)找上級,說你沒有打卡(不健康狀態(tài))

服務(wù)器端主動(dòng)下探機(jī)制:上級檢測到你有不打卡的記錄,主動(dòng)來找你

對于Nacos健康檢測機(jī)制,我們不能主動(dòng)去設(shè)置,但是健康檢查機(jī)制是和Nacos的服務(wù)實(shí)例類型強(qiáng)相關(guān),主要是有兩種服務(wù)實(shí)例:

  • 臨時(shí)實(shí)例:客戶端主動(dòng)上報(bào)
  • 持久實(shí)例:服務(wù)端主動(dòng)下探

客戶端主動(dòng)上報(bào)

臨時(shí)實(shí)例每隔5秒會(huì)主動(dòng)上報(bào)自己的健康狀態(tài),發(fā)送心跳,如果發(fā)送心跳的間隔時(shí)間超過15秒,Nacos服務(wù)器端會(huì)將服務(wù)標(biāo)記為亞健康狀態(tài),如果超過30S沒有發(fā)送心跳,那么服務(wù)實(shí)例會(huì)被從服務(wù)列表中剔除

在2.0版本以后,持久實(shí)例不變,臨時(shí)實(shí)例而是通過長連接來判斷實(shí)例是否健康。

  1. 長連接: 一個(gè)連接上可以連續(xù)發(fā)送多數(shù)據(jù)包,在連接保持期間,如果沒有數(shù)據(jù)包發(fā)送,需要雙方發(fā)鏈路檢測包,在Nacos2.0之后,使用Grpc協(xié)議代替了http協(xié)議。長連接會(huì)保持客戶端和服務(wù)端發(fā)送的狀態(tài),在源碼中ConnectionManager 管理所有客戶端的長連接

ConnectionManager: 每3秒檢測所有超過20S內(nèi)沒有發(fā)生過通訊的客戶端,向客戶端發(fā)起ClientDetectionRequest探測請求,如果客戶端在1s內(nèi)成功響應(yīng),則檢測通過,否則執(zhí)行unregister方法移除Connection

如果客戶端持續(xù)和服務(wù)端進(jìn)行通訊,服務(wù)端是不需要主動(dòng)下探的,只有當(dāng)客戶端沒有一直和服務(wù)端通信的時(shí)候,服務(wù)端才會(huì)主動(dòng)下探操作

@Service
public class ConnectionManager extends Subscriber<ConnectionLimitRuleChangeEvent> {

Map<String, Connection> connections = new ConcurrentHashMap<String, Connection>();

   //只要spring容器啟動(dòng),會(huì)觸發(fā)這個(gè)方法
    @PostConstruct
    public void start() {
    // 啟動(dòng)不健康連接排除功能.
    RpcScheduledExecutor.COMMON_SERVER_EXECUTOR.scheduleWithFixedDelay(new Runnable() {
      @Override
      public void run() {
        // 1. 統(tǒng)計(jì)過時(shí)(20s)連接
         Set<Map.Entry<String, Connection>> entries = connections.entrySet();
        //2.獲得需要剔除的IP和端口
        //3.根據(jù)限制獲取剔除的IP和端口
        //4.如果還是有需要剔除的客戶端,則繼續(xù)執(zhí)行
        //5.沒有活動(dòng)的客戶端執(zhí)行探測            
        //6.如果沒有馬上響應(yīng),則馬上剔除
        //7.剔除后發(fā)布ClientDisconnectEvent事件
      }
    });

    }
}

//注銷(移出)連接方法
public synchronized void unregister(String connectionId) {
    Connection remove = this.connections.remove(connectionId);
    if (remove != null) {
        String clientIp = remove.getMetaInfo().clientIp;
        AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);
        if (atomicInteger != null) {
            int count = atomicInteger.decrementAndGet();
            if (count <= 0) {
                connectionForClientIp.remove(clientIp);
            }
        }
        remove.close();
        Loggers.REMOTE_DIGEST.info("[{}]Connection unregistered successfully. ", connectionId);
        clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);
    }

當(dāng)服務(wù)端操作移除事件以后,會(huì)操作notifyClientDisConnected()方法,主要調(diào)用的是clientConnectionEventListener.clientDisConnected(connection)方法,將連接信息傳入進(jìn)去

    public void notifyClientDisConnected(final Connection connection) {
        
        for (ClientConnectionEventListener clientConnectionEventListener : clientConnectionEventListeners) {
            try {
                clientConnectionEventListener.clientDisConnected(connection);
            } catch (Throwable throwable) {
                Loggers.REMOTE.info("[NotifyClientDisConnected] failed for listener {}",
                        clientConnectionEventListener.getName(), throwable);
            }
        }

clientConnectionEventListenerd的實(shí)現(xiàn)類是ConnectionBasedClientManager,在這里面會(huì)出發(fā)清除索引緩存等操作

@Component("connectionBasedClientManager")
public class ConnectionBasedClientManager extends ClientConnectionEventListener implements ClientManager {
    @Override
    public boolean clientDisconnected(String clientId) {
        Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId);
        //同步移除client數(shù)據(jù)
        ConnectionBasedClient client = clients.remove(clientId);
        if (null == client) {
            return true;
        }
        client.release();
        //服務(wù)訂閱,將變更通知到客戶端
        NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));
        return true;
    }
}

總結(jié)

到這里Nacos服務(wù)端的基礎(chǔ)的源碼就講完了,有些地方我們沒有展開來講,在后續(xù)的源碼講解中,會(huì)給大家詳細(xì)的進(jìn)行講解,今天主要講解了,服務(wù)端注冊以及監(jiān)控檢查的基礎(chǔ)代碼,后面會(huì)有最新的內(nèi)容呈現(xiàn)給大家,如果覺得文中對您有幫助的,記得點(diǎn)贊關(guān)注~


文章轉(zhuǎn)載自: https://muxiaonong.blog.csdn.net


公眾號:牧小農(nóng),微信掃碼關(guān)注或搜索公眾號名稱