Nacos源碼系列—訂閱機(jī)制的前因后果(下)

事件發(fā)布

在上一節(jié)中我們講解了在NotifyCenter中維護(hù)了事件名稱(chēng)和事件發(fā)布者的關(guān)系,而默認(rèn)的事件發(fā)布者為DefaultPublisher,今天我們就來(lái)講一下DefaultPublisher的事件發(fā)布的具體邏輯

首先我們來(lái)看一下DefaultPublisher的源碼:

public class DefaultPublisher extends Thread implements EventPublisher {
    @Override
    public void init(Class<? extends Event> type, int bufferSize) {
        //守護(hù)線程
        setDaemon(true);
        //設(shè)置線程名
        setName("nacos.publisher-" + type.getName());
        this.eventType = type;
        this.queueMaxSize = bufferSize;
        //阻塞隊(duì)列初始化
        this.queue = new ArrayBlockingQueue<>(bufferSize);
        //啟動(dòng)線程
        start();
    }
    
    @Override
    public synchronized void start() {
        if (!initialized) {
            // start just called once
            //啟動(dòng)run方法
            super.start();
            if (queueMaxSize == -1) {
                queueMaxSize = ringBufferSize;
            }
            initialized = true;
        }
    }
}

我們可以看到這個(gè)類(lèi)繼承自Thread,說(shuō)明他是一個(gè)線程類(lèi),同時(shí)實(shí)現(xiàn)了EventPublisher說(shuō)明他也是一個(gè)發(fā)布者,在init()中,是以守護(hù)線程的方式運(yùn)作的,同時(shí)初始化了一個(gè)阻塞隊(duì)列,最后調(diào)用start()啟動(dòng)線程。

在start()里面,其實(shí)就是啟動(dòng)run():

@Override
public void run() {
    openEventHandler();
}

   void openEventHandler() {
    try {

        // This variable is defined to resolve the problem which message overstock in the queue.
        int waitTimes = 60;
        // To ensure that messages are not lost, enable EventHandler when
        // waiting for the first Subscriber to register
        //死循環(huán)遍歷,線程啟動(dòng)設(shè)置最大延遲60秒,用來(lái)解決消息積壓?jiǎn)栴}
        for (; ; ) {
            if (shutdown || hasSubscriber() || waitTimes <= 0) {
                break;
            }
            ThreadUtils.sleep(1000L);
            waitTimes--;
        }
        //死循環(huán)從隊(duì)列中取出event對(duì)象,同時(shí)通知訂閱者(subscriber)執(zhí)行event對(duì)象
        for (; ; ) {
            if (shutdown) {
                break;
            }
            final Event event = queue.take();
            receiveEvent(event);
            UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence()));
        }
    } catch (Throwable ex) {
        LOGGER.error("Event listener exception : ", ex);
    }
}

在上述代碼中我們可以看到for (; ; )這個(gè)循環(huán)出現(xiàn)了兩次,這個(gè)就是循環(huán)遍歷(死循環(huán)),第一個(gè)死循環(huán)我們可以理解成延時(shí)效果,里面最大延時(shí)60秒,每隔一秒運(yùn)行一次,判斷(當(dāng)前線程是否關(guān)閉、是否有訂閱者、是否超過(guò)60秒)只要滿足其中任意一個(gè)條件,跳出循環(huán)
第二個(gè)死循環(huán),是我們業(yè)務(wù)邏輯處理,用來(lái)消費(fèi),從隊(duì)列中取出event事件,然后通過(guò)receiveEvent()執(zhí)行。

那么我們可以從隊(duì)列中取出事件,那么這個(gè)事件又在哪一步注入進(jìn)去的呢,我們還是在當(dāng)前類(lèi)里面,找到一個(gè)叫publish()的方法

@Override
public boolean publish(Event event) {
    checkIsStart();
    //向隊(duì)列中插入元素
    boolean success = this.queue.offer(event);
    //判斷是否插入成功
    if (!success) {
        LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);
        //失敗直接執(zhí)行
        receiveEvent(event);
        return true;
    }
    return true;
}

這個(gè)方法其實(shí)就是發(fā)布事件調(diào)用了publish往阻塞隊(duì)列中存入事件,如果失敗那么立即執(zhí)行receiveEvent(),不在繼續(xù)走隊(duì)列方法

