• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Spring Cloud Loadbalancer (二) 实现LoadBalancerClient

武飞扬头像
老磨谈技术
帮助6

前面一个章节,我们知道 RestTemplate 负载均衡,最终是委托给了 LoadBalancerClient 去执行的,回忆一下其定义如下:


public interface LoadBalancerRequest<T> {
    T apply(ServiceInstance instance) throws Exception;
}

public interface ServiceInstanceChooser {
	ServiceInstance choose(String serviceId);// 根据服务名获取具体实例
	<T> ServiceInstance choose(String serviceId, Request<T> request);
}
//根据服务名获取实例,并执行请求
public interface LoadBalancerClient extends ServiceInstanceChooser {
    <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
    <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;
    URI reconstructURI(ServiceInstance instance, URI original);
}


学新通

查看 spring-cloud-loadbalancer 包下的 spring.factories,可以看到 LoadBalancerClient 的自动配置类 BlockingLoadBalancerClientAutoConfiguration。进去可以看到具体的实现类为 BlockingLoadBalancerClient。其核心是使用 ReactiveLoadBalancer.Factory 实例通过 serviceId 获取具体的 ServiceInstance。具体代码如下:

public class BlockingLoadBalancerClient implements LoadBalancerClient {

	private final ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerClientFactory;

	public BlockingLoadBalancerClient(ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerClientFactory) {
		this.loadBalancerClientFactory = loadBalancerClientFactory;
	}
    // 在RestTemplate 拦截器调用的方法,在这里通过 serviceId 获取  ServiceInstance。
	@Override
	public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
        // 省略部分代码
		String hint = getHint(serviceId);
		LoadBalancerRequestAdapter<T, TimedRequestContext> lbRequest = new LoadBalancerRequestAdapter<>(request,
				buildRequestContext(request, hint));
		ServiceInstance serviceInstance = choose(serviceId, lbRequest);
		return execute(serviceId, serviceInstance, lbRequest);
	}

	@Override
	public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request)
			throws IOException {
        // 省略部分代码
		try {
			T response = request.apply(serviceInstance);
			return response;
		} catch (IOException iOException) {}
		catch (Exception exception) {}
		return null;
	}

	@Override
	public ServiceInstance choose(String serviceId) {
		return choose(serviceId, REQUEST);
	}

    // 重要的方法,从 loadBalancerClientFactory 获取  ReactiveLoadBalancer 。
	@Override
	public <T> ServiceInstance choose(String serviceId, Request<T> request) {
		ReactiveLoadBalancer<ServiceInstance> loadBalancer = loadBalancerClientFactory.getInstance(serviceId);
		if (loadBalancer == null) {
			return null;
		}
		Response<ServiceInstance> loadBalancerResponse = Mono.from(loadBalancer.choose(request)).block();
		if (loadBalancerResponse == null) {
			return null;
		}
		return loadBalancerResponse.getServer();
	}

    @Override
    public URI reconstructURI(ServiceInstance serviceInstance, URI original) {
        return LoadBalancerUriTools.reconstructURI(serviceInstance, original);
    }

	// 省略部分代码

}
学新通

其核心是通过 choose 方法获取 ServiceInstance ,然后交给 LoadBalancerRequest 去执行的。此时我们还没有看到负载均衡的影子,我们继续看 choose 方法中的 loadBalancerClientFactory (ReactiveLoadBalancer.Factory),是通过 LoadBalancerAutoConfiguration 这个类进行配置的,其实现类是 LoadBalancerClientFactory,继承了 NamedContextFactory:

public class LoadBalancerClientFactory extends NamedContextFactory<LoadBalancerClientSpecification>
		implements ReactiveLoadBalancer.Factory<ServiceInstance> {
}

我们看下 LoadBalancerAutoConfiguration 是如何对 LoadBalancerClientFactory 进行配置的。

