Kafka源码9-若网络没建立好会发送消息吗
欢迎大家关注 github.com/hsfxuebao/j… ,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈
上篇 Kafka源码分析8-网络设计分析了kafka的网络设计,本文分析如果网路没有建立好,生产者会发送消息吗?sender 线程代码如下:
{
// 省略。。
// create produce requests
/**
* 步骤五:
*
* 我们有可能要发送的partition有很多个,
* 很有可能有一些partition的leader partition是在同一台服务器上面。
* p0:leader:0
* p1:leader: 0
* p2:leader: 1
* p3:leader: 2
* 假设我们集群只有3台服务器
* 当我们的分区的个数大于集群的节点的个数的时候,一定会有多个leader partition在同一台服务器上面。
*
* 按照broker进行分组,同一个broker的partition为同一组
* 0:{p0,p1}
* 1:{p2}
* 2:{p3}
*/
// 如果网络没有建立好,这的代码是不执行的
Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
result.readyNodes,
this.maxRequestSize,
now);
if (guaranteeMessageOrder) {
// Mute all the partitions drained
for (List<RecordBatch> batchList : batches.values()) {
//如果batches 空的话,这而的代码也就不执行了。
for (RecordBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}
/**
* 步骤六:
* 对超时的批次是如何处理的?
*/
List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
// update sensors
for (RecordBatch expiredBatch : expiredBatches)
this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
sensors.updateProduceRequestMetrics(batches);
// If we have any nodes that are ready to send have sendable data, poll with 0 timeout so this can immediately
// loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
// that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
// with sendable data that aren't ready to send since they would cause busy looping.
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
if (!result.readyNodes.isEmpty()) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
pollTimeout = 0;
}
/**
* 步骤七:
* 创建发送消息的请求
* 创建请求
* 我们往partition上面去发送消息的时候,有一些partition他们在同一台服务器上面
* ,如果我们一分区一个分区的发送我们网络请求,那网络请求就会有一些频繁
* 我们要知道,我们集群里面网络资源是非常珍贵的。
* 会把发往同个broker上面partition的数据 组合成为一个请求。
* 然后统一个一个地次发送过去,这样子就减少了网络请求。
*/
//如果网络连接没有建立好 batches其实是为空。
//也就说其实这段代码也是不会执行。
sendProduceRequests(batches, now);
// if some partitions are already ready to be sent, the select time would be 0;
// otherwise if some partition already has some data accumulated but not ready yet,
// the select time will be the time difference between now and its linger expiry time;
// otherwise the select time will be the time difference between now and the metadata expiry time;
/**
* 步骤八:
* 真正执行网络操作的都是这个NetWorkClient这个组件
* 包括:发送请求,接受响应(处理响应)
*/
// 我们猜这儿可能就是去建立连接。
this.client.poll(pollTimeout, now);
}
我们分析步骤五到步骤七,如果网络没有建立,是不会发送消息的。我们可以猜出来,网络连接最终是在步骤八建立的。这个我们下篇文章再细讲。
参考文档:
史上最详细kafka源码注释(kafka-0.10.2.0-src)
kafka技术内幕-图文详解Kafka源码设计与实现
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhffcjef
系列文章
更多
同类精品
更多
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
怎样阻止微信小程序自动打开
PHP中文网 06-13