void receiveEvent(Event event) {
    final long currentEventSequence = event.sequence();

    if (!hasSubscriber()) {
        LOGGER.warn("[NotifyCenter] the {} is lost, because there is no subscriber.", event);
        return;
    }

    // Notification single event listener
    //循環(huán)遍歷subscribers對(duì)象
    for (Subscriber subscriber : subscribers) {
        // Whether to ignore expiration events
        if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
            LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",
                    event.getClass());
            continue;
        }

        // Because unifying smartSubscriber and subscriber, so here need to think of compatibility.
        // Remove original judge part of codes.
        //通知訂閱者執(zhí)行event
        notifySubscriber(subscriber, event);
    }
}

而在receiveEvent()方法中,這里其實(shí)就是遍歷的subscribers集合(訂閱者),然后通過(guò)notifySubscriber()通知訂閱者方法,而這個(gè)subscribers集合就是在我們之前講到的NacosNamingService.init()方法中設(shè)置的。

public class NacosNamingService implements NamingService {
 private void init(Properties properties) throws NacosException {
   //將Subscribe注冊(cè)到Publisher
   NotifyCenter.registerSubscriber(changeNotifier);
 }
}

NotifyCenter.registerSubscriber(changeNotifier);會(huì)調(diào)用NotifyCenter.addSubscriber()方法,進(jìn)行最終的操作。

private static void addSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType,
      EventPublisherFactory factory) {

  final String topic = ClassUtils.getCanonicalName(subscribeType);
  synchronized (NotifyCenter.class) {
      // MapUtils.computeIfAbsent is a unsafe method.
      MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, subscribeType, ringBufferSize);
  }
  //獲取對(duì)應(yīng)的publisher
  EventPublisher publisher = INSTANCE.publisherMap.get(topic);
  if (publisher instanceof ShardedEventPublisher) {
      ((ShardedEventPublisher) publisher).addSubscriber(consumer, subscribeType);
  } else {
      //添加到subscribers集合
      publisher.addSubscriber(consumer);
  }
}

addSubscriber()方法的邏輯就是講訂閱事件、發(fā)布中、訂閱者三個(gè)關(guān)系進(jìn)行綁定,而發(fā)布者和事件通過(guò)Map進(jìn)行維護(hù),發(fā)布者與訂閱者通過(guò)關(guān)聯(lián)關(guān)系進(jìn)行維護(hù)。

我們回到剛剛DefaulePublisher.notifySubscriber()方法,這里是最后執(zhí)行訂閱者事件的方法

@Override
public void notifySubscriber(final Subscriber subscriber, final Event event) {

  LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);
  //執(zhí)行訂閱者事件
  final Runnable job = () -> subscriber.onEvent(event);
  //執(zhí)行者
  final Executor executor = subscriber.executor();

  if (executor != null) {
      executor.execute(job);
  } else {
      try {
          job.run();
      } catch (Throwable e) {
          LOGGER.error("Event callback exception: ", e);
      }
  }
}

到這里,訂閱機(jī)制就講完了,可能會(huì)有點(diǎn)繞,最好是我們能夠去跟著代碼走一遍,這樣會(huì)比較理解和記憶,在這里我們重點(diǎn)需要理解NotifyCenter對(duì)事件發(fā)布者、訂閱者以及之間關(guān)系的維護(hù),關(guān)系維護(hù)的入口就在NacosNamingService.init()中,我們來(lái)看一下他的核心邏輯

首先ServiceInfoHolder中通過(guò)NotifyCenter發(fā)布InstancesChangeEvent事件.

NotifyCenter獲取對(duì)應(yīng)的CanonicalName,并將這個(gè)參數(shù)作為key,從NotifyCenter.publisherMap中獲取對(duì)應(yīng)的事件發(fā)布者,然后將InstancesChangeEvent事件進(jìn)行發(fā)布.

InstancesChangeEvent事件發(fā)布主要是通過(guò)EventPublisher的實(shí)現(xiàn)類(lèi),DefaultPublisher進(jìn)行InstancesChangeEvent事件發(fā)布,而DefaultPublisher本身作為守護(hù)線程的方式進(jìn)行運(yùn)作,在執(zhí)行業(yè)務(wù)邏輯時(shí)判斷是否線程啟動(dòng),如果啟動(dòng),將事件添加到隊(duì)列中,如果成功,則發(fā)布過(guò)程完成,如果添加失敗,立即執(zhí)行DefaultPublisher.receiveEvent,接收事件通知訂閱者,創(chuàng)建一個(gè)Runnable對(duì)象,執(zhí)行訂閱者的Event事件。