@LoadBalancerClients
@EnableConfigurationProperties(LoadBalancerClientsProperties.class)
@AutoConfigureBefore({ ReactorLoadBalancerClientAutoConfiguration.class, LoadBalancerBeanPostProcessorAutoConfiguration.class })
@ConditionalOnProperty(value = "spring.cloud.loadbalancer.enabled", havingValue = "true", matchIfMissing = true)
public class LoadBalancerAutoConfiguration {
    // 1. Spring 的特性,对于 Configuration ,其构造函数只有一个参数的时候,是可以自动注入的
	private final ObjectProvider<List<LoadBalancerClientSpecification>> configurations;
    
	public LoadBalancerAutoConfiguration(ObjectProvider<List<LoadBalancerClientSpecification>> configurations) {
		this.configurations = configurations;
	}
    // 2. 创建 LoadBalancerClientFactory 的 bean
	@ConditionalOnMissingBean
	@Bean
	public LoadBalancerClientFactory loadBalancerClientFactory(LoadBalancerClientsProperties properties) {
		LoadBalancerClientFactory clientFactory = new LoadBalancerClientFactory(properties);
        // 如果对 NamedContextFactory 熟悉,就知道这一步很重要,将配置类注入到 NamedContextFactory 中,实现个性化配置。
		clientFactory.setConfigurations(this.configurations.getIfAvailable(Collections::emptyList));
		return clientFactory;
	}
}
学新通

我们再看下 LoadBalancerClientFactory 的构造函数,其中已经传入了一个默认的配置类 LoadBalancerClientConfiguration。

public class LoadBalancerClientFactory extends NamedContextFactory<LoadBalancerClientSpecification>
		implements ReactiveLoadBalancer.Factory<ServiceInstance> {
    public LoadBalancerClientFactory(LoadBalancerClientsProperties properties) {
        // 这里注意看 LoadBalancerClientConfiguration 被当成了默认配置类注入到 NamedContextFactory 中。也就是说每个子容器都会有这个配置类。
        super(LoadBalancerClientConfiguration.class, NAMESPACE, PROPERTY_NAME);
        this.properties = properties;
    }
}

LoadBalancerClientConfiguration 这个配置类,有很多有条件的配置,我们只看默认条件下的关键配置。

@Configuration(proxyBeanMethods = false)
@ConditionalOnDiscoveryEnabled
public class LoadBalancerClientConfiguration {

	private static final int REACTIVE_SERVICE_INSTANCE_SUPPLIER_ORDER = 193827465;
    
    // 这个配置有点绕,首先,在前文可以看到在 LoadBalancerAutoConfiguration 配置中,配置了 LoadBalancerClientFactory ,其实例 bean 存在于父容器中,而当前配置类 LoadBalancerClientConfiguration 通过构造函数会当成默认配置类注册到 NamedContextFactory 的子容器中,每个子容器都拥有。只有明白了这一点,才能够理解容器的隔离。 ConditionalOnMissingBean 说明可以自己实现这个bean,来覆盖默认的配置。
	@Bean
	@ConditionalOnMissingBean
	public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(Environment environment,
			LoadBalancerClientFactory loadBalancerClientFactory) {
        // 获取当前子容器的名称(也是服务名)
		String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
		return new RoundRobinLoadBalancer(loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);
	}

	@Configuration(proxyBeanMethods = false)
	@ConditionalOnBlockingDiscoveryEnabled
	@Order(REACTIVE_SERVICE_INSTANCE_SUPPLIER_ORDER   1)
	public static class BlockingSupportConfiguration {
        // 真正获取  ServiceInstance 列表的 由 ServiceInstanceListSupplier 提供,这里可以看到可以由 DiscoveryClient 提供。这样就已经跟注册中心客户端联系上了。
		@Bean
		@ConditionalOnBean(DiscoveryClient.class)
		@ConditionalOnMissingBean
		@Conditional(DefaultConfigurationCondition.class)
		public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier(
				ConfigurableApplicationContext context) {
			return ServiceInstanceListSupplier.builder().withBlockingDiscoveryClient().withCaching().build(context);
		}

	}

}
学新通
public class RoundRobinLoadBalancer implements ReactorServiceInstanceLoadBalancer {

