• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

kafka--consumer拉取消息的方式

武飞扬头像
童小绿
帮助3

consumer拉取消息的方式,主要Kafka的索引文件和log文件有关系,.index是通过offset来拉取,.timeindexl通过时间戳的方式来拉取,那么,从这个角度出发,consumer拉取消息的方式主要为按offset拉取和按时间戳拉取,默认是从broker服务器中记录的offset拉取,一般代码中不用指出,broker服务器的offset文件会为大家记录。

producer

package com.tsj.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class kafkaProducer01 {
    public static final String brokerList = "127.0.0.1:9092";
    public static final String topic = "test_topic";

    private static Properties initProperties() {

        Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("bootstrap.servers", brokerList);
        return properties;
    }

    public static void main(String[] args) {
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(initProperties());

        for (int i = 0; i < 100; i  ) {
            ProducerRecord<String, String> record = new ProducerRecord<>(topic, "key "   i, "hello world");
            try {
                producer.send(record);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        producer.close();
    }
}

consumer

1.按offset方式拉取

(1)默认的offset拉取

新的group从offset=0开始拉取,之后,broker记录offset消费位置,每次拉取偏移量为broker中最新消费进度offset值。

package com.tsj.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.*;

public class kafkaConsumer01 {

    public static final String brokerList = "127.0.0.1:9092";
    public static final String topic = "test_topic";
    public static final String groupId = "demo05";

    private static Properties initProperties() {
        Properties properties = new Properties();
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("bootstrap.servers", brokerList);
        //调用返回的记录数
        properties.put("max.poll.records", Integer.MAX_VALUE);
        //可以取值为latest(从最新的消息开始消费)或者earliest(从最老的消息开始消费)
        properties.put("auto.offset.reset", "earliest");
        properties.put(ProducerConfig.ACKS_CONFIG, "-1");
        properties.put("group.id", groupId);
        return properties;
    }

    public static void main(String[] args) {
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(initProperties());
        consumer.subscribe(Collections.singletonList(topic));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("partition:" record.partition() " offset:" record.offset()
                         " key" record.key() " value"  record.value()  " time:" record.timestamp());
            }
        }
    }
}

学新通

(2)指定offset拉取

package com.tsj.kafka;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.*;

public class kafkaConsumer01 {

    public static final String brokerList = "127.0.0.1:9092";
    public static final String topic = "test_topic";
    public static final String groupId = "demo06";

    private static Properties initProperties() {
        Properties properties = new Properties();
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("bootstrap.servers", brokerList);
        //调用返回的记录数
        properties.put("max.poll.records", Integer.MAX_VALUE);
        //可以取值为latest(从最新的消息开始消费)或者earliest(从最老的消息开始消费)
        properties.put("auto.offset.reset", "earliest");
        properties.put(ProducerConfig.ACKS_CONFIG, "-1");
        properties.put("group.id", groupId);
        return properties;
    }

    public static void main(String[] args) {
        //2.根据offset拉取数据 指定从offset=400开始拉取消费
        KafkaConsumer<String, String> consumer1 = new KafkaConsumer<>(initProperties());
        Map<TopicPartition, OffsetAndMetadata> hashMaps = new HashMap<TopicPartition, OffsetAndMetadata>();
        hashMaps.put(new TopicPartition(topic, 0), new OffsetAndMetadata(400L));
        consumer1.commitSync(hashMaps);
        consumer1.subscribe(Arrays.asList(topic));
        ConsumerRecords<String, String> records = consumer1.poll(1000);
        for (ConsumerRecord<String, String> record : records) {
            System.out.println("partition:" record.partition() " offset:" record.offset()   " key" record.key() " value"  record.value()  " time:" record.timestamp());
        }
    }
}

学新通

2.按时间戳方式拉取

先从时间戳查询到offset,然后通过seek定位到offset位置,在开始拉取消费。

package com.tsj.kafka;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.*;

public class kafkaConsumer01 {

    public static final String brokerList = "127.0.0.1:9092";
    public static final String topic = "test_topic";
    public static final String groupId = "demo07";

    private static Properties initProperties() {
        Properties properties = new Properties();
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("bootstrap.servers", brokerList);
        //调用返回的记录数
        properties.put("max.poll.records", Integer.MAX_VALUE);
        //可以取值为latest(从最新的消息开始消费)或者earliest(从最老的消息开始消费)
        properties.put("auto.offset.reset", "earliest");
        properties.put(ProducerConfig.ACKS_CONFIG, "-1");
        properties.put("group.id", groupId);
        return properties;
    }

    public static void main(String[] args) {
        //3.根据时间拉取数据 指定从时间戳=1661254128065开始拉取消费
        Long time = 1661254128065L;
        KafkaConsumer<String, String> consumer2 = new KafkaConsumer<>(initProperties());

        //获取分区
        Map<TopicPartition, Long> tts = new HashMap<>();
        List<PartitionInfo> partitionInfos = consumer2.partitionsFor(topic);
        List<TopicPartition> topicPartitions = new ArrayList<>(partitionInfos.size());

        for (PartitionInfo partitionInfo : partitionInfos) {
            topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
            tts.put(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()), time);
        }

        consumer2.assign(topicPartitions);
        Map<TopicPartition, OffsetAndTimestamp> offtime_map = consumer2.offsetsForTimes(tts);

        for (TopicPartition partition : topicPartitions) {
            //通过取回的offset数据,通过consumer的seek方法,修正自己的消费偏移
            OffsetAndTimestamp offsetAndTimestamp = offtime_map.get(partition);
            long offset = offsetAndTimestamp.offset();
            System.out.println("partition:"   partition   " time:"   offsetAndTimestamp.timestamp()   " offset:"   offset);
            // 设置读取消息的偏移量
            consumer2.seek(partition, offset);
        }

        while (true) {
            ConsumerRecords<String, String> records = consumer2.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("partition:"   record.partition()   " offset:"   record.offset()   " key"   record.key()   " value"   record.value()   " time:"   record.timestamp());
            }
        }
    }
}

学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhegafhh
系列文章
更多 icon
同类精品
更多 icon
继续加载