• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

kafka 生产者客户端重要参数

武飞扬头像
InTheCage
帮助13


前言

在 KafkaProducer 中,除了 bootstrap.servers、key.serializer、key.serializer 这三个必要的参数配置,大部分的参数都有合理的默认值,一般不需要修改它们,但是了解这些参数可以使我们更合理地使用生产者客户端,其中还有一些重要的参数涉及程序的可用性和性能,如果能够熟练的掌握它们,可以让我们在编写相关程序时能够更好的进行性能调优与故障排查。
下面挑选一些重要的参数进行详解。


一、acks

  • acks = 1。默认值即为 1。

生产者发送消息之后,只要分区的 leader 副本成功写入消息,那么它就会收到来自服务端的成功响应。

如果消息无法写入 leader 副本,比如在 leader 副本崩溃、重新选举新的 leader 副本的过程中,那么生产者就会收到一个错误的响应,为了避免消息丢失,生产者可以选择重发消息。

如果消息写入 leader 副本并返回成功给生产者,且在被其他 follower 副本拉取之前 leader 副本崩溃,那么此时消息还是会丢失,因为新选举的 leader 副本中并没有这条对应的消息。

acks 设置为 1,是消息可靠性和吞吐量之间的折中方案。

  • acks = 0。

生产者发送消息之后不需要等待任何服务端的响应。

如果在消息从发送到写入 Kafka 的过程中出现了某些异常,导致 Kafka 并没有收到这条消息,那么生产者也无从得知,消息也就丢失了。

在其他配置环境相同的情况下,acks 设置为 0 可以达到最大的吞吐量。

  • acks = -1 或 acks = all。

生产者在消息发送之后,需要等待 ISR 中的所有副本都成功写入消息之后才能够收到来自服务端的成功响应。

在其他配置环境相同的情况下,acks 设置为 -1 可以达到最强的可靠性。

但是并不意味着消息就一定可靠,因为 ISR 中可能只有 leader 副本,这样就退化成了 acks = 1 的情况。

要获得更高的消息可靠性需要配合 min.insync.replicas 等参数的联动。

  • 需要注意 acks 参数配置的字是一个 String 类型,例如:acks 参数设置为 1。
properties.put("acks", "1");
//或者
properties.put(ProducerConfig.ACKS_CONFIG, "1");

二、max.request.size

  • 这个参数用来限制生产者客户端能够发送的消息的最大值,默认值为 1048576 B,即 1 MB。
  • 一般情况下,这个默认值就可以满足大多数的应用场景了。
  • 不建议盲目地增大这个参数的配置值,尤其是在对 Kafka 整体脉络没有足够把控的时候。
  • 因为这个参数还涉及一些其他参数的联动,比如 broker 端的 message.max.bytes 参数,如果配置错误可能会引起一些不必要的一场。
  • 比如讲 broker 端端 message.max.bytes 参数配置为 10, 而 max.request.size 参数配置为 20,那么当我们发送一条消息大小为 15 的消息时,生产者客户端就会报出异常:
  • org.apache.kafka.common.errors.RecordTooLargeException: The reqeust included a message larger than the max message size the server will accept.

三、retries 和 retry.backoff.ms

  • retries 参数用来配置生产者重试的次数,默认值为 0,即发生异常的时候不进行任何的重试动作。
  • 消息在从生产者发出道成功写入服务器之前可能发生一些临时性的异常,比如网络抖动、Leader 副本的选举等,这种异常往往是可以自行恢复的,生产者可以通过配置 retries 大于 0 的值,以此通过内部重试来恢复而不是一味的将异常抛给生产者的应用程序。
  • 如果重试达到设定的次数,那么生产者就会放弃重试并返回异常。
  • 不过并不是所有的异常都是可以通过重试来解决的,比如消息太大,超过 max.request.size 参数配置的值时,这种方式就不行了。
  • 重试还和另一个参数 retry.backoff.ms 有关,这个参数的默认值为 100,它用来设定两次重试之间的时间间隔,避免无效的频繁重试。
  • 在配置 retries 和 retry.backoff.ms 之前,最好先估算一下可能的异常恢复时间,这样可以设定总的重试时间大于这个异常恢复时间,以此来避免生产者过早地放弃重试。
  • Kafka 可以保证同一个分区中的消息时有序的。
  • 如果生产者按照一定的顺序发送消息,那么这些消息也会顺序的写入分区,进而消费者也可以按照同样的顺序消费它们。
  • 对于某些应用来说,顺序性非常重要,比如 Mysql 的 binlog 传输,如果出现错误就会造成非常严重的后果。
  • 如果讲 retries 参数设置为非零值,并且 max.in.flight.requests.per.connection 参数配置为大于 1 的值,那么就会出现错序的现象:如果第一批次消息写入失败,而第二批次消息写入成功,那么生产者会重试发送第一批次的消息,此时如果第一批次的消息写入成功,那么这两个批次的消息就出现了错序。
  • 一般而言,在需要保证顺序的场合建议把参数 max.in.flight.requests.per.connection 配置为 1,而不是把 retries 配置为 0. 不过这样也会影响整体的吞吐。

