• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

java语言中Nacos注册中心:Cleint端【注册和心跳】

武飞扬头像
juejin
帮助65

前言

java语言中Nacos注册中心:Cleint端【注册和心跳】

0. 环境

  • nacos版本:1.4.1
  • Spring Cloud : 2020.0.2
  • Spring Boot :2.4.4
  • Spring Cloud alibaba: 2.2.5.RELEASE

测试代码:github.com/hsfxuebao/s…

1. 入口分析

spring.factories image.png 其中,会注入NacosAutoServiceRegistration:

@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosAutoServiceRegistration nacosAutoServiceRegistration(
      NacosServiceRegistry registry,
      AutoServiceRegistrationProperties autoServiceRegistrationProperties,
      NacosRegistration registration) {
   return new NacosAutoServiceRegistration(registry,
         autoServiceRegistrationProperties, registration);
}

我们看一下 这个类的register()方法:

// com.alibaba.cloud.nacos.registry.NacosAutoServiceRegistration#register
@Override
protected void register() {
   if (!this.registration.getNacosDiscoveryProperties().isRegisterEnabled()) {
      log.debug("Registration disabled.");
      return;
   }
   if (this.registration.getPort() < 0) {
      this.registration.setPort(getPort().get());
   }
   super.register();
}

// org.springframework.cloud.client.serviceregistry.AbstractAutoServiceRegistration#register
protected void register() {
   this.serviceRegistry.register(getRegistration());
}

// com.alibaba.cloud.nacos.registry.NacosServiceRegistry#register
@Override
public void register(Registration registration) {

   if (StringUtils.isEmpty(registration.getServiceId())) {
      log.warn("No service to register for nacos client...");
      return;
   }

   NamingService namingService = namingService();
   String serviceId = registration.getServiceId();
   String group = nacosDiscoveryProperties.getGroup();

   Instance instance = getNacosInstanceFromRegistration(registration);

   try {
      namingService.registerInstance(serviceId, group, instance);
      log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
            instance.getIp(), instance.getPort());
   }
   catch (Exception e) {
      log.error("nacos registry, {} register failed...{},", serviceId,
            registration.toString(), e);
      // rethrow a RuntimeException if the registration is failed.
      // issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132
      rethrowRuntimeException(e);
   }
}

核心方法namingService.registerInstance,接下来来到NacosNamingService#registerInstance方法中:

@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    NamingUtils.checkInstanceIsLegal(instance);
    // 格式为:groupId@@微服务名称
    String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
    // 若当前实例为临时实例,则向Server发送心跳
    if (instance.isEphemeral()) {
        // 构建一个心跳信息实例
        BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
        // todo 向Server端发送心跳(定时任务)
        beatReactor.addBeatInfo(groupedServiceName, beatInfo);
    }
    // todo 向Server发送注册请求
    serverProxy.registerService(groupedServiceName, groupName, instance);
}

接下来,分别分析注册和心跳两个核心方法。

2. 客户端注册

注册方法NamingProxy#registerService

public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {

    NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
            instance);

    // 将instance拆开,写入请求参数param
    final Map<String, String> params = new HashMap<String, String>(16);
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, serviceName);
    params.put(CommonParams.GROUP_NAME, groupName);
    params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
    params.put("ip", instance.getIp());
    params.put("port", String.valueOf(instance.getPort()));
    params.put("weight", String.valueOf(instance.getWeight()));
    params.put("enable", String.valueOf(instance.isEnabled()));
    params.put("healthy", String.valueOf(instance.isHealthy()));
    params.put("ephemeral", String.valueOf(instance.isEphemeral()));
    params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
    // todo 发送注册请求
    reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
}

首先,将一些核心参数设置到params中,后面在Server端会从params中解析这些参数。然后,调用reqApi方法,接下来,我们详细分析下这个方法,后面客户端向服务端发送请求都是使用的reqApi方法

