Nacos源碼系列—訂閱機(jī)制的前因后果(上)

前因

我們?cè)诹私?a target="_blank" class="hl hl-1" data-report-click="{"spm":"1001.2101.3001.7020","dest":"https://so.csdn.net/so/search?q=Nacos&spm=1001.2101.3001.7020"}" data-tit="Nacos" data-pretit="nacos">Nacos訂閱機(jī)制之前,首先來(lái)了解一下前因——Nacos客戶端的“服務(wù)發(fā)現(xiàn)”,我們先通過(guò)下面一張圖來(lái)直觀的看一下,有人可能就說(shuō)這也叫直觀,明明很曲折,小農(nóng)想說(shuō)的是,這樣才能讓你們印象更加深刻(手動(dòng)狗頭)。

讀者內(nèi)心:我信你個(gè)鬼。

對(duì)于Naocs客戶端“服務(wù)發(fā)現(xiàn)” 主要是有NamingService獲取服務(wù)列表、組裝參數(shù),調(diào)用服務(wù)接口等等,上圖中只是一個(gè)大致的流程,在其中還有獲取服務(wù)列表中的通信流程協(xié)議(Grpc/http),訂閱流程以及后果(故障轉(zhuǎn)移流程),下面我們就來(lái)詳細(xì)講解一下,客戶端服務(wù)發(fā)現(xiàn)的基本流程。

首先我們先從一個(gè)入口類(lèi)Client項(xiàng)目下的NamingTest開(kāi)始看起

@Ignore
public class NamingTest {
    
    @Test
    public void testServiceList() throws Exception {
        
        Properties properties = new Properties();
        //服務(wù)IP
        properties.put(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:8848");
        //用戶名
        properties.put(PropertyKeyConst.USERNAME, "nacos");
        //密碼
        properties.put(PropertyKeyConst.PASSWORD, "nacos");
        
        Instance instance = new Instance();
        //實(shí)例IP
        instance.setIp("1.1.1.1");
        //實(shí)例端口
        instance.setPort(800);
        //配置權(quán)重
        instance.setWeight(2);
        Map<String, String> map = new HashMap<String, String>();
        map.put("netType", "external");
        map.put("version", "2.0");
        instance.setMetadata(map);

        //關(guān)鍵代碼 創(chuàng)建自己的實(shí)例
        NamingService namingService = NacosFactory.createNamingService(properties);
        namingService.registerInstance("nacos.test.1", instance);
        
        ThreadUtils.sleep(5000L);
        
        List<Instance> list = namingService.getAllInstances("nacos.test.1");
        System.out.println(list);
        ThreadUtils.sleep(30000L);        
    }
}

在前幾篇章節(jié)中,我們講解了registerInstance()方法,今天我們需要來(lái)看一下getAllInstances()方法的具體邏輯,這個(gè)就是我們需要觀察的入口

@Override
public List<Instance>  getAllInstances(String serviceName, String groupName, List<String> clusters,
        boolean subscribe) throws NacosException {
    ServiceInfo serviceInfo;
    String clusterString = StringUtils.join(clusters, ",");
    if (subscribe) {
        serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);
        if (null == serviceInfo || !clientProxy.isSubscribed(serviceName, groupName, clusterString)) {
            serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
        }
    } else {
        serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);
    }
    List<Instance> list;
    if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
        return new ArrayList<Instance>();
    }
    return list;
}

在上面具體方法中,會(huì)經(jīng)過(guò)幾輪重載方法的調(diào)用,在重載方法調(diào)用的過(guò)程中已經(jīng)設(shè)置了默認(rèn)值,例如(默認(rèn)分組(DEFAULT_GROUP),集群列表(空)、是否訂閱(是)等等)

/**
 *
 * @param serviceName 服務(wù)名稱
 * @param groupName   分組名稱(DEFAULT_GROUP)
 * @param clusters    集群數(shù)量(默認(rèn)為空)
 * @param subscribe   是否訂閱服務(wù)(是)
 * @return
 * @throws NacosException
 */
@Override
public List<Instance>  getAllInstances(String serviceName, String groupName, List<String> clusters,
        boolean subscribe) throws NacosException {
    ServiceInfo serviceInfo;
    String clusterString = StringUtils.join(clusters, ",");
    //是否為訂閱模式
    if (subscribe) {
        //從客戶端緩存中獲取服務(wù)信息
        serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);
        if (null == serviceInfo || !clientProxy.isSubscribed(serviceName, groupName, clusterString)) {
            //如果緩存中服務(wù)信息不存在,進(jìn)行訂閱
            serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
        }
    } else {
        //未訂閱,從服務(wù)器獲取
        serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);
    }
    //獲取實(shí)例列表
    List<Instance> list;
    if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
        return new ArrayList<Instance>();
    }
    return list;
}

