Nacos源碼系列—訂閱機制的前因后果(上)
前因
我們在了解Nacos訂閱機制之前,首先來了解一下前因——Nacos客戶端的“服務發(fā)現(xiàn)”,我們先通過下面一張圖來直觀的看一下,有人可能就說這也叫直觀,明明很曲折,小農(nóng)想說的是,這樣才能讓你們印象更加深刻(手動狗頭)。
讀者內(nèi)心:我信你個鬼。
對于Naocs客戶端“服務發(fā)現(xiàn)” 主要是有NamingService獲取服務列表、組裝參數(shù),調(diào)用服務接口等等,上圖中只是一個大致的流程,在其中還有獲取服務列表中的通信流程協(xié)議(Grpc/http),訂閱流程以及后果(故障轉(zhuǎn)移流程),下面我們就來詳細講解一下,客戶端服務發(fā)現(xiàn)的基本流程。
首先我們先從一個入口類Client項目下的NamingTest
開始看起
@Ignore
public class NamingTest {
@Test
public void testServiceList() throws Exception {
Properties properties = new Properties();
//服務IP
properties.put(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:8848");
//用戶名
properties.put(PropertyKeyConst.USERNAME, "nacos");
//密碼
properties.put(PropertyKeyConst.PASSWORD, "nacos");
Instance instance = new Instance();
//實例IP
instance.setIp("1.1.1.1");
//實例端口
instance.setPort(800);
//配置權(quán)重
instance.setWeight(2);
Map<String, String> map = new HashMap<String, String>();
map.put("netType", "external");
map.put("version", "2.0");
instance.setMetadata(map);
//關(guān)鍵代碼 創(chuàng)建自己的實例
NamingService namingService = NacosFactory.createNamingService(properties);
namingService.registerInstance("nacos.test.1", instance);
ThreadUtils.sleep(5000L);
List<Instance> list = namingService.getAllInstances("nacos.test.1");
System.out.println(list);
ThreadUtils.sleep(30000L);
}
}
在前幾篇章節(jié)中,我們講解了registerInstance()
方法,今天我們需要來看一下getAllInstances()
方法的具體邏輯,這個就是我們需要觀察的入口
@Override
public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters,
boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
String clusterString = StringUtils.join(clusters, ",");
if (subscribe) {
serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);
if (null == serviceInfo || !clientProxy.isSubscribed(serviceName, groupName, clusterString)) {
serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
}
} else {
serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);
}
List<Instance> list;
if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
return new ArrayList<Instance>();
}
return list;
}
在上面具體方法中,會經(jīng)過幾輪重載方法的調(diào)用,在重載方法調(diào)用的過程中已經(jīng)設置了默認值,例如(默認分組(DEFAULT_GROUP),集群列表(空)、是否訂閱(是)等等)
/**
*
* @param serviceName 服務名稱
* @param groupName 分組名稱(DEFAULT_GROUP)
* @param clusters 集群數(shù)量(默認為空)
* @param subscribe 是否訂閱服務(是)
* @return
* @throws NacosException
*/
@Override
public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters,
boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
String clusterString = StringUtils.join(clusters, ",");
//是否為訂閱模式
if (subscribe) {
//從客戶端緩存中獲取服務信息
serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);
if (null == serviceInfo || !clientProxy.isSubscribed(serviceName, groupName, clusterString)) {
//如果緩存中服務信息不存在,進行訂閱
serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
}
} else {
//未訂閱,從服務器獲取
serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);
}
//獲取實例列表
List<Instance> list;
if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
return new ArrayList<Instance>();
}
return list;
}
如果是訂閱模式,直接從本都緩存中獲取服務信息,然后從中獲取實例列表,訂閱機制會自動同步服務器實例信息到本地,如果緩存中沒有,說明是首次調(diào)用,進行訂閱后獲取服務信息,具體流程如下:
訂閱處理流程
在上面的流程中,我們講到了訂閱的邏輯,接下來我們就來看一看訂閱里面到底做了哪些事情,首先我們已經(jīng)知道服務在哪里訂閱了,我們只需要點進去找對應的方法。
serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
下面是具體的方法,這里clientProxy類型為NamingClientProxyDelegate,實例化NacosNamingService時該類被實例化
@Override
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
NAMING_LOGGER.info("[SUBSCRIBE-SERVICE] service:{}, group:{}, clusters:{} ", serviceName, groupName, clusters);
String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);
String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);
//定時調(diào)度UpdateTask
serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
//獲取緩存中的serviceInfo對象
ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
if (null == result || !isSubscribed(serviceName, groupName, clusters)) {
//判斷如果為空,進行訂閱邏輯處理(Grpc協(xié)議)
result = grpcClientProxy.subscribe(serviceName, groupName, clusters);
}
//ServiceInfo本地緩存處理
serviceInfoHolder.processServiceInfo(result);
return result;
}
在上述代碼中,可以看到我們在獲取服務器列表中,進行了訂閱邏輯的擴展。
- 在訂閱方法中首先開啟定時任務,用來定時同步服務端的實例信息,進行本地緩存的更新等操作,如果是首次直接返回,去判斷是否有本地緩存
- 如果本地緩存中存在serviceInfo信息,直接返回serviceInfo信息,如果不存在,默認采用Grpc協(xié)議進行訂閱,然后在返回serviceInfo信息
- 通過
grpcClientProxy.subscribe()
直接向服務器發(fā)送一個訂閱請求,并返回結(jié)果 - servieInfo本地緩存處理,并且會將獲取的最新的serviceInfo和本地的serviceInfo進行比較,進行更新操作。
如下圖所示:
訂閱
在上面我們講解了,Nacos是如何進行服務器發(fā)現(xiàn),以及訂閱的入口和大體邏輯,接下來我們就來詳細講一講Nacos的訂閱機制的核心,首先Nacos客戶端會通過定時任務,進行輪詢,每間隔6秒從Nacos注冊中心獲取服務實例列表,如果檢測實例發(fā)生變化,發(fā)布變更事件,訂閱者進行對應的邏輯處理(更新緩存和實例信息),我們先從一張圖,來了解一下訂閱機制主要的流程。
定時任務
訂閱其實本身也是服務發(fā)現(xiàn)的一種實現(xiàn)方式,就是在服務發(fā)現(xiàn)的時候執(zhí)行訂閱方法,然后通過定時任務定時拉取服務端信息。
我們找到 NacosNamingService.subscribe()
,會發(fā)現(xiàn)里面有好幾個···subscribe()```方法,這幾個方法在重載的過程中,會幫我們添加一些默認參數(shù)(默認分組、空集合列表),最終我們對定位到下面這個方法:
@Override
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
throws NacosException {
if (null == listener) {
return;
}
String clusterString = StringUtils.join(clusters, ",");
changeNotifier.registerListener(groupName, serviceName, clusterString, listener);
clientProxy.subscribe(serviceName, groupName, clusterString);
}
在這里我們先來看 clientProxy.subscribe()
,這個方法實際上就是我們上面講到的NamingClientProxyDelegate.subscribe()
方法,在這里主要是對服務列表的信息進行查詢,所有我們可以知道不管是查詢還是訂閱都是用的同一個方法。在這里我們就不做過多的描述。
在這里我們主要關(guān)注的是這個方法里面一個定時調(diào)度的方法ServiceInfoUpdateService.scheduleUpdateIfAbsent();
,這個方法里面構(gòu)建了serviceKey,通過key來判斷是否重復,最后添加到updateTask,而addTask()
就是添加任務并且發(fā)起一個定時任務
public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {
String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
if (futureMap.get(serviceKey) != null) {
return;
}
synchronized (futureMap) {
if (futureMap.get(serviceKey) != null) {
return;
}
//主要關(guān)注點,添加定時任務
ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters));
futureMap.put(serviceKey, future);
}
}
默認定時延遲一秒執(zhí)行:
private static final long DEFAULT_DELAY = 1000L;
private synchronized ScheduledFuture<?> addTask(UpdateTask task) {
return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
}
在這個定時任務里面封裝了訂閱機制的核心業(yè)務邏輯,位于UpdateTask.run()
方法。
@Override
public void run() {
long delayTime = DEFAULT_DELAY;
try {
//判斷 服務是否訂閱過并且沒有開啟定時任務 操作過不再執(zhí)行
if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(
serviceKey)) {
NAMING_LOGGER.info("update task is stopped, service:{}, clusters:{}", groupedServiceName, clusters);
isCancel = true;
return;
}
//獲取緩存的service信息
ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
//緩存不存在
if (serviceObj == null) {
//根據(jù)serviceName等信息獲取service信息
serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
//進行本地緩存處理
serviceInfoHolder.processServiceInfo(serviceObj);
lastRefTime = serviceObj.getLastRefTime();
return;
}
//如果服務最后的更新時間<=緩存刷新時間,從注冊中心重新查詢
if (serviceObj.getLastRefTime() <= lastRefTime) {
serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
//本地緩存處理
serviceInfoHolder.processServiceInfo(serviceObj);
}
//刷新最后更新的時間
lastRefTime = serviceObj.getLastRefTime();
if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
incFailCount();
return;
}
// TODO multiple time can be configured.
//下一次更新緩存時間設置(6秒)
delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;
//設置失敗數(shù)量為0
resetFailCount();
} catch (Throwable e) {
incFailCount();
NAMING_LOGGER.warn("[NA] failed to update serviceName: {}", groupedServiceName, e);
} finally {
//沒有服務訂閱過并且開啟定時任務
if (!isCancel) {
// 下次調(diào)度刷新時間,下次執(zhí)行的時間與failCount有關(guān),failCount=0,則下次調(diào)度時間為6秒,最長為1分鐘
// 無異常情況下緩存實例的刷新時間是6秒
executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60),
TimeUnit.MILLISECONDS);
}
}
}
通過定時任務執(zhí)行UpdateTask
,默認間隔時間為6秒,當發(fā)生異常時會延長,但不會超過1分鐘。該方法會比較本地是否存在緩存,以及是否過期,當不存在或者過期的時候,會去查詢注冊中心,獲取最新實例,更新最后獲取時間,處理服務信息,在最后會計算任務時間,循環(huán)執(zhí)行流程。
業(yè)務邏輯在最后會計算下一次定時任務的執(zhí)行時間,通過delayTime來延遲執(zhí)行,delayTime默認為1000*6(6秒),在finally 里面發(fā)起下一次定時任務,當我們程序出現(xiàn)異常的時候,執(zhí)行時間和錯誤次數(shù)成正比,最長時間不超過一分鐘
到這里我們已經(jīng)對于Nacos客戶端定于的核心流程講解了一遍,Nacos客戶端通過一個定時任務,每間隔6秒從注冊中心獲取實例列表,當發(fā)現(xiàn)實例發(fā)生變化的時候,發(fā)布變更事件,訂閱者進行業(yè)務處理,然后更新內(nèi)存中和本地緩存中的實例。接下來我們就來講一講,定時任務獲取到最新實例列表之后,整個時間機制是如何處理的。
我們在第一步調(diào)用subscribe()
方法的時候,會訂閱一個EventListener
事件,而在定時任務UpdateTask定時獲取實例列表之后,會調(diào)用ServiceInfoHolder.processServiceInfo
方法對ServiceInfo進行本地處理,這其中就包括事件處理。
在subscribe方法中,通過下面的代碼我們進行監(jiān)聽事件的注冊
@Override
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
throws NacosException {
if (null == listener) {
return;
}
String clusterString = StringUtils.join(clusters, ",");
changeNotifier.registerListener(groupName, serviceName, clusterString, listener);
clientProxy.subscribe(serviceName, groupName, clusterString);
}
在上述代碼中,我們主要關(guān)注的是changeNotifier.registerListener
,這個監(jiān)聽就是進行具體事件注冊邏輯,在下述代碼中,主要是將EventListener
存儲在listenerMap
map結(jié)構(gòu)中,key為服務實例信息的拼接,value為監(jiān)聽事件的集合
public void registerListener(String groupName, String serviceName, String clusters, EventListener listener) {
String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
if (eventListeners == null) {
synchronized (lock) {
eventListeners = listenerMap.get(key);
if (eventListeners == null) {
eventListeners = new ConcurrentHashSet<EventListener>();
//將EventListener緩存到listenerMap中
listenerMap.put(key, eventListeners);
}
}
}
eventListeners.add(listener);
}
關(guān)于serviceInfo的處理
在updateTask獲取到最新的實例信息后會進行本地化處理,我們需要看的是ServiceInfoUpdateService.run()
下的serviceInfoHolder.processServiceInfo(serviceObj);
本地緩存方法
public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
//判斷服務key是否為空
String serviceKey = serviceInfo.getKey();
if (serviceKey == null) {
return null;
}
ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
if (isEmptyOrErrorPush(serviceInfo)) {
//empty or error push, just ignore
return oldService;
}
//將緩存信息放置到map中
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
//判斷實例信息是否發(fā)生改變
boolean changed = isChangedServiceInfo(oldService, serviceInfo);
if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
}
//監(jiān)控服務緩存map的大小
MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
if (changed) {
NAMING_LOGGER.info("current ips:({}) service: {} -> {}", serviceInfo.ipCount(), serviceInfo.getKey(),
JacksonUtils.toJson(serviceInfo.getHosts()));
//添加實例變更事件,被訂閱者執(zhí)行
NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),
serviceInfo.getClusters(), serviceInfo.getHosts()));
//寫入本地文件
DiskCache.write(serviceInfo, cacheDir);
}
return serviceInfo;
}
首先我們判斷最新的ServiceInfo數(shù)據(jù)是否正確,有沒有發(fā)生變化,如果數(shù)據(jù)格式正確且發(fā)生變化,會發(fā)布一個變更事件(InstancesChangeEvent)
,同時講serviceinfo寫入緩存中
對于服務信息的變更,Nacos是如何做的呢,別急我們往下看,當我們調(diào)用InstancesChangeEvent()
方法以后,變更事件會由NotifyCenter
進行發(fā)布,我們來瞅一瞅
首先事件追蹤的核心流程主要分為,根據(jù)事件類型獲取-》獲取事件發(fā)布者-》發(fā)布事件,詳細如下所示:
private static final NotifyCenter INSTANCE = new NotifyCenter();
private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {
if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
return INSTANCE.sharePublisher.publish(event);
}
//根據(jù)時間類型,獲取對應的CanonicalName
final String topic = ClassUtils.getCanonicalName(eventType);
//從NotifyCenter.publisherMap中獲取對應時間發(fā)布中
EventPublisher publisher = INSTANCE.publisherMap.get(topic);
if (publisher != null) {
//事件發(fā)布者publisher發(fā)布事件
return publisher.publish(event);
}
LOGGER.warn("There are no [{}] publishers for this event, please register", topic);
return false;
}
在這個源碼中,其實 INSTANCE
是單例實現(xiàn)的,在這里publisherMap
鍵值對是什么時候建立的?其實在是我們NacosNamingService.init()
調(diào)用初始化方法的時候進行綁定的
private void init(Properties properties) throws NacosException {
......
//建立InstancesChangeEvent和EnvenPublisher的關(guān)系
NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);
......
}
當我們從上面方法進去的時候,會發(fā)現(xiàn)他默認使用的是DEFAULT_PUBLISHER_FACTORY
來進行構(gòu)建,而在NotifyCenter
代碼塊中,會發(fā)現(xiàn)DEFAULT_PUBLISHER_FACTORY默認構(gòu)建的EventPublisher為DefaultPublisher
public static EventPublisher registerToPublisher(final Class<? extends Event> eventType, final int queueMaxSize) {
//主要關(guān)注DEFAULT_PUBLISHER_FACTORY
return registerToPublisher(eventType, DEFAULT_PUBLISHER_FACTORY, queueMaxSize);
}
if (iterator.hasNext()) {
clazz = iterator.next().getClass();
} else {
clazz = DefaultPublisher.class;
}
DEFAULT_PUBLISHER_FACTORY = (cls, buffer) -> {
try {
EventPublisher publisher = clazz.newInstance();
publisher.init(cls, buffer);
return publisher;
} catch (Throwable ex) {
LOGGER.error("Service class newInstance has error : ", ex);
throw new NacosRuntimeException(SERVER_ERROR, ex);
}
};
由此我們看出在NotifyCenter
類中維護了事件名稱和事件發(fā)布者的關(guān)系,而默認的時間發(fā)布中為DefaultPublisher。
閑言
到這里,我們Nacos訂閱機制的前半章我們就講完了,因為整體服務訂閱的事件機制還是比較復雜,篇幅太長,所以分成了兩部分,今天這個章節(jié)我們主要講解了,客戶端服務發(fā)現(xiàn)的原理以及訂閱機制中定時器的運行邏輯和NotifyCenter發(fā)布InstancesChangeEvent事件的流程
文章轉(zhuǎn)載自: https://muxiaonong.blog.csdn.net
公眾號:牧小農(nóng),微信掃碼關(guān)注或搜索公眾號名稱