在添加到隊(duì)列成功的時(shí)候,DefaultPublisher會(huì)創(chuàng)建一個(gè)阻塞隊(duì)列(BlockingQueue),標(biāo)記線程啟動(dòng),當(dāng)他執(zhí)行 super.start(),會(huì)調(diào)用它的run方法,在這個(gè)run方法里面核心的業(yè)務(wù)邏輯就是openEventHandler(),里面會(huì)有兩個(gè)死循環(huán),第一個(gè)是在線程啟動(dòng)的60秒內(nèi)執(zhí)行條件,第二個(gè)是從阻塞隊(duì)列中獲取Event事件,調(diào)用DefaultPublisher.receiveEvent()通知訂閱者,流程結(jié)束

本地緩存

我們?cè)谥暗南盗兄?,客戶端?huì)緩存一些信息在本地中,來(lái)獲取ServiceInfo的信息,但是在執(zhí)行本地緩存的時(shí)候,難免會(huì)有一些故障,有故障就需要進(jìn)行處理,在這里主要涉及到兩個(gè)類(lèi)ServiceInfoHolderFailoverReactor

Nacos緩存主要是分為兩個(gè)方面,一個(gè)從注冊(cè)中心獲取實(shí)例信息緩存到內(nèi)存中,通過(guò)ConcurrentMap進(jìn)行存儲(chǔ),一個(gè)是通過(guò)磁盤(pán)文件的形式定時(shí)緩存。

同時(shí)故障處理也分為兩個(gè)部分,一個(gè)是故障處理的開(kāi)關(guān)通過(guò)文件進(jìn)行標(biāo)記,一個(gè)是當(dāng)起來(lái)故障處理后,可以從故障備份的文件中獲取服務(wù)實(shí)例信息。

介紹完上面幾點(diǎn),我們先來(lái)詳細(xì)講解第一個(gè)核心類(lèi)ServiceInfoHolder

ServiceInfoHolder

ServiceInfoHolder類(lèi),主要是用來(lái)處理服務(wù)信息的,每次客戶端從服務(wù)端拉取服務(wù)信息時(shí),都用經(jīng)過(guò)這個(gè)類(lèi),而processServiceInfo用來(lái)處理本地信息(緩存、發(fā)布、更新、本地目錄初始化)等

ServiceInfo: 注冊(cè)服務(wù)的信息,主要包含(服務(wù)名、分組名、集群信息、實(shí)例列表、最后一次更新時(shí)間),客戶端獲取的信息,都是通過(guò)ServiceInfo作為承載體,ServiceInfoHolder.ServiceInfo,通過(guò)ConcurrentMap進(jìn)行存儲(chǔ),如下所示:

public class ServiceInfoHolder implements Closeable {

  private final ConcurrentMap<String, ServiceInfo> serviceInfoMap;
  
   public ServiceInfoHolder(String namespace, Properties properties) {
    initCacheDir(namespace, properties);
    //啟動(dòng)是判斷是否從緩存信息中獲取,默認(rèn)為false
    if (isLoadCacheAtStart(properties)) {
        //從緩存目錄中讀取信息
        this.serviceInfoMap = new ConcurrentHashMap<>(DiskCache.read(this.cacheDir));
    } else {
        //創(chuàng)建空集合對(duì)象
        this.serviceInfoMap = new ConcurrentHashMap<>(16);
    }
    this.failoverReactor = new FailoverReactor(this, cacheDir);
    this.pushEmptyProtection = isPushEmptyProtect(properties);
}
  
  public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
        //判斷服務(wù)key是否為空
        String serviceKey = serviceInfo.getKey();
        if (serviceKey == null) {
            return null;
        }
        ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
        if (isEmptyOrErrorPush(serviceInfo)) {
            //empty or error push, just ignore
            return oldService;
        }
        //將緩存信息放置到map中
        serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
        //判斷實(shí)例信息是否發(fā)生改變
        boolean changed = isChangedServiceInfo(oldService, serviceInfo);
        if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
            serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
        }
        //監(jiān)控服務(wù)緩存map的大小
        MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
        if (changed) {
            NAMING_LOGGER.info("current ips:({}) service: {} -> {}", serviceInfo.ipCount(), serviceInfo.getKey(),
                    JacksonUtils.toJson(serviceInfo.getHosts()));
            //添加實(shí)例變更事件,被訂閱者執(zhí)行
            NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),
                    serviceInfo.getClusters(), serviceInfo.getHosts()));
            //寫(xiě)入本地文件
            DiskCache.write(serviceInfo, cacheDir);
        }
        return serviceInfo;
    }
}