如果是訂閱模式,直接從本都緩存中獲取服務(wù)信息,然后從中獲取實(shí)例列表,訂閱機(jī)制會(huì)自動(dòng)同步服務(wù)器實(shí)例信息到本地,如果緩存中沒(méi)有,說(shuō)明是首次調(diào)用,進(jìn)行訂閱后獲取服務(wù)信息,具體流程如下:

訂閱處理流程
在上面的流程中,我們講到了訂閱的邏輯,接下來(lái)我們就來(lái)看一看訂閱里面到底做了哪些事情,首先我們已經(jīng)知道服務(wù)在哪里訂閱了,我們只需要點(diǎn)進(jìn)去找對(duì)應(yīng)的方法。

serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);

下面是具體的方法,這里clientProxy類(lèi)型為NamingClientProxyDelegate,實(shí)例化NacosNamingService時(shí)該類(lèi)被實(shí)例化

@Override
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
    NAMING_LOGGER.info("[SUBSCRIBE-SERVICE] service:{}, group:{}, clusters:{} ", serviceName, groupName, clusters);
    String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);
    String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);
    //定時(shí)調(diào)度UpdateTask
    serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
    //獲取緩存中的serviceInfo對(duì)象
    ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
    if (null == result || !isSubscribed(serviceName, groupName, clusters)) {
        //判斷如果為空,進(jìn)行訂閱邏輯處理(Grpc協(xié)議)
        result = grpcClientProxy.subscribe(serviceName, groupName, clusters);
    }
    //ServiceInfo本地緩存處理
    serviceInfoHolder.processServiceInfo(result);
    return result;
}

在上述代碼中,可以看到我們?cè)讷@取服務(wù)器列表中,進(jìn)行了訂閱邏輯的擴(kuò)展。

  1. 在訂閱方法中首先開(kāi)啟定時(shí)任務(wù),用來(lái)定時(shí)同步服務(wù)端的實(shí)例信息,進(jìn)行本地緩存的更新等操作,如果是首次直接返回,去判斷是否有本地緩存
  2. 如果本地緩存中存在serviceInfo信息,直接返回serviceInfo信息,如果不存在,默認(rèn)采用Grpc協(xié)議進(jìn)行訂閱,然后在返回serviceInfo信息
  3. 通過(guò)grpcClientProxy.subscribe()直接向服務(wù)器發(fā)送一個(gè)訂閱請(qǐng)求,并返回結(jié)果
  4. servieInfo本地緩存處理,并且會(huì)將獲取的最新的serviceInfo和本地的serviceInfo進(jìn)行比較,進(jìn)行更新操作。

如下圖所示:

訂閱

在上面我們講解了,Nacos是如何進(jìn)行服務(wù)器發(fā)現(xiàn),以及訂閱的入口和大體邏輯,接下來(lái)我們就來(lái)詳細(xì)講一講Nacos的訂閱機(jī)制的核心,首先Nacos客戶端會(huì)通過(guò)定時(shí)任務(wù),進(jìn)行輪詢,每間隔6秒從Nacos注冊(cè)中心獲取服務(wù)實(shí)例列表,如果檢測(cè)實(shí)例發(fā)生變化,發(fā)布變更事件,訂閱者進(jìn)行對(duì)應(yīng)的邏輯處理(更新緩存和實(shí)例信息),我們先從一張圖,來(lái)了解一下訂閱機(jī)制主要的流程。

定時(shí)任務(wù)

