Nacos源碼系列—訂閱機制的前因后果(上)

前因

我們在了解Nacos訂閱機制之前,首先來了解一下前因——Nacos客戶端的“服務發(fā)現(xiàn)”,我們先通過下面一張圖來直觀的看一下,有人可能就說這也叫直觀,明明很曲折,小農(nóng)想說的是,這樣才能讓你們印象更加深刻(手動狗頭)。

讀者內(nèi)心:我信你個鬼。

對于Naocs客戶端“服務發(fā)現(xiàn)” 主要是有NamingService獲取服務列表、組裝參數(shù),調(diào)用服務接口等等,上圖中只是一個大致的流程,在其中還有獲取服務列表中的通信流程協(xié)議(Grpc/http),訂閱流程以及后果(故障轉(zhuǎn)移流程),下面我們就來詳細講解一下,客戶端服務發(fā)現(xiàn)的基本流程。

首先我們先從一個入口類Client項目下的NamingTest開始看起

@Ignore
public class NamingTest {
    
    @Test
    public void testServiceList() throws Exception {
        
        Properties properties = new Properties();
        //服務IP
        properties.put(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:8848");
        //用戶名
        properties.put(PropertyKeyConst.USERNAME, "nacos");
        //密碼
        properties.put(PropertyKeyConst.PASSWORD, "nacos");
        
        Instance instance = new Instance();
        //實例IP
        instance.setIp("1.1.1.1");
        //實例端口
        instance.setPort(800);
        //配置權(quán)重
        instance.setWeight(2);
        Map<String, String> map = new HashMap<String, String>();
        map.put("netType", "external");
        map.put("version", "2.0");
        instance.setMetadata(map);

        //關(guān)鍵代碼 創(chuàng)建自己的實例
        NamingService namingService = NacosFactory.createNamingService(properties);
        namingService.registerInstance("nacos.test.1", instance);
        
        ThreadUtils.sleep(5000L);
        
        List<Instance> list = namingService.getAllInstances("nacos.test.1");
        System.out.println(list);
        ThreadUtils.sleep(30000L);        
    }
}

在前幾篇章節(jié)中,我們講解了registerInstance()方法,今天我們需要來看一下getAllInstances()方法的具體邏輯,這個就是我們需要觀察的入口

@Override
public List<Instance>  getAllInstances(String serviceName, String groupName, List<String> clusters,
        boolean subscribe) throws NacosException {
    ServiceInfo serviceInfo;
    String clusterString = StringUtils.join(clusters, ",");
    if (subscribe) {
        serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);
        if (null == serviceInfo || !clientProxy.isSubscribed(serviceName, groupName, clusterString)) {
            serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
        }
    } else {
        serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);
    }
    List<Instance> list;
    if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
        return new ArrayList<Instance>();
    }
    return list;
}

在上面具體方法中,會經(jīng)過幾輪重載方法的調(diào)用,在重載方法調(diào)用的過程中已經(jīng)設置了默認值,例如(默認分組(DEFAULT_GROUP),集群列表(空)、是否訂閱(是)等等)

/**
 *
 * @param serviceName 服務名稱
 * @param groupName   分組名稱(DEFAULT_GROUP)
 * @param clusters    集群數(shù)量(默認為空)
 * @param subscribe   是否訂閱服務(是)
 * @return
 * @throws NacosException
 */
@Override
public List<Instance>  getAllInstances(String serviceName, String groupName, List<String> clusters,
        boolean subscribe) throws NacosException {
    ServiceInfo serviceInfo;
    String clusterString = StringUtils.join(clusters, ",");
    //是否為訂閱模式
    if (subscribe) {
        //從客戶端緩存中獲取服務信息
        serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);
        if (null == serviceInfo || !clientProxy.isSubscribed(serviceName, groupName, clusterString)) {
            //如果緩存中服務信息不存在,進行訂閱
            serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
        }
    } else {
        //未訂閱,從服務器獲取
        serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);
    }
    //獲取實例列表
    List<Instance> list;
    if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
        return new ArrayList<Instance>();
    }
    return list;
}

如果是訂閱模式,直接從本都緩存中獲取服務信息,然后從中獲取實例列表,訂閱機制會自動同步服務器實例信息到本地,如果緩存中沒有,說明是首次調(diào)用,進行訂閱后獲取服務信息,具體流程如下:

訂閱處理流程
在上面的流程中,我們講到了訂閱的邏輯,接下來我們就來看一看訂閱里面到底做了哪些事情,首先我們已經(jīng)知道服務在哪里訂閱了,我們只需要點進去找對應的方法。

serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);

下面是具體的方法,這里clientProxy類型為NamingClientProxyDelegate,實例化NacosNamingService時該類被實例化

@Override
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
    NAMING_LOGGER.info("[SUBSCRIBE-SERVICE] service:{}, group:{}, clusters:{} ", serviceName, groupName, clusters);
    String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);
    String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);
    //定時調(diào)度UpdateTask
    serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
    //獲取緩存中的serviceInfo對象
    ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
    if (null == result || !isSubscribed(serviceName, groupName, clusters)) {
        //判斷如果為空,進行訂閱邏輯處理(Grpc協(xié)議)
        result = grpcClientProxy.subscribe(serviceName, groupName, clusters);
    }
    //ServiceInfo本地緩存處理
    serviceInfoHolder.processServiceInfo(result);
    return result;
}

在上述代碼中,可以看到我們在獲取服務器列表中,進行了訂閱邏輯的擴展。

  1. 在訂閱方法中首先開啟定時任務,用來定時同步服務端的實例信息,進行本地緩存的更新等操作,如果是首次直接返回,去判斷是否有本地緩存
  2. 如果本地緩存中存在serviceInfo信息,直接返回serviceInfo信息,如果不存在,默認采用Grpc協(xié)議進行訂閱,然后在返回serviceInfo信息
  3. 通過grpcClientProxy.subscribe()直接向服務器發(fā)送一個訂閱請求,并返回結(jié)果
  4. servieInfo本地緩存處理,并且會將獲取的最新的serviceInfo和本地的serviceInfo進行比較,進行更新操作。

如下圖所示:

訂閱

在上面我們講解了,Nacos是如何進行服務器發(fā)現(xiàn),以及訂閱的入口和大體邏輯,接下來我們就來詳細講一講Nacos的訂閱機制的核心,首先Nacos客戶端會通過定時任務,進行輪詢,每間隔6秒從Nacos注冊中心獲取服務實例列表,如果檢測實例發(fā)生變化,發(fā)布變更事件,訂閱者進行對應的邏輯處理(更新緩存和實例信息),我們先從一張圖,來了解一下訂閱機制主要的流程。

定時任務

