• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Kafka消费者不停机参数调整思路实现

武飞扬头像
十年培训经验的菜包
帮助1

背景

最近团队的小伙伴反馈了几个关于kafka消费者的问题

  1. 项目监听了几十个topic,启动时都需要进行初始化,很多时候本地调试时根本不需要启动消费者,但却被迫进行初始化,希望能优化这个问题。
  2. 由于我司在每个环境上,都区分了不同集群(如01、02、03等),来满足并行开发不同需求使用。但目前kafka并未根据不同集群做区分,导致partition消息无法被指定的某个集群消费,例如原本希望被集群01消费的消息,却被集群03消费了,所以希望能服务运行时动态控制不同集群消费者是否开启;
  3. kafka的topic调整partition数量后,消费者服务在不增加实例的前提下,想调整消费者服务的并发数只能手动修改代码后重新部署,非常麻烦,所以希望能在服务运行时动态调整消费者的并发数量,提高消费能力;

分析

我司项目都是基于SpringBoot,消费者使用@KafkaListener注解实现监听。

问题1

由于@KafkaListenr注解被KafkaListenerAnnotationBeanPostProcessor扫描,在postProcessAfterInitialization方法被解析后构造Consumer容器,且autoStartup属性默认为true,所以会自动启动。

要禁止自动启动Consumer容器比较简单,只要将Consumer容器设置成不自动启动即可,有以下解决方案:

  • 在@KafkaListener注解指定autoStartup属性为false,该方法只针对使用注解的方法;
  • 在@KafkaListener注解指定containerFactory属性,该属性值指向AbstractKafkaListenerContainerFactory实现类的实例,实例可以设置autoStartup属性为false;

在本地调试环境时可以将autoStartup属性设置为默认false不启动Consumer容器,但是在其他环境,还是需要启动Consumer容器的,所以就有以下两种方案

  • 利用不同环境的配置文件,根据配置值来决定是否启动Consumer容器(@KafkaListener的autoStartup支持SpEL);
  • 指定AbstractKafkaListenerContainerFactory的autoStartup为false,集成Apollo配置中心,并且在Spring容器完全启动成功后,再根据配置中心文件判断是否需要对Consumer容器进行start;

问题2

kafka通过Rebalance协议来规定一个Consumer Group下的所有Consumer如何达成一致,来分配订阅Topic的每个partition。

触发Rebalance的条件有3个:

  • Group组员发生变化,例如有Consumer加入或离开Consumer Group;
  • 订阅的Topic数量发生变化;
  • 订阅的Topic的partition数量发生变化;

综上可以得知,我们只需让指定集群的Consumer在启动的时候,不加入Consumer Group(不启动Consumer),或者在已经加入Consumer Group前提下离开Consumer Group即可,从而触发Rebalance机制,最终达到只让指定集群的Consumer消费的目的。

至于想要在服务运行时进行动态调整,只需接入Apollo配置中心,不同集群使用不同的配置文件即可。

问题3

@KafkaListener注解上的concurrency属性,用来指定Consumer的并发数量。

目前项目想调整Consumer的并发数量,只能通过手动修改的代码后重新发布,其目的本质上来讲,是修改

AbstractKafkaListenerContainerFactory实例上的concurrency属性,然后重启AbstractKafkaListenerContainerFactory实例(其实就是调用stop()方法后再调用start()方法),最终在启动的时候,消费者服务会根据concurrency启动相应数量的消费者。

至于想要在服务运行时进行动态调整,只需配合Apoll配置中心实现即可。

实现

在问题1的两种方案中,由于方案2可以更灵活控制,所以此处直接使用方案2

@Bean(KAFKA_CONSUMER_FACTORY)
public KafkaListenerContainerFactory<?> kafkaConsumerContainerFactory() {
    Map<String, Object> properties = new HashMap<>();
    //...其他配置
    DefaultKafkaConsumerFactory<Object, Object> consumerFactory = new DefaultKafkaConsumerFactory<>(properties);
    //支持并发的消费者容器工厂
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    //设置Consumer不自动启动
    factory.setAutoStartup(false);
    return factory;
}

//@KafkaListener的autoStartup属性,优先级高于containerFactory的配置,二选一即可
@KafkaListener(id = "listenerId-1", topics = "topic-1", 
               containerFactory = KAFKA_CONSUMER_FACTORY,
              autoStartup = false)
public void listener1(String message) {
    log.info("listener1 message:{}",message);
}

实现了Consumer容器禁止自动启动,那在哪个时间点让Consumer容器进行start呢,此处选择ApplicationRunner接口回调时间点。

@Slf4j
public class KafkaConsumerRefresh implements ApplicationContextAware, ApplicationRunner {

    //定义apollo配置的key前缀
    //最终apollo上的配置key为 kafka.refresh.consumer.${listenerId}
    public static final String KAFKA_REFRESH_CONSUMER_PREFIXES = "kafka.refresh.consumer.";
    
