Nacos源碼系列—訂閱機制的前因后果(下)

事件發(fā)布

在上一節(jié)中我們講解了在NotifyCenter中維護(hù)了事件名稱和事件發(fā)布者的關(guān)系,而默認(rèn)的事件發(fā)布者為DefaultPublisher,今天我們就來講一下DefaultPublisher的事件發(fā)布的具體邏輯

首先我們來看一下DefaultPublisher的源碼:

public class DefaultPublisher extends Thread implements EventPublisher {
    @Override
    public void init(Class<? extends Event> type, int bufferSize) {
        //守護(hù)線程
        setDaemon(true);
        //設(shè)置線程名
        setName("nacos.publisher-" + type.getName());
        this.eventType = type;
        this.queueMaxSize = bufferSize;
        //阻塞隊列初始化
        this.queue = new ArrayBlockingQueue<>(bufferSize);
        //啟動線程
        start();
    }
    
    @Override
    public synchronized void start() {
        if (!initialized) {
            // start just called once
            //啟動run方法
            super.start();
            if (queueMaxSize == -1) {
                queueMaxSize = ringBufferSize;
            }
            initialized = true;
        }
    }
}

我們可以看到這個類繼承自Thread,說明他是一個線程類,同時實現(xiàn)了EventPublisher說明他也是一個發(fā)布者,在init()中,是以守護(hù)線程的方式運作的,同時初始化了一個阻塞隊列,最后調(diào)用start()啟動線程。

在start()里面,其實就是啟動run():

@Override
public void run() {
    openEventHandler();
}

   void openEventHandler() {
    try {

        // This variable is defined to resolve the problem which message overstock in the queue.
        int waitTimes = 60;
        // To ensure that messages are not lost, enable EventHandler when
        // waiting for the first Subscriber to register
        //死循環(huán)遍歷,線程啟動設(shè)置最大延遲60秒,用來解決消息積壓問題
        for (; ; ) {
            if (shutdown || hasSubscriber() || waitTimes <= 0) {
                break;
            }
            ThreadUtils.sleep(1000L);
            waitTimes--;
        }
        //死循環(huán)從隊列中取出event對象,同時通知訂閱者(subscriber)執(zhí)行event對象
        for (; ; ) {
            if (shutdown) {
                break;
            }
            final Event event = queue.take();
            receiveEvent(event);
            UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence()));
        }
    } catch (Throwable ex) {
        LOGGER.error("Event listener exception : ", ex);
    }
}

在上述代碼中我們可以看到for (; ; )這個循環(huán)出現(xiàn)了兩次,這個就是循環(huán)遍歷(死循環(huán)),第一個死循環(huán)我們可以理解成延時效果,里面最大延時60秒,每隔一秒運行一次,判斷(當(dāng)前線程是否關(guān)閉、是否有訂閱者、是否超過60秒)只要滿足其中任意一個條件,跳出循環(huán)
第二個死循環(huán),是我們業(yè)務(wù)邏輯處理,用來消費,從隊列中取出event事件,然后通過receiveEvent()執(zhí)行。

那么我們可以從隊列中取出事件,那么這個事件又在哪一步注入進(jìn)去的呢,我們還是在當(dāng)前類里面,找到一個叫publish()的方法

@Override
public boolean publish(Event event) {
    checkIsStart();
    //向隊列中插入元素
    boolean success = this.queue.offer(event);
    //判斷是否插入成功
    if (!success) {
        LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);
        //失敗直接執(zhí)行
        receiveEvent(event);
        return true;
    }
    return true;
}

這個方法其實就是發(fā)布事件調(diào)用了publish往阻塞隊列中存入事件,如果失敗那么立即執(zhí)行receiveEvent(),不在繼續(xù)走隊列方法

