• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Kafka-4: Kafka重难点问题

武飞扬头像
AHA_WT
帮助1

前言

本文适合有一定 Kafka 基础的小伙伴阅读,如果你对 Kafka 的基础概念还不是很清楚,请移步至 Kafka-1: 安装与关键概念介绍;如果你还对 Spring Boot 操作 Kafka 有一些疑问,请移步至 Kafka-2:Spring for Kafka - 1 Kafka-3: Spring for Kafka - 2

kafka 的速度为什么这么快

  1. 零拷贝
  2. 页缓存
  3. 顺序读写

kafka 的消息是保存以 segment 的形式保存到磁盘上的,但是 kafka 的其中一个特点就是高吞吐,性能好,那么 kafka 是怎么样做到呢?

写入数据

kafka 会把消息写到磁盘中,这样它就不会丢失消息,为了优化写入速度 kafka 主要做了两点优化,顺序写入和 内存映射文件 MMAP: Memory Mapped Files

  1. 顺序写入:磁盘慢的主要原因便是需要先寻址然后在写入,寻址是一个机械动作是最耗时的,如果使用顺序写入磁盘在某些场景下是可以媲美内存的。所以 Kafka 采用了顺序写入的方式,每次写数据的时候就是在 segment 后面添加数据,这样有一个缺点,就是没有办法删除数据,所以 kafka 是不会删除数据的,只会将一个 segment 给删除。
  2. 即便使用顺序写入磁盘速度还是追不上内存的,所以 kafka 的数据并不是实时写入磁盘的,它是充分利用 MMAP 来提高 IO 的效率。内存映射文简称 MMAP: Memory Mapped Files,它的操作原理是利用操作系统的 page cache 分页缓存的技术,实现文件到物理内存的直接映射,这样你对物理内存的操作就会被同步到硬盘上,在操作系统适当的时候。通过这种方式就会省去用户空间到内核空间复制的开销 ps: 调用文件的read会把数据先放到内核空间的内存中,然后再复制到用户空间的内存中 。但是这样就会有一个很大的问题就是不可靠,写到 mmap 中的数据没有被真正的写到磁盘中,只有在操作系统调用 flush 的时候才真正的写到磁盘。在配置文件中可以设置 log.flush.interval.messages=10000,log.flush.interval.ms=1000 多长时间或者消息达到多少条刷新到磁盘。

读取数据

使用零拷贝读取数据的速度。

传统模式下文件传输的时候需要:

  1. 调用 read 函数,把文件数据 copy 到内核缓冲区
  2. read 函数返回,把文件数据从内核缓冲区 copy 到用户缓冲区
  3. write 函数调用,将文件数据从用户缓冲区 copy 到内核与 socket 相关的缓冲区
  4. 最后将数据从 socket 缓冲区 copy 到相关协议引擎

从上面可以看出传统的 read/write 通过网络进行文件的传输需要四次的 copy ,硬盘 -> 内核缓冲区 -> 用户缓冲区 -> socket 缓冲区 -> 协议引擎。使用零拷贝的方式可以减少内核缓冲区到用户缓冲的拷贝,内核态和用户态的切换是十分耗时的。

批量压缩

在很多情况下,系统的瓶颈不是 CPU 或者磁盘而是网络 IOkafka 采用批量压缩的方式来减少网络 IO

小结:

Kafka 速度快主要归结于:它把所有的消息都变成一个批量的文件,并且进行合理的批量压缩,减少网络 IO 损耗,通过 mmap 提高磁盘 I/O 速度并且采用顺序写入的方法;读取数据的时候配合零拷贝,减少用户缓冲区到内核缓冲区文件的拷贝。

怎么保证 Kafka 不丢消息

在讨论怎么保证 Kafka 不丢消息之前,我们先来了解一下生产者的 ACKS 机制消费者偏移量的提交方式。

生产者 ACKS 机制

参考 kafka官方文档配置项

ACKS 参数指定了要需要有多少个副本接收到消息,生产者才会认为消息是发送成功的,消息发送的可靠性,它主要有以下几种情况:

  1. acks=0:生产者不会等待服务器的任何确认只要将消息发送到 socket 缓冲区中就认为消息发送成功了。这种方式没有办法保证服务器真正收到消息,重试机制也不会生效,因为客户端不会接收到任何失败信息。每个记录返回的偏移量都会被设置成 -1。
  2. acks=1leader 副本接收到消息并且写入到本地日志文件中,但是不会等待所有的 follower 副本同步完成。 这种方式如果 leader 接收到消息之后,在 follower 同步之前就挂了,就会导致消息丢失的情况。
  3. acks=allleader 副本会等待所有的 ISR 副本同步完成之后进行确认。只要有一个 ISR 副本还存活就不会丢失消息。它等同于 acks = -1

消费者偏移量的提交方式

消费者把每个分区最后读取的消息偏移量提交保存在 ZookeeperKafka 的 topic 中,如果消费者关闭或重启,它的读取状态不会丢失,提交偏移量的方式主要有两种:

  1. 自动提交:

如果 enable.auto.commit 被设置为 true,那么消费者会自动提交当前处理到的偏移量存入 Zookeeper,自动提交的时间间隔为 5s,通过 auto.commit.interval.ms 属性设置,自动提交是非常方便,但是自动提交会出现消息被重复消费的风险,可以通过减小自动提交的时间间隔来减小重复消费消息的可能性,但也仅仅是减小。

  1. 手动提交:

