• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

kafka-consumer-消费者代码

武飞扬头像
SeaDhdhdhdhdh
帮助1

目录

1 消费一个主题

2 消费一个分区

3 消费者组案例


1 消费一个主题

消费topic为first的消息。

  1.  
    public class ConsumerTest{
  2.  
    public void main(string[] args){
  3.  
    // 0 配置
  4.  
    Properties properties = new Properties();
  5.  
    //连接bootstrap . servers
  6.  
    properties.put(ConsumerConfig.BO0TSTRAP_SERVERS_CONFIG , " hadoop102:9092, hadoop103:9092");
  7.  
    //反序列化
  8.  
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
  9.  
    //配置消费者组id,必须配置,没有也要配置,不然会抛出异常
  10.  
    properties.put(ConsumerConfig.GROUP_ID_CONFIG , "test" );
  11.  
     
  12.  
    // 1 创建一个消费者" ", "hello"
  13.  
    KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(properties);
  14.  
     
  15.  
    // 2 订阅主题first,可以订阅多个topic。
  16.  
    ArrayList<String> topics = new ArrayList<>();topics.add( "first" );
  17.  
    kafkaConsumer.subscribe(topics);
  18.  
     
  19.  
    // 3 消费数据
  20.  
    while (true){
  21.  
    //每一秒拉取一次数据。
  22.  
    ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
  23.  
    for (ConsumerRecord<String,String> consumerRecord : consumerRecords) {
  24.  
    System.out.println(consumerRecord);
  25.  
    }
  26.  
    }
  27.  
    }
  28.  
    }
  29.  
     
学新通

2 消费一个分区

应用场景:当生产者将所有消息发往特定的某个主题分区。

消费first主题0号分区代码:

  1.  
    public class customConsumerPartition {
  2.  
    public static void main(String[ ] args) {
  3.  
    // 0 配置
  4.  
    Properties properties = new Properties();
  5.  
    //连接
  6.  
    properties.put(ConsumerConfig.B00TSTRAP_SERVERS_CONFIG , " hadoop102:9092 , hadoop103:9092");
  7.  
    //反序列化
  8.  
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_ CONFI6,StringDeserializer.class.getName());
  9.  
    //组id
  10.  
    properties.put(ConsumerConfig.GROUP_ID_CONFIG , "test");l
  11.  
    // 1 创建一个消费者
  12.  
    KafkaConsumer<String,String> kafkaConsumer = new KafkaConsume
  13.  
    r<>(properties);
  14.  
    // 2 订阅主题对应的分区
  15.  
    ArrayList<TopicPartition> topicPartitions = new ArrayList<>();topicPartitions.add(new TopicPartition( topic: "first", partition: 0))kafkaconsumer.assign(topicPartitions);
  16.  
     
  17.  
    // 3 消费数据
  18.  
    while (true){
  19.  
    ConsumerRecords<String,String> consumerReconds = kafkaConsumer.poll(Duration.ofSeconds(1));
  20.  
    for (ConsumerRecord<String,String> consumerRecord : consumerRecords){
  21.  
    system.out.println(consumerRecord);
  22.  
    }
  23.  
    }
  24.  
    }
  25.  
    }
学新通

3 消费者组案例

测试同一个主题的分区数据,只能由一个消费者组中的一个消费者进行消费。

学新通

创建三个消费者对某一分区进行消费 。

将消费主题中的代码复制三份,由于group id是一样的,所以这三个消费者为同一消费者组。

  1. 生产者发送消息(方便阅读)
    学新通
    此时消息分布在0,1,2三个分区中。
  2. 消费结果发现,三个消费者每个消费者消费一个分区的数据。
    学新通
    学新通
    学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgcgifj
系列文章
更多 icon
同类精品
更多 icon
继续加载