//com.alibaba.nacos.client.naming.net.NamingProxy#reqApi(java.lang.String, java.util.Map<java.lang.String,java.lang.String>, java.lang.String)
public String reqApi(String api, Map<String, String> params, String method) throws NacosException {
    // todo
    return reqApi(api, params, Collections.EMPTY_MAP, method);
}

public String reqApi(String api, Map<String, String> params, Map<String, String> body, String method)
        throws NacosException {
    // todo
    // getServerList() 配置文件中链接服务端的地址
    return reqApi(api, params, body, getServerList(), method);
}

接下来:

public String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers,
        String method) throws NacosException {

    params.put(CommonParams.NAMESPACE_ID, getNamespaceId());

    if (CollectionUtils.isEmpty(servers) && StringUtils.isBlank(nacosDomain)) {
        throw new NacosException(NacosException.INVALID_PARAM, "no server available");
    }

    NacosException exception = new NacosException();

    if (StringUtils.isNotBlank(nacosDomain)) {
        // 默认尝试连接3次
        for (int i = 0; i < maxRetry; i  ) {
            try {
                return callServer(api, params, body, nacosDomain, method);
            } catch (NacosException e) {
                exception = e;
                if (NAMING_LOGGER.isDebugEnabled()) {
                    NAMING_LOGGER.debug("request {} failed.", nacosDomain, e);
                }
            }
        }
    } else {
        // 生成一个随机数,轮询
        Random random = new Random(System.currentTimeMillis());
        int index = random.nextInt(servers.size());

        // 遍历Server,从中选择一个Server进行连接,轮询
        for (int i = 0; i < servers.size(); i  ) {
            String server = servers.get(index);
            try {
                // todo 发送请求
                return callServer(api, params, body, server, method);
            } catch (NacosException e) {
                exception = e;
                if (NAMING_LOGGER.isDebugEnabled()) {
                    NAMING_LOGGER.debug("request {} failed.", server, e);
                }
            }
            index = (index   1) % servers.size();
        }
    }
    ...
    throw new NacosException(exception.getErrCode(),
            "failed to req API:"   api   " after all servers("   servers   ") tried: "   exception.getMessage());

}

轮询向Server端发送请求,知道请求成功为止。

public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer,
        String method) throws NacosException {
    long start = System.currentTimeMillis();
    long end = 0;
    injectSecurityInfo(params);
    Header header = builderHeader();

    String url;
    if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) {
        url = curServer   api;
    } else {
        if (!IPUtil.containsPort(curServer)) {
            curServer = curServer   IPUtil.IP_PORT_SPLITER   serverPort;
        }
        url = NamingHttpClientManager.getInstance().getPrefix()   curServer   api;
    }

    try {
        // todo
        HttpRestResult<String> restResult = nacosRestTemplate
                .exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class);
        end = System.currentTimeMillis();

        MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode()))
                .observe(end - start);

        if (restResult.ok()) {
            return restResult.getData();
        }
        if (HttpStatus.SC_NOT_MODIFIED == restResult.getCode()) {
            return StringUtils.EMPTY;
        }
        throw new NacosException(restResult.getCode(), restResult.getMessage());
    } catch (Exception e) {
        NAMING_LOGGER.error("[NA] failed to request", e);
        throw new NacosException(NacosException.SERVER_ERROR, e);
    }
}

public <T> HttpRestResult<T> exchangeForm(String url, Header header, Query query, Map<String, String> bodyValues,
        String httpMethod, Type responseType) throws Exception {
    RequestHttpEntity requestHttpEntity = new RequestHttpEntity(
            header.setContentType(MediaType.APPLICATION_FORM_URLENCODED), query, bodyValues);
    // todo
    return execute(url, httpMethod, requestHttpEntity, responseType);
}

