Nacos源碼系列—訂閱機制的前因后果(下)
事件發(fā)布
在上一節(jié)中我們講解了在NotifyCenter
中維護(hù)了事件名稱和事件發(fā)布者的關(guān)系,而默認(rèn)的事件發(fā)布者為DefaultPublisher
,今天我們就來講一下DefaultPublisher的事件發(fā)布的具體邏輯
首先我們來看一下DefaultPublisher
的源碼:
public class DefaultPublisher extends Thread implements EventPublisher {
@Override
public void init(Class<? extends Event> type, int bufferSize) {
//守護(hù)線程
setDaemon(true);
//設(shè)置線程名
setName("nacos.publisher-" + type.getName());
this.eventType = type;
this.queueMaxSize = bufferSize;
//阻塞隊列初始化
this.queue = new ArrayBlockingQueue<>(bufferSize);
//啟動線程
start();
}
@Override
public synchronized void start() {
if (!initialized) {
// start just called once
//啟動run方法
super.start();
if (queueMaxSize == -1) {
queueMaxSize = ringBufferSize;
}
initialized = true;
}
}
}
我們可以看到這個類繼承自Thread
,說明他是一個線程類,同時實現(xiàn)了EventPublisher
說明他也是一個發(fā)布者,在init()中,是以守護(hù)線程的方式運作的,同時初始化了一個阻塞隊列,最后調(diào)用start()啟動線程。
在start()里面,其實就是啟動run():
@Override
public void run() {
openEventHandler();
}
void openEventHandler() {
try {
// This variable is defined to resolve the problem which message overstock in the queue.
int waitTimes = 60;
// To ensure that messages are not lost, enable EventHandler when
// waiting for the first Subscriber to register
//死循環(huán)遍歷,線程啟動設(shè)置最大延遲60秒,用來解決消息積壓問題
for (; ; ) {
if (shutdown || hasSubscriber() || waitTimes <= 0) {
break;
}
ThreadUtils.sleep(1000L);
waitTimes--;
}
//死循環(huán)從隊列中取出event對象,同時通知訂閱者(subscriber)執(zhí)行event對象
for (; ; ) {
if (shutdown) {
break;
}
final Event event = queue.take();
receiveEvent(event);
UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence()));
}
} catch (Throwable ex) {
LOGGER.error("Event listener exception : ", ex);
}
}
在上述代碼中我們可以看到for (; ; )
這個循環(huán)出現(xiàn)了兩次,這個就是循環(huán)遍歷(死循環(huán)),第一個死循環(huán)我們可以理解成延時效果,里面最大延時60秒,每隔一秒運行一次,判斷(當(dāng)前線程是否關(guān)閉、是否有訂閱者、是否超過60秒)只要滿足其中任意一個條件,跳出循環(huán)
第二個死循環(huán),是我們業(yè)務(wù)邏輯處理,用來消費,從隊列中取出event
事件,然后通過receiveEvent()
執(zhí)行。
那么我們可以從隊列中取出事件,那么這個事件又在哪一步注入進(jìn)去的呢,我們還是在當(dāng)前類里面,找到一個叫publish()
的方法
@Override
public boolean publish(Event event) {
checkIsStart();
//向隊列中插入元素
boolean success = this.queue.offer(event);
//判斷是否插入成功
if (!success) {
LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);
//失敗直接執(zhí)行
receiveEvent(event);
return true;
}
return true;
}
這個方法其實就是發(fā)布事件調(diào)用了publish
往阻塞隊列中存入事件,如果失敗那么立即執(zhí)行receiveEvent()
,不在繼續(xù)走隊列方法
void receiveEvent(Event event) {
final long currentEventSequence = event.sequence();
if (!hasSubscriber()) {
LOGGER.warn("[NotifyCenter] the {} is lost, because there is no subscriber.", event);
return;
}
// Notification single event listener
//循環(huán)遍歷subscribers對象
for (Subscriber subscriber : subscribers) {
// Whether to ignore expiration events
if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",
event.getClass());
continue;
}
// Because unifying smartSubscriber and subscriber, so here need to think of compatibility.
// Remove original judge part of codes.
//通知訂閱者執(zhí)行event
notifySubscriber(subscriber, event);
}
}
而在receiveEvent()
方法中,這里其實就是遍歷的subscribers
集合(訂閱者),然后通過notifySubscriber()
通知訂閱者方法,而這個subscribers
集合就是在我們之前講到的NacosNamingService.init()
方法中設(shè)置的。
public class NacosNamingService implements NamingService {
private void init(Properties properties) throws NacosException {
//將Subscribe注冊到Publisher
NotifyCenter.registerSubscriber(changeNotifier);
}
}
而NotifyCenter.registerSubscriber(changeNotifier);
會調(diào)用NotifyCenter.addSubscriber()
方法,進(jìn)行最終的操作。
private static void addSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType,
EventPublisherFactory factory) {
final String topic = ClassUtils.getCanonicalName(subscribeType);
synchronized (NotifyCenter.class) {
// MapUtils.computeIfAbsent is a unsafe method.
MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, subscribeType, ringBufferSize);
}
//獲取對應(yīng)的publisher
EventPublisher publisher = INSTANCE.publisherMap.get(topic);
if (publisher instanceof ShardedEventPublisher) {
((ShardedEventPublisher) publisher).addSubscriber(consumer, subscribeType);
} else {
//添加到subscribers集合
publisher.addSubscriber(consumer);
}
}
addSubscriber()
方法的邏輯就是講訂閱事件、發(fā)布中、訂閱者三個關(guān)系進(jìn)行綁定,而發(fā)布者和事件通過Map進(jìn)行維護(hù),發(fā)布者與訂閱者通過關(guān)聯(lián)關(guān)系進(jìn)行維護(hù)。
我們回到剛剛DefaulePublisher.notifySubscriber()
方法,這里是最后執(zhí)行訂閱者事件的方法
@Override
public void notifySubscriber(final Subscriber subscriber, final Event event) {
LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);
//執(zhí)行訂閱者事件
final Runnable job = () -> subscriber.onEvent(event);
//執(zhí)行者
final Executor executor = subscriber.executor();
if (executor != null) {
executor.execute(job);
} else {
try {
job.run();
} catch (Throwable e) {
LOGGER.error("Event callback exception: ", e);
}
}
}
到這里,訂閱機制就講完了,可能會有點繞,最好是我們能夠去跟著代碼走一遍,這樣會比較理解和記憶,在這里我們重點需要理解NotifyCenter
對事件發(fā)布者、訂閱者以及之間關(guān)系的維護(hù),關(guān)系維護(hù)的入口就在NacosNamingService.init()
中,我們來看一下他的核心邏輯
首先ServiceInfoHolder
中通過NotifyCenter
發(fā)布InstancesChangeEvent
事件.
NotifyCenter
獲取對應(yīng)的CanonicalName
,并將這個參數(shù)作為key,從NotifyCenter.publisherMap
中獲取對應(yīng)的事件發(fā)布者,然后將InstancesChangeEvent
事件進(jìn)行發(fā)布.
InstancesChangeEvent
事件發(fā)布主要是通過EventPublisher
的實現(xiàn)類,DefaultPublisher
進(jìn)行InstancesChangeEvent
事件發(fā)布,而DefaultPublisher
本身作為守護(hù)線程的方式進(jìn)行運作,在執(zhí)行業(yè)務(wù)邏輯時判斷是否線程啟動,如果啟動,將事件添加到隊列中,如果成功,則發(fā)布過程完成,如果添加失敗,立即執(zhí)行DefaultPublisher.receiveEvent
,接收事件通知訂閱者,創(chuàng)建一個Runnable
對象,執(zhí)行訂閱者的Event事件。
在添加到隊列成功的時候,DefaultPublisher
會創(chuàng)建一個阻塞隊列(BlockingQueue),標(biāo)記線程啟動,當(dāng)他執(zhí)行 super.start()
,會調(diào)用它的run方法,在這個run方法里面核心的業(yè)務(wù)邏輯就是openEventHandler()
,里面會有兩個死循環(huán),第一個是在線程啟動的60秒內(nèi)執(zhí)行條件,第二個是從阻塞隊列中獲取Event事件,調(diào)用DefaultPublisher.receiveEvent()
通知訂閱者,流程結(jié)束
本地緩存
我們在之前的系列中,客戶端會緩存一些信息在本地中,來獲取ServiceInfo
的信息,但是在執(zhí)行本地緩存的時候,難免會有一些故障,有故障就需要進(jìn)行處理,在這里主要涉及到兩個類ServiceInfoHolder
和FailoverReactor
Nacos緩存主要是分為兩個方面,一個從注冊中心獲取實例信息緩存到內(nèi)存中,通過ConcurrentMap
進(jìn)行存儲,一個是通過磁盤文件的形式定時緩存。
同時故障處理也分為兩個部分,一個是故障處理的開關(guān)通過文件進(jìn)行標(biāo)記,一個是當(dāng)起來故障處理后,可以從故障備份的文件中獲取服務(wù)實例信息。
介紹完上面幾點,我們先來詳細(xì)講解第一個核心類ServiceInfoHolder
ServiceInfoHolder
ServiceInfoHolder
類,主要是用來處理服務(wù)信息的,每次客戶端從服務(wù)端拉取服務(wù)信息時,都用經(jīng)過這個類,而processServiceInfo
用來處理本地信息(緩存、發(fā)布、更新、本地目錄初始化)等
ServiceInfo: 注冊服務(wù)的信息,主要包含(服務(wù)名、分組名、集群信息、實例列表、最后一次更新時間),客戶端獲取的信息,都是通過ServiceInfo
作為承載體,ServiceInfoHolder.ServiceInfo
,通過ConcurrentMap
進(jìn)行存儲,如下所示:
public class ServiceInfoHolder implements Closeable {
private final ConcurrentMap<String, ServiceInfo> serviceInfoMap;
public ServiceInfoHolder(String namespace, Properties properties) {
initCacheDir(namespace, properties);
//啟動是判斷是否從緩存信息中獲取,默認(rèn)為false
if (isLoadCacheAtStart(properties)) {
//從緩存目錄中讀取信息
this.serviceInfoMap = new ConcurrentHashMap<>(DiskCache.read(this.cacheDir));
} else {
//創(chuàng)建空集合對象
this.serviceInfoMap = new ConcurrentHashMap<>(16);
}
this.failoverReactor = new FailoverReactor(this, cacheDir);
this.pushEmptyProtection = isPushEmptyProtect(properties);
}
public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
//判斷服務(wù)key是否為空
String serviceKey = serviceInfo.getKey();
if (serviceKey == null) {
return null;
}
ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
if (isEmptyOrErrorPush(serviceInfo)) {
//empty or error push, just ignore
return oldService;
}
//將緩存信息放置到map中
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
//判斷實例信息是否發(fā)生改變
boolean changed = isChangedServiceInfo(oldService, serviceInfo);
if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
}
//監(jiān)控服務(wù)緩存map的大小
MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
if (changed) {
NAMING_LOGGER.info("current ips:({}) service: {} -> {}", serviceInfo.ipCount(), serviceInfo.getKey(),
JacksonUtils.toJson(serviceInfo.getHosts()));
//添加實例變更事件,被訂閱者執(zhí)行
NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),
serviceInfo.getClusters(), serviceInfo.getHosts()));
//寫入本地文件
DiskCache.write(serviceInfo, cacheDir);
}
return serviceInfo;
}
}
這里就是Nacos獲取注冊信息的緩存,之前我們有講過,當(dāng)服務(wù)信息變更的時候會第一時間更新ServiceInfoMap
中的信息,通過isChangedServiceInfo
進(jìn)行判斷,當(dāng)發(fā)生變動時,serviceInfoMap.put
最新數(shù)據(jù),當(dāng)我們需要使用的時候,通過key進(jìn)行g(shù)et操作,ServiceInfoMap
默認(rèn)創(chuàng)建空的對象,但如果配置啟動從緩存文件中獲取,則會從緩存中獲取信息。而且當(dāng)我們服務(wù)實例發(fā)生變更的時候,會通過DiskCache.write()
向?qū)?yīng)的目錄文件中寫入ServiceInfo
信息
本地緩存地址
本地緩存的地址通過cacheDir
進(jìn)行執(zhí)行本地緩存和故障處理的根目錄,在ServiceInfoHolder
構(gòu)造方法中,會默認(rèn)生成緩存目錄,默認(rèn)路徑為${user}/nacos/naming/public
,我們也可以需要通過System.setProperty("JM.SNAPSHOT.PATH")
指定。
public class ServiceInfoHolder implements Closeable {
private String cacheDir;
public ServiceInfoHolder(String namespace, Properties properties) {
//初始化生成緩存目錄
initCacheDir(namespace, properties);
......
}
private void initCacheDir(String namespace, Properties properties) {
String jmSnapshotPath = System.getProperty(JM_SNAPSHOT_PATH_PROPERTY);
String namingCacheRegistryDir = "";
if (properties.getProperty(PropertyKeyConst.NAMING_CACHE_REGISTRY_DIR) != null) {
namingCacheRegistryDir = File.separator + properties.getProperty(PropertyKeyConst.NAMING_CACHE_REGISTRY_DIR);
}
if (!StringUtils.isBlank(jmSnapshotPath)) {
cacheDir = jmSnapshotPath + File.separator + FILE_PATH_NACOS + namingCacheRegistryDir
+ File.separator + FILE_PATH_NAMING + File.separator + namespace;
} else {
cacheDir = System.getProperty(USER_HOME_PROPERTY) + File.separator + FILE_PATH_NACOS + namingCacheRegistryDir
+ File.separator + FILE_PATH_NAMING + File.separator + namespace;
}
}
}
故障處理
在ServiceInfoHolder
構(gòu)造方法中,還會初始化一個FailoverReactor
的類,這個類主要是用來故障處理。
public class ServiceInfoHolder implements Closeable {
private final FailoverReactor failoverReactor;
public ServiceInfoHolder(String namespace, Properties properties) {
....
//為兩者相互持有對方的引用
this.failoverReactor = new FailoverReactor(this, cacheDir);
.....
}
public FailoverReactor(ServiceInfoHolder serviceInfoHolder, String cacheDir) {
//獲取serviceInfoHolder引用對象
this.serviceInfoHolder = serviceInfoHolder;
//故障目錄${user}/nacos/naming/public/failover
this.failoverDir = cacheDir + FAILOVER_DIR;
//初始化executorService
this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
//開啟守護(hù)線程
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.failover");
return thread;
}
});
//其他信息初始化
this.init();
}
public void init() {
//執(zhí)行初始化操作,間隔5秒,執(zhí)行SwitchRefresher()任務(wù)
executorService.scheduleWithFixedDelay(new SwitchRefresher(), 0L, 5000L, TimeUnit.MILLISECONDS);
//初始化操作,延遲30分鐘執(zhí)行,間隔24小時,執(zhí)行DiskFileWriter()任務(wù)
executorService.scheduleWithFixedDelay(new DiskFileWriter(), 30, DAY_PERIOD_MINUTES, TimeUnit.MINUTES);
//初始化操作,間隔10秒,核心方法為DiskFileWriter
executorService.schedule(new Runnable() {
@Override
public void run() {
try {
File cacheDir = new File(failoverDir);
if (!cacheDir.exists() && !cacheDir.mkdirs()) {
throw new IllegalStateException("failed to create cache dir: " + failoverDir);
}
File[] files = cacheDir.listFiles();
//如果故障目錄為空,啟動立即執(zhí)行,備份文件
if (files == null || files.length <= 0) {
new DiskFileWriter().run();
}
} catch (Throwable e) {
NAMING_LOGGER.error("[NA] failed to backup file on startup.", e);
}
}
}, 10000L, TimeUnit.MILLISECONDS);
}
}
在init()
代碼中,開啟了三個定時任務(wù),三個任務(wù)都是FailoverReactor
內(nèi)部類,
- 執(zhí)行初始化操作,間隔5秒,執(zhí)行SwitchRefresher()任務(wù)
- 初始化操作,延遲30分鐘執(zhí)行,間隔24小時,執(zhí)行DiskFileWriter()任務(wù)
- 初始化操作,間隔10秒,核心方法為DiskFileWriter
我們這里先來看一下核心方法DiskFileWriter
,這里主要是獲取服務(wù)信息,判斷是否能夠?qū)懭氪疟P,條件滿足,寫入拼接的故障目錄中,因為第一個和第二個初始化操作,都會用到DiskFileWriter
,當(dāng)我們第三個定時判斷如果文件不存在,則會將文件寫入本地磁盤中
class DiskFileWriter extends TimerTask {
@Override
public void run() {
Map<String, ServiceInfo> map = serviceInfoHolder.getServiceInfoMap();
for (Map.Entry<String, ServiceInfo> entry : map.entrySet()) {
ServiceInfo serviceInfo = entry.getValue();
//主要是判斷服務(wù)信息是否完整
if (StringUtils.equals(serviceInfo.getKey(), UtilAndComs.ALL_IPS) || StringUtils
.equals(serviceInfo.getName(), UtilAndComs.ENV_LIST_KEY) || StringUtils
.equals(serviceInfo.getName(), UtilAndComs.ENV_CONFIGS) || StringUtils
.equals(serviceInfo.getName(), UtilAndComs.VIP_CLIENT_FILE) || StringUtils
.equals(serviceInfo.getName(), UtilAndComs.ALL_HOSTS)) {
continue;
}
//將文件寫入磁盤中
DiskCache.write(serviceInfo, failoverDir);
}
}
}
接下來我們再看一下,第一個定時任務(wù)SwitchRefresher
的業(yè)務(wù)邏輯,
class SwitchRefresher implements Runnable {
long lastModifiedMillis = 0L;
@Override
public void run() {
try {
File switchFile = new File(failoverDir + UtilAndComs.FAILOVER_SWITCH);
//如果文件不存在返回
if (!switchFile.exists()) {
switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString());
NAMING_LOGGER.debug("failover switch is not found, {}", switchFile.getName());
return;
}
long modified = switchFile.lastModified();
//判斷文件修改時間
if (lastModifiedMillis < modified) {
lastModifiedMillis = modified;
//獲取故障處理文件內(nèi)容
String failover = ConcurrentDiskUtil.getFileContent(failoverDir + UtilAndComs.FAILOVER_SWITCH,
Charset.defaultCharset().toString());
if (!StringUtils.isEmpty(failover)) {
String[] lines = failover.split(DiskCache.getLineSeparator());
for (String line : lines) {
String line1 = line.trim();
//"1" 開啟故障處理
if (IS_FAILOVER_MODE.equals(line1)) {
switchParams.put(FAILOVER_MODE_PARAM, Boolean.TRUE.toString());
NAMING_LOGGER.info("failover-mode is on");
new FailoverFileReader().run();
//"0" 關(guān)閉故障處理
} else if (NO_FAILOVER_MODE.equals(line1)) {
switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString());
NAMING_LOGGER.info("failover-mode is off");
}
}
} else {
switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString());
}
}
} catch (Throwable e) {
NAMING_LOGGER.error("[NA] failed to read failover switch.", e);
}
}
}
這里面主要是判斷故障處理文件是否存在,不存在直接返回,再去比較文件的修改時間,如果已經(jīng)修改,則獲取文件中的內(nèi)容,繼續(xù)進(jìn)行判斷,當(dāng)我們開啟故障處理時,執(zhí)行線程FailoverFileReader().run()
class FailoverFileReader implements Runnable {
@Override
public void run() {
Map<String, ServiceInfo> domMap = new HashMap<String, ServiceInfo>(16);
BufferedReader reader = null;
try {
//讀取failoverDir目錄下的文件
File cacheDir = new File(failoverDir);
//不存在返回錯誤
if (!cacheDir.exists() && !cacheDir.mkdirs()) {
throw new IllegalStateException("failed to create cache dir: " + failoverDir);
}
//獲取文件
File[] files = cacheDir.listFiles();
//文件不存在返回
if (files == null) {
return;
}
//遍歷處理
for (File file : files) {
//文件不存在跳過
if (!file.isFile()) {
continue;
}
//如果是故障處理標(biāo)志文件,跳過這一步
if (file.getName().equals(UtilAndComs.FAILOVER_SWITCH)) {
continue;
}
ServiceInfo dom = new ServiceInfo(file.getName());
//讀取備份中的內(nèi)容,轉(zhuǎn)換為ServiceInfo對象
try {
String dataString = ConcurrentDiskUtil
.getFileContent(file, Charset.defaultCharset().toString());
reader = new BufferedReader(new StringReader(dataString));
String json;
if ((json = reader.readLine()) != null) {
try {
dom = JacksonUtils.toObj(json, ServiceInfo.class);
} catch (Exception e) {
NAMING_LOGGER.error("[NA] error while parsing cached dom : {}", json, e);
}
}
} catch (Exception e) {
NAMING_LOGGER.error("[NA] failed to read cache for dom: {}", file.getName(), e);
} finally {
try {
if (reader != null) {
reader.close();
}
} catch (Exception e) {
//ignore
}
}
if (!CollectionUtils.isEmpty(dom.getHosts())) {
//將ServiceInfo對象放入domMap中
domMap.put(dom.getKey(), dom);
}
}
} catch (Exception e) {
NAMING_LOGGER.error("[NA] failed to read cache file", e);
}
//如果不為空,賦值serviceMap
if (domMap.size() > 0) {
serviceMap = domMap;
}
}
}
FailoverFileReader
主要是操作讀取failover
目錄存儲的備份服務(wù)信息文件內(nèi)容,然后裝換成ServiceInfo
信息,并將所有的ServiceInfo
儲存在FailoverReactor
的ServiceMap
屬性中。
總結(jié)
到這里我們Nacos訂閱機制核心流程就講完了,整體訂閱機制的流程還是比較復(fù)雜的,因為還涉及到之前將的邏輯,會有點繞,并且用到了保證線程Map、守護(hù)線程、阻塞隊列、線程的使用等等,我們需要重點掌握的主要是事件發(fā)布者、訂閱者之間的關(guān)系,這里還是推薦大家有機會的話可以自己跟著源碼走一遍,會有更深的體驗。
文章轉(zhuǎn)載自: https://muxiaonong.blog.csdn.net
公眾號:牧小農(nóng),微信掃碼關(guān)注或搜索公眾號名稱