    private ApplicationContext applicationContext;
  
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        //为了在Spring容器启动后,还能获取到Consumer容器,需要注入Spring容器。
        //此处选择实现ApplicationContextAware接口
        this.applicationContext = applicationContext;
    }

    //ApplicationRunner回调方法
    @Override
    public void run(ApplicationArguments args) throws Exception {
        //使用@KafkaListener注解构成的Consumer,最终会被注册到KafkaListenerEndpointRegistry实例对象中,
        // 详见org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.postProcessAfterInitialization
        KafkaListenerEndpointRegistry endpointRegistry = this.applicationContext.getBean(KafkaListenerEndpointRegistry.class);
        //获取所有的Consumer容器
        for (MessageListenerContainer listenerContainer : endpointRegistry.getListenerContainers()) {
            //获取指定listenerId对应配置值,表示是否需要启动Consumer容器,默认不启动
            Boolean needStart = ConfigService.getAppConfig().getBooleanProperty(KAFKA_REFRESH_CONSUMER_PREFIXES   listenerContainer.getListenerId(), false);
            if (needStart){
                //启动容器中的Consumer
                listenerContainer.start();
            }
        }
    }
}

为了实现服务运行时可以感知Apollo配置文件变更,从而动态调整程序,需要实现监听Apollo,代码如下

@Slf4j
public class KafkaConsumerRefresh implements ApplicationContextAware, ApplicationRunner, ConfigChangeListener {

    //定义apollo配置的key前缀
    //最终apollo上的配置key为 kafka.refresh.consumer.${listenerId}
    public static final String KAFKA_REFRESH_CONSUMER_PREFIXES = "kafka.refresh.consumer.";
    
    private ApplicationContext applicationContext;

    @PostConstruct
    public void init(){
        //注入Apollo监听指定前缀的配置
        ConfigService.getAppConfig().addChangeListener(this,null, Sets.newHashSet(KAFKA_REFRESH_CONSUMER_PREFIXES));
    }
  
    //apollo配置变更回调
    @Override
    public void onChange(ConfigChangeEvent configChangeEvent) {
        Set<String> changedKeys = configChangeEvent.changedKeys();
        for (String changedKey : changedKeys) {
            String key = ConfigService.getAppConfig().getProperty(changedKey, "");
            String listenerId = getListenerIdByPropertiesName(key);
            log.info("变更配置 key:{},listenerId:{}",key,listenerId);
        }
    }
  
    ///...其他方法略
}

由于需要满足的对Consumer容器启用、停用、调整并发,所以需要Apollo上配置对应的值,我们使用json来定义。

@Data
public class KafkaRefreshProperties {
    //listsnerId
    private String listenerId;
    //consumer容器是否启用
    private boolean start;
    //consumer是否暂停
    private boolean pause;
    //consumer容器并发数量
    private Integer concurrency;
    //是否在应用启动完成后再启动consumer
    private boolean afterAppRunnerStart;
}

关于Consumer容器与Consumer的区别,可以简单看成一对多的关系,即容器里面维护多个Consumer,从而形成Consumer并发。当Consumer容器进行stop操作时,所维护的多个Consumer都会退出Consumer Group,从而触发Rebalance。Consumer也可以进行pause和resume操作,pause时,Consumer将暂停消费partition消息,但是不会退出Consumer Group,所以不会触发Rebalance,此时可以用resume来恢复Consumer消费。

有了动态配置的数据结构,下面就可以实现对Consumer容器的动态调整了


@Slf4j
public class KafkaConsumerRefresh implements ConfigChangeListener, ApplicationContextAware, ApplicationRunner, ApplicationListener<KafkaEvent> {

    //定义apollo配置的key前缀
    //最终apollo上的配置key为 kafka.refresh.consumer.${listenerId}
    public static final String KAFKA_REFRESH_CONSUMER_PREFIXES = "kafka.refresh.consumer.";

    private ApplicationContext applicationContext;