	private static final Log log = LogFactory.getLog(RoundRobinLoadBalancer.class);

	final AtomicInteger position;

	final String serviceId;

	ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;

	public RoundRobinLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider,
			String serviceId) {
		this(serviceInstanceListSupplierProvider, serviceId, new Random().nextInt(1000));
	}
	
	@Override
	// see original
	// https://github.com/Netflix/ocelli/blob/master/ocelli-core/
	// src/main/java/netflix/ocelli/loadbalancer/RoundRobinLoadBalancer.java
	public Mono<Response<ServiceInstance>> choose(Request request) {
		ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
				.getIfAvailable(NoopServiceInstanceListSupplier::new);
		return supplier.get(request).next()
				.map(serviceInstances -> processInstanceResponse(supplier, serviceInstances));
	}

	private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier,
			List<ServiceInstance> serviceInstances) {
		Response<ServiceInstance> serviceInstanceResponse = getInstanceResponse(serviceInstances);
		if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
			((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());
		}
		return serviceInstanceResponse;
	}
    // 真正获去进行负载均衡的逻辑,可以看到这里使用的是轮询的方法。
	private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {
		if (instances.isEmpty()) {
			if (log.isWarnEnabled()) {
				log.warn("No servers available for service: "   serviceId);
			}
			return new EmptyResponse();
		}

		// Ignore the sign bit, this allows pos to loop sequentially from 0 to
		// Integer.MAX_VALUE
		int pos = this.position.incrementAndGet() & Integer.MAX_VALUE;

		ServiceInstance instance = instances.get(pos % instances.size());

		return new DefaultResponse(instance);
	}

}
学新通

至此,默认的负载均衡的类已经浮出水面,那就是 RoundRobinLoadBalancer。

我们重新梳理一下这流程。RestTemplate 负载均衡逻辑是委托给了 LoadBalancerClient 实现的。而实现类 BlockingLoadBalancerClient 又将获取 ServiceInstance 委托给了 ReactiveLoadBalancer.Factory,这是一个 NamedContextFactory 的子类,其中每个子容器的名称为 serviceId ,每个子容器都注册了 LoadBalancerClientConfiguration 配置类,也就都有 ReactorLoadBalancer Bean的配置,这个配置是条件式的,如果我们熟悉 NamedContextFactory 那么就知道,我们可以通过 setConfigurations 方法来为制定的子容器来新增配置类,从而提供 ReactorLoadBalancer 新的实现。

那假设想更换成随机的负载均衡怎么办呢,其代码如下:

@Configuration
public class RandomLoadBalancerConfig {
    @Bean
    ReactorLoadBalancer<ServiceInstance> randomLoadBalancer(
            Environment environment, LoadBalancerClientFactory loadBalancerClientFactory) {
        String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
        return new RandomLoadBalancer(loadBalancerClientFactory
                .getLazyProvider(name, ServiceInstanceListSupplier.class), name);
    }
}

Spring Cloud Loadbalander 提供了一下两个注解

@Configuration(proxyBeanMethods = false)
@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.TYPE })
@Documented
@Import(LoadBalancerClientConfigurationRegistrar.class)
public @interface LoadBalancerClients {

	LoadBalancerClient[] value() default {};
    
	Class<?>[] defaultConfiguration() default {};

}

@Configuration(proxyBeanMethods = false)
@Import(LoadBalancerClientConfigurationRegistrar.class)
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface LoadBalancerClient {

    @AliasFor("name")
    String value() default "";

    @AliasFor("value")
    String name() default "";

    Class<?>[] configuration() default {};

}
学新通

其核心在 LoadBalancerClientConfigurationRegistrar 类的逻辑

public class LoadBalancerClientConfigurationRegistrar implements ImportBeanDefinitionRegistrar {
    
