Nacos源碼系列—訂閱機(jī)制的前因后果(上)
前因
我們?cè)诹私?a target="_blank" class="hl hl-1" data-report-click="{"spm":"1001.2101.3001.7020","dest":"https://so.csdn.net/so/search?q=Nacos&spm=1001.2101.3001.7020"}" data-tit="Nacos" data-pretit="nacos">Nacos訂閱機(jī)制之前,首先來(lái)了解一下前因——Nacos客戶端的“服務(wù)發(fā)現(xiàn)”,我們先通過(guò)下面一張圖來(lái)直觀的看一下,有人可能就說(shuō)這也叫直觀,明明很曲折,小農(nóng)想說(shuō)的是,這樣才能讓你們印象更加深刻(手動(dòng)狗頭)。
讀者內(nèi)心:我信你個(gè)鬼。
對(duì)于Naocs客戶端“服務(wù)發(fā)現(xiàn)” 主要是有NamingService獲取服務(wù)列表、組裝參數(shù),調(diào)用服務(wù)接口等等,上圖中只是一個(gè)大致的流程,在其中還有獲取服務(wù)列表中的通信流程協(xié)議(Grpc/http),訂閱流程以及后果(故障轉(zhuǎn)移流程),下面我們就來(lái)詳細(xì)講解一下,客戶端服務(wù)發(fā)現(xiàn)的基本流程。
首先我們先從一個(gè)入口類(lèi)Client項(xiàng)目下的NamingTest
開(kāi)始看起
@Ignore
public class NamingTest {
@Test
public void testServiceList() throws Exception {
Properties properties = new Properties();
//服務(wù)IP
properties.put(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:8848");
//用戶名
properties.put(PropertyKeyConst.USERNAME, "nacos");
//密碼
properties.put(PropertyKeyConst.PASSWORD, "nacos");
Instance instance = new Instance();
//實(shí)例IP
instance.setIp("1.1.1.1");
//實(shí)例端口
instance.setPort(800);
//配置權(quán)重
instance.setWeight(2);
Map<String, String> map = new HashMap<String, String>();
map.put("netType", "external");
map.put("version", "2.0");
instance.setMetadata(map);
//關(guān)鍵代碼 創(chuàng)建自己的實(shí)例
NamingService namingService = NacosFactory.createNamingService(properties);
namingService.registerInstance("nacos.test.1", instance);
ThreadUtils.sleep(5000L);
List<Instance> list = namingService.getAllInstances("nacos.test.1");
System.out.println(list);
ThreadUtils.sleep(30000L);
}
}
在前幾篇章節(jié)中,我們講解了registerInstance()
方法,今天我們需要來(lái)看一下getAllInstances()
方法的具體邏輯,這個(gè)就是我們需要觀察的入口
@Override
public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters,
boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
String clusterString = StringUtils.join(clusters, ",");
if (subscribe) {
serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);
if (null == serviceInfo || !clientProxy.isSubscribed(serviceName, groupName, clusterString)) {
serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
}
} else {
serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);
}
List<Instance> list;
if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
return new ArrayList<Instance>();
}
return list;
}
在上面具體方法中,會(huì)經(jīng)過(guò)幾輪重載方法的調(diào)用,在重載方法調(diào)用的過(guò)程中已經(jīng)設(shè)置了默認(rèn)值,例如(默認(rèn)分組(DEFAULT_GROUP),集群列表(空)、是否訂閱(是)等等)
/**
*
* @param serviceName 服務(wù)名稱
* @param groupName 分組名稱(DEFAULT_GROUP)
* @param clusters 集群數(shù)量(默認(rèn)為空)
* @param subscribe 是否訂閱服務(wù)(是)
* @return
* @throws NacosException
*/
@Override
public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters,
boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
String clusterString = StringUtils.join(clusters, ",");
//是否為訂閱模式
if (subscribe) {
//從客戶端緩存中獲取服務(wù)信息
serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);
if (null == serviceInfo || !clientProxy.isSubscribed(serviceName, groupName, clusterString)) {
//如果緩存中服務(wù)信息不存在,進(jìn)行訂閱
serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
}
} else {
//未訂閱,從服務(wù)器獲取
serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);
}
//獲取實(shí)例列表
List<Instance> list;
if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
return new ArrayList<Instance>();
}
return list;
}
如果是訂閱模式,直接從本都緩存中獲取服務(wù)信息,然后從中獲取實(shí)例列表,訂閱機(jī)制會(huì)自動(dòng)同步服務(wù)器實(shí)例信息到本地,如果緩存中沒(méi)有,說(shuō)明是首次調(diào)用,進(jìn)行訂閱后獲取服務(wù)信息,具體流程如下:
訂閱處理流程
在上面的流程中,我們講到了訂閱的邏輯,接下來(lái)我們就來(lái)看一看訂閱里面到底做了哪些事情,首先我們已經(jīng)知道服務(wù)在哪里訂閱了,我們只需要點(diǎn)進(jìn)去找對(duì)應(yīng)的方法。
serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
下面是具體的方法,這里clientProxy類(lèi)型為NamingClientProxyDelegate,實(shí)例化NacosNamingService時(shí)該類(lèi)被實(shí)例化
@Override
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
NAMING_LOGGER.info("[SUBSCRIBE-SERVICE] service:{}, group:{}, clusters:{} ", serviceName, groupName, clusters);
String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);
String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);
//定時(shí)調(diào)度UpdateTask
serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
//獲取緩存中的serviceInfo對(duì)象
ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
if (null == result || !isSubscribed(serviceName, groupName, clusters)) {
//判斷如果為空,進(jìn)行訂閱邏輯處理(Grpc協(xié)議)
result = grpcClientProxy.subscribe(serviceName, groupName, clusters);
}
//ServiceInfo本地緩存處理
serviceInfoHolder.processServiceInfo(result);
return result;
}
在上述代碼中,可以看到我們?cè)讷@取服務(wù)器列表中,進(jìn)行了訂閱邏輯的擴(kuò)展。
- 在訂閱方法中首先開(kāi)啟定時(shí)任務(wù),用來(lái)定時(shí)同步服務(wù)端的實(shí)例信息,進(jìn)行本地緩存的更新等操作,如果是首次直接返回,去判斷是否有本地緩存
- 如果本地緩存中存在serviceInfo信息,直接返回serviceInfo信息,如果不存在,默認(rèn)采用Grpc協(xié)議進(jìn)行訂閱,然后在返回serviceInfo信息
- 通過(guò)
grpcClientProxy.subscribe()
直接向服務(wù)器發(fā)送一個(gè)訂閱請(qǐng)求,并返回結(jié)果 - servieInfo本地緩存處理,并且會(huì)將獲取的最新的serviceInfo和本地的serviceInfo進(jìn)行比較,進(jìn)行更新操作。
如下圖所示:
訂閱
在上面我們講解了,Nacos是如何進(jìn)行服務(wù)器發(fā)現(xiàn),以及訂閱的入口和大體邏輯,接下來(lái)我們就來(lái)詳細(xì)講一講Nacos的訂閱機(jī)制的核心,首先Nacos客戶端會(huì)通過(guò)定時(shí)任務(wù),進(jìn)行輪詢,每間隔6秒從Nacos注冊(cè)中心獲取服務(wù)實(shí)例列表,如果檢測(cè)實(shí)例發(fā)生變化,發(fā)布變更事件,訂閱者進(jìn)行對(duì)應(yīng)的邏輯處理(更新緩存和實(shí)例信息),我們先從一張圖,來(lái)了解一下訂閱機(jī)制主要的流程。
定時(shí)任務(wù)
訂閱其實(shí)本身也是服務(wù)發(fā)現(xiàn)的一種實(shí)現(xiàn)方式,就是在服務(wù)發(fā)現(xiàn)的時(shí)候執(zhí)行訂閱方法,然后通過(guò)定時(shí)任務(wù)定時(shí)拉取服務(wù)端信息。
我們找到 NacosNamingService.subscribe()
,會(huì)發(fā)現(xiàn)里面有好幾個(gè)···subscribe()```方法,這幾個(gè)方法在重載的過(guò)程中,會(huì)幫我們添加一些默認(rèn)參數(shù)(默認(rèn)分組、空集合列表),最終我們對(duì)定位到下面這個(gè)方法:
@Override
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
throws NacosException {
if (null == listener) {
return;
}
String clusterString = StringUtils.join(clusters, ",");
changeNotifier.registerListener(groupName, serviceName, clusterString, listener);
clientProxy.subscribe(serviceName, groupName, clusterString);
}
在這里我們先來(lái)看 clientProxy.subscribe()
,這個(gè)方法實(shí)際上就是我們上面講到的NamingClientProxyDelegate.subscribe()
方法,在這里主要是對(duì)服務(wù)列表的信息進(jìn)行查詢,所有我們可以知道不管是查詢還是訂閱都是用的同一個(gè)方法。在這里我們就不做過(guò)多的描述。
在這里我們主要關(guān)注的是這個(gè)方法里面一個(gè)定時(shí)調(diào)度的方法ServiceInfoUpdateService.scheduleUpdateIfAbsent();
,這個(gè)方法里面構(gòu)建了serviceKey,通過(guò)key來(lái)判斷是否重復(fù),最后添加到updateTask,而addTask()
就是添加任務(wù)并且發(fā)起一個(gè)定時(shí)任務(wù)
public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {
String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
if (futureMap.get(serviceKey) != null) {
return;
}
synchronized (futureMap) {
if (futureMap.get(serviceKey) != null) {
return;
}
//主要關(guān)注點(diǎn),添加定時(shí)任務(wù)
ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters));
futureMap.put(serviceKey, future);
}
}
默認(rèn)定時(shí)延遲一秒執(zhí)行:
private static final long DEFAULT_DELAY = 1000L;
private synchronized ScheduledFuture<?> addTask(UpdateTask task) {
return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
}
在這個(gè)定時(shí)任務(wù)里面封裝了訂閱機(jī)制的核心業(yè)務(wù)邏輯,位于UpdateTask.run()
方法。
@Override
public void run() {
long delayTime = DEFAULT_DELAY;
try {
//判斷 服務(wù)是否訂閱過(guò)并且沒(méi)有開(kāi)啟定時(shí)任務(wù) 操作過(guò)不再執(zhí)行
if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(
serviceKey)) {
NAMING_LOGGER.info("update task is stopped, service:{}, clusters:{}", groupedServiceName, clusters);
isCancel = true;
return;
}
//獲取緩存的service信息
ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
//緩存不存在
if (serviceObj == null) {
//根據(jù)serviceName等信息獲取service信息
serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
//進(jìn)行本地緩存處理
serviceInfoHolder.processServiceInfo(serviceObj);
lastRefTime = serviceObj.getLastRefTime();
return;
}
//如果服務(wù)最后的更新時(shí)間<=緩存刷新時(shí)間,從注冊(cè)中心重新查詢
if (serviceObj.getLastRefTime() <= lastRefTime) {
serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
//本地緩存處理
serviceInfoHolder.processServiceInfo(serviceObj);
}
//刷新最后更新的時(shí)間
lastRefTime = serviceObj.getLastRefTime();
if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
incFailCount();
return;
}
// TODO multiple time can be configured.
//下一次更新緩存時(shí)間設(shè)置(6秒)
delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;
//設(shè)置失敗數(shù)量為0
resetFailCount();
} catch (Throwable e) {
incFailCount();
NAMING_LOGGER.warn("[NA] failed to update serviceName: {}", groupedServiceName, e);
} finally {
//沒(méi)有服務(wù)訂閱過(guò)并且開(kāi)啟定時(shí)任務(wù)
if (!isCancel) {
// 下次調(diào)度刷新時(shí)間,下次執(zhí)行的時(shí)間與failCount有關(guān),failCount=0,則下次調(diào)度時(shí)間為6秒,最長(zhǎng)為1分鐘
// 無(wú)異常情況下緩存實(shí)例的刷新時(shí)間是6秒
executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60),
TimeUnit.MILLISECONDS);
}
}
}
通過(guò)定時(shí)任務(wù)執(zhí)行UpdateTask
,默認(rèn)間隔時(shí)間為6秒,當(dāng)發(fā)生異常時(shí)會(huì)延長(zhǎng),但不會(huì)超過(guò)1分鐘。該方法會(huì)比較本地是否存在緩存,以及是否過(guò)期,當(dāng)不存在或者過(guò)期的時(shí)候,會(huì)去查詢注冊(cè)中心,獲取最新實(shí)例,更新最后獲取時(shí)間,處理服務(wù)信息,在最后會(huì)計(jì)算任務(wù)時(shí)間,循環(huán)執(zhí)行流程。
業(yè)務(wù)邏輯在最后會(huì)計(jì)算下一次定時(shí)任務(wù)的執(zhí)行時(shí)間,通過(guò)delayTime來(lái)延遲執(zhí)行,delayTime默認(rèn)為1000*6(6秒),在finally 里面發(fā)起下一次定時(shí)任務(wù),當(dāng)我們程序出現(xiàn)異常的時(shí)候,執(zhí)行時(shí)間和錯(cuò)誤次數(shù)成正比,最長(zhǎng)時(shí)間不超過(guò)一分鐘
到這里我們已經(jīng)對(duì)于Nacos客戶端定于的核心流程講解了一遍,Nacos客戶端通過(guò)一個(gè)定時(shí)任務(wù),每間隔6秒從注冊(cè)中心獲取實(shí)例列表,當(dāng)發(fā)現(xiàn)實(shí)例發(fā)生變化的時(shí)候,發(fā)布變更事件,訂閱者進(jìn)行業(yè)務(wù)處理,然后更新內(nèi)存中和本地緩存中的實(shí)例。接下來(lái)我們就來(lái)講一講,定時(shí)任務(wù)獲取到最新實(shí)例列表之后,整個(gè)時(shí)間機(jī)制是如何處理的。
我們?cè)诘谝徊秸{(diào)用subscribe()
方法的時(shí)候,會(huì)訂閱一個(gè)EventListener
事件,而在定時(shí)任務(wù)UpdateTask定時(shí)獲取實(shí)例列表之后,會(huì)調(diào)用ServiceInfoHolder.processServiceInfo
方法對(duì)ServiceInfo進(jìn)行本地處理,這其中就包括事件處理。
在subscribe方法中,通過(guò)下面的代碼我們進(jìn)行監(jiān)聽(tīng)事件的注冊(cè)
@Override
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
throws NacosException {
if (null == listener) {
return;
}
String clusterString = StringUtils.join(clusters, ",");
changeNotifier.registerListener(groupName, serviceName, clusterString, listener);
clientProxy.subscribe(serviceName, groupName, clusterString);
}
在上述代碼中,我們主要關(guān)注的是changeNotifier.registerListener
,這個(gè)監(jiān)聽(tīng)就是進(jìn)行具體事件注冊(cè)邏輯,在下述代碼中,主要是將EventListener
存儲(chǔ)在listenerMap
map結(jié)構(gòu)中,key為服務(wù)實(shí)例信息的拼接,value為監(jiān)聽(tīng)事件的集合
public void registerListener(String groupName, String serviceName, String clusters, EventListener listener) {
String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
if (eventListeners == null) {
synchronized (lock) {
eventListeners = listenerMap.get(key);
if (eventListeners == null) {
eventListeners = new ConcurrentHashSet<EventListener>();
//將EventListener緩存到listenerMap中
listenerMap.put(key, eventListeners);
}
}
}
eventListeners.add(listener);
}
關(guān)于serviceInfo的處理
在updateTask獲取到最新的實(shí)例信息后會(huì)進(jìn)行本地化處理,我們需要看的是ServiceInfoUpdateService.run()
下的serviceInfoHolder.processServiceInfo(serviceObj);
本地緩存方法
public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
//判斷服務(wù)key是否為空
String serviceKey = serviceInfo.getKey();
if (serviceKey == null) {
return null;
}
ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
if (isEmptyOrErrorPush(serviceInfo)) {
//empty or error push, just ignore
return oldService;
}
//將緩存信息放置到map中
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
//判斷實(shí)例信息是否發(fā)生改變
boolean changed = isChangedServiceInfo(oldService, serviceInfo);
if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
}
//監(jiān)控服務(wù)緩存map的大小
MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
if (changed) {
NAMING_LOGGER.info("current ips:({}) service: {} -> {}", serviceInfo.ipCount(), serviceInfo.getKey(),
JacksonUtils.toJson(serviceInfo.getHosts()));
//添加實(shí)例變更事件,被訂閱者執(zhí)行
NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),
serviceInfo.getClusters(), serviceInfo.getHosts()));
//寫(xiě)入本地文件
DiskCache.write(serviceInfo, cacheDir);
}
return serviceInfo;
}
首先我們判斷最新的ServiceInfo數(shù)據(jù)是否正確,有沒(méi)有發(fā)生變化,如果數(shù)據(jù)格式正確且發(fā)生變化,會(huì)發(fā)布一個(gè)變更事件(InstancesChangeEvent)
,同時(shí)講serviceinfo寫(xiě)入緩存中
對(duì)于服務(wù)信息的變更,Nacos是如何做的呢,別急我們往下看,當(dāng)我們調(diào)用
InstancesChangeEvent()
方法以后,變更事件會(huì)由NotifyCenter
進(jìn)行發(fā)布,我們來(lái)瞅一瞅
首先事件追蹤的核心流程主要分為,根據(jù)事件類(lèi)型獲取-》獲取事件發(fā)布者-》發(fā)布事件,詳細(xì)如下所示:
private static final NotifyCenter INSTANCE = new NotifyCenter();
private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {
if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
return INSTANCE.sharePublisher.publish(event);
}
//根據(jù)時(shí)間類(lèi)型,獲取對(duì)應(yīng)的CanonicalName
final String topic = ClassUtils.getCanonicalName(eventType);
//從NotifyCenter.publisherMap中獲取對(duì)應(yīng)時(shí)間發(fā)布中
EventPublisher publisher = INSTANCE.publisherMap.get(topic);
if (publisher != null) {
//事件發(fā)布者publisher發(fā)布事件
return publisher.publish(event);
}
LOGGER.warn("There are no [{}] publishers for this event, please register", topic);
return false;
}
在這個(gè)源碼中,其實(shí) INSTANCE
是單例實(shí)現(xiàn)的,在這里publisherMap
鍵值對(duì)是什么時(shí)候建立的?其實(shí)在是我們NacosNamingService.init()
調(diào)用初始化方法的時(shí)候進(jìn)行綁定的
private void init(Properties properties) throws NacosException {
......
//建立InstancesChangeEvent和EnvenPublisher的關(guān)系
NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);
......
}
當(dāng)我們從上面方法進(jìn)去的時(shí)候,會(huì)發(fā)現(xiàn)他默認(rèn)使用的是DEFAULT_PUBLISHER_FACTORY
來(lái)進(jìn)行構(gòu)建,而在NotifyCenter
代碼塊中,會(huì)發(fā)現(xiàn)DEFAULT_PUBLISHER_FACTORY默認(rèn)構(gòu)建的EventPublisher為DefaultPublisher
public static EventPublisher registerToPublisher(final Class<? extends Event> eventType, final int queueMaxSize) {
//主要關(guān)注DEFAULT_PUBLISHER_FACTORY
return registerToPublisher(eventType, DEFAULT_PUBLISHER_FACTORY, queueMaxSize);
}
if (iterator.hasNext()) {
clazz = iterator.next().getClass();
} else {
clazz = DefaultPublisher.class;
}
DEFAULT_PUBLISHER_FACTORY = (cls, buffer) -> {
try {
EventPublisher publisher = clazz.newInstance();
publisher.init(cls, buffer);
return publisher;
} catch (Throwable ex) {
LOGGER.error("Service class newInstance has error : ", ex);
throw new NacosRuntimeException(SERVER_ERROR, ex);
}
};
由此我們看出在NotifyCenter
類(lèi)中維護(hù)了事件名稱和事件發(fā)布者的關(guān)系,而默認(rèn)的時(shí)間發(fā)布中為DefaultPublisher。
閑言
到這里,我們Nacos訂閱機(jī)制的前半章我們就講完了,因?yàn)檎w服務(wù)訂閱的事件機(jī)制還是比較復(fù)雜,篇幅太長(zhǎng),所以分成了兩部分,今天這個(gè)章節(jié)我們主要講解了,客戶端服務(wù)發(fā)現(xiàn)的原理以及訂閱機(jī)制中定時(shí)器的運(yùn)行邏輯和NotifyCenter發(fā)布InstancesChangeEvent事件的流程
文章轉(zhuǎn)載自: https://muxiaonong.blog.csdn.net
公眾號(hào):牧小農(nóng),微信掃碼關(guān)注或搜索公眾號(hào)名稱