Nacos源碼系列—訂閱機(jī)制的前因后果(下)
事件發(fā)布
在上一節(jié)中我們講解了在NotifyCenter
中維護(hù)了事件名稱(chēng)和事件發(fā)布者的關(guān)系,而默認(rèn)的事件發(fā)布者為DefaultPublisher
,今天我們就來(lái)講一下DefaultPublisher的事件發(fā)布的具體邏輯
首先我們來(lái)看一下DefaultPublisher
的源碼:
public class DefaultPublisher extends Thread implements EventPublisher {
@Override
public void init(Class<? extends Event> type, int bufferSize) {
//守護(hù)線程
setDaemon(true);
//設(shè)置線程名
setName("nacos.publisher-" + type.getName());
this.eventType = type;
this.queueMaxSize = bufferSize;
//阻塞隊(duì)列初始化
this.queue = new ArrayBlockingQueue<>(bufferSize);
//啟動(dòng)線程
start();
}
@Override
public synchronized void start() {
if (!initialized) {
// start just called once
//啟動(dòng)run方法
super.start();
if (queueMaxSize == -1) {
queueMaxSize = ringBufferSize;
}
initialized = true;
}
}
}
我們可以看到這個(gè)類(lèi)繼承自Thread
,說(shuō)明他是一個(gè)線程類(lèi),同時(shí)實(shí)現(xiàn)了EventPublisher
說(shuō)明他也是一個(gè)發(fā)布者,在init()中,是以守護(hù)線程的方式運(yùn)作的,同時(shí)初始化了一個(gè)阻塞隊(duì)列,最后調(diào)用start()啟動(dòng)線程。
在start()里面,其實(shí)就是啟動(dòng)run():
@Override
public void run() {
openEventHandler();
}
void openEventHandler() {
try {
// This variable is defined to resolve the problem which message overstock in the queue.
int waitTimes = 60;
// To ensure that messages are not lost, enable EventHandler when
// waiting for the first Subscriber to register
//死循環(huán)遍歷,線程啟動(dòng)設(shè)置最大延遲60秒,用來(lái)解決消息積壓?jiǎn)栴}
for (; ; ) {
if (shutdown || hasSubscriber() || waitTimes <= 0) {
break;
}
ThreadUtils.sleep(1000L);
waitTimes--;
}
//死循環(huán)從隊(duì)列中取出event對(duì)象,同時(shí)通知訂閱者(subscriber)執(zhí)行event對(duì)象
for (; ; ) {
if (shutdown) {
break;
}
final Event event = queue.take();
receiveEvent(event);
UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence()));
}
} catch (Throwable ex) {
LOGGER.error("Event listener exception : ", ex);
}
}
在上述代碼中我們可以看到for (; ; )
這個(gè)循環(huán)出現(xiàn)了兩次,這個(gè)就是循環(huán)遍歷(死循環(huán)),第一個(gè)死循環(huán)我們可以理解成延時(shí)效果,里面最大延時(shí)60秒,每隔一秒運(yùn)行一次,判斷(當(dāng)前線程是否關(guān)閉、是否有訂閱者、是否超過(guò)60秒)只要滿足其中任意一個(gè)條件,跳出循環(huán)
第二個(gè)死循環(huán),是我們業(yè)務(wù)邏輯處理,用來(lái)消費(fèi),從隊(duì)列中取出event
事件,然后通過(guò)receiveEvent()
執(zhí)行。
那么我們可以從隊(duì)列中取出事件,那么這個(gè)事件又在哪一步注入進(jìn)去的呢,我們還是在當(dāng)前類(lèi)里面,找到一個(gè)叫publish()
的方法
@Override
public boolean publish(Event event) {
checkIsStart();
//向隊(duì)列中插入元素
boolean success = this.queue.offer(event);
//判斷是否插入成功
if (!success) {
LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);
//失敗直接執(zhí)行
receiveEvent(event);
return true;
}
return true;
}
這個(gè)方法其實(shí)就是發(fā)布事件調(diào)用了publish
往阻塞隊(duì)列中存入事件,如果失敗那么立即執(zhí)行receiveEvent()
,不在繼續(xù)走隊(duì)列方法
void receiveEvent(Event event) {
final long currentEventSequence = event.sequence();
if (!hasSubscriber()) {
LOGGER.warn("[NotifyCenter] the {} is lost, because there is no subscriber.", event);
return;
}
// Notification single event listener
//循環(huán)遍歷subscribers對(duì)象
for (Subscriber subscriber : subscribers) {
// Whether to ignore expiration events
if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",
event.getClass());
continue;
}
// Because unifying smartSubscriber and subscriber, so here need to think of compatibility.
// Remove original judge part of codes.
//通知訂閱者執(zhí)行event
notifySubscriber(subscriber, event);
}
}
而在receiveEvent()
方法中,這里其實(shí)就是遍歷的subscribers
集合(訂閱者),然后通過(guò)notifySubscriber()
通知訂閱者方法,而這個(gè)subscribers
集合就是在我們之前講到的NacosNamingService.init()
方法中設(shè)置的。
public class NacosNamingService implements NamingService {
private void init(Properties properties) throws NacosException {
//將Subscribe注冊(cè)到Publisher
NotifyCenter.registerSubscriber(changeNotifier);
}
}
而NotifyCenter.registerSubscriber(changeNotifier);
會(huì)調(diào)用NotifyCenter.addSubscriber()
方法,進(jìn)行最終的操作。
private static void addSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType,
EventPublisherFactory factory) {
final String topic = ClassUtils.getCanonicalName(subscribeType);
synchronized (NotifyCenter.class) {
// MapUtils.computeIfAbsent is a unsafe method.
MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, subscribeType, ringBufferSize);
}
//獲取對(duì)應(yīng)的publisher
EventPublisher publisher = INSTANCE.publisherMap.get(topic);
if (publisher instanceof ShardedEventPublisher) {
((ShardedEventPublisher) publisher).addSubscriber(consumer, subscribeType);
} else {
//添加到subscribers集合
publisher.addSubscriber(consumer);
}
}
addSubscriber()
方法的邏輯就是講訂閱事件、發(fā)布中、訂閱者三個(gè)關(guān)系進(jìn)行綁定,而發(fā)布者和事件通過(guò)Map進(jìn)行維護(hù),發(fā)布者與訂閱者通過(guò)關(guān)聯(lián)關(guān)系進(jìn)行維護(hù)。
我們回到剛剛DefaulePublisher.notifySubscriber()
方法,這里是最后執(zhí)行訂閱者事件的方法
@Override
public void notifySubscriber(final Subscriber subscriber, final Event event) {
LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);
//執(zhí)行訂閱者事件
final Runnable job = () -> subscriber.onEvent(event);
//執(zhí)行者
final Executor executor = subscriber.executor();
if (executor != null) {
executor.execute(job);
} else {
try {
job.run();
} catch (Throwable e) {
LOGGER.error("Event callback exception: ", e);
}
}
}
到這里,訂閱機(jī)制就講完了,可能會(huì)有點(diǎn)繞,最好是我們能夠去跟著代碼走一遍,這樣會(huì)比較理解和記憶,在這里我們重點(diǎn)需要理解NotifyCenter
對(duì)事件發(fā)布者、訂閱者以及之間關(guān)系的維護(hù),關(guān)系維護(hù)的入口就在NacosNamingService.init()
中,我們來(lái)看一下他的核心邏輯
首先ServiceInfoHolder
中通過(guò)NotifyCenter
發(fā)布InstancesChangeEvent
事件.
NotifyCenter
獲取對(duì)應(yīng)的CanonicalName
,并將這個(gè)參數(shù)作為key,從NotifyCenter.publisherMap
中獲取對(duì)應(yīng)的事件發(fā)布者,然后將InstancesChangeEvent
事件進(jìn)行發(fā)布.
InstancesChangeEvent
事件發(fā)布主要是通過(guò)EventPublisher
的實(shí)現(xiàn)類(lèi),DefaultPublisher
進(jìn)行InstancesChangeEvent
事件發(fā)布,而DefaultPublisher
本身作為守護(hù)線程的方式進(jìn)行運(yùn)作,在執(zhí)行業(yè)務(wù)邏輯時(shí)判斷是否線程啟動(dòng),如果啟動(dòng),將事件添加到隊(duì)列中,如果成功,則發(fā)布過(guò)程完成,如果添加失敗,立即執(zhí)行DefaultPublisher.receiveEvent
,接收事件通知訂閱者,創(chuàng)建一個(gè)Runnable
對(duì)象,執(zhí)行訂閱者的Event事件。
在添加到隊(duì)列成功的時(shí)候,DefaultPublisher
會(huì)創(chuàng)建一個(gè)阻塞隊(duì)列(BlockingQueue),標(biāo)記線程啟動(dòng),當(dāng)他執(zhí)行 super.start()
,會(huì)調(diào)用它的run方法,在這個(gè)run方法里面核心的業(yè)務(wù)邏輯就是openEventHandler()
,里面會(huì)有兩個(gè)死循環(huán),第一個(gè)是在線程啟動(dòng)的60秒內(nèi)執(zhí)行條件,第二個(gè)是從阻塞隊(duì)列中獲取Event事件,調(diào)用DefaultPublisher.receiveEvent()
通知訂閱者,流程結(jié)束
本地緩存
我們?cè)谥暗南盗兄?,客戶端?huì)緩存一些信息在本地中,來(lái)獲取ServiceInfo
的信息,但是在執(zhí)行本地緩存的時(shí)候,難免會(huì)有一些故障,有故障就需要進(jìn)行處理,在這里主要涉及到兩個(gè)類(lèi)ServiceInfoHolder
和FailoverReactor
Nacos緩存主要是分為兩個(gè)方面,一個(gè)從注冊(cè)中心獲取實(shí)例信息緩存到內(nèi)存中,通過(guò)ConcurrentMap
進(jìn)行存儲(chǔ),一個(gè)是通過(guò)磁盤(pán)文件的形式定時(shí)緩存。
同時(shí)故障處理也分為兩個(gè)部分,一個(gè)是故障處理的開(kāi)關(guān)通過(guò)文件進(jìn)行標(biāo)記,一個(gè)是當(dāng)起來(lái)故障處理后,可以從故障備份的文件中獲取服務(wù)實(shí)例信息。
介紹完上面幾點(diǎn),我們先來(lái)詳細(xì)講解第一個(gè)核心類(lèi)ServiceInfoHolder
ServiceInfoHolder
ServiceInfoHolder
類(lèi),主要是用來(lái)處理服務(wù)信息的,每次客戶端從服務(wù)端拉取服務(wù)信息時(shí),都用經(jīng)過(guò)這個(gè)類(lèi),而processServiceInfo
用來(lái)處理本地信息(緩存、發(fā)布、更新、本地目錄初始化)等
ServiceInfo: 注冊(cè)服務(wù)的信息,主要包含(服務(wù)名、分組名、集群信息、實(shí)例列表、最后一次更新時(shí)間),客戶端獲取的信息,都是通過(guò)ServiceInfo
作為承載體,ServiceInfoHolder.ServiceInfo
,通過(guò)ConcurrentMap
進(jìn)行存儲(chǔ),如下所示:
public class ServiceInfoHolder implements Closeable {
private final ConcurrentMap<String, ServiceInfo> serviceInfoMap;
public ServiceInfoHolder(String namespace, Properties properties) {
initCacheDir(namespace, properties);
//啟動(dòng)是判斷是否從緩存信息中獲取,默認(rèn)為false
if (isLoadCacheAtStart(properties)) {
//從緩存目錄中讀取信息
this.serviceInfoMap = new ConcurrentHashMap<>(DiskCache.read(this.cacheDir));
} else {
//創(chuàng)建空集合對(duì)象
this.serviceInfoMap = new ConcurrentHashMap<>(16);
}
this.failoverReactor = new FailoverReactor(this, cacheDir);
this.pushEmptyProtection = isPushEmptyProtect(properties);
}
public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
//判斷服務(wù)key是否為空
String serviceKey = serviceInfo.getKey();
if (serviceKey == null) {
return null;
}
ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
if (isEmptyOrErrorPush(serviceInfo)) {
//empty or error push, just ignore
return oldService;
}
//將緩存信息放置到map中
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
//判斷實(shí)例信息是否發(fā)生改變
boolean changed = isChangedServiceInfo(oldService, serviceInfo);
if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
}
//監(jiān)控服務(wù)緩存map的大小
MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
if (changed) {
NAMING_LOGGER.info("current ips:({}) service: {} -> {}", serviceInfo.ipCount(), serviceInfo.getKey(),
JacksonUtils.toJson(serviceInfo.getHosts()));
//添加實(shí)例變更事件,被訂閱者執(zhí)行
NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),
serviceInfo.getClusters(), serviceInfo.getHosts()));
//寫(xiě)入本地文件
DiskCache.write(serviceInfo, cacheDir);
}
return serviceInfo;
}
}
這里就是Nacos獲取注冊(cè)信息的緩存,之前我們有講過(guò),當(dāng)服務(wù)信息變更的時(shí)候會(huì)第一時(shí)間更新ServiceInfoMap
中的信息,通過(guò)isChangedServiceInfo
進(jìn)行判斷,當(dāng)發(fā)生變動(dòng)時(shí),serviceInfoMap.put
最新數(shù)據(jù),當(dāng)我們需要使用的時(shí)候,通過(guò)key進(jìn)行g(shù)et操作,ServiceInfoMap
默認(rèn)創(chuàng)建空的對(duì)象,但如果配置啟動(dòng)從緩存文件中獲取,則會(huì)從緩存中獲取信息。而且當(dāng)我們服務(wù)實(shí)例發(fā)生變更的時(shí)候,會(huì)通過(guò)DiskCache.write()
向?qū)?yīng)的目錄文件中寫(xiě)入ServiceInfo
信息
本地緩存地址
本地緩存的地址通過(guò)cacheDir
進(jìn)行執(zhí)行本地緩存和故障處理的根目錄,在ServiceInfoHolder
構(gòu)造方法中,會(huì)默認(rèn)生成緩存目錄,默認(rèn)路徑為${user}/nacos/naming/public
,我們也可以需要通過(guò)System.setProperty("JM.SNAPSHOT.PATH")
指定。
public class ServiceInfoHolder implements Closeable {
private String cacheDir;
public ServiceInfoHolder(String namespace, Properties properties) {
//初始化生成緩存目錄
initCacheDir(namespace, properties);
......
}
private void initCacheDir(String namespace, Properties properties) {
String jmSnapshotPath = System.getProperty(JM_SNAPSHOT_PATH_PROPERTY);
String namingCacheRegistryDir = "";
if (properties.getProperty(PropertyKeyConst.NAMING_CACHE_REGISTRY_DIR) != null) {
namingCacheRegistryDir = File.separator + properties.getProperty(PropertyKeyConst.NAMING_CACHE_REGISTRY_DIR);
}
if (!StringUtils.isBlank(jmSnapshotPath)) {
cacheDir = jmSnapshotPath + File.separator + FILE_PATH_NACOS + namingCacheRegistryDir
+ File.separator + FILE_PATH_NAMING + File.separator + namespace;
} else {
cacheDir = System.getProperty(USER_HOME_PROPERTY) + File.separator + FILE_PATH_NACOS + namingCacheRegistryDir
+ File.separator + FILE_PATH_NAMING + File.separator + namespace;
}
}
}
故障處理
在ServiceInfoHolder
構(gòu)造方法中,還會(huì)初始化一個(gè)FailoverReactor
的類(lèi),這個(gè)類(lèi)主要是用來(lái)故障處理。
public class ServiceInfoHolder implements Closeable {
private final FailoverReactor failoverReactor;
public ServiceInfoHolder(String namespace, Properties properties) {
....
//為兩者相互持有對(duì)方的引用
this.failoverReactor = new FailoverReactor(this, cacheDir);
.....
}
public FailoverReactor(ServiceInfoHolder serviceInfoHolder, String cacheDir) {
//獲取serviceInfoHolder引用對(duì)象
this.serviceInfoHolder = serviceInfoHolder;
//故障目錄${user}/nacos/naming/public/failover
this.failoverDir = cacheDir + FAILOVER_DIR;
//初始化executorService
this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
//開(kāi)啟守護(hù)線程
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.failover");
return thread;
}
});
//其他信息初始化
this.init();
}
public void init() {
//執(zhí)行初始化操作,間隔5秒,執(zhí)行SwitchRefresher()任務(wù)
executorService.scheduleWithFixedDelay(new SwitchRefresher(), 0L, 5000L, TimeUnit.MILLISECONDS);
//初始化操作,延遲30分鐘執(zhí)行,間隔24小時(shí),執(zhí)行DiskFileWriter()任務(wù)
executorService.scheduleWithFixedDelay(new DiskFileWriter(), 30, DAY_PERIOD_MINUTES, TimeUnit.MINUTES);
//初始化操作,間隔10秒,核心方法為DiskFileWriter
executorService.schedule(new Runnable() {
@Override
public void run() {
try {
File cacheDir = new File(failoverDir);
if (!cacheDir.exists() && !cacheDir.mkdirs()) {
throw new IllegalStateException("failed to create cache dir: " + failoverDir);
}
File[] files = cacheDir.listFiles();
//如果故障目錄為空,啟動(dòng)立即執(zhí)行,備份文件
if (files == null || files.length <= 0) {
new DiskFileWriter().run();
}
} catch (Throwable e) {
NAMING_LOGGER.error("[NA] failed to backup file on startup.", e);
}
}
}, 10000L, TimeUnit.MILLISECONDS);
}
}
在init()
代碼中,開(kāi)啟了三個(gè)定時(shí)任務(wù),三個(gè)任務(wù)都是FailoverReactor
內(nèi)部類(lèi),
- 執(zhí)行初始化操作,間隔5秒,執(zhí)行SwitchRefresher()任務(wù)
- 初始化操作,延遲30分鐘執(zhí)行,間隔24小時(shí),執(zhí)行DiskFileWriter()任務(wù)
- 初始化操作,間隔10秒,核心方法為DiskFileWriter
我們這里先來(lái)看一下核心方法DiskFileWriter
,這里主要是獲取服務(wù)信息,判斷是否能夠?qū)懭氪疟P(pán),條件滿足,寫(xiě)入拼接的故障目錄中,因?yàn)榈谝粋€(gè)和第二個(gè)初始化操作,都會(huì)用到DiskFileWriter
,當(dāng)我們第三個(gè)定時(shí)判斷如果文件不存在,則會(huì)將文件寫(xiě)入本地磁盤(pán)中
class DiskFileWriter extends TimerTask {
@Override
public void run() {
Map<String, ServiceInfo> map = serviceInfoHolder.getServiceInfoMap();
for (Map.Entry<String, ServiceInfo> entry : map.entrySet()) {
ServiceInfo serviceInfo = entry.getValue();
//主要是判斷服務(wù)信息是否完整
if (StringUtils.equals(serviceInfo.getKey(), UtilAndComs.ALL_IPS) || StringUtils
.equals(serviceInfo.getName(), UtilAndComs.ENV_LIST_KEY) || StringUtils
.equals(serviceInfo.getName(), UtilAndComs.ENV_CONFIGS) || StringUtils
.equals(serviceInfo.getName(), UtilAndComs.VIP_CLIENT_FILE) || StringUtils
.equals(serviceInfo.getName(), UtilAndComs.ALL_HOSTS)) {
continue;
}
//將文件寫(xiě)入磁盤(pán)中
DiskCache.write(serviceInfo, failoverDir);
}
}
}
接下來(lái)我們?cè)倏匆幌?,第一個(gè)定時(shí)任務(wù)SwitchRefresher
的業(yè)務(wù)邏輯,
class SwitchRefresher implements Runnable {
long lastModifiedMillis = 0L;
@Override
public void run() {
try {
File switchFile = new File(failoverDir + UtilAndComs.FAILOVER_SWITCH);
//如果文件不存在返回
if (!switchFile.exists()) {
switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString());
NAMING_LOGGER.debug("failover switch is not found, {}", switchFile.getName());
return;
}
long modified = switchFile.lastModified();
//判斷文件修改時(shí)間
if (lastModifiedMillis < modified) {
lastModifiedMillis = modified;
//獲取故障處理文件內(nèi)容
String failover = ConcurrentDiskUtil.getFileContent(failoverDir + UtilAndComs.FAILOVER_SWITCH,
Charset.defaultCharset().toString());
if (!StringUtils.isEmpty(failover)) {
String[] lines = failover.split(DiskCache.getLineSeparator());
for (String line : lines) {
String line1 = line.trim();
//"1" 開(kāi)啟故障處理
if (IS_FAILOVER_MODE.equals(line1)) {
switchParams.put(FAILOVER_MODE_PARAM, Boolean.TRUE.toString());
NAMING_LOGGER.info("failover-mode is on");
new FailoverFileReader().run();
//"0" 關(guān)閉故障處理
} else if (NO_FAILOVER_MODE.equals(line1)) {
switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString());
NAMING_LOGGER.info("failover-mode is off");
}
}
} else {
switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString());
}
}
} catch (Throwable e) {
NAMING_LOGGER.error("[NA] failed to read failover switch.", e);
}
}
}
這里面主要是判斷故障處理文件是否存在,不存在直接返回,再去比較文件的修改時(shí)間,如果已經(jīng)修改,則獲取文件中的內(nèi)容,繼續(xù)進(jìn)行判斷,當(dāng)我們開(kāi)啟故障處理時(shí),執(zhí)行線程FailoverFileReader().run()
class FailoverFileReader implements Runnable {
@Override
public void run() {
Map<String, ServiceInfo> domMap = new HashMap<String, ServiceInfo>(16);
BufferedReader reader = null;
try {
//讀取failoverDir目錄下的文件
File cacheDir = new File(failoverDir);
//不存在返回錯(cuò)誤
if (!cacheDir.exists() && !cacheDir.mkdirs()) {
throw new IllegalStateException("failed to create cache dir: " + failoverDir);
}
//獲取文件
File[] files = cacheDir.listFiles();
//文件不存在返回
if (files == null) {
return;
}
//遍歷處理
for (File file : files) {
//文件不存在跳過(guò)
if (!file.isFile()) {
continue;
}
//如果是故障處理標(biāo)志文件,跳過(guò)這一步
if (file.getName().equals(UtilAndComs.FAILOVER_SWITCH)) {
continue;
}
ServiceInfo dom = new ServiceInfo(file.getName());
//讀取備份中的內(nèi)容,轉(zhuǎn)換為ServiceInfo對(duì)象
try {
String dataString = ConcurrentDiskUtil
.getFileContent(file, Charset.defaultCharset().toString());
reader = new BufferedReader(new StringReader(dataString));
String json;
if ((json = reader.readLine()) != null) {
try {
dom = JacksonUtils.toObj(json, ServiceInfo.class);
} catch (Exception e) {
NAMING_LOGGER.error("[NA] error while parsing cached dom : {}", json, e);
}
}
} catch (Exception e) {
NAMING_LOGGER.error("[NA] failed to read cache for dom: {}", file.getName(), e);
} finally {
try {
if (reader != null) {
reader.close();
}
} catch (Exception e) {
//ignore
}
}
if (!CollectionUtils.isEmpty(dom.getHosts())) {
//將ServiceInfo對(duì)象放入domMap中
domMap.put(dom.getKey(), dom);
}
}
} catch (Exception e) {
NAMING_LOGGER.error("[NA] failed to read cache file", e);
}
//如果不為空,賦值serviceMap
if (domMap.size() > 0) {
serviceMap = domMap;
}
}
}
FailoverFileReader
主要是操作讀取failover
目錄存儲(chǔ)的備份服務(wù)信息文件內(nèi)容,然后裝換成ServiceInfo
信息,并將所有的ServiceInfo
儲(chǔ)存在FailoverReactor
的ServiceMap
屬性中。
總結(jié)
到這里我們Nacos訂閱機(jī)制核心流程就講完了,整體訂閱機(jī)制的流程還是比較復(fù)雜的,因?yàn)檫€涉及到之前將的邏輯,會(huì)有點(diǎn)繞,并且用到了保證線程Map、守護(hù)線程、阻塞隊(duì)列、線程的使用等等,我們需要重點(diǎn)掌握的主要是事件發(fā)布者、訂閱者之間的關(guān)系,這里還是推薦大家有機(jī)會(huì)的話可以自己跟著源碼走一遍,會(huì)有更深的體驗(yàn)。
文章轉(zhuǎn)載自: https://muxiaonong.blog.csdn.net
公眾號(hào):牧小農(nóng),微信掃碼關(guān)注或搜索公眾號(hào)名稱(chēng)