鉴于 Kafka 自动提交偏移量的不足(只能是按指定频率的提交,有重复消费消息的可能),Kafka 提供了手动提交偏移量的策略。将 enable.auto.commit 自动提交参数设置为 false 来关闭自动提交,手动提交偏移量示例如下:

@KafkaListener(id = "cat", topics = "myTopic",
          containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
    ...
    ack.acknowledge();
}

如何保证消息不丢失

了解完生产者的ACKS 机制和 消费者偏移量的提交方式接下来我们来说一下如何保证消息不丢失。

1. 设置 partition 的 replication 的数量 >= 3

设置系统参数 replication.factor >= 3

或者在创建 Topic 的时候指定 replication >= 3

2. 设置 min.insync.replicas > 1

min.insync.replicasISR 中最少有几个 replicatition,关于 ISR 的具体描述可以参考 Kafka-1: 安装与关键概念介绍 的相关章节,从 生产者 ACKS 机制 段落中也可以认识到当 acks = all 的时候这个参数的设置才有意义,当它大于 1 且 acks = all 时,说明消息至少同步了两个副本才会进行确认。

3. 设置 unclean.leader.election.enable = false

此参数的作用为:是否可以从 OSR 选取 leader 副本。这边应该设置成 false,防止数据缺失较多的副本成为 leader ,较低消息丢失的可能。

关于 OSR 的具体描述也可以参考 Kafka-1: 安装与关键概念介绍 的相关章节

4. 生产者添加异步回调,排查消息发送失败原因

ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, o);
future.addCallback(result -> logger.info("生产者成功发送消息到topic:{} partition:{}的消息", result.getRecordMetadata().topic(), result.getRecordMetadata().partition()),
                   ex -> logger.error("生产者发送消失败,原因:{}", ex.getMessage()));

5. 生产者设置 acks = all

spring:
  kafka:
    producer:
    	acks: all

6. 生产者添加消息的重试次数

spring:
  kafka:
    producer:
    	retries: 3

7. 设置消费偏移量的提交方式为手动提交

spring:
  kafka:
  	consumer:
    	enable-auto-commit: false

注意:为了保证消息不丢失,上述的很多设置都是很影响性能的,所以可靠性与效率往往是不可兼得的,要根据实际应用场景来进行参数的设置。

Kafka 的重平衡

重平衡其实就是一个协议,它规定了消费者组下的消费者如何分配 topic 中的分区。

重平衡触发的条件

  1. 消费者组内成员发生变更,这个变更包括了增加和减少消费者。注意这里的减少有很大的可能是被动的,就是某个消费者崩溃退出了
  2. 主题的分区数发生变更,kafka 目前只支持增加分区,当增加的时候就会触发重平衡
  3. 订阅的主题发生变化,当消费者组使用正则表达式订阅主题,而恰好又新建了对应的主题,就会触发重平衡

为什么要避免重平衡

重平衡过程中,消费者无法从 kafka 消费消息,这对 kafkaTPS 影响极大,而如果 kafka 集群内节点较多,比如数百个,那重平衡可能会耗时极多。数分钟到数小时都有可能,而这段时间 kafka 基本处于不可用状态。所以在线上环境中,应该尽量避免重平衡发生。

怎么避免重平衡

上面章节介绍完重平衡触发的条件,那么避免重平衡自然是避开触发的条件即可。

对于主题分区数发生变更,订阅的主题发生变化和主动增加消费者这些情况都是主动控制的,在进行对应的操作的时候考虑会发生重平衡的情况即可。除了尽量避免主动触发重平衡的情况,我们更应该避免在不知情的情况下发生重平衡的情况。

排除消费者真的挂掉的情况不论,因为这种情况我们也无能为力,我们要做的优化就是防止 Kafka 误以为一个正常的消费者挂掉了。主要使用以下三个配置来调节:

  1. session.timout.ms:心跳检测的超时时间,建议设置成 6s。
  2. heartbeat.interval.ms:心跳检测的间隔时间,间隔越小越不容易误判,越消耗资源。建议设置成 2s。
  3. max.poll.interval.ms:消费者两次 poll 数据最大时间间隔,消费者在拉取数据之后需要对数据进行处理,然后再次去拉取数据,如果两个拉取数据的时间间隔超过这个参数设置的时间,消费者组就会把这个消费者剔除消费者组。默认是 5 分钟,根据实际处理消息的最长时间来定,一般比最长处理消息时间多一分钟即可。

重平衡的几种策略

  1. Range :基于单个 Topic 进行 Partition 的分配
  2. RoundRobin:基于多个 TopicPartition 的分配
  3. Sticky:在重平衡的过程中,在尽量不改变给原有消费者分配的 partition 的基础上,进行重平衡。

1 和 2 的区别就是如果有 5 个 topic 每个 topic 有 3 个分区,被一个有 15 个消费者的消费者组订阅;

如果使用第一种策略,它会看第一个 topic 有 5 个分区,平均分给前 5 个消费者,每个消费者一个分区,第 2 至 5 个 topic 也是这种情况,就会导致前 5 个消费者都有 3 个分区而后面的 10 个消费者是空闲的。

如果使用第二种策略,一下看 5 个 topic 一共有 15 个分区,15个消费者,直接每一个消费者分配一个分区。

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhffcgbg
系列文章
更多 icon
同类精品
更多 icon
继续加载