• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Kafka 消费者组位移重设的几种方式

武飞扬头像
Pseudocode
帮助4

相关:Kafka 中的消费者组位移主题:Kafka 的消费者组是怎么保存消费位移的?

之前介绍过,Kafka 的消费者可以手动提交位移,并且可以提交并非当前位置的位移,这样可以实现跳过或者重新消费消息。这主要是因为 Kafka 是一个基于日志结构的消息系统,而并非「队列」结构。

简而言之,位移数据是消费者控制的。要注意的是,消费者只能控制位移,不能控制消息,消息对于消费者来说,永远都是只读的。

实际上,Kafka 为消费者提供了丰富的重设位移的方式,大致可以分为针对位置重设和根据时间重设。

根据位置重设

根据位置的位移重设有以下几种

Earliest

把位移重设到当前最早位移处。

这里要注意,因为 Kafka 会删除较早的日志,因此,最早的位置不一定是 0。如果你想重新消费主题中现有的所有消息,可以使用这个策略。

Latest

把位移重设到当前最新位移处。

如果你想跳过所有的历史消息,从最新的消息开始消费,那么就是用这个策略。

Current

把位移重设到当前最新提交的位移处。这个策略的使用场景不多。

Specified-Offset

把位移重设到一个指定的位移处。

有时候,消费者会从消息系统中拉取到无法消费的消息,比如消息的格式错误,或者消费过程中报错,再或者某些与业务相关的原因,导致消息不能消费。此时,可以使用这种策略跳过,消费它之后的消息。

Shift-By-N

把位移重设到一个与当前位置相对的位置上(当前位置 N)。

Specified-Offset 可以直接指定要重设的位移位置,而 Shift-By-N 可以指定相对于当前位置的位移。比如 N 是 5 的时候,相当于跳过 5 个消息,这里的 N 也可以是负数,这样就会向回跳。

Specified-Offset 和 Shift-By-N 可以理解为绝对位置相对位置

根据时间重设

根据时间的位移重设有两种

DateTime

把位移重设为指定时间之后第一个位置。

Duration

把位移重设为与当前时间相对的一个时间点之后的第一个位置。

DateTime 和 Duration 也可以理解为绝对时间相对时间

如何操作

了解了这些策略,下面介绍一下具体怎么操作。

比如,要将消费者组的位移重设到当前最早的位置,可以使用一下命令:

bin/kafka-consumer-groups.sh --bootstrap-server <host>:<port> --group <group_id> --reset-offsets --all-topics --to-earliest –execute

Latest 和 Current 策略与之类似。

Specified-Offset 和 Shift-By-N 则需要在命令中提供具体的值,格式分别是 --to-offset <offset>--shift-by <offset_N>

对于 DateTime 策略的位移重设,需要提供一个具体的时间:

bin/kafka-consumer-groups.sh --bootstrap-server <host>:<port> --group <group_id> --reset-offsets --all-topics --to-datetime 2021-11-25T20:00:00.000 –execute

对于 Duration 策略,需要提供一个符合 ISO-8601 规范的 Duration 格式,以字母 P 开头,后面由 4 部分组成,即 D、H、M 和 S,分别表示天、小时、分钟和秒。

bin/kafka-consumer-groups.sh --bootstrap-server <host>:<port> --group <group_id> --reset-offsets --all-topics --by-duration PT0H30M0S –execute

这里的 PT0H30M0S 代表 30 分钟。

以上这些命令在执行之后,命令行都会提示新的位移信息。

如果要在消费者程序中重设位移,Kafka 也提供了相应的消费者 API,一下是 Java API:

void seek(TopicPartition partition, long offset);
void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata);
void seekToBeginning(Collection<TopicPartition> partitions);
void seekToEnd(Collection<TopicPartition> partitions);

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhffcgkc
系列文章
更多 icon
同类精品
更多 icon
继续加载