四、compression.type

  • 这个参数用来指定消费的压缩方式,默认值为 “none”,即默认情况下,消息不会被压缩。
  • 该参数还可以配置为 “gzip”,“snappy”,“lz4”。
  • 对消息进行压缩可以极大地减少网络传输量、降低网络 I/O ,从而提高整体的性能。
  • 消息压缩是一种使用时间换空间的优化方式,如果对时延有一定的要求,则不推荐对消息进行压缩。

五、connection.max.idle.ms

  • 这个参数用来指定在多久之后关闭闲置的连接,默认值时 540000 ms,即 9 分钟。

六、linger.ms

  • 这个参数用来指定生产者发送 ProducerBatch 之前等待更多消息(ProducerRecord)加入 ProducerBatch 的时间,默认值为 0。
  • 生产者客户端会在 ProducerBatch 被填满或等待时间超过 linger.ms 值时发送出去。
  • 增大这个参数的值会增加消息的延迟,但是同时能提升一定的吞吐量。
  • 这个 linger.ms 参数与 TCP 协议中的 Nagle 算法有异曲同工之妙。

七、receive.buffer.bytes

  • 这个参数用来设置 Socket 接受消息缓冲区(SO_RECBUF)的大小,默认值为 32768(B),即 32 KB。
  • 如果设置为 -1,则使用操作系统的默认值。
  • 如果 Producer 与 Kafka 处于不同的机房,则可以适当调大这个参数值。

八、send.buffer.bytes

  • 这个参数用来设置 Socket 发送消息缓冲区(SO_SNDBUF)的大小,默认值为 131072 (B),即 128 KB。
  • 与 receive.buffer.bytes 参数一样,如果设置为 -1 ,则使用操作系统默认值。

九、request.timeout.ms

  • 这个参数用来配置 Producer 等待请求响应的最长时间,默认值为 30000 ms。
  • 请求超时之后可以选择进行重试。
  • 注意这个参数需要比 broker 端参数 replica.lag.time.max.ms 的值要大,这样可以减少因客户端重试而引起的消息重复的概率。

十、bootstrap.servers

  • 指定连接 Kafka 集群所需要的 broker 地址清单。
  • 具体的内容格式为 host1:port1,host2:port2,可以设置一个或者多个地址,中间以逗号进行隔开,此参数的默认值为 “”。
  • 注意这里并非需要所有的 broker 地址,因为生产者会从给定的 broker 里查找其他 broker 的信息。
  • 不过建议至少要设置两个以上的 broker 地址信息,当其中任意一个宕机时,生产者仍然可以连接到 Kafka 集群上。

十一、key.serializer 和 value.serializer

  • broker 端接受的消息必须以字节数组(byte[]) 的形式存在。
  • 在发往 broker 端之前需要将消息中对应的 key 和 value 做相应的序列化操作来转换成字节数组。
  • key.serializer 和 value.serializer 这两个参数分别用来指定 key 和 value 序列化操作的序列化器,这两个参数无默认值。
  • 注意这里必须填写序列化器的全限定名,如:org.apache.kafka.common.serializeation.StringSerializer。

十二、buffer.memory

  • 生产者客户端用于缓存消息缓冲区大小。
  • 此参数默认值为 33554432 B,即 32 MB。
  • Record Accumulator 缓存的大小可以通过该参数来配置。