    @PostConstruct
    public void init(){
        //Apollo注入指定前缀监听
        ConfigService.getAppConfig().addChangeListener(this,null, Sets.newHashSet(KAFKA_REFRESH_CONSUMER_PREFIXES));
    }
  

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        //为了在Spring容器启动后,还能获取到Consumer容器,需要注入Spring上下文容器。
        //此处选择实现ApplicationContextAware接口
        this.applicationContext = applicationContext;
    }

    @Override
    public void onChange(ConfigChangeEvent configChangeEvent) {
        Set<String> changedKeys = configChangeEvent.changedKeys();
        log.info("changedKeys:{}",changedKeys);
        //删除操作忽略,新增、修改操作需要变动
            changedKeys.stream().filter(key-> configChangeEvent.getChange(key).getChangeType() != PropertyChangeType.DELETED).map(this::getRefreshPropertiesByPropertiesName).forEach(this::doChange);
    }
    //配置文件变更,执行操作
    private void doChange(KafkaRefreshProperties refreshProperties){
        if (!refreshProperties.isStart()){
            this.stop(refreshProperties);
        }else {
            this.start(refreshProperties);
        }
    }
  
    //停止Consumer容器
    public void stop(KafkaRefreshProperties refreshProperties){
        List<MessageListenerContainer> listenerContainers = load(refreshProperties.getListenerId());
        if (listenerContainers == null || listenerContainers.size() == 0){
            return;
        }
        for (MessageListenerContainer container : listenerContainers) {
            container.stop();
        }
    }
  
    //启动Consumer容器
    public void start(KafkaRefreshProperties refreshProperties){
        List<MessageListenerContainer> listenerContainers = load(refreshProperties.getListenerId());
        if (listenerContainers == null || listenerContainers.size() == 0){
            return;
        }
        for (MessageListenerContainer container : listenerContainers) {
            //调整并发
            if (container instanceof ConcurrentMessageListenerContainer<?, ?> && refreshProperties.getConcurrency() != null) {
                ConcurrentMessageListenerContainer concurrentListenerContainer = (ConcurrentMessageListenerContainer) container;
                if (concurrentListenerContainer.getConcurrency() != refreshProperties.getConcurrency()) {
                  //调整并发
                    concurrentListenerContainer.setConcurrency(refreshProperties.getConcurrency());
                    // 如果当前状态为运行中,需要先stop再start
                    concurrentListenerContainer.stop();
                }
            }
            //恢复
            if (container.isPauseRequested() && !refreshProperties.isPause()){
                container.resume();
            }
            //暂停
            if (!container.isPauseRequested() && refreshProperties.isPause()){
                container.pause();
            }
            //执行启动操作
            container.start();
        }
    }

    //propertiesName解析成listenerId
    public String getListenerIdByPropertiesName(String propertiesName){
        return propertiesName.substring(KAFKA_REFRESH_CONSUMER_PREFIXES.length());
    }
  
    //propertiesName获取值,并解析成KafkaRefreshProperties配置对象
    public KafkaRefreshProperties getRefreshPropertiesByPropertiesName(String propertiesName){
        String listenerId = this.getListenerIdByPropertiesName(propertiesName);
        KafkaRefreshProperties property = ConfigService.getAppConfig().getProperty(propertiesName, value -> JSON.parseObject(value, KafkaRefreshProperties.class), null);
        if (property != null){
            property.setListenerId(listenerId);
        }
        return property;
    }
  
    //根据listenerId获取Consumer容器
    public List<MessageListenerContainer> load(String listenerId){
      //使用@KafkaListener注解构成的Consumer,最终会被注册到KafkaListenerEndpointRegistry实例对象中,
        // 详见org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.postProcessAfterInitialization
        KafkaListenerEndpointRegistry endpointRegistry = this.applicationContext.getBean(KafkaListenerEndpointRegistry.class);
        return endpointRegistry.getListenerContainers().stream().filter(f-> Objects.equals(f.getListenerId(), listenerId)).collect(Collectors.toList());
    }

    //获取存在的动态配置
    private Set<KafkaRefreshProperties> getExistsProperties() {
        return ConfigService.getAppConfig().getPropertyNames().stream().filter(f -> f.startsWith(KAFKA_REFRESH_CONSUMER_PREFIXES))
                .map(this::getRefreshPropertiesByPropertiesName).filter(Objects::nonNull).collect(Collectors.toSet());
    }

    //ApplicationRunner回调方法
    @Override
    public void run(ApplicationArguments args) throws Exception {
        //筛选需要在应用容器启动后自动启动的Consumer容器,并执行启动操作
        getExistsProperties().stream().filter(KafkaRefreshProperties::isAfterAppRunnerStart)
                .filter(KafkaRefreshProperties::isStart)
                .forEach(this::start);
    }
  
    //kafka事件变更监听
    @Override
    public void onApplicationEvent(KafkaEvent event) {
        log.info("KafkaEvent:{}",event);
    }
}

来到这里,核心代码基本就完成了。

除此之外,为了方便观察,还可以实现了ApplicationListener接口,并监听KafkaEvent,可以观察到参数调整对Kafka消费者的影响。

接下来就只需要将KafkaConsumerRefresh实例注入到Spring容器

@Slf4j
@Configuration
public class KafkaConfig {
  
    @Bean
    public KafkaConsumerRefresh kafkaConsumerRefresh(){
        return new KafkaConsumerRefresh();
    }
}

然后在Apollo配置中心配置以下内容

kafka.refresh.consumer.listenerId-1 = {\n    "start": true,\n    "pause": false,\n    "concurrency": 2,\n    "afterAppRunnerStart":true\n}

该配置表示listenerId-1的Consumer容器,需要启动,并且在应用容器启动后再启动,并且Consumer并发数为2。

后续如果需要调整,只需要修改json中对应字段的值,再发版,即可动态调整kafka消费者相关参数。

总结

本按理只是简单对Consumer常见的参数今天调整,按理来说,支持对Consumer更多参数进行拓展调整,期待各位大佬的实现;

同时也可以举一反三,除了Kafka可以使用这种操作不停服调整消费者参数,那么RabbitMQ,或者其他类似中间件的消费者,在基于Spring或SpringBoot的基础上,是否也可以这样操作呢?

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgbachh
系列文章
更多 icon
同类精品
更多 icon
继续加载