    @Override
    public void registerBeanDefinitions(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {
        Map<String, Object> attrs = metadata.getAnnotationAttributes(LoadBalancerClients.class.getName(), true);
        // 1. 首先获取 LoadBalancerClients 注解的属性
        if (attrs != null && attrs.containsKey("value")) {
            // 2. value 属性为 LoadBalancerClient 注解,可以给 configuration 赋值,填写需要的配置类,比如  RandomLoadBalancerConfig.class
            AnnotationAttributes[] clients = (AnnotationAttributes[]) attrs.get("value");
            for (AnnotationAttributes client : clients) {
                // 3. 将配置类 注册到当前容器的 BeanDefinitionRegistry 上。
                registerClientConfiguration(registry, getClientName(client), client.get("configuration"));
            }
        }
        if (attrs != null && attrs.containsKey("defaultConfiguration")) {
            String name;
            if (metadata.hasEnclosingClass()) {
                name = "default."   metadata.getEnclosingClassName();
            }
            else {
                name = "default."   metadata.getClassName();
            }
            // 4 根据以上逻辑,如果 defaultConfiguration 有配置,也会将会当做默认配置,赋值给所有容器
            registerClientConfiguration(registry, name, attrs.get("defaultConfiguration"));
        }
        // 5 处理使用了 LoadBalancerClient 注解的类。
        Map<String, Object> client = metadata.getAnnotationAttributes(LoadBalancerClient.class.getName(), true);
        String name = getClientName(client);
        if (name != null) {
            // 6 configuration 属性的类 注册到容器中
            registerClientConfiguration(registry, name, client.get("configuration"));
        }
    }
    
	private static String getClientName(Map<String, Object> client) {
		if (client == null) {
			return null;
		}
		String value = (String) client.get("value");
		if (!StringUtils.hasText(value)) {
			value = (String) client.get("name");
		}
		if (StringUtils.hasText(value)) {
			return value;
		}
		throw new IllegalStateException("Either 'name' or 'value' must be provided in @LoadBalancerClient");
	}

	private static void registerClientConfiguration(BeanDefinitionRegistry registry, Object name,
			Object configuration) {
		BeanDefinitionBuilder builder = BeanDefinitionBuilder
				.genericBeanDefinition(LoadBalancerClientSpecification.class);
		builder.addConstructorArgValue(name);
		builder.addConstructorArgValue(configuration);
		registry.registerBeanDefinition(name   ".LoadBalancerClientSpecification", builder.getBeanDefinition());
	}
}
学新通

显而易见,LoadBalancerClients 有一个默认的配置属性:Class<?>[] configuration() default {}; 这个属性的类将会被冠以 default. 开头的命名。然后组合了 多个 LoadBalancerClient 注解,每个注解都需要指定 value, 这个 value 实际就是 serviceId, 同时也有一个属性 Class<?>[] configuration() default {};用于传入当前 serviceId 名字的下的配置。 无论是LoadBalancerClients 还是 LoadBalancerClient 的 configuration 都会被封装成为 LoadBalancerClientSpecification 类,注册到当前容器中,最终注入到 LoadBalancerAutoConfiguration 配置类的属性 configurations,用于 LoadBalancerClientFactory 类的实例化。当 LoadBalancerClientFactory 的子容器实例化的时候,就会去获取默认的或者当前命名的 LoadBalancerClientSpecification 类进行实例化,这样我们的配置类就进入到子容器中了。

总结

RestTemplate 负载均衡最终是委托给了 LoadBalancerClient 去实现,核心逻辑是依靠 serviceId 去获取 ServiceInstance。为了达到灵活配置的作用,引入了 NamedContextFactory,为每个 serviceId 分配了一个子容器,每个子容器都拥有 ReactorLoadBalancer 的实现。ReactorLoadBalancer 才是真正实现负载均衡策略的地方。我们可以通过 @LoadBalancerClients 引入我们的自定义 ReactorLoadBalancer。但是需要注意的是,根据 NamedContextFactory 的特性,自定义的 ReactorLoadBalancer 不能被父容器扫到,否则所有的子容器都会被子容器引入,在每个 serviceId 中都生效。

那由谁提供 ServiceInstance 列表给 ReactorLoadBalancer 呢?这个将在下一章展示。

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhfhkhch
系列文章
更多 icon
同类精品
更多 icon
继续加载