這里就是Nacos獲取注冊(cè)信息的緩存,之前我們有講過(guò),當(dāng)服務(wù)信息變更的時(shí)候會(huì)第一時(shí)間更新ServiceInfoMap中的信息,通過(guò)isChangedServiceInfo進(jìn)行判斷,當(dāng)發(fā)生變動(dòng)時(shí),serviceInfoMap.put最新數(shù)據(jù),當(dāng)我們需要使用的時(shí)候,通過(guò)key進(jìn)行g(shù)et操作,ServiceInfoMap默認(rèn)創(chuàng)建空的對(duì)象,但如果配置啟動(dòng)從緩存文件中獲取,則會(huì)從緩存中獲取信息。而且當(dāng)我們服務(wù)實(shí)例發(fā)生變更的時(shí)候,會(huì)通過(guò)DiskCache.write()向?qū)?yīng)的目錄文件中寫(xiě)入ServiceInfo信息

本地緩存地址

本地緩存的地址通過(guò)cacheDir進(jìn)行執(zhí)行本地緩存和故障處理的根目錄,在ServiceInfoHolder構(gòu)造方法中,會(huì)默認(rèn)生成緩存目錄,默認(rèn)路徑為${user}/nacos/naming/public,我們也可以需要通過(guò)System.setProperty("JM.SNAPSHOT.PATH")指定。

public class ServiceInfoHolder implements Closeable {
    private String cacheDir;
    
    public ServiceInfoHolder(String namespace, Properties properties) {
    //初始化生成緩存目錄
    initCacheDir(namespace, properties);
    ......
    }
    
    private void initCacheDir(String namespace, Properties properties) {
        String jmSnapshotPath = System.getProperty(JM_SNAPSHOT_PATH_PROPERTY);

        String namingCacheRegistryDir = "";
        if (properties.getProperty(PropertyKeyConst.NAMING_CACHE_REGISTRY_DIR) != null) {
            namingCacheRegistryDir = File.separator + properties.getProperty(PropertyKeyConst.NAMING_CACHE_REGISTRY_DIR);
        }

        if (!StringUtils.isBlank(jmSnapshotPath)) {
            cacheDir = jmSnapshotPath + File.separator + FILE_PATH_NACOS + namingCacheRegistryDir
                    + File.separator + FILE_PATH_NAMING + File.separator + namespace;
        } else {
            cacheDir = System.getProperty(USER_HOME_PROPERTY) + File.separator + FILE_PATH_NACOS + namingCacheRegistryDir
                    + File.separator + FILE_PATH_NAMING + File.separator + namespace;
        }
    }
    
}

故障處理

ServiceInfoHolder構(gòu)造方法中,還會(huì)初始化一個(gè)FailoverReactor的類(lèi),這個(gè)類(lèi)主要是用來(lái)故障處理。

public class ServiceInfoHolder implements Closeable {
  private final FailoverReactor failoverReactor;
  
  public ServiceInfoHolder(String namespace, Properties properties) {
    ....
    //為兩者相互持有對(duì)方的引用
    this.failoverReactor = new FailoverReactor(this, cacheDir);
    .....
  }
  