訂閱其實(shí)本身也是服務(wù)發(fā)現(xiàn)的一種實(shí)現(xiàn)方式,就是在服務(wù)發(fā)現(xiàn)的時(shí)候執(zhí)行訂閱方法,然后通過(guò)定時(shí)任務(wù)定時(shí)拉取服務(wù)端信息。
我們找到 NacosNamingService.subscribe(),會(huì)發(fā)現(xiàn)里面有好幾個(gè)···subscribe()```方法,這幾個(gè)方法在重載的過(guò)程中,會(huì)幫我們添加一些默認(rèn)參數(shù)(默認(rèn)分組、空集合列表),最終我們對(duì)定位到下面這個(gè)方法:

@Override
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
        throws NacosException {
    if (null == listener) {
        return;
    }
    String clusterString = StringUtils.join(clusters, ",");
    changeNotifier.registerListener(groupName, serviceName, clusterString, listener);
    clientProxy.subscribe(serviceName, groupName, clusterString);
}

在這里我們先來(lái)看 clientProxy.subscribe(),這個(gè)方法實(shí)際上就是我們上面講到的NamingClientProxyDelegate.subscribe()方法,在這里主要是對(duì)服務(wù)列表的信息進(jìn)行查詢,所有我們可以知道不管是查詢還是訂閱都是用的同一個(gè)方法。在這里我們就不做過(guò)多的描述。

在這里我們主要關(guān)注的是這個(gè)方法里面一個(gè)定時(shí)調(diào)度的方法ServiceInfoUpdateService.scheduleUpdateIfAbsent();,這個(gè)方法里面構(gòu)建了serviceKey,通過(guò)key來(lái)判斷是否重復(fù),最后添加到updateTask,而addTask()就是添加任務(wù)并且發(fā)起一個(gè)定時(shí)任務(wù)

public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {
    String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
    if (futureMap.get(serviceKey) != null) {
        return;
    }
    synchronized (futureMap) {
        if (futureMap.get(serviceKey) != null) {
            return;
        }
        //主要關(guān)注點(diǎn),添加定時(shí)任務(wù)
        ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters));
        futureMap.put(serviceKey, future);
    }
}

默認(rèn)定時(shí)延遲一秒執(zhí)行:

private static final long DEFAULT_DELAY = 1000L;
private synchronized ScheduledFuture<?> addTask(UpdateTask task) {
    return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
}

在這個(gè)定時(shí)任務(wù)里面封裝了訂閱機(jī)制的核心業(yè)務(wù)邏輯,位于UpdateTask.run()方法。

@Override
public void run() {
    long delayTime = DEFAULT_DELAY;

    try {
        //判斷 服務(wù)是否訂閱過(guò)并且沒(méi)有開(kāi)啟定時(shí)任務(wù) 操作過(guò)不再執(zhí)行
        if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(
                serviceKey)) {
            NAMING_LOGGER.info("update task is stopped, service:{}, clusters:{}", groupedServiceName, clusters);
            isCancel = true;
            return;
        }

        //獲取緩存的service信息
        ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
        //緩存不存在
        if (serviceObj == null) {
            //根據(jù)serviceName等信息獲取service信息
            serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
            //進(jìn)行本地緩存處理
            serviceInfoHolder.processServiceInfo(serviceObj);
            lastRefTime = serviceObj.getLastRefTime();
            return;
        }

        //如果服務(wù)最后的更新時(shí)間<=緩存刷新時(shí)間,從注冊(cè)中心重新查詢
        if (serviceObj.getLastRefTime() <= lastRefTime) {
            serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
            //本地緩存處理
            serviceInfoHolder.processServiceInfo(serviceObj);
        }
        //刷新最后更新的時(shí)間
        lastRefTime = serviceObj.getLastRefTime();
        if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
            incFailCount();
            return;
        }
        // TODO multiple time can be configured.
        //下一次更新緩存時(shí)間設(shè)置(6秒)
        delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;
        //設(shè)置失敗數(shù)量為0
        resetFailCount();
    } catch (Throwable e) {
        incFailCount();
        NAMING_LOGGER.warn("[NA] failed to update serviceName: {}", groupedServiceName, e);
    } finally {
        //沒(méi)有服務(wù)訂閱過(guò)并且開(kāi)啟定時(shí)任務(wù)
        if (!isCancel) {
            // 下次調(diào)度刷新時(shí)間,下次執(zhí)行的時(shí)間與failCount有關(guān),failCount=0,則下次調(diào)度時(shí)間為6秒,最長(zhǎng)為1分鐘
            // 無(wú)異常情況下緩存實(shí)例的刷新時(shí)間是6秒
            executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60),
                    TimeUnit.MILLISECONDS);
        }
    }
}

通過(guò)定時(shí)任務(wù)執(zhí)行UpdateTask,默認(rèn)間隔時(shí)間為6秒,當(dāng)發(fā)生異常時(shí)會(huì)延長(zhǎng),但不會(huì)超過(guò)1分鐘。該方法會(huì)比較本地是否存在緩存,以及是否過(guò)期,當(dāng)不存在或者過(guò)期的時(shí)候,會(huì)去查詢注冊(cè)中心,獲取最新實(shí)例,更新最后獲取時(shí)間,處理服務(wù)信息,在最后會(huì)計(jì)算任務(wù)時(shí)間,循環(huán)執(zhí)行流程。


業(yè)務(wù)邏輯在最后會(huì)計(jì)算下一次定時(shí)任務(wù)的執(zhí)行時(shí)間,通過(guò)delayTime來(lái)延遲執(zhí)行,delayTime默認(rèn)為1000*6(6秒),在finally 里面發(fā)起下一次定時(shí)任務(wù),當(dāng)我們程序出現(xiàn)異常的時(shí)候,執(zhí)行時(shí)間和錯(cuò)誤次數(shù)成正比,最長(zhǎng)時(shí)間不超過(guò)一分鐘

到這里我們已經(jīng)對(duì)于Nacos客戶端定于的核心流程講解了一遍,Nacos客戶端通過(guò)一個(gè)定時(shí)任務(wù),每間隔6秒從注冊(cè)中心獲取實(shí)例列表,當(dāng)發(fā)現(xiàn)實(shí)例發(fā)生變化的時(shí)候,發(fā)布變更事件,訂閱者進(jìn)行業(yè)務(wù)處理,然后更新內(nèi)存中和本地緩存中的實(shí)例。接下來(lái)我們就來(lái)講一講,定時(shí)任務(wù)獲取到最新實(shí)例列表之后,整個(gè)時(shí)間機(jī)制是如何處理的。

我們?cè)诘谝徊秸{(diào)用subscribe()方法的時(shí)候,會(huì)訂閱一個(gè)EventListener事件,而在定時(shí)任務(wù)UpdateTask定時(shí)獲取實(shí)例列表之后,會(huì)調(diào)用ServiceInfoHolder.processServiceInfo方法對(duì)ServiceInfo進(jìn)行本地處理,這其中就包括事件處理。

在subscribe方法中,通過(guò)下面的代碼我們進(jìn)行監(jiān)聽(tīng)事件的注冊(cè)

@Override
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
        throws NacosException {
    if (null == listener) {
        return;
    }
    String clusterString = StringUtils.join(clusters, ",");
    changeNotifier.registerListener(groupName, serviceName, clusterString, listener);
    clientProxy.subscribe(serviceName, groupName, clusterString);
}

在上述代碼中,我們主要關(guān)注的是changeNotifier.registerListener,這個(gè)監(jiān)聽(tīng)就是進(jìn)行具體事件注冊(cè)邏輯,在下述代碼中,主要是將EventListener存儲(chǔ)在listenerMapmap結(jié)構(gòu)中,key為服務(wù)實(shí)例信息的拼接,value為監(jiān)聽(tīng)事件的集合

public void registerListener(String groupName, String serviceName, String clusters, EventListener listener) {
    String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
    ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
    if (eventListeners == null) {
        synchronized (lock) {
            eventListeners = listenerMap.get(key);
            if (eventListeners == null) {
                eventListeners = new ConcurrentHashSet<EventListener>();
                //將EventListener緩存到listenerMap中
                listenerMap.put(key, eventListeners);
            }
        }
    }
    eventListeners.add(listener);
}

關(guān)于serviceInfo的處理
在updateTask獲取到最新的實(shí)例信息后會(huì)進(jìn)行本地化處理,我們需要看的是ServiceInfoUpdateService.run()下的serviceInfoHolder.processServiceInfo(serviceObj);本地緩存方法

public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
      //判斷服務(wù)key是否為空
      String serviceKey = serviceInfo.getKey();
      if (serviceKey == null) {
          return null;
      }
      ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
      if (isEmptyOrErrorPush(serviceInfo)) {
          //empty or error push, just ignore
          return oldService;
      }
      //將緩存信息放置到map中
      serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
      //判斷實(shí)例信息是否發(fā)生改變
      boolean changed = isChangedServiceInfo(oldService, serviceInfo);
      if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
          serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
      }
      //監(jiān)控服務(wù)緩存map的大小
      MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
      if (changed) {
          NAMING_LOGGER.info("current ips:({}) service: {} -> {}", serviceInfo.ipCount(), serviceInfo.getKey(),
                  JacksonUtils.toJson(serviceInfo.getHosts()));
          //添加實(shí)例變更事件,被訂閱者執(zhí)行
          NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),
                  serviceInfo.getClusters(), serviceInfo.getHosts()));
          //寫(xiě)入本地文件
          DiskCache.write(serviceInfo, cacheDir);
      }
      return serviceInfo;
  }   

首先我們判斷最新的ServiceInfo數(shù)據(jù)是否正確,有沒(méi)有發(fā)生變化,如果數(shù)據(jù)格式正確且發(fā)生變化,會(huì)發(fā)布一個(gè)變更事件(InstancesChangeEvent),同時(shí)講serviceinfo寫(xiě)入緩存中

對(duì)于服務(wù)信息的變更,Nacos是如何做的呢,別急我們往下看,當(dāng)我們調(diào)用InstancesChangeEvent()方法以后,變更事件會(huì)由NotifyCenter進(jìn)行發(fā)布,我們來(lái)瞅一瞅

首先事件追蹤的核心流程主要分為,根據(jù)事件類(lèi)型獲取-》獲取事件發(fā)布者-》發(fā)布事件,詳細(xì)如下所示:

private static final NotifyCenter INSTANCE = new NotifyCenter();

private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {
    if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
        return INSTANCE.sharePublisher.publish(event);
    }
    //根據(jù)時(shí)間類(lèi)型,獲取對(duì)應(yīng)的CanonicalName
    final String topic = ClassUtils.getCanonicalName(eventType);
    //從NotifyCenter.publisherMap中獲取對(duì)應(yīng)時(shí)間發(fā)布中
    EventPublisher publisher = INSTANCE.publisherMap.get(topic);
    if (publisher != null) {
        //事件發(fā)布者publisher發(fā)布事件
        return publisher.publish(event);
    }
    LOGGER.warn("There are no [{}] publishers for this event, please register", topic);
    return false;
}

在這個(gè)源碼中,其實(shí) INSTANCE是單例實(shí)現(xiàn)的,在這里publisherMap鍵值對(duì)是什么時(shí)候建立的?其實(shí)在是我們NacosNamingService.init()調(diào)用初始化方法的時(shí)候進(jìn)行綁定的

private void init(Properties properties) throws NacosException {
    ......
    //建立InstancesChangeEvent和EnvenPublisher的關(guān)系
    NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);
   ......
}

當(dāng)我們從上面方法進(jìn)去的時(shí)候,會(huì)發(fā)現(xiàn)他默認(rèn)使用的是DEFAULT_PUBLISHER_FACTORY來(lái)進(jìn)行構(gòu)建,而在NotifyCenter代碼塊中,會(huì)發(fā)現(xiàn)DEFAULT_PUBLISHER_FACTORY默認(rèn)構(gòu)建的EventPublisher為DefaultPublisher

public static EventPublisher registerToPublisher(final Class<? extends Event> eventType, final int queueMaxSize) {
//主要關(guān)注DEFAULT_PUBLISHER_FACTORY
  return registerToPublisher(eventType, DEFAULT_PUBLISHER_FACTORY, queueMaxSize);
}

if (iterator.hasNext()) {
      clazz = iterator.next().getClass();
  } else {
      clazz = DefaultPublisher.class;
  }

  DEFAULT_PUBLISHER_FACTORY = (cls, buffer) -> {
      try {
          EventPublisher publisher = clazz.newInstance();
          publisher.init(cls, buffer);
          return publisher;
      } catch (Throwable ex) {
          LOGGER.error("Service class newInstance has error : ", ex);
          throw new NacosRuntimeException(SERVER_ERROR, ex);
      }
  }; 

由此我們看出在NotifyCenter類(lèi)中維護(hù)了事件名稱和事件發(fā)布者的關(guān)系,而默認(rèn)的時(shí)間發(fā)布中為DefaultPublisher。

閑言

到這里,我們Nacos訂閱機(jī)制的前半章我們就講完了,因?yàn)檎w服務(wù)訂閱的事件機(jī)制還是比較復(fù)雜,篇幅太長(zhǎng),所以分成了兩部分,今天這個(gè)章節(jié)我們主要講解了,客戶端服務(wù)發(fā)現(xiàn)的原理以及訂閱機(jī)制中定時(shí)器的運(yùn)行邏輯和NotifyCenter發(fā)布InstancesChangeEvent事件的流程


文章轉(zhuǎn)載自: https://muxiaonong.blog.csdn.net


公眾號(hào):牧小農(nóng),微信掃碼關(guān)注或搜索公眾號(hào)名稱