• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

SpringCloud Gateway实现灰度

武飞扬头像
summer_west_fish
帮助1

一、什么是灰度发布?

灰度发布(又名金丝雀发布)是指在黑与白之间,能够平滑过渡的一种发布方式。在其上可以进行A/B testing,即让一部分用户继续用产品特性A,一部分用户开始用产品特性B,如果用户对B没有什么反对意见,那么逐步扩大范围,把所有用户都迁移到B上面来。灰度发布可以保证整体系统的稳定,在初始灰度的时候就可以发现、调整问题,以保证其影响度。

实现的整体思路:

  • 编写灰度路由
  • 编写自定义filter
  • nacos服务配置需要灰度发布的服务的元数据信息以及权重
  • 灰度路由从nacos服务拉取元数据信息以及权重,然后根据权重算法,返回符合要求的服务实例给自定义的filter
  • 网关配置文件配置需要灰度路由的服务(因为本文代码没有网关实现动态路由,不然灰度路由可以配置在配置中心,从配置中心拉取)
  • filter通过责任链模式,把服务实例透传给其他filter比如NettyRoutingFilter

二、SpringCloud Gateway集成

学新通

灰度路由

  1.  
    import org.apache.commons.lang3.StringUtils;
  2.  
    import org.springframework.beans.factory.ObjectProvider;
  3.  
    import org.springframework.cloud.client.ServiceInstance;
  4.  
    import org.springframework.cloud.client.loadbalancer.reactive.DefaultResponse;
  5.  
    import org.springframework.cloud.client.loadbalancer.reactive.EmptyResponse;
  6.  
    import org.springframework.cloud.client.loadbalancer.reactive.Request;
  7.  
    import org.springframework.cloud.client.loadbalancer.reactive.Response;
  8.  
    import org.springframework.cloud.loadbalancer.core.NoopServiceInstanceListSupplier;
  9.  
    import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
  10.  
    import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
  11.  
    import org.springframework.http.HttpHeaders;
  12.  
    import reactor.core.publisher.Flux;
  13.  
    import reactor.core.publisher.Mono;
  14.  
     
  15.  
    import java.util.List;
  16.  
    import java.util.Random;
  17.  
    import java.util.concurrent.atomic.AtomicInteger;
  18.  
    import java.util.stream.Collectors;
  19.  
     
  20.  
    /**
  21.  
    * @author XIAXINYU3
  22.  
    * @date 2021/11/3
  23.  
    */
  24.  
    public class VersionGrayLoadBalancer implements ReactorServiceInstanceLoadBalancer {
  25.  
     
  26.  
    private ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;
  27.  
    private String serviceId;
  28.  
    private final AtomicInteger position;
  29.  
     
  30.  
     
  31.  
    public VersionGrayLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider, String serviceId) {
  32.  
    this(serviceInstanceListSupplierProvider, serviceId, new Random().nextInt(1000));
  33.  
    }
  34.  
     
  35.  
    public VersionGrayLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider, String serviceId, int seedPosition) {
  36.  
    this.serviceId = serviceId;
  37.  
    this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
  38.  
    this.position = new AtomicInteger(seedPosition);
  39.  
    }
  40.  
     
  41.  
    @Override
  42.  
    public Mono<Response<ServiceInstance>> choose(Request request) {
  43.  
    HttpHeaders headers = (HttpHeaders) request.getContext();
  44.  
    ServiceInstanceListSupplier supplier = this.serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);
  45.  
    return ((Flux) supplier.get()).next().map(list -> processInstanceResponse((List<ServiceInstance>) list, headers));
  46.  
    }
  47.  
     
  48.  
    private Response<ServiceInstance> processInstanceResponse(List<ServiceInstance> instances, HttpHeaders headers) {
  49.  
    if (instances.isEmpty()) {
  50.  
    return new EmptyResponse();
  51.  
    } else {
  52.  
    String reqVersion = headers.getFirst("version");
  53.  
     
  54.  
    if (StringUtils.isEmpty(reqVersion)) {
  55.  
    return processRibbonInstanceResponse(instances);
  56.  
    }
  57.  
     
  58.  
    List<ServiceInstance> serviceInstances = instances.stream()
  59.  
    .filter(instance -> reqVersion.equals(instance.getMetadata().get("version")))
  60.  
    .collect(Collectors.toList());
  61.  
     
  62.  
    if (serviceInstances.size() > 0) {
  63.  
    return processRibbonInstanceResponse(serviceInstances);
  64.  
    } else {
  65.  
    return processRibbonInstanceResponse(instances);
  66.  
    }
  67.  
    }
  68.  
    }
  69.  
     
  70.  
    private Response<ServiceInstance> processRibbonInstanceResponse(List<ServiceInstance> instances) {
  71.  
    int pos = Math.abs(this.position.incrementAndGet());
  72.  
    ServiceInstance instance = instances.get(pos % instances.size());
  73.  
    return new DefaultResponse(instance);
  74.  
    }
  75.  
    }