   public FailoverReactor(ServiceInfoHolder serviceInfoHolder, String cacheDir) {
        //獲取serviceInfoHolder引用對(duì)象
        this.serviceInfoHolder = serviceInfoHolder;
        //故障目錄${user}/nacos/naming/public/failover
        this.failoverDir = cacheDir + FAILOVER_DIR;
        //初始化executorService
        this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                //開(kāi)啟守護(hù)線程
                thread.setDaemon(true);
                thread.setName("com.alibaba.nacos.naming.failover");
                return thread;
            }
        });
        //其他信息初始化
        this.init();
    }
    
     public void init() {
        //執(zhí)行初始化操作,間隔5秒,執(zhí)行SwitchRefresher()任務(wù)
        executorService.scheduleWithFixedDelay(new SwitchRefresher(), 0L, 5000L, TimeUnit.MILLISECONDS);

        //初始化操作,延遲30分鐘執(zhí)行,間隔24小時(shí),執(zhí)行DiskFileWriter()任務(wù)
        executorService.scheduleWithFixedDelay(new DiskFileWriter(), 30, DAY_PERIOD_MINUTES, TimeUnit.MINUTES);
        

        //初始化操作,間隔10秒,核心方法為DiskFileWriter
        executorService.schedule(new Runnable() {
            @Override
            public void run() {
                try {
                    File cacheDir = new File(failoverDir);
                    
                    if (!cacheDir.exists() && !cacheDir.mkdirs()) {
                        throw new IllegalStateException("failed to create cache dir: " + failoverDir);
                    }
                    
                    File[] files = cacheDir.listFiles();
                    //如果故障目錄為空,啟動(dòng)立即執(zhí)行,備份文件
                    if (files == null || files.length <= 0) {
                        new DiskFileWriter().run();
                    }
                } catch (Throwable e) {
                    NAMING_LOGGER.error("[NA] failed to backup file on startup.", e);
                }
                
            }
        }, 10000L, TimeUnit.MILLISECONDS);
    }
}

init()代碼中,開(kāi)啟了三個(gè)定時(shí)任務(wù),三個(gè)任務(wù)都是FailoverReactor內(nèi)部類(lèi),

  • 執(zhí)行初始化操作,間隔5秒,執(zhí)行SwitchRefresher()任務(wù)
  • 初始化操作,延遲30分鐘執(zhí)行,間隔24小時(shí),執(zhí)行DiskFileWriter()任務(wù)
  • 初始化操作,間隔10秒,核心方法為DiskFileWriter

我們這里先來(lái)看一下核心方法DiskFileWriter,這里主要是獲取服務(wù)信息,判斷是否能夠?qū)懭氪疟P(pán),條件滿足,寫(xiě)入拼接的故障目錄中,因?yàn)榈谝粋€(gè)和第二個(gè)初始化操作,都會(huì)用到DiskFileWriter,當(dāng)我們第三個(gè)定時(shí)判斷如果文件不存在,則會(huì)將文件寫(xiě)入本地磁盤(pán)中

class DiskFileWriter extends TimerTask {

  @Override
  public void run() {
      Map<String, ServiceInfo> map = serviceInfoHolder.getServiceInfoMap();
      for (Map.Entry<String, ServiceInfo> entry : map.entrySet()) {
          ServiceInfo serviceInfo = entry.getValue();
          //主要是判斷服務(wù)信息是否完整
          if (StringUtils.equals(serviceInfo.getKey(), UtilAndComs.ALL_IPS) || StringUtils
                  .equals(serviceInfo.getName(), UtilAndComs.ENV_LIST_KEY) || StringUtils
                  .equals(serviceInfo.getName(), UtilAndComs.ENV_CONFIGS) || StringUtils
                  .equals(serviceInfo.getName(), UtilAndComs.VIP_CLIENT_FILE) || StringUtils
                  .equals(serviceInfo.getName(), UtilAndComs.ALL_HOSTS)) {
              continue;
          }
          //將文件寫(xiě)入磁盤(pán)中
          DiskCache.write(serviceInfo, failoverDir);
      }
  }
}

接下來(lái)我們?cè)倏匆幌?,第一個(gè)定時(shí)任務(wù)SwitchRefresher的業(yè)務(wù)邏輯,

class SwitchRefresher implements Runnable {

long lastModifiedMillis = 0L;

@Override
public void run() {
  try {
      File switchFile = new File(failoverDir + UtilAndComs.FAILOVER_SWITCH);
      //如果文件不存在返回
      if (!switchFile.exists()) {
          switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString());
          NAMING_LOGGER.debug("failover switch is not found, {}", switchFile.getName());
          return;
      }

      long modified = switchFile.lastModified();
      //判斷文件修改時(shí)間
      if (lastModifiedMillis < modified) {
          lastModifiedMillis = modified;
          //獲取故障處理文件內(nèi)容
          String failover = ConcurrentDiskUtil.getFileContent(failoverDir + UtilAndComs.FAILOVER_SWITCH,
                  Charset.defaultCharset().toString());
          if (!StringUtils.isEmpty(failover)) {
              String[] lines = failover.split(DiskCache.getLineSeparator());

              for (String line : lines) {
                  String line1 = line.trim();
                  //"1" 開(kāi)啟故障處理
                  if (IS_FAILOVER_MODE.equals(line1)) {
                      switchParams.put(FAILOVER_MODE_PARAM, Boolean.TRUE.toString());
                      NAMING_LOGGER.info("failover-mode is on");
                      new FailoverFileReader().run();
                      //"0" 關(guān)閉故障處理
                  } else if (NO_FAILOVER_MODE.equals(line1)) {
                      switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString());
                      NAMING_LOGGER.info("failover-mode is off");
                  }
              }
          } else {
              switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString());
          }
      }

  } catch (Throwable e) {
      NAMING_LOGGER.error("[NA] failed to read failover switch.", e);
  }
}
}

