kafka--consumer拉取消息的方式
consumer拉取消息的方式,主要Kafka的索引文件和log文件有关系,.index是通过offset来拉取,.timeindexl通过时间戳的方式来拉取,那么,从这个角度出发,consumer拉取消息的方式主要为按offset拉取和按时间戳拉取,默认是从broker服务器中记录的offset拉取,一般代码中不用指出,broker服务器的offset文件会为大家记录。
producer
package com.tsj.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class kafkaProducer01 {
public static final String brokerList = "127.0.0.1:9092";
public static final String topic = "test_topic";
private static Properties initProperties() {
Properties properties = new Properties();
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("bootstrap.servers", brokerList);
return properties;
}
public static void main(String[] args) {
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(initProperties());
for (int i = 0; i < 100; i ) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "key " i, "hello world");
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
}
producer.close();
}
}
consumer
1.按offset方式拉取
(1)默认的offset拉取
新的group从offset=0开始拉取,之后,broker记录offset消费位置,每次拉取偏移量为broker中最新消费进度offset值。
package com.tsj.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;
public class kafkaConsumer01 {
public static final String brokerList = "127.0.0.1:9092";
public static final String topic = "test_topic";
public static final String groupId = "demo05";
private static Properties initProperties() {
Properties properties = new Properties();
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("bootstrap.servers", brokerList);
//调用返回的记录数
properties.put("max.poll.records", Integer.MAX_VALUE);
//可以取值为latest(从最新的消息开始消费)或者earliest(从最老的消息开始消费)
properties.put("auto.offset.reset", "earliest");
properties.put(ProducerConfig.ACKS_CONFIG, "-1");
properties.put("group.id", groupId);
return properties;
}
public static void main(String[] args) {
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(initProperties());
consumer.subscribe(Collections.singletonList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println("partition:" record.partition() " offset:" record.offset()
" key" record.key() " value" record.value() " time:" record.timestamp());
}
}
}
}
(2)指定offset拉取
package com.tsj.kafka;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;
public class kafkaConsumer01 {
public static final String brokerList = "127.0.0.1:9092";
public static final String topic = "test_topic";
public static final String groupId = "demo06";
private static Properties initProperties() {
Properties properties = new Properties();
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("bootstrap.servers", brokerList);
//调用返回的记录数
properties.put("max.poll.records", Integer.MAX_VALUE);
//可以取值为latest(从最新的消息开始消费)或者earliest(从最老的消息开始消费)
properties.put("auto.offset.reset", "earliest");
properties.put(ProducerConfig.ACKS_CONFIG, "-1");
properties.put("group.id", groupId);
return properties;
}
public static void main(String[] args) {
//2.根据offset拉取数据 指定从offset=400开始拉取消费
KafkaConsumer<String, String> consumer1 = new KafkaConsumer<>(initProperties());
Map<TopicPartition, OffsetAndMetadata> hashMaps = new HashMap<TopicPartition, OffsetAndMetadata>();
hashMaps.put(new TopicPartition(topic, 0), new OffsetAndMetadata(400L));
consumer1.commitSync(hashMaps);
consumer1.subscribe(Arrays.asList(topic));
ConsumerRecords<String, String> records = consumer1.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.println("partition:" record.partition() " offset:" record.offset() " key" record.key() " value" record.value() " time:" record.timestamp());
}
}
}
2.按时间戳方式拉取
先从时间戳查询到offset,然后通过seek定位到offset位置,在开始拉取消费。
package com.tsj.kafka;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;
public class kafkaConsumer01 {
public static final String brokerList = "127.0.0.1:9092";
public static final String topic = "test_topic";
public static final String groupId = "demo07";
private static Properties initProperties() {
Properties properties = new Properties();
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("bootstrap.servers", brokerList);
//调用返回的记录数
properties.put("max.poll.records", Integer.MAX_VALUE);
//可以取值为latest(从最新的消息开始消费)或者earliest(从最老的消息开始消费)
properties.put("auto.offset.reset", "earliest");
properties.put(ProducerConfig.ACKS_CONFIG, "-1");
properties.put("group.id", groupId);
return properties;
}
public static void main(String[] args) {
//3.根据时间拉取数据 指定从时间戳=1661254128065开始拉取消费
Long time = 1661254128065L;
KafkaConsumer<String, String> consumer2 = new KafkaConsumer<>(initProperties());
//获取分区
Map<TopicPartition, Long> tts = new HashMap<>();
List<PartitionInfo> partitionInfos = consumer2.partitionsFor(topic);
List<TopicPartition> topicPartitions = new ArrayList<>(partitionInfos.size());
for (PartitionInfo partitionInfo : partitionInfos) {
topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
tts.put(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()), time);
}
consumer2.assign(topicPartitions);
Map<TopicPartition, OffsetAndTimestamp> offtime_map = consumer2.offsetsForTimes(tts);
for (TopicPartition partition : topicPartitions) {
//通过取回的offset数据,通过consumer的seek方法,修正自己的消费偏移
OffsetAndTimestamp offsetAndTimestamp = offtime_map.get(partition);
long offset = offsetAndTimestamp.offset();
System.out.println("partition:" partition " time:" offsetAndTimestamp.timestamp() " offset:" offset);
// 设置读取消息的偏移量
consumer2.seek(partition, offset);
}
while (true) {
ConsumerRecords<String, String> records = consumer2.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.println("partition:" record.partition() " offset:" record.offset() " key" record.key() " value" record.value() " time:" record.timestamp());
}
}
}
}
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhegafhh
系列文章
更多
同类精品
更多
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
怎样阻止微信小程序自动打开
PHP中文网 06-13 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01