Kafka 使用java实现,极快入门
一、kafka的生产者和消费者
1. 生产者发送消息的流程
2. 消费者接收消息的流程
二、 java 代码实现
1. 添加依赖:
-
<dependency>
-
<groupId>org.apache.kafka</groupId>
-
<artifactId>kafka_2.12</artifactId>
-
</dependency>
2. 实现生产者
-
public class NormalProducer {
-
-
public static void main(String[] args) {
-
Properties properties = new Properties();
-
// 1.配置生产者启动的关键属性参数
-
-
// 1.1 BOOTSTRAP_SERVERS_CONFIG:连接kafka集群的服务列表,如果有多个,使用"逗号"进行分隔
-
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.101:9092");
-
// 1.2 CLIENT_ID_CONFIG:这个属性的目的是标记kafkaclient的ID
-
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "quickstart-producer");
-
// 1.3 KEY_SERIALIZER_CLASS_CONFIG VALUE_SERIALIZER_CLASS_CONFIG
-
// Q: 对 kafka的 key 和 value 做序列化,为什么需要序列化?
-
// A: 因为KAFKA Broker 在接收消息的时候,必须要以二进制的方式接收,所以必须要对KEY和VALUE进行序列化
-
// 字符串序列化类:org.apache.kafka.common.serialization.StringSerializer
-
// KEY: 是kafka用于做消息投递计算具体投递到对应的主题的哪一个partition而需要的
-
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
-
// VALUE: 实际发送消息的内容
-
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
-
-
// 2.创建kafka生产者对象 传递properties属性参数集合
-
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
-
-
for(int i = 0; i <10; i ) {
-
// 3.构造消息内容
-
User user = new User("00" i, "张三");
-
ProducerRecord<String, String> record =
-
// arg1:topic , arg2:实际的消息体内容,quick_start 是 topic 名称
-
new ProducerRecord<String, String>("quick_start",
-
JSON.toJSONString(user));
-
-
// 4.发送消息
-
producer.send(record);
-
}
-
-
-
// 5.关闭生产者
-
producer.close();
-
-
}
-
}
其中的 User 对象为:
-
public class User {
-
-
private String id;
-
-
private String name;
-
-
public User() {
-
}
-
-
public User(String id, String name) {
-
this.id = id;
-
this.name = name;
-
}
-
-
public String getId() {
-
return id;
-
}
-
-
public void setId(String id) {
-
this.id = id;
-
}
-
-
public String getName() {
-
return name;
-
}
-
-
public void setName(String name) {
-
this.name = name;
-
}
-
}
3. 实现消费者
-
public class NormalConsumer {
-
-
public static void main(String[] args) {
-
-
// 1. 配置属性参数
-
Properties properties = new Properties();
-
-
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.101:9092");
-
-
// org.apache.kafka.common.serialization.StringDeserializer
-
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-
-
// 非常重要的属性配置:与我们消费者订阅组有关系
-
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "quickstart-group");
-
// 常规属性:会话连接超时时间
-
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
-
// 消费者提交offset: 自动提交 & 手工提交,默认是自动提交
-
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
-
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
-
-
// 2. 创建消费者对象
-
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
-
-
// 3. 订阅你感兴趣的主题:quick_start
-
consumer.subscribe(Collections.singletonList("quick_start"));
-
-
System.err.println("quickstart consumer started...");
-
-
try {
-
// 4.采用拉取消息的方式消费数据
-
while(true) {
-
// 等待多久拉取一次消息
-
// 拉取TOPIC_QUICKSTART主题里面所有的消息
-
// topic 和 partition是 一对多的关系,一个topic可以有多个partition
-
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
-
// 因为消息是在partition中存储的,所以需要遍历partition集合
-
for(TopicPartition topicPartition : records.partitions()) {
-
// 通过TopicPartition获取指定的消息集合,获取到的就是当前topicPartition下面所有的消息
-
List<ConsumerRecord<String, String>> partitionRecords = records.records(topicPartition);
-
// 获取TopicPartition对应的主题名称
-
String topic = topicPartition.topic();
-
// 获取当前topicPartition下的消息条数
-
int size = partitionRecords.size();
-
-
System.err.println(String.format("--- 获取topic: %s, 分区位置:%s, 消息总数: %s",
-
topic,
-
topicPartition.partition(),
-
size));
-
-
for(int i = 0; i < size; i ) {
-
ConsumerRecord<String, String> consumerRecord = partitionRecords.get(i);
-
// 实际的数据内容
-
String value = consumerRecord.value();
-
// 当前获取的消息偏移量
-
long offset = consumerRecord.offset();
-
// ISR : High Watermark, 如果要提交的话,比如提交当前消息的offset 1
-
// 表示下一次从什么位置(offset)拉取消息
-
long commitOffser = offset 1;
-
System.err.println(String.format("获取实际消息 value:%s, 消息offset: %s, 提交offset: %s",
-
value, offset, commitOffser));
-
}
-
}
-
}
-
} finally {
-
consumer.close();
-
}
-
}
-
}
4. 测试结果
生产者发送的消息在消费者端可以正常接收:
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhgcgifh
同类精品
更多
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
怎样阻止微信小程序自动打开
PHP中文网 06-13 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01