void receiveEvent(Event event) {
    final long currentEventSequence = event.sequence();

    if (!hasSubscriber()) {
        LOGGER.warn("[NotifyCenter] the {} is lost, because there is no subscriber.", event);
        return;
    }

    // Notification single event listener
    //循環(huán)遍歷subscribers對象
    for (Subscriber subscriber : subscribers) {
        // Whether to ignore expiration events
        if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
            LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",
                    event.getClass());
            continue;
        }

        // Because unifying smartSubscriber and subscriber, so here need to think of compatibility.
        // Remove original judge part of codes.
        //通知訂閱者執(zhí)行event
        notifySubscriber(subscriber, event);
    }
}

而在receiveEvent()方法中,這里其實就是遍歷的subscribers集合(訂閱者),然后通過notifySubscriber()通知訂閱者方法,而這個subscribers集合就是在我們之前講到的NacosNamingService.init()方法中設(shè)置的。

public class NacosNamingService implements NamingService {
 private void init(Properties properties) throws NacosException {
   //將Subscribe注冊到Publisher
   NotifyCenter.registerSubscriber(changeNotifier);
 }
}

NotifyCenter.registerSubscriber(changeNotifier);會調(diào)用NotifyCenter.addSubscriber()方法,進(jìn)行最終的操作。

private static void addSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType,
      EventPublisherFactory factory) {

  final String topic = ClassUtils.getCanonicalName(subscribeType);
  synchronized (NotifyCenter.class) {
      // MapUtils.computeIfAbsent is a unsafe method.
      MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, subscribeType, ringBufferSize);
  }
  //獲取對應(yīng)的publisher
  EventPublisher publisher = INSTANCE.publisherMap.get(topic);
  if (publisher instanceof ShardedEventPublisher) {
      ((ShardedEventPublisher) publisher).addSubscriber(consumer, subscribeType);
  } else {
      //添加到subscribers集合
      publisher.addSubscriber(consumer);
  }
}

addSubscriber()方法的邏輯就是講訂閱事件、發(fā)布中、訂閱者三個關(guān)系進(jìn)行綁定,而發(fā)布者和事件通過Map進(jìn)行維護(hù),發(fā)布者與訂閱者通過關(guān)聯(lián)關(guān)系進(jìn)行維護(hù)。

我們回到剛剛DefaulePublisher.notifySubscriber()方法,這里是最后執(zhí)行訂閱者事件的方法

@Override
public void notifySubscriber(final Subscriber subscriber, final Event event) {

  LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);
  //執(zhí)行訂閱者事件
  final Runnable job = () -> subscriber.onEvent(event);
  //執(zhí)行者
  final Executor executor = subscriber.executor();

  if (executor != null) {
      executor.execute(job);
  } else {
      try {
          job.run();
      } catch (Throwable e) {
          LOGGER.error("Event callback exception: ", e);
      }
  }
}

到這里,訂閱機制就講完了,可能會有點繞,最好是我們能夠去跟著代碼走一遍,這樣會比較理解和記憶,在這里我們重點需要理解NotifyCenter對事件發(fā)布者、訂閱者以及之間關(guān)系的維護(hù),關(guān)系維護(hù)的入口就在NacosNamingService.init()中,我們來看一下他的核心邏輯

首先ServiceInfoHolder中通過NotifyCenter發(fā)布InstancesChangeEvent事件.

NotifyCenter獲取對應(yīng)的CanonicalName,并將這個參數(shù)作為key,從NotifyCenter.publisherMap中獲取對應(yīng)的事件發(fā)布者,然后將InstancesChangeEvent事件進(jìn)行發(fā)布.

InstancesChangeEvent事件發(fā)布主要是通過EventPublisher的實現(xiàn)類,DefaultPublisher進(jìn)行InstancesChangeEvent事件發(fā)布,而DefaultPublisher本身作為守護(hù)線程的方式進(jìn)行運作,在執(zhí)行業(yè)務(wù)邏輯時判斷是否線程啟動,如果啟動,將事件添加到隊列中,如果成功,則發(fā)布過程完成,如果添加失敗,立即執(zhí)行DefaultPublisher.receiveEvent,接收事件通知訂閱者,創(chuàng)建一個Runnable對象,執(zhí)行訂閱者的Event事件。