十三、batch.size

  • 用于指定 ProducerBatch 可以复用内存区域的大小。

  • 默认值为 16384 B,即 16 KB。

  • ProducerBatch 的大小和 batch.size 参数有着密切的关系。

  • 当一条消息(ProdcuerRecord)流入 RecordAccumulator 时,会先寻找与消息分区所对应的双端队列(如果没有则新建),再从这个双端队列的尾部获取一个 ProducerBatch(如果没有则新建),查看 ProducerBatch 中是否还可以写入这个 Producer Record ,如果可以则写入,如果不可以则需要创建一个新的 ProducerBatch 。

  • 在新建 ProducerBatch 时评估这条消息的大小是否超过 batch.size 参数的大小,如果不超过,那么就以 batch.size 参数的大小来创建 ProdcuerBatch,这样在使用完这块内存区域之后,可以通过 BufferPool 的管理来进行复用。如果超过,那么就以评估的大小来创建 ProducerBatch ,这段内存区域不会被复用。

    十四、client.id

  • 用来设定 KafkaProducer 对应的客户端 id。

  • 默认值为 “”。

十五、max.block.ms

  • 此参数默认值为 60000,即 60 秒。
  • 用来控制 KafkaProducer 中 send() 方法和 partitionFor() 方法的阻塞时间。
  • 当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞。
  • 比如:生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候 KafkaProducer 的 send() 方法调用要么被阻塞,要么抛出异常,取决于该参数的设置。

十六、partitioner.class

  • 默认值:org.apache.kafka.clients.producer.internals.DefaultPartitioner
  • 用来指定分区器,需要实现 org.apache.kafka.clients.producer.Partitioner 接口。

十七、enable.idempotence

  • 默认值 false
  • 是否开启幂等性功能。
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// 或者
properties.put("enable.idempotence", true);
  • 如果要确保幂等性功能正常,还需要确保生产者客户端的 retries、acks、max.in.flight.requests.per.connection 这几个参数不被配置错。实际上在使用幂等性功能的时候,完全可以不用配置,也不建议配置这几个参数。
  • 如果显式地指定了 retries 参数,那么这个参数的值必须大于 0。否则会报出 ConfigException
  • 如果没有显式指定 retries 参数,那么 KafkaProducer 会将它置为 Integer.MAX_VALUE。
  • 同时还需要保证 max.in.flight,requests.per.connection 参数的值不能大于 5,否则会报错 ConfigException。
  • 如果还显式指定了 acks 参数,那么还需要保证这个参数的值为 -1 或者 all,如果不为 -1 ,那么也会报错 ConfigException。
  • 如果没有显式指定 acks 参数,那么 KafkaProducer 会将它置为 -1。
  • 开启幂等性功能之后,生产者就可以如同未开启幂等时一样发送消息了。

十八、interceptor.classes

  • 默认值:“ ”
  • 用来设定生产者拦截器,需要实现 org.apache.kafka.clients.producer.ProducerInterceptor 接口。

十九、max.in.flight.reqeusts.per.connection

  • 默认值 5
  • 限制每个连接(客户端与 Node 之间的连接)最多缓存的请求数。

二十、metadata.max.age.ms

  • 默认值 300000 ,即 5 分钟
  • 如果在这个时间内元数据没有更新的话会被强制更新。
  • 当客户端中没有需要使用的元数据信息时,比如没有指定的主题信息,或者超过 metadata.max.age.ms 时间没有更新更新元数据都会引起元数据的更新操作。
  • 元数据的更新操作是在客户端内部进行的,对客户端的外部使用者不可见。
  • 当需要更新元数据时,会先挑选出 leastLoadedNode,然后向这个 Node 发送 MetadataRequest 请求来获取具体的元数据信息。
  • 这个更新操作是由 Sender 现场发起的,在创建完 Metadata Request 之后同一会存入 InFlightRequests,之后的步骤就和发送消息时类似。
  • 元数据虽然由 Sender 线程负责更新,但是主线程也需要读取到这些信息,这里的数据同步通过 synchronized 和 final 关键字来保障。

二十一、transactional.id

  • 默认值 null
  • 设置事务 id,必须唯一。

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhfibejg
系列文章
更多 icon
同类精品
更多 icon
继续加载