Nacos源碼系列—服務(wù)端那些事兒
前言
在上節(jié)課中,我們講解了客戶端注冊服務(wù)的大體流程,客戶端在注冊服務(wù)的時(shí)候調(diào)用的是 NamingService.registerInstance
來完成實(shí)例的注冊,在最后呢我們知道服務(wù)注冊是通過 nacos/v1/ns/instance
接口來完成注冊的,我們今天來講解服務(wù)端的注冊,首先就從這個(gè)接口地址開始,來看具體服務(wù)端都做了哪些事情
服務(wù)注冊
上面是我們從官網(wǎng)中找到的Nacos架構(gòu)圖,從這個(gè)圖中我們大體可以得出我們要找的接口應(yīng)該是在NamingService
這個(gè)服務(wù)中,同時(shí)我們在項(xiàng)目結(jié)構(gòu)中也可以看到naming這個(gè)模塊,naming就是實(shí)現(xiàn)服務(wù)注冊的,我們都知道請求路徑都是通過controller來進(jìn)行處理的,而在其中我們可以看到一個(gè)InstanceController
的這么一個(gè)類,那么注冊實(shí)例肯定會(huì)和它有關(guān)??梢钥吹?code>InstanceController類的請求路由即是我們POST請求的路由的部分,如下:
所以,我們就從開始研究接收請求處理服務(wù)注冊的源碼,我們找到通過RestFul API接口,請求類型為Post,的方法,符合條件的只有InstanceController.register
方法,這個(gè)方法用來接收用戶的請求,并且把收到的信息進(jìn)行解析,裝換成實(shí)例信息,然后通過getInstanceOperator().registerInstance
進(jìn)行調(diào)用,這個(gè)方法也是服務(wù)注冊的核心
@CanDistro
@PostMapping
@Secured(action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
//從 request信息中獲取namespaceId,如果沒有默認(rèn)為public
final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
//獲取服務(wù)名稱 格式:“group@@serviceName”
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
//將request參數(shù)還原成instance實(shí)例
final Instance instance = HttpRequestInstanceBuilder.newBuilder()
.setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build();
//【核心】注冊服務(wù)實(shí)例
getInstanceOperator().registerInstance(namespaceId, serviceName, instance);
return "ok";
}
我們先來看一下下面這個(gè)核心方法
getInstanceOperator().registerInstance(namespaceId, serviceName, instance);
getInstanceOperator()
這個(gè)判斷是否走Grpc協(xié)議,默認(rèn)走Grpc,所以我們使用的是instanceServiceV2
這個(gè)實(shí)例對象
private InstanceOperator getInstanceOperator() {
return upgradeJudgement.isUseGrpcFeatures() ? instanceServiceV2 : instanceServiceV1;
}
instanceServiceV2
就是InstanceOperatorClientImpl
,方法所以我們需要進(jìn)入的是下面這個(gè)實(shí)例的處理類
具體方法如下所示:
@Override
public void registerInstance(String namespaceId, String serviceName, Instance instance) {
//判斷是否為臨時(shí)客戶端
boolean ephemeral = instance.isEphemeral();
//獲取客戶端ID
String clientId = IpPortBasedClient.getClientId(instance.toInetAddr(), ephemeral);
//通過客戶端ID創(chuàng)建客戶端連接
createIpPortClientIfAbsent(clientId);
//獲取服務(wù)信息
Service service = getService(namespaceId, serviceName, ephemeral);
//注冊服務(wù)
clientOperationService.registerInstance(service, instance, clientId);
}
從Nacos2.0以后,新增了Client模型,管理與該客戶機(jī)有關(guān)的數(shù)據(jù)內(nèi)容,如果一個(gè)客戶機(jī)發(fā)布了一個(gè)服務(wù),那么這個(gè)客戶機(jī)發(fā)布的所有服務(wù)和訂閱者信息都會(huì)被更新到一個(gè)Client對象中,這個(gè)Client對象對應(yīng)于這個(gè)客戶機(jī)的鏈接,然后通過事件機(jī)制觸發(fā)索引信息的更新。Client負(fù)責(zé)管理一個(gè)客戶機(jī)的服務(wù)實(shí)例注冊Publish和服務(wù)訂閱Subscribe,可以方便地對需要推送的服務(wù)范圍進(jìn)行快速聚合,同時(shí)一個(gè)客戶端gRPC長連接對應(yīng)一個(gè)Client,每個(gè)Client有自己唯一的 clientId
package com.alibaba.nacos.naming.core.v2.client;
public interface Client {
// 客戶端id
String getClientId();
// 是否臨時(shí)客戶端
boolean isEphemeral();
//設(shè)置客戶端更新時(shí)間
void setLastUpdatedTime();
//獲取客戶端更新時(shí)間
long getLastUpdatedTime();
// 服務(wù)實(shí)例注冊
boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo);
//服務(wù)實(shí)例移除
InstancePublishInfo removeServiceInstance(Service service);
//服務(wù)實(shí)例查詢
InstancePublishInfo getInstancePublishInfo(Service service);
Collection<Service> getAllPublishedService();
// 服務(wù)訂閱
boolean addServiceSubscriber(Service service, Subscriber subscriber);
///取消訂閱
boolean removeServiceSubscriber(Service service);
//查詢訂閱
Subscriber getSubscriber(Service service);
Collection<Service> getAllSubscribeService();
// 生成同步給其他節(jié)點(diǎn)的client數(shù)據(jù)
ClientSyncData generateSyncData();
// 是否過期
boolean isExpire(long currentTime);
// 釋放資源
void release();
}
知道了Client模型后,我們來接著從clientOperationService.registerInstance(service, instance, clientId);
找到對應(yīng)的具體實(shí)現(xiàn)
EphemeralClientOperationServiceImpl.registerInstance()
下面這個(gè)方法是具體來負(fù)責(zé)處理服務(wù)注冊,我們來詳細(xì)了解一下:
@Override
public void registerInstance(Service service, Instance instance, String clientId) {
//確保Service單例存在,注意Service的equals和hasCode方法
Service singleton = ServiceManager.getInstance().getSingleton(service);
//如果不是臨時(shí)客戶端
if (!singleton.isEphemeral()) {
throw new NacosRuntimeException(NacosException.INVALID_PARAM,
String.format("Current service %s is persistent service, can't register ephemeral instance.",
singleton.getGroupedServiceName()));
}
//根據(jù)客戶端ID找到客戶端信息,這個(gè)關(guān)系在連接建立的時(shí)候存儲
Client client = clientManager.getClient(clientId);
if (!clientIsLegal(client, clientId)) {
return;
}
//將客戶端實(shí)例模型,裝換成服務(wù)端實(shí)例模型
InstancePublishInfo instanceInfo = getPublishInfo(instance);
//將實(shí)例存儲到client中
client.addServiceInstance(singleton, instanceInfo);
//設(shè)置最后更新時(shí)間
client.setLastUpdatedTime();
//建立服務(wù)和客戶端的關(guān)聯(lián)關(guān)系
NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
NotifyCenter
.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
}
- ServiceManager.getInstance().getSingleton() 當(dāng)調(diào)用
getSingleton
的時(shí)候會(huì)負(fù)責(zé)管理service的單例,在這里service會(huì)重寫equlas和hasCode方法作為key
public class ServiceManager {
//單例service 看service中equals和hasCode方法
private final ConcurrentHashMap<Service, Service> singletonRepository;
//namespace下所有的service
private final ConcurrentHashMap<String, Set<Service>> namespaceSingletonMaps;
//通過Map儲存單例的Service
public Service getSingleton(Service service) {
singletonRepository.putIfAbsent(service, service);
Service result = singletonRepository.get(service);
namespaceSingletonMaps.computeIfAbsent(result.getNamespace(), (namespace) -> new ConcurrentHashSet<>());
namespaceSingletonMaps.get(result.getNamespace()).add(result);
return result;
}
}
- service中 equal和hasCode方法,namespace+group+name在服務(wù)端是一個(gè)單例Service
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof Service)) {
return false;
}
Service service = (Service) o;
return namespace.equals(service.namespace) && group.equals(service.group) && name.equals(service.name);
}
@Override
public int hashCode() {
return Objects.hash(namespace, group, name);
}
- clientManager.getClient() 這里對應(yīng)的實(shí)現(xiàn)類為
ConnectionBasedClientManager
這個(gè)實(shí)現(xiàn)類負(fù)責(zé)管理長連接clientId和client模型的映射關(guān)系
@Component("connectionBasedClientManager")
public class ConnectionBasedClientManager extends ClientConnectionEventListener implements ClientManager {
//通過map存儲ID和client之間的關(guān)聯(lián)關(guān)系
private final ConcurrentMap<String, ConnectionBasedClient> clients = new ConcurrentHashMap<>();
//根據(jù)clientId查詢client
@Override
public Client getClient(String clientId) {
return clients.get(clientId);
}
}
- client.addServiceInstance(); 抽象類為
AbstractClient
:負(fù)責(zé)存儲當(dāng)前客戶端服務(wù)注冊表,也就是 service和instance的關(guān)系。
protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1);、
//將service和實(shí)例進(jìn)行關(guān)聯(lián)
@Override
public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
if (null == publishers.put(service, instancePublishInfo)) {
//監(jiān)控指標(biāo)自增實(shí)例數(shù)
MetricsMonitor.incrementInstanceCount();
}
NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());
return true;
}
- ClientOperationEvent.ClientRegisterServiceEvent() :這里目的是為了過濾目標(biāo)服務(wù)得到最終instance列表建立service和client的關(guān)系,能夠方便我們快速查詢,同時(shí)會(huì)觸發(fā)
ClientServiceIndexesManager
的監(jiān)聽事件。
//服務(wù)與發(fā)布client的關(guān)系
private final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();
//服務(wù)與訂閱clientId的關(guān)系
private final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();
private void handleClientOperation(ClientOperationEvent event) {
Service service = event.getService();
String clientId = event.getClientId();
if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {
addPublisherIndexes(service, clientId);
} else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {
removePublisherIndexes(service, clientId);
} else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
addSubscriberIndexes(service, clientId);
} else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {
removeSubscriberIndexes(service, clientId);
}
}
private void addPublisherIndexes(Service service, String clientId) {
publisherIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
publisherIndexes.get(service).add(clientId);
NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
}
private void removePublisherIndexes(Service service, String clientId) {
if (!publisherIndexes.containsKey(service)) {
return;
}
publisherIndexes.get(service).remove(clientId);
NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
}
private void addSubscriberIndexes(Service service, String clientId) {
subscriberIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
// Fix #5404, Only first time add need notify event.
if (subscriberIndexes.get(service).add(clientId)) {
NotifyCenter.publishEvent(new ServiceEvent.ServiceSubscribedEvent(service, clientId));
}
}
private void removeSubscriberIndexes(Service service, String clientId) {
if (!subscriberIndexes.containsKey(service)) {
return;
}
subscriberIndexes.get(service).remove(clientId);
if (subscriberIndexes.get(service).isEmpty()) {
subscriberIndexes.remove(service);
}
}
請求流程圖:
服務(wù)端監(jiān)控檢查
Nacos作為注冊中心不止提供了服務(wù)注冊和服務(wù)發(fā)現(xiàn)的功能,還提供了服務(wù)可用性檢測的功能,在1.0的版本中,臨時(shí)實(shí)例走的是distro協(xié)議,客戶端向注冊中心發(fā)送心跳來維持自身的健康(healthy)狀態(tài),持久實(shí)例則走的是Raft協(xié)議存儲。
- 兩種檢測機(jī)制:
- 客戶端主動(dòng)上報(bào)機(jī)制
- 服務(wù)器端主動(dòng)下探機(jī)制
客戶端主動(dòng)上報(bào)機(jī)制:你主動(dòng)找上級,說你沒有打卡(不健康狀態(tài))
服務(wù)器端主動(dòng)下探機(jī)制:上級檢測到你有不打卡的記錄,主動(dòng)來找你
對于Nacos健康檢測機(jī)制,我們不能主動(dòng)去設(shè)置,但是健康檢查機(jī)制是和Nacos的服務(wù)實(shí)例類型強(qiáng)相關(guān),主要是有兩種服務(wù)實(shí)例:
- 臨時(shí)實(shí)例:客戶端主動(dòng)上報(bào)
- 持久實(shí)例:服務(wù)端主動(dòng)下探
客戶端主動(dòng)上報(bào)
臨時(shí)實(shí)例每隔5秒會(huì)主動(dòng)上報(bào)自己的健康狀態(tài),發(fā)送心跳,如果發(fā)送心跳的間隔時(shí)間超過15秒,Nacos服務(wù)器端會(huì)將服務(wù)標(biāo)記為亞健康狀態(tài),如果超過30S沒有發(fā)送心跳,那么服務(wù)實(shí)例會(huì)被從服務(wù)列表中剔除
在2.0版本以后,持久實(shí)例不變,臨時(shí)實(shí)例而是通過長連接來判斷實(shí)例是否健康。
- 長連接: 一個(gè)連接上可以連續(xù)發(fā)送多數(shù)據(jù)包,在連接保持期間,如果沒有數(shù)據(jù)包發(fā)送,需要雙方發(fā)鏈路檢測包,在Nacos2.0之后,使用Grpc協(xié)議代替了http協(xié)議。長連接會(huì)保持客戶端和服務(wù)端發(fā)送的狀態(tài),在源碼中
ConnectionManager
管理所有客戶端的長連接
ConnectionManager: 每3秒檢測所有超過20S內(nèi)沒有發(fā)生過通訊的客戶端,向客戶端發(fā)起ClientDetectionRequest探測請求,如果客戶端在1s內(nèi)成功響應(yīng),則檢測通過,否則執(zhí)行unregister方法移除Connection
如果客戶端持續(xù)和服務(wù)端進(jìn)行通訊,服務(wù)端是不需要主動(dòng)下探的,只有當(dāng)客戶端沒有一直和服務(wù)端通信的時(shí)候,服務(wù)端才會(huì)主動(dòng)下探操作
@Service
public class ConnectionManager extends Subscriber<ConnectionLimitRuleChangeEvent> {
Map<String, Connection> connections = new ConcurrentHashMap<String, Connection>();
//只要spring容器啟動(dòng),會(huì)觸發(fā)這個(gè)方法
@PostConstruct
public void start() {
// 啟動(dòng)不健康連接排除功能.
RpcScheduledExecutor.COMMON_SERVER_EXECUTOR.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
// 1. 統(tǒng)計(jì)過時(shí)(20s)連接
Set<Map.Entry<String, Connection>> entries = connections.entrySet();
//2.獲得需要剔除的IP和端口
//3.根據(jù)限制獲取剔除的IP和端口
//4.如果還是有需要剔除的客戶端,則繼續(xù)執(zhí)行
//5.沒有活動(dòng)的客戶端執(zhí)行探測
//6.如果沒有馬上響應(yīng),則馬上剔除
//7.剔除后發(fā)布ClientDisconnectEvent事件
}
});
}
}
//注銷(移出)連接方法
public synchronized void unregister(String connectionId) {
Connection remove = this.connections.remove(connectionId);
if (remove != null) {
String clientIp = remove.getMetaInfo().clientIp;
AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);
if (atomicInteger != null) {
int count = atomicInteger.decrementAndGet();
if (count <= 0) {
connectionForClientIp.remove(clientIp);
}
}
remove.close();
Loggers.REMOTE_DIGEST.info("[{}]Connection unregistered successfully. ", connectionId);
clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);
}
當(dāng)服務(wù)端操作移除事件以后,會(huì)操作notifyClientDisConnected()
方法,主要調(diào)用的是clientConnectionEventListener.clientDisConnected(connection)
方法,將連接信息傳入進(jìn)去
public void notifyClientDisConnected(final Connection connection) {
for (ClientConnectionEventListener clientConnectionEventListener : clientConnectionEventListeners) {
try {
clientConnectionEventListener.clientDisConnected(connection);
} catch (Throwable throwable) {
Loggers.REMOTE.info("[NotifyClientDisConnected] failed for listener {}",
clientConnectionEventListener.getName(), throwable);
}
}
clientConnectionEventListenerd
的實(shí)現(xiàn)類是ConnectionBasedClientManager
,在這里面會(huì)出發(fā)清除索引緩存等操作
@Component("connectionBasedClientManager")
public class ConnectionBasedClientManager extends ClientConnectionEventListener implements ClientManager {
@Override
public boolean clientDisconnected(String clientId) {
Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId);
//同步移除client數(shù)據(jù)
ConnectionBasedClient client = clients.remove(clientId);
if (null == client) {
return true;
}
client.release();
//服務(wù)訂閱,將變更通知到客戶端
NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));
return true;
}
}
總結(jié)
到這里Nacos服務(wù)端的基礎(chǔ)的源碼就講完了,有些地方我們沒有展開來講,在后續(xù)的源碼講解中,會(huì)給大家詳細(xì)的進(jìn)行講解,今天主要講解了,服務(wù)端注冊以及監(jiān)控檢查的基礎(chǔ)代碼,后面會(huì)有最新的內(nèi)容呈現(xiàn)給大家,如果覺得文中對您有幫助的,記得點(diǎn)贊關(guān)注~
文章轉(zhuǎn)載自: https://muxiaonong.blog.csdn.net
公眾號:牧小農(nóng),微信掃碼關(guān)注或搜索公眾號名稱