在添加到隊列成功的時候,DefaultPublisher會創(chuàng)建一個阻塞隊列(BlockingQueue),標(biāo)記線程啟動,當(dāng)他執(zhí)行 super.start(),會調(diào)用它的run方法,在這個run方法里面核心的業(yè)務(wù)邏輯就是openEventHandler(),里面會有兩個死循環(huán),第一個是在線程啟動的60秒內(nèi)執(zhí)行條件,第二個是從阻塞隊列中獲取Event事件,調(diào)用DefaultPublisher.receiveEvent()通知訂閱者,流程結(jié)束

本地緩存

我們在之前的系列中,客戶端會緩存一些信息在本地中,來獲取ServiceInfo的信息,但是在執(zhí)行本地緩存的時候,難免會有一些故障,有故障就需要進(jìn)行處理,在這里主要涉及到兩個類ServiceInfoHolderFailoverReactor

Nacos緩存主要是分為兩個方面,一個從注冊中心獲取實例信息緩存到內(nèi)存中,通過ConcurrentMap進(jìn)行存儲,一個是通過磁盤文件的形式定時緩存。

同時故障處理也分為兩個部分,一個是故障處理的開關(guān)通過文件進(jìn)行標(biāo)記,一個是當(dāng)起來故障處理后,可以從故障備份的文件中獲取服務(wù)實例信息。

介紹完上面幾點,我們先來詳細(xì)講解第一個核心類ServiceInfoHolder

ServiceInfoHolder

ServiceInfoHolder類,主要是用來處理服務(wù)信息的,每次客戶端從服務(wù)端拉取服務(wù)信息時,都用經(jīng)過這個類,而processServiceInfo用來處理本地信息(緩存、發(fā)布、更新、本地目錄初始化)等

ServiceInfo: 注冊服務(wù)的信息,主要包含(服務(wù)名、分組名、集群信息、實例列表、最后一次更新時間),客戶端獲取的信息,都是通過ServiceInfo作為承載體,ServiceInfoHolder.ServiceInfo,通過ConcurrentMap進(jìn)行存儲,如下所示:

public class ServiceInfoHolder implements Closeable {

  private final ConcurrentMap<String, ServiceInfo> serviceInfoMap;
  
   public ServiceInfoHolder(String namespace, Properties properties) {
    initCacheDir(namespace, properties);
    //啟動是判斷是否從緩存信息中獲取,默認(rèn)為false
    if (isLoadCacheAtStart(properties)) {
        //從緩存目錄中讀取信息
        this.serviceInfoMap = new ConcurrentHashMap<>(DiskCache.read(this.cacheDir));
    } else {
        //創(chuàng)建空集合對象
        this.serviceInfoMap = new ConcurrentHashMap<>(16);
    }
    this.failoverReactor = new FailoverReactor(this, cacheDir);
    this.pushEmptyProtection = isPushEmptyProtect(properties);
}
  
  public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
        //判斷服務(wù)key是否為空
        String serviceKey = serviceInfo.getKey();
        if (serviceKey == null) {
            return null;
        }
        ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
        if (isEmptyOrErrorPush(serviceInfo)) {
            //empty or error push, just ignore
            return oldService;
        }
        //將緩存信息放置到map中
        serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
        //判斷實例信息是否發(fā)生改變
        boolean changed = isChangedServiceInfo(oldService, serviceInfo);
        if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
            serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
        }
        //監(jiān)控服務(wù)緩存map的大小
        MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
        if (changed) {
            NAMING_LOGGER.info("current ips:({}) service: {} -> {}", serviceInfo.ipCount(), serviceInfo.getKey(),
                    JacksonUtils.toJson(serviceInfo.getHosts()));
            //添加實例變更事件,被訂閱者執(zhí)行
            NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),
                    serviceInfo.getClusters(), serviceInfo.getHosts()));
            //寫入本地文件
            DiskCache.write(serviceInfo, cacheDir);
        }
        return serviceInfo;
    }
}