学新通

自定义filter

  1.  
    import org.apache.commons.logging.Log;
  2.  
    import org.apache.commons.logging.LogFactory;
  3.  
    import org.springframework.cloud.client.ServiceInstance;
  4.  
    import org.springframework.cloud.client.loadbalancer.LoadBalancerUriTools;
  5.  
    import org.springframework.cloud.client.loadbalancer.reactive.DefaultRequest;
  6.  
    import org.springframework.cloud.client.loadbalancer.reactive.Request;
  7.  
    import org.springframework.cloud.client.loadbalancer.reactive.Response;
  8.  
    import org.springframework.cloud.gateway.config.LoadBalancerProperties;
  9.  
    import org.springframework.cloud.gateway.filter.GatewayFilterChain;
  10.  
    import org.springframework.cloud.gateway.filter.GlobalFilter;
  11.  
    import org.springframework.cloud.gateway.filter.ReactiveLoadBalancerClientFilter;
  12.  
    import org.springframework.cloud.gateway.support.DelegatingServiceInstance;
  13.  
    import org.springframework.cloud.gateway.support.NotFoundException;
  14.  
    import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;
  15.  
    import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
  16.  
    import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
  17.  
    import org.springframework.core.Ordered;
  18.  
    import org.springframework.http.HttpHeaders;
  19.  
    import org.springframework.web.server.ServerWebExchange;
  20.  
    import reactor.core.publisher.Mono;
  21.  
     
  22.  
    import java.net.URI;
  23.  
     
  24.  
    /**
  25.  
    * @author XIAXINYU3
  26.  
    * @date 2021/11/3
  27.  
    */
  28.  
    public class GrayReactiveLoadBalancerClientFilter implements GlobalFilter, Ordered {
  29.  
     
  30.  
    private static final Log log = LogFactory.getLog(ReactiveLoadBalancerClientFilter.class);
  31.  
    private static final int LOAD_BALANCER_CLIENT_FILTER_ORDER = 10150;
  32.  
    private final LoadBalancerClientFactory clientFactory;
  33.  
    private LoadBalancerProperties properties;
  34.  
     
  35.  
    public GrayReactiveLoadBalancerClientFilter(LoadBalancerClientFactory clientFactory, LoadBalancerProperties properties) {
  36.  
    this.clientFactory = clientFactory;
  37.  
    this.properties = properties;
  38.  
    }
  39.  
     
  40.  
    @Override
  41.  
    public int getOrder() {
  42.  
    return LOAD_BALANCER_CLIENT_FILTER_ORDER;
  43.  
    }
  44.  
     
  45.  
    @Override
  46.  
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
  47.  
    URI url = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
  48.  
    String schemePrefix = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_SCHEME_PREFIX_ATTR);
  49.  
    if (url != null && ("grayLb".equals(url.getScheme()) || "grayLb".equals(schemePrefix))) {
  50.  
    ServerWebExchangeUtils.addOriginalRequestUrl(exchange, url);
  51.  
    if (log.isTraceEnabled()) {
  52.  
    log.trace(ReactiveLoadBalancerClientFilter.class.getSimpleName() " url before: " url);
  53.  
    }
  54.  
     
  55.  
    return this.choose(exchange).doOnNext((response) -> {
  56.  
    if (!response.hasServer()) {
  57.  
    throw NotFoundException.create(this.properties.isUse404(), "Unable to find instance for " url.getHost());
  58.  
    } else {
  59.  
    URI uri = exchange.getRequest().getURI();
  60.  
    String overrideScheme = null;
  61.  
    if (schemePrefix != null) {
  62.  
    overrideScheme = url.getScheme();
  63.  
    }
  64.  
     
  65.  
    DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance((ServiceInstance)response.getServer(), overrideScheme);
  66.  
    URI requestUrl = this.reconstructURI(serviceInstance, uri);
  67.  
    if (log.isTraceEnabled()) {
  68.  
    log.trace("LoadBalancerClientFilter url chosen: " requestUrl);
  69.  
    }
  70.  
     
  71.  
    exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR, requestUrl);
  72.  
    }
  73.  
    }).then(chain.filter(exchange));
  74.  
    } else {
  75.  
    return chain.filter(exchange);
  76.  
    }
  77.  
    }
  78.  
     
  79.  
    protected URI reconstructURI(ServiceInstance serviceInstance, URI original) {
  80.  
    return LoadBalancerUriTools.reconstructURI(serviceInstance, original);
  81.  
    }
  82.  
     
  83.  
    private Mono<Response<ServiceInstance>> choose(ServerWebExchange exchange) {
  84.  
    URI uri = (URI)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
  85.  
    VersionGrayLoadBalancer loadBalancer = new VersionGrayLoadBalancer(clientFactory.getLazyProvider(uri.getHost(), ServiceInstanceListSupplier.class), uri.getHost());
  86.  
    if (loadBalancer == null) {
  87.  
    throw new NotFoundException("No loadbalancer available for " uri.getHost());
  88.  
    } else {
  89.  
    return loadBalancer.choose(this.createRequest(exchange));
  90.  
    }
  91.  
    }
  92.  
     
  93.  
    private Request createRequest(ServerWebExchange exchange) {
  94.  
    HttpHeaders headers = exchange.getRequest().getHeaders();
  95.  
    Request<HttpHeaders> request = new DefaultRequest<>(headers);
  96.  
    return request;
  97.  
    }
  98.  
    }
