java语言中Nacos注册中心:Cleint端【注册和心跳】
前言
java语言中Nacos注册中心:Cleint端【注册和心跳】
0. 环境
- nacos版本:1.4.1
- Spring Cloud : 2020.0.2
- Spring Boot :2.4.4
- Spring Cloud alibaba: 2.2.5.RELEASE
1. 入口分析
spring.factories
其中,会注入NacosAutoServiceRegistration
:
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosAutoServiceRegistration nacosAutoServiceRegistration(
NacosServiceRegistry registry,
AutoServiceRegistrationProperties autoServiceRegistrationProperties,
NacosRegistration registration) {
return new NacosAutoServiceRegistration(registry,
autoServiceRegistrationProperties, registration);
}
我们看一下 这个类的register()
方法:
// com.alibaba.cloud.nacos.registry.NacosAutoServiceRegistration#register
@Override
protected void register() {
if (!this.registration.getNacosDiscoveryProperties().isRegisterEnabled()) {
log.debug("Registration disabled.");
return;
}
if (this.registration.getPort() < 0) {
this.registration.setPort(getPort().get());
}
super.register();
}
// org.springframework.cloud.client.serviceregistry.AbstractAutoServiceRegistration#register
protected void register() {
this.serviceRegistry.register(getRegistration());
}
// com.alibaba.cloud.nacos.registry.NacosServiceRegistry#register
@Override
public void register(Registration registration) {
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for nacos client...");
return;
}
NamingService namingService = namingService();
String serviceId = registration.getServiceId();
String group = nacosDiscoveryProperties.getGroup();
Instance instance = getNacosInstanceFromRegistration(registration);
try {
namingService.registerInstance(serviceId, group, instance);
log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
instance.getIp(), instance.getPort());
}
catch (Exception e) {
log.error("nacos registry, {} register failed...{},", serviceId,
registration.toString(), e);
// rethrow a RuntimeException if the registration is failed.
// issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132
rethrowRuntimeException(e);
}
}
核心方法namingService.registerInstance
,接下来来到NacosNamingService#registerInstance
方法中:
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);
// 格式为:groupId@@微服务名称
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
// 若当前实例为临时实例,则向Server发送心跳
if (instance.isEphemeral()) {
// 构建一个心跳信息实例
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
// todo 向Server端发送心跳(定时任务)
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
// todo 向Server发送注册请求
serverProxy.registerService(groupedServiceName, groupName, instance);
}
接下来,分别分析注册和心跳两个核心方法。
2. 客户端注册
注册方法NamingProxy#registerService
:
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
instance);
// 将instance拆开,写入请求参数param
final Map<String, String> params = new HashMap<String, String>(16);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
// todo 发送注册请求
reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
}
首先,将一些核心参数设置到params中,后面在Server端会从params中解析这些参数。然后,调用reqApi方法
,接下来,我们详细分析下这个方法,后面客户端向服务端发送请求都是使用的reqApi方法
:
//com.alibaba.nacos.client.naming.net.NamingProxy#reqApi(java.lang.String, java.util.Map<java.lang.String,java.lang.String>, java.lang.String)
public String reqApi(String api, Map<String, String> params, String method) throws NacosException {
// todo
return reqApi(api, params, Collections.EMPTY_MAP, method);
}
public String reqApi(String api, Map<String, String> params, Map<String, String> body, String method)
throws NacosException {
// todo
// getServerList() 配置文件中链接服务端的地址
return reqApi(api, params, body, getServerList(), method);
}
接下来:
public String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers,
String method) throws NacosException {
params.put(CommonParams.NAMESPACE_ID, getNamespaceId());
if (CollectionUtils.isEmpty(servers) && StringUtils.isBlank(nacosDomain)) {
throw new NacosException(NacosException.INVALID_PARAM, "no server available");
}
NacosException exception = new NacosException();
if (StringUtils.isNotBlank(nacosDomain)) {
// 默认尝试连接3次
for (int i = 0; i < maxRetry; i ) {
try {
return callServer(api, params, body, nacosDomain, method);
} catch (NacosException e) {
exception = e;
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("request {} failed.", nacosDomain, e);
}
}
}
} else {
// 生成一个随机数,轮询
Random random = new Random(System.currentTimeMillis());
int index = random.nextInt(servers.size());
// 遍历Server,从中选择一个Server进行连接,轮询
for (int i = 0; i < servers.size(); i ) {
String server = servers.get(index);
try {
// todo 发送请求
return callServer(api, params, body, server, method);
} catch (NacosException e) {
exception = e;
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("request {} failed.", server, e);
}
}
index = (index 1) % servers.size();
}
}
...
throw new NacosException(exception.getErrCode(),
"failed to req API:" api " after all servers(" servers ") tried: " exception.getMessage());
}
轮询向Server端发送请求,知道请求成功为止。
public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer,
String method) throws NacosException {
long start = System.currentTimeMillis();
long end = 0;
injectSecurityInfo(params);
Header header = builderHeader();
String url;
if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) {
url = curServer api;
} else {
if (!IPUtil.containsPort(curServer)) {
curServer = curServer IPUtil.IP_PORT_SPLITER serverPort;
}
url = NamingHttpClientManager.getInstance().getPrefix() curServer api;
}
try {
// todo
HttpRestResult<String> restResult = nacosRestTemplate
.exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class);
end = System.currentTimeMillis();
MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode()))
.observe(end - start);
if (restResult.ok()) {
return restResult.getData();
}
if (HttpStatus.SC_NOT_MODIFIED == restResult.getCode()) {
return StringUtils.EMPTY;
}
throw new NacosException(restResult.getCode(), restResult.getMessage());
} catch (Exception e) {
NAMING_LOGGER.error("[NA] failed to request", e);
throw new NacosException(NacosException.SERVER_ERROR, e);
}
}
public <T> HttpRestResult<T> exchangeForm(String url, Header header, Query query, Map<String, String> bodyValues,
String httpMethod, Type responseType) throws Exception {
RequestHttpEntity requestHttpEntity = new RequestHttpEntity(
header.setContentType(MediaType.APPLICATION_FORM_URLENCODED), query, bodyValues);
// todo
return execute(url, httpMethod, requestHttpEntity, responseType);
}
private <T> HttpRestResult<T> execute(String url, String httpMethod, RequestHttpEntity requestEntity,
Type responseType) throws Exception {
URI uri = HttpUtils.buildUri(url, requestEntity.getQuery());
...
ResponseHandler<T> responseHandler = super.selectResponseHandler(responseType);
HttpClientResponse response = null;
try {
// todo 获取到nacos自定义httpClient,其实就是对JDK中httpClient的封装
response = this.requestClient().execute(uri, httpMethod, requestEntity);
return responseHandler.handle(response);
} finally {
if (response != null) {
response.close();
}
}
}
获取到nacos自定义httpClient
,其实就是对JDK中httpClient的封装,然后发起请求。
注册请求:POST请求,url: /nacos/v1/ns/instance/
3. 客户端心跳
// 若当前实例为临时实例,则向Server发送心跳
if (instance.isEphemeral()) {
// 构建一个心跳信息实例
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
// todo 向Server端发送心跳(定时任务)
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
构建BeatInfo
,向Server端发送心跳请求:
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
// 格式为:groupId@@微服务名称#ip#port
// 这个key就固定一个主机
String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
BeatInfo existBeat = null;
//fix #1733
// dom2Beat为一个map,key为主机 value为该主机发送心跳beatInfo
if ((existBeat = dom2Beat.remove(key)) != null) {
existBeat.setStopped(true);
}
dom2Beat.put(key, beatInfo);
// todo 开启一个定时任务
executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}
创建定时任务,其实就是执行BeatTask
的run方法:
class BeatTask implements Runnable {
BeatInfo beatInfo;
public BeatTask(BeatInfo beatInfo) {
this.beatInfo = beatInfo;
}
@Override
public void run() {
if (beatInfo.isStopped()) {
return;
}
long nextTime = beatInfo.getPeriod();
try {
// todo 发送心跳
JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
long interval = result.get("clientBeatInterval").asLong();
boolean lightBeatEnabled = false;
if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
}
BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
if (interval > 0) {
nextTime = interval;
}
int code = NamingResponseCode.OK;
if (result.has(CommonParams.CODE)) {
code = result.get(CommonParams.CODE).asInt();
}
// 若在服务端没有发现这个Client,则Server端返回错误码为20404
if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
Instance instance = new Instance();
instance.setPort(beatInfo.getPort());
instance.setIp(beatInfo.getIp());
instance.setWeight(beatInfo.getWeight());
instance.setMetadata(beatInfo.getMetadata());
instance.setClusterName(beatInfo.getCluster());
instance.setServiceName(beatInfo.getServiceName());
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(true);
try {
// todo 向Server端发送注册请求
serverProxy.registerService(beatInfo.getServiceName(),
NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
} catch (Exception ignore) {
}
}
} catch (NacosException ex) {
NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());
}
// 设置一个新的定时任务
executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
}
核心方法sendBeat
:
public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());
}
Map<String, String> params = new HashMap<String, String>(8);
Map<String, String> bodyMap = new HashMap<String, String>(2);
if (!lightBeatEnabled) {
bodyMap.put("beat", JacksonUtils.toJson(beatInfo));
}
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());
params.put("ip", beatInfo.getIp());
params.put("port", String.valueOf(beatInfo.getPort()));
// 发送PUT请求
String result = reqApi(UtilAndComs.nacosUrlBase "/instance/beat", params, bodyMap, HttpMethod.PUT);
return JacksonUtils.toObj(result);
}
心跳请求:PUT请求,url:/nacos/v1/ns/instance/beat
4. 方法调用图
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanehib
系列文章
更多
同类精品
更多
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
怎样阻止微信小程序自动打开
PHP中文网 06-13 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
photoshop蒙版画笔没反应怎么办
PHP中文网 06-24