這里就是Nacos獲取注冊信息的緩存,之前我們有講過,當(dāng)服務(wù)信息變更的時候會第一時間更新ServiceInfoMap中的信息,通過isChangedServiceInfo進(jìn)行判斷,當(dāng)發(fā)生變動時,serviceInfoMap.put最新數(shù)據(jù),當(dāng)我們需要使用的時候,通過key進(jìn)行g(shù)et操作,ServiceInfoMap默認(rèn)創(chuàng)建空的對象,但如果配置啟動從緩存文件中獲取,則會從緩存中獲取信息。而且當(dāng)我們服務(wù)實例發(fā)生變更的時候,會通過DiskCache.write()向?qū)?yīng)的目錄文件中寫入ServiceInfo信息

本地緩存地址

本地緩存的地址通過cacheDir進(jìn)行執(zhí)行本地緩存和故障處理的根目錄,在ServiceInfoHolder構(gòu)造方法中,會默認(rèn)生成緩存目錄,默認(rèn)路徑為${user}/nacos/naming/public,我們也可以需要通過System.setProperty("JM.SNAPSHOT.PATH")指定。

public class ServiceInfoHolder implements Closeable {
    private String cacheDir;
    
    public ServiceInfoHolder(String namespace, Properties properties) {
    //初始化生成緩存目錄
    initCacheDir(namespace, properties);
    ......
    }
    
    private void initCacheDir(String namespace, Properties properties) {
        String jmSnapshotPath = System.getProperty(JM_SNAPSHOT_PATH_PROPERTY);

        String namingCacheRegistryDir = "";
        if (properties.getProperty(PropertyKeyConst.NAMING_CACHE_REGISTRY_DIR) != null) {
            namingCacheRegistryDir = File.separator + properties.getProperty(PropertyKeyConst.NAMING_CACHE_REGISTRY_DIR);
        }

        if (!StringUtils.isBlank(jmSnapshotPath)) {
            cacheDir = jmSnapshotPath + File.separator + FILE_PATH_NACOS + namingCacheRegistryDir
                    + File.separator + FILE_PATH_NAMING + File.separator + namespace;
        } else {
            cacheDir = System.getProperty(USER_HOME_PROPERTY) + File.separator + FILE_PATH_NACOS + namingCacheRegistryDir
                    + File.separator + FILE_PATH_NAMING + File.separator + namespace;
        }
    }
    
}

故障處理

ServiceInfoHolder構(gòu)造方法中,還會初始化一個FailoverReactor的類,這個類主要是用來故障處理。

public class ServiceInfoHolder implements Closeable {
  private final FailoverReactor failoverReactor;
  
  public ServiceInfoHolder(String namespace, Properties properties) {
    ....
    //為兩者相互持有對方的引用
    this.failoverReactor = new FailoverReactor(this, cacheDir);
    .....
  }
  
   public FailoverReactor(ServiceInfoHolder serviceInfoHolder, String cacheDir) {
        //獲取serviceInfoHolder引用對象
        this.serviceInfoHolder = serviceInfoHolder;
        //故障目錄${user}/nacos/naming/public/failover
        this.failoverDir = cacheDir + FAILOVER_DIR;
        //初始化executorService
        this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                //開啟守護(hù)線程
                thread.setDaemon(true);
                thread.setName("com.alibaba.nacos.naming.failover");
                return thread;
            }
        });
        //其他信息初始化
        this.init();
    }
    
     public void init() {
        //執(zhí)行初始化操作,間隔5秒,執(zhí)行SwitchRefresher()任務(wù)
        executorService.scheduleWithFixedDelay(new SwitchRefresher(), 0L, 5000L, TimeUnit.MILLISECONDS);

        //初始化操作,延遲30分鐘執(zhí)行,間隔24小時,執(zhí)行DiskFileWriter()任務(wù)
        executorService.scheduleWithFixedDelay(new DiskFileWriter(), 30, DAY_PERIOD_MINUTES, TimeUnit.MINUTES);
        

        //初始化操作,間隔10秒,核心方法為DiskFileWriter
        executorService.schedule(new Runnable() {
            @Override
            public void run() {
                try {
                    File cacheDir = new File(failoverDir);
                    
                    if (!cacheDir.exists() && !cacheDir.mkdirs()) {
                        throw new IllegalStateException("failed to create cache dir: " + failoverDir);
                    }
                    
                    File[] files = cacheDir.listFiles();
                    //如果故障目錄為空,啟動立即執(zhí)行,備份文件
                    if (files == null || files.length <= 0) {
                        new DiskFileWriter().run();
                    }
                } catch (Throwable e) {
                    NAMING_LOGGER.error("[NA] failed to backup file on startup.", e);
                }
                
            }
        }, 10000L, TimeUnit.MILLISECONDS);
    }
}