学新通

配置自定义filter给spring管理

  1.  
    import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
  2.  
    import org.springframework.cloud.gateway.config.LoadBalancerProperties;
  3.  
    import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
  4.  
    import org.springframework.context.annotation.Bean;
  5.  
    import org.springframework.context.annotation.Configuration;
  6.  
     
  7.  
    /**
  8.  
    * @author XIAXINYU3
  9.  
    * @date 2021/11/3
  10.  
    */
  11.  
    @Configuration
  12.  
    public class GrayGatewayReactiveLoadBalancerClientAutoConfiguration {
  13.  
     
  14.  
    @Bean
  15.  
    @ConditionalOnMissingBean({GrayReactiveLoadBalancerClientFilter.class})
  16.  
    public GrayReactiveLoadBalancerClientFilter grayReactiveLoadBalancerClientFilter(LoadBalancerClientFactory clientFactory,
  17.  
    LoadBalancerProperties properties) {
  18.  
    return new GrayReactiveLoadBalancerClientFilter(clientFactory, properties);
  19.  
    }
  20.  
    }
学新通

配置application.yaml

  1.  
    #注意:lb 改成 grayLb
  2.  
     
  3.  
    spring:
  4.  
    main:
  5.  
    allow-bean-definition-overriding: true
  6.  
    application:
  7.  
    name: gateway
  8.  
    servlet:
  9.  
    multipart:
  10.  
    max-file-size: 50MB
  11.  
    max-request-size: 50MB
  12.  
    cloud:
  13.  
    nacos:
  14.  
    config:
  15.  
    server-addr: 127.0.0.1:8848
  16.  
    discovery:
  17.  
    server-addr: 127.0.0.1:8848
  18.  
    gateway:
  19.  
    routes: # http://127.0.0.1:9000/actuator/gateway/routes
  20.  
    - id: provider # 路由 ID,保持唯一
  21.  
    uri: grayLb://provider # uri指目标服务地址,lb代表从注册中心获取服务
  22.  
    predicates:
  23.  
    - Path=/provider/** # http://127.0.0.1:9000/provider/port 会转发到 http://localhost:9001/provider/port, 和预期不符合, 需要StripPrefix来处理
  24.  
    filters:
  25.  
    - StripPrefix=1 # StripPrefix=1就代表截取路径的个数为1, 这样请求 http://127.0.0.1:9000/provider/test/port 会转发到 http://localhost:9001/test/port
学新通

三、provider 微服务集成

  1.  
    # application.yaml 添加eureka配置,关键点metadata-map
  2.  
     
  3.  
    eureka:
  4.  
    instance:
  5.  
    preferIpAddress: true
  6.  
    leaseRenewalIntervalInSeconds: 10
  7.  
    leaseExpirationDurationInSeconds: 30
  8.  
    metadata-map:
  9.  
    version: v1
  10.  
    client:
  11.  
    serviceUrl:
  12.  
    defaultZone: http://localhost:8761/eureka/
  13.  
    registryFetchIntervalSeconds: 10
  14.  
    disable-delta: true

四、测试以及源码

学新通

https://gitee.com/xiaxinyu3_admin/ms-gateway.git学新通https://gitee.com/xiaxinyu3_admin/ms-gateway.git

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhggccbe
系列文章
更多 icon
同类精品
更多 icon
继续加载