這里面主要是判斷故障處理文件是否存在,不存在直接返回,再去比較文件的修改時(shí)間,如果已經(jīng)修改,則獲取文件中的內(nèi)容,繼續(xù)進(jìn)行判斷,當(dāng)我們開(kāi)啟故障處理時(shí),執(zhí)行線程FailoverFileReader().run()

class FailoverFileReader implements Runnable {

@Override
public void run() {
  Map<String, ServiceInfo> domMap = new HashMap<String, ServiceInfo>(16);

  BufferedReader reader = null;
  try {
      //讀取failoverDir目錄下的文件
      File cacheDir = new File(failoverDir);
      //不存在返回錯(cuò)誤
      if (!cacheDir.exists() && !cacheDir.mkdirs()) {
          throw new IllegalStateException("failed to create cache dir: " + failoverDir);
      }
      //獲取文件
      File[] files = cacheDir.listFiles();
      //文件不存在返回
      if (files == null) {
          return;
      }
      //遍歷處理
      for (File file : files) {
          //文件不存在跳過(guò)
          if (!file.isFile()) {
              continue;
          }
          //如果是故障處理標(biāo)志文件,跳過(guò)這一步
          if (file.getName().equals(UtilAndComs.FAILOVER_SWITCH)) {
              continue;
          }

          ServiceInfo dom = new ServiceInfo(file.getName());

          //讀取備份中的內(nèi)容,轉(zhuǎn)換為ServiceInfo對(duì)象
          try {
              String dataString = ConcurrentDiskUtil
                      .getFileContent(file, Charset.defaultCharset().toString());
              reader = new BufferedReader(new StringReader(dataString));

              String json;
              if ((json = reader.readLine()) != null) {
                  try {
                      dom = JacksonUtils.toObj(json, ServiceInfo.class);
                  } catch (Exception e) {
                      NAMING_LOGGER.error("[NA] error while parsing cached dom : {}", json, e);
                  }
              }

          } catch (Exception e) {
              NAMING_LOGGER.error("[NA] failed to read cache for dom: {}", file.getName(), e);
          } finally {
              try {
                  if (reader != null) {
                      reader.close();
                  }
              } catch (Exception e) {
                  //ignore
              }
          }
          if (!CollectionUtils.isEmpty(dom.getHosts())) {
              //將ServiceInfo對(duì)象放入domMap中
              domMap.put(dom.getKey(), dom);
          }
      }
  } catch (Exception e) {
      NAMING_LOGGER.error("[NA] failed to read cache file", e);
  }
  //如果不為空,賦值serviceMap
  if (domMap.size() > 0) {
      serviceMap = domMap;
  }
}
}

FailoverFileReader主要是操作讀取failover目錄存儲(chǔ)的備份服務(wù)信息文件內(nèi)容,然后裝換成ServiceInfo信息,并將所有的ServiceInfo儲(chǔ)存在FailoverReactorServiceMap屬性中。

總結(jié)

到這里我們Nacos訂閱機(jī)制核心流程就講完了,整體訂閱機(jī)制的流程還是比較復(fù)雜的,因?yàn)檫€涉及到之前將的邏輯,會(huì)有點(diǎn)繞,并且用到了保證線程Map、守護(hù)線程、阻塞隊(duì)列、線程的使用等等,我們需要重點(diǎn)掌握的主要是事件發(fā)布者、訂閱者之間的關(guān)系,這里還是推薦大家有機(jī)會(huì)的話可以自己跟著源碼走一遍,會(huì)有更深的體驗(yàn)。


文章轉(zhuǎn)載自: https://muxiaonong.blog.csdn.net


公眾號(hào):牧小農(nóng),微信掃碼關(guān)注或搜索公眾號(hào)名稱(chēng)