init()代碼中,開啟了三個定時任務(wù),三個任務(wù)都是FailoverReactor內(nèi)部類,

  • 執(zhí)行初始化操作,間隔5秒,執(zhí)行SwitchRefresher()任務(wù)
  • 初始化操作,延遲30分鐘執(zhí)行,間隔24小時,執(zhí)行DiskFileWriter()任務(wù)
  • 初始化操作,間隔10秒,核心方法為DiskFileWriter

我們這里先來看一下核心方法DiskFileWriter,這里主要是獲取服務(wù)信息,判斷是否能夠?qū)懭氪疟P,條件滿足,寫入拼接的故障目錄中,因為第一個和第二個初始化操作,都會用到DiskFileWriter,當(dāng)我們第三個定時判斷如果文件不存在,則會將文件寫入本地磁盤中

class DiskFileWriter extends TimerTask {

  @Override
  public void run() {
      Map<String, ServiceInfo> map = serviceInfoHolder.getServiceInfoMap();
      for (Map.Entry<String, ServiceInfo> entry : map.entrySet()) {
          ServiceInfo serviceInfo = entry.getValue();
          //主要是判斷服務(wù)信息是否完整
          if (StringUtils.equals(serviceInfo.getKey(), UtilAndComs.ALL_IPS) || StringUtils
                  .equals(serviceInfo.getName(), UtilAndComs.ENV_LIST_KEY) || StringUtils
                  .equals(serviceInfo.getName(), UtilAndComs.ENV_CONFIGS) || StringUtils
                  .equals(serviceInfo.getName(), UtilAndComs.VIP_CLIENT_FILE) || StringUtils
                  .equals(serviceInfo.getName(), UtilAndComs.ALL_HOSTS)) {
              continue;
          }
          //將文件寫入磁盤中
          DiskCache.write(serviceInfo, failoverDir);
      }
  }
}

接下來我們再看一下,第一個定時任務(wù)SwitchRefresher的業(yè)務(wù)邏輯,

class SwitchRefresher implements Runnable {

long lastModifiedMillis = 0L;

@Override
public void run() {
  try {
      File switchFile = new File(failoverDir + UtilAndComs.FAILOVER_SWITCH);
      //如果文件不存在返回
      if (!switchFile.exists()) {
          switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString());
          NAMING_LOGGER.debug("failover switch is not found, {}", switchFile.getName());
          return;
      }

      long modified = switchFile.lastModified();
      //判斷文件修改時間
      if (lastModifiedMillis < modified) {
          lastModifiedMillis = modified;
          //獲取故障處理文件內(nèi)容
          String failover = ConcurrentDiskUtil.getFileContent(failoverDir + UtilAndComs.FAILOVER_SWITCH,
                  Charset.defaultCharset().toString());
          if (!StringUtils.isEmpty(failover)) {
              String[] lines = failover.split(DiskCache.getLineSeparator());

              for (String line : lines) {
                  String line1 = line.trim();
                  //"1" 開啟故障處理
                  if (IS_FAILOVER_MODE.equals(line1)) {
                      switchParams.put(FAILOVER_MODE_PARAM, Boolean.TRUE.toString());
                      NAMING_LOGGER.info("failover-mode is on");
                      new FailoverFileReader().run();
                      //"0" 關(guān)閉故障處理
                  } else if (NO_FAILOVER_MODE.equals(line1)) {
                      switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString());
                      NAMING_LOGGER.info("failover-mode is off");
                  }
              }
          } else {
              switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString());
          }
      }

  } catch (Throwable e) {
      NAMING_LOGGER.error("[NA] failed to read failover switch.", e);
  }
}
}