private <T> HttpRestResult<T> execute(String url, String httpMethod, RequestHttpEntity requestEntity,
        Type responseType) throws Exception {
    URI uri = HttpUtils.buildUri(url, requestEntity.getQuery());
    ...

    ResponseHandler<T> responseHandler = super.selectResponseHandler(responseType);
    HttpClientResponse response = null;
    try {
        // todo 获取到nacos自定义httpClient,其实就是对JDK中httpClient的封装
        response = this.requestClient().execute(uri, httpMethod, requestEntity);
        return responseHandler.handle(response);
    } finally {
        if (response != null) {
            response.close();
        }
    }
}

获取到nacos自定义httpClient,其实就是对JDK中httpClient的封装,然后发起请求。

注册请求:POST请求,url: /nacos/v1/ns/instance/

3. 客户端心跳

// 若当前实例为临时实例,则向Server发送心跳
if (instance.isEphemeral()) {
    // 构建一个心跳信息实例
    BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
    // todo 向Server端发送心跳(定时任务)
    beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}

构建BeatInfo,向Server端发送心跳请求:

public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
    NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
    // 格式为:groupId@@微服务名称#ip#port
    // 这个key就固定一个主机
    String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
    BeatInfo existBeat = null;
    //fix #1733
    // dom2Beat为一个map,key为主机 value为该主机发送心跳beatInfo
    if ((existBeat = dom2Beat.remove(key)) != null) {
        existBeat.setStopped(true);
    }
    dom2Beat.put(key, beatInfo);
    // todo 开启一个定时任务
    executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
    MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}

创建定时任务,其实就是执行BeatTask的run方法:

class BeatTask implements Runnable {

    BeatInfo beatInfo;

    public BeatTask(BeatInfo beatInfo) {
        this.beatInfo = beatInfo;
    }

    @Override
    public void run() {
        if (beatInfo.isStopped()) {
            return;
        }
        long nextTime = beatInfo.getPeriod();
        try {
            // todo 发送心跳
            JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
            long interval = result.get("clientBeatInterval").asLong();
            boolean lightBeatEnabled = false;
            if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
                lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
            }
            BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
            if (interval > 0) {
                nextTime = interval;
            }
            int code = NamingResponseCode.OK;
            if (result.has(CommonParams.CODE)) {
                code = result.get(CommonParams.CODE).asInt();
            }
            // 若在服务端没有发现这个Client,则Server端返回错误码为20404
            if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
                Instance instance = new Instance();
                instance.setPort(beatInfo.getPort());
                instance.setIp(beatInfo.getIp());
                instance.setWeight(beatInfo.getWeight());
                instance.setMetadata(beatInfo.getMetadata());
                instance.setClusterName(beatInfo.getCluster());
                instance.setServiceName(beatInfo.getServiceName());
                instance.setInstanceId(instance.getInstanceId());
                instance.setEphemeral(true);
                try {
                    // todo 向Server端发送注册请求
                    serverProxy.registerService(beatInfo.getServiceName(),
                            NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
                } catch (Exception ignore) {
                }
            }
        } catch (NacosException ex) {
            NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
                    JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());

        }
        // 设置一个新的定时任务
        executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
    }
}

核心方法sendBeat

public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {

    if (NAMING_LOGGER.isDebugEnabled()) {
        NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());
    }
    Map<String, String> params = new HashMap<String, String>(8);
    Map<String, String> bodyMap = new HashMap<String, String>(2);
    if (!lightBeatEnabled) {
        bodyMap.put("beat", JacksonUtils.toJson(beatInfo));
    }
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
    params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());
    params.put("ip", beatInfo.getIp());
    params.put("port", String.valueOf(beatInfo.getPort()));
    // 发送PUT请求
    String result = reqApi(UtilAndComs.nacosUrlBase   "/instance/beat", params, bodyMap, HttpMethod.PUT);
    return JacksonUtils.toObj(result);
}

心跳请求:PUT请求,url:/nacos/v1/ns/instance/beat

4. 方法调用图

image.png

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanehib
系列文章
更多 icon
同类精品
更多 icon
继续加载