訂閱其實本身也是服務發(fā)現(xiàn)的一種實現(xiàn)方式,就是在服務發(fā)現(xiàn)的時候執(zhí)行訂閱方法,然后通過定時任務定時拉取服務端信息。
我們找到 NacosNamingService.subscribe(),會發(fā)現(xiàn)里面有好幾個···subscribe()```方法,這幾個方法在重載的過程中,會幫我們添加一些默認參數(shù)(默認分組、空集合列表),最終我們對定位到下面這個方法:

@Override
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
        throws NacosException {
    if (null == listener) {
        return;
    }
    String clusterString = StringUtils.join(clusters, ",");
    changeNotifier.registerListener(groupName, serviceName, clusterString, listener);
    clientProxy.subscribe(serviceName, groupName, clusterString);
}

在這里我們先來看 clientProxy.subscribe(),這個方法實際上就是我們上面講到的NamingClientProxyDelegate.subscribe()方法,在這里主要是對服務列表的信息進行查詢,所有我們可以知道不管是查詢還是訂閱都是用的同一個方法。在這里我們就不做過多的描述。

在這里我們主要關(guān)注的是這個方法里面一個定時調(diào)度的方法ServiceInfoUpdateService.scheduleUpdateIfAbsent();,這個方法里面構(gòu)建了serviceKey,通過key來判斷是否重復,最后添加到updateTask,而addTask()就是添加任務并且發(fā)起一個定時任務

public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {
    String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
    if (futureMap.get(serviceKey) != null) {
        return;
    }
    synchronized (futureMap) {
        if (futureMap.get(serviceKey) != null) {
            return;
        }
        //主要關(guān)注點,添加定時任務
        ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters));
        futureMap.put(serviceKey, future);
    }
}

默認定時延遲一秒執(zhí)行:

private static final long DEFAULT_DELAY = 1000L;
private synchronized ScheduledFuture<?> addTask(UpdateTask task) {
    return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
}

在這個定時任務里面封裝了訂閱機制的核心業(yè)務邏輯,位于UpdateTask.run()方法。

@Override
public void run() {
    long delayTime = DEFAULT_DELAY;

    try {
        //判斷 服務是否訂閱過并且沒有開啟定時任務 操作過不再執(zhí)行
        if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(
                serviceKey)) {
            NAMING_LOGGER.info("update task is stopped, service:{}, clusters:{}", groupedServiceName, clusters);
            isCancel = true;
            return;
        }

        //獲取緩存的service信息
        ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
        //緩存不存在
        if (serviceObj == null) {
            //根據(jù)serviceName等信息獲取service信息
            serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
            //進行本地緩存處理
            serviceInfoHolder.processServiceInfo(serviceObj);
            lastRefTime = serviceObj.getLastRefTime();
            return;
        }

        //如果服務最后的更新時間<=緩存刷新時間,從注冊中心重新查詢
        if (serviceObj.getLastRefTime() <= lastRefTime) {
            serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
            //本地緩存處理
            serviceInfoHolder.processServiceInfo(serviceObj);
        }
        //刷新最后更新的時間
        lastRefTime = serviceObj.getLastRefTime();
        if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
            incFailCount();
            return;
        }
        // TODO multiple time can be configured.
        //下一次更新緩存時間設置(6秒)
        delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;
        //設置失敗數(shù)量為0
        resetFailCount();
    } catch (Throwable e) {
        incFailCount();
        NAMING_LOGGER.warn("[NA] failed to update serviceName: {}", groupedServiceName, e);
    } finally {
        //沒有服務訂閱過并且開啟定時任務
        if (!isCancel) {
            // 下次調(diào)度刷新時間,下次執(zhí)行的時間與failCount有關(guān),failCount=0,則下次調(diào)度時間為6秒,最長為1分鐘
            // 無異常情況下緩存實例的刷新時間是6秒
            executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60),
                    TimeUnit.MILLISECONDS);
        }
    }
}

通過定時任務執(zhí)行UpdateTask,默認間隔時間為6秒,當發(fā)生異常時會延長,但不會超過1分鐘。該方法會比較本地是否存在緩存,以及是否過期,當不存在或者過期的時候,會去查詢注冊中心,獲取最新實例,更新最后獲取時間,處理服務信息,在最后會計算任務時間,循環(huán)執(zhí)行流程。


業(yè)務邏輯在最后會計算下一次定時任務的執(zhí)行時間,通過delayTime來延遲執(zhí)行,delayTime默認為1000*6(6秒),在finally 里面發(fā)起下一次定時任務,當我們程序出現(xiàn)異常的時候,執(zhí)行時間和錯誤次數(shù)成正比,最長時間不超過一分鐘

到這里我們已經(jīng)對于Nacos客戶端定于的核心流程講解了一遍,Nacos客戶端通過一個定時任務,每間隔6秒從注冊中心獲取實例列表,當發(fā)現(xiàn)實例發(fā)生變化的時候,發(fā)布變更事件,訂閱者進行業(yè)務處理,然后更新內(nèi)存中和本地緩存中的實例。接下來我們就來講一講,定時任務獲取到最新實例列表之后,整個時間機制是如何處理的。

我們在第一步調(diào)用subscribe()方法的時候,會訂閱一個EventListener事件,而在定時任務UpdateTask定時獲取實例列表之后,會調(diào)用ServiceInfoHolder.processServiceInfo方法對ServiceInfo進行本地處理,這其中就包括事件處理。

在subscribe方法中,通過下面的代碼我們進行監(jiān)聽事件的注冊

@Override
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
        throws NacosException {
    if (null == listener) {
        return;
    }
    String clusterString = StringUtils.join(clusters, ",");
    changeNotifier.registerListener(groupName, serviceName, clusterString, listener);
    clientProxy.subscribe(serviceName, groupName, clusterString);
}

在上述代碼中,我們主要關(guān)注的是changeNotifier.registerListener,這個監(jiān)聽就是進行具體事件注冊邏輯,在下述代碼中,主要是將EventListener存儲在listenerMapmap結(jié)構(gòu)中,key為服務實例信息的拼接,value為監(jiān)聽事件的集合

public void registerListener(String groupName, String serviceName, String clusters, EventListener listener) {
    String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
    ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
    if (eventListeners == null) {
        synchronized (lock) {
            eventListeners = listenerMap.get(key);
            if (eventListeners == null) {
                eventListeners = new ConcurrentHashSet<EventListener>();
                //將EventListener緩存到listenerMap中
                listenerMap.put(key, eventListeners);
            }
        }
    }
    eventListeners.add(listener);
}

關(guān)于serviceInfo的處理
在updateTask獲取到最新的實例信息后會進行本地化處理,我們需要看的是ServiceInfoUpdateService.run()下的serviceInfoHolder.processServiceInfo(serviceObj);本地緩存方法

public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
      //判斷服務key是否為空
      String serviceKey = serviceInfo.getKey();
      if (serviceKey == null) {
          return null;
      }
      ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
      if (isEmptyOrErrorPush(serviceInfo)) {
          //empty or error push, just ignore
          return oldService;
      }
      //將緩存信息放置到map中
      serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
      //判斷實例信息是否發(fā)生改變
      boolean changed = isChangedServiceInfo(oldService, serviceInfo);
      if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
          serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
      }
      //監(jiān)控服務緩存map的大小
      MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
      if (changed) {
          NAMING_LOGGER.info("current ips:({}) service: {} -> {}", serviceInfo.ipCount(), serviceInfo.getKey(),
                  JacksonUtils.toJson(serviceInfo.getHosts()));
          //添加實例變更事件,被訂閱者執(zhí)行
          NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),
                  serviceInfo.getClusters(), serviceInfo.getHosts()));
          //寫入本地文件
          DiskCache.write(serviceInfo, cacheDir);
      }
      return serviceInfo;
  }   

首先我們判斷最新的ServiceInfo數(shù)據(jù)是否正確,有沒有發(fā)生變化,如果數(shù)據(jù)格式正確且發(fā)生變化,會發(fā)布一個變更事件(InstancesChangeEvent),同時講serviceinfo寫入緩存中

對于服務信息的變更,Nacos是如何做的呢,別急我們往下看,當我們調(diào)用InstancesChangeEvent()方法以后,變更事件會由NotifyCenter進行發(fā)布,我們來瞅一瞅

首先事件追蹤的核心流程主要分為,根據(jù)事件類型獲取-》獲取事件發(fā)布者-》發(fā)布事件,詳細如下所示:

private static final NotifyCenter INSTANCE = new NotifyCenter();

private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {
    if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
        return INSTANCE.sharePublisher.publish(event);
    }
    //根據(jù)時間類型,獲取對應的CanonicalName
    final String topic = ClassUtils.getCanonicalName(eventType);
    //從NotifyCenter.publisherMap中獲取對應時間發(fā)布中
    EventPublisher publisher = INSTANCE.publisherMap.get(topic);
    if (publisher != null) {
        //事件發(fā)布者publisher發(fā)布事件
        return publisher.publish(event);
    }
    LOGGER.warn("There are no [{}] publishers for this event, please register", topic);
    return false;
}

在這個源碼中,其實 INSTANCE是單例實現(xiàn)的,在這里publisherMap鍵值對是什么時候建立的?其實在是我們NacosNamingService.init()調(diào)用初始化方法的時候進行綁定的

private void init(Properties properties) throws NacosException {
    ......
    //建立InstancesChangeEvent和EnvenPublisher的關(guān)系
    NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);
   ......
}

當我們從上面方法進去的時候,會發(fā)現(xiàn)他默認使用的是DEFAULT_PUBLISHER_FACTORY來進行構(gòu)建,而在NotifyCenter代碼塊中,會發(fā)現(xiàn)DEFAULT_PUBLISHER_FACTORY默認構(gòu)建的EventPublisher為DefaultPublisher

public static EventPublisher registerToPublisher(final Class<? extends Event> eventType, final int queueMaxSize) {
//主要關(guān)注DEFAULT_PUBLISHER_FACTORY
  return registerToPublisher(eventType, DEFAULT_PUBLISHER_FACTORY, queueMaxSize);
}

if (iterator.hasNext()) {
      clazz = iterator.next().getClass();
  } else {
      clazz = DefaultPublisher.class;
  }

  DEFAULT_PUBLISHER_FACTORY = (cls, buffer) -> {
      try {
          EventPublisher publisher = clazz.newInstance();
          publisher.init(cls, buffer);
          return publisher;
      } catch (Throwable ex) {
          LOGGER.error("Service class newInstance has error : ", ex);
          throw new NacosRuntimeException(SERVER_ERROR, ex);
      }
  }; 

由此我們看出在NotifyCenter類中維護了事件名稱和事件發(fā)布者的關(guān)系,而默認的時間發(fā)布中為DefaultPublisher。

閑言

到這里,我們Nacos訂閱機制的前半章我們就講完了,因為整體服務訂閱的事件機制還是比較復雜,篇幅太長,所以分成了兩部分,今天這個章節(jié)我們主要講解了,客戶端服務發(fā)現(xiàn)的原理以及訂閱機制中定時器的運行邏輯和NotifyCenter發(fā)布InstancesChangeEvent事件的流程


文章轉(zhuǎn)載自: https://muxiaonong.blog.csdn.net


公眾號:牧小農(nóng),微信掃碼關(guān)注或搜索公眾號名稱