這里面主要是判斷故障處理文件是否存在,不存在直接返回,再去比較文件的修改時間,如果已經(jīng)修改,則獲取文件中的內(nèi)容,繼續(xù)進(jìn)行判斷,當(dāng)我們開啟故障處理時,執(zhí)行線程FailoverFileReader().run()

class FailoverFileReader implements Runnable {

@Override
public void run() {
  Map<String, ServiceInfo> domMap = new HashMap<String, ServiceInfo>(16);

  BufferedReader reader = null;
  try {
      //讀取failoverDir目錄下的文件
      File cacheDir = new File(failoverDir);
      //不存在返回錯誤
      if (!cacheDir.exists() && !cacheDir.mkdirs()) {
          throw new IllegalStateException("failed to create cache dir: " + failoverDir);
      }
      //獲取文件
      File[] files = cacheDir.listFiles();
      //文件不存在返回
      if (files == null) {
          return;
      }
      //遍歷處理
      for (File file : files) {
          //文件不存在跳過
          if (!file.isFile()) {
              continue;
          }
          //如果是故障處理標(biāo)志文件,跳過這一步
          if (file.getName().equals(UtilAndComs.FAILOVER_SWITCH)) {
              continue;
          }

          ServiceInfo dom = new ServiceInfo(file.getName());

          //讀取備份中的內(nèi)容,轉(zhuǎn)換為ServiceInfo對象
          try {
              String dataString = ConcurrentDiskUtil
                      .getFileContent(file, Charset.defaultCharset().toString());
              reader = new BufferedReader(new StringReader(dataString));

              String json;
              if ((json = reader.readLine()) != null) {
                  try {
                      dom = JacksonUtils.toObj(json, ServiceInfo.class);
                  } catch (Exception e) {
                      NAMING_LOGGER.error("[NA] error while parsing cached dom : {}", json, e);
                  }
              }

          } catch (Exception e) {
              NAMING_LOGGER.error("[NA] failed to read cache for dom: {}", file.getName(), e);
          } finally {
              try {
                  if (reader != null) {
                      reader.close();
                  }
              } catch (Exception e) {
                  //ignore
              }
          }
          if (!CollectionUtils.isEmpty(dom.getHosts())) {
              //將ServiceInfo對象放入domMap中
              domMap.put(dom.getKey(), dom);
          }
      }
  } catch (Exception e) {
      NAMING_LOGGER.error("[NA] failed to read cache file", e);
  }
  //如果不為空,賦值serviceMap
  if (domMap.size() > 0) {
      serviceMap = domMap;
  }
}
}

FailoverFileReader主要是操作讀取failover目錄存儲的備份服務(wù)信息文件內(nèi)容,然后裝換成ServiceInfo信息,并將所有的ServiceInfo儲存在FailoverReactorServiceMap屬性中。

總結(jié)

到這里我們Nacos訂閱機制核心流程就講完了,整體訂閱機制的流程還是比較復(fù)雜的,因為還涉及到之前將的邏輯,會有點繞,并且用到了保證線程Map、守護(hù)線程、阻塞隊列、線程的使用等等,我們需要重點掌握的主要是事件發(fā)布者、訂閱者之間的關(guān)系,這里還是推薦大家有機會的話可以自己跟著源碼走一遍,會有更深的體驗。


文章轉(zhuǎn)載自: https://muxiaonong.blog.csdn.net


公眾號:牧小農(nóng),微信掃碼關(guān)注或搜索公眾號名稱