• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

在Springboot接收kafka消息

武飞扬头像
天河归来
帮助1

整体描述

之前写过一篇使用docker搭建kafka服务的文章,使用centos搭建kafka服务器Docker,本文主要简单将一下在springboot框架下,接收kafka服务器发过来的消息。

版本对应

由于使用springboot,管理版本时和springboot绑定的,我目前用的是springboot2.7,kafka的版本是2.1,这个版本也没啥影响,因为kafka服务器是向下兼容的,也就是说你的kafka服务器的版本是3.1,kafka客户端的版本使用3.1以下的,就都可以。

具体接入

1. pom引用

直接使用springboot框架带的kafka客户端,不指定版本号,引入的默认版本就是和springboot版本有关的。这块我看网上有指定版本号会报错的,因为springboot和kafka的版本是有对应关系的,如果引入的kafka版本和当前使用的springboot版本不兼容,就会报错。具体版本对应关系可以自己去网上搜一下。

        <!-- kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

2. kafka参数配置

配置kafka参数,这块有两种方式,一是直接在配置文件里写,还有就是在代码里写,两种方式都可以,我这里就直接在springboot的config里写了,添加KafkaConsumerConfig.java:

/**
 * Kafka消费者配置类
 *
 * @author thcb
 * @date 2023-05-24
 */
@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    public final static String BOOTSTRAP_SERVERS = "192.168.1.100:9092";
    public final static String GROUP_ID = "test_group";

    @Bean
    @Conditional(KafkaCondition.class)
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String>
                factory = new ConcurrentKafkaListenerContainerFactory<>();
        // 设置消费者工厂
        factory.setConsumerFactory(consumerFactory());
        // 消费者组中线程数量
        factory.setConcurrency(3);
        // 拉取超时时间
        factory.getContainerProperties().setPollTimeout(3000);

        // 当使用批量监听器时需要设置为true
        factory.setBatchListener(true);

        return factory;
    }

    @Bean
    @Conditional(KafkaCondition.class)
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    @Conditional(KafkaCondition.class)
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        // Kafka地址
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        //配置默认分组,这里没有配置 在监听的地方没有设置groupId,多个服务会出现收到相同消息情况
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        // 是否自动提交offset偏移量(默认true)
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        // 自动提交的频率(ms)
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        // Session超时设置
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        //请求超时时间
        propsMap.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "15000");
        // 键的反序列化方式
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // 值的反序列化方式
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // offset偏移量规则设置:
        // (1)、earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
        // (2)、latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
        // (3)、none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        return propsMap;
    }
}

学新通

注意BOOTSTRAP_SERVERS 需要根据自己的kafka服务器地址配置。这块由于使用的是config配置,所以在kafka服务无法访问时,springboot程序就会启动失败,这块根据实际情况处理吧,由于我现在的项目,kafka是第三方接口,用来接收第三方数据的,所以kafka服务器无法访问,不应该影响程序启动。所以这块我加了一个判断,如果kadka服务无法访问,就不进行kafka相关的初始化操作,也是使用springboot带的注解实现的。

3. 添加Conditional注解

这个注解就是提供一个判断,如果判断通过,就执行注解的内容,如果不通过,就不执行。
这块我们可以在注解里加一个判断kafka服务器的操作:

/**
 * kafka动态启动
 * kafka代理服务器正常时启动kafka服务
 * kafka代理服务器不可用时,不启动kafka服务
 *
 * @author thcb
 * @date 2023-05-24
 */
public class KafkaCondition implements Condition {
    public static final Logger log = LoggerFactory.getLogger(KafkaCondition.class);

    @Override
    public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
        URI uri = URI.create("http://"   KafkaConsumerConfig.BOOTSTRAP_SERVERS);
        String host = uri.getHost();
        int port1 = uri.getPort();
        boolean b = this.isHostConnectable(host, port1);
        log.info("matches:{}", b);
        return b;
    }

    /**
     * 判断kafka服务器,能否正常连接
     *
     * @param host
     * @param port
     * @return
     */
    public boolean isHostConnectable(String host, int port) {
        log.info("isHostConnectable:host:{},port:{}", host, port);
        Socket socket = new Socket();
        try {
            //判断kafka网络是否能联通,不能连通则返回false
            socket.connect(new InetSocketAddress(host, port), 2000);
        } catch (IOException e) {
            log.error("isHostConnectable:{}", ExceptionUtil.getExceptionMessage(e));
            return false;
        } finally {
            try {
                socket.close();
            } catch (IOException e) {
                log.error("isHostConnectable:{}", ExceptionUtil.getExceptionMessage(e));
            }
        }
        return true;
    }
}
学新通

然后就在使用kafka的地方都加上这个注解,其实在KafkaConsumerConfig的配置类里,就已经加上了。

4. 添加listener

创建一个监听,直接监听kafka消息就可以了。其中主题根据kafka服务端发的主题确定,GROUP_ID 就是配置文件里设置的。

/**
 * Kafka消费者Listener
 *
 * @author thcb
 * @date 2023-05-24
 */
@Component
@Conditional(KafkaCondition.class)
public class EimpKafkaConsumerListener {

    public static final Logger log = LoggerFactory.getLogger(EimpKafkaConsumerListener.class);
    public final static String TOPIC = "test.topic";
    public final static String GROUP_ID = "test_group";

    //监听kafka消费
    @KafkaListener(topics = TOPIC, groupId = GROUP_ID, containerFactory = "kafkaListenerContainerFactory")
    @Conditional(KafkaCondition.class)
    public void onMessage(String message) {
        log.info("EimpKafkaConsumerListener onMessage:{}", message);
    }
}
学新通

这样就可以在程序里接收kafka的消息了,在前面我写的那个文章里,有开启服务端Demo的方法,可以在服务端模拟发消息,在程序的log里就能收到了。

总结

服务器搭建起来之后,接收kafka消息就简单多了。本文主要将接收kafka消息的方式整理了一下,还加了对kafka服务器是否可用的判断。

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhghfjie
系列文章
更多 icon
同类精品
更多 icon
继续加载