kafka-consumer-消费者代码
目录
1 消费一个主题
消费topic为first的消息。
-
public class ConsumerTest{
-
public void main(string[] args){
-
// 0 配置
-
Properties properties = new Properties();
-
//连接bootstrap . servers
-
properties.put(ConsumerConfig.BO0TSTRAP_SERVERS_CONFIG , " hadoop102:9092, hadoop103:9092");
-
//反序列化
-
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
-
//配置消费者组id,必须配置,没有也要配置,不然会抛出异常
-
properties.put(ConsumerConfig.GROUP_ID_CONFIG , "test" );
-
-
// 1 创建一个消费者" ", "hello"
-
KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(properties);
-
-
// 2 订阅主题first,可以订阅多个topic。
-
ArrayList<String> topics = new ArrayList<>();topics.add( "first" );
-
kafkaConsumer.subscribe(topics);
-
-
// 3 消费数据
-
while (true){
-
//每一秒拉取一次数据。
-
ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
-
for (ConsumerRecord<String,String> consumerRecord : consumerRecords) {
-
System.out.println(consumerRecord);
-
}
-
}
-
}
-
}
-
2 消费一个分区
应用场景:当生产者将所有消息发往特定的某个主题分区。
消费first主题0号分区代码:
-
public class customConsumerPartition {
-
public static void main(String[ ] args) {
-
// 0 配置
-
Properties properties = new Properties();
-
//连接
-
properties.put(ConsumerConfig.B00TSTRAP_SERVERS_CONFIG , " hadoop102:9092 , hadoop103:9092");
-
//反序列化
-
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_ CONFI6,StringDeserializer.class.getName());
-
//组id
-
properties.put(ConsumerConfig.GROUP_ID_CONFIG , "test");l
-
// 1 创建一个消费者
-
KafkaConsumer<String,String> kafkaConsumer = new KafkaConsume
-
r<>(properties);
-
// 2 订阅主题对应的分区
-
ArrayList<TopicPartition> topicPartitions = new ArrayList<>();topicPartitions.add(new TopicPartition( topic: "first", partition: 0))kafkaconsumer.assign(topicPartitions);
-
-
// 3 消费数据
-
while (true){
-
ConsumerRecords<String,String> consumerReconds = kafkaConsumer.poll(Duration.ofSeconds(1));
-
for (ConsumerRecord<String,String> consumerRecord : consumerRecords){
-
system.out.println(consumerRecord);
-
}
-
}
-
}
-
}
3 消费者组案例
测试同一个主题的分区数据,只能由一个消费者组中的一个消费者进行消费。
创建三个消费者对某一分区进行消费 。
将消费主题中的代码复制三份,由于group id是一样的,所以这三个消费者为同一消费者组。
- 生产者发送消息(方便阅读)
此时消息分布在0,1,2三个分区中。 - 消费结果发现,三个消费者每个消费者消费一个分区的数据。
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhgcgifj
系列文章
更多
同类精品
更多
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
怎样阻止微信小程序自动打开
PHP中文网 06-13