Kafka 消息的生产和消费
原文链接:
Kafka Tutorial: Creating a Kafka Producer in Java (cloudurable.com)
Kafka Tutorial: Creating a Kafka Consumer in Java (cloudurable.com)
承接着上文我们配置好了Kafka之后,我们现在用java 来实现向Kafka里发送message 和消费message。
首先Kafka的依赖我们需要加一下:
添加kafka-clients依赖就可以了。其他的jar,会自动download 下来。
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.2</version> </dependency> |
第一步就是要创建一个Producer.创建一个Producer,我们需要知道我们要连的kafka 的bootstrap server和要往哪个topic里面发送消息。这里假设我们的服务器有3台:"localhost:9092,localhost:9093,localhost:9094"
Topic我们来mock一个:"my-example-topic"
下面是我们创建KafkaProducer的代码。
private static Producer<Long, String> createProducer() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ProducerConfig.CLIENT_ID_CONFIG,"KafkaProducer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); return new org.apache.kafka.clients.producer.KafkaProducer<>(props); } |
这里用到了序列化,这里Kafka 的key 和value 都需要序列化,这样才能持久化到硬盘。这里我们使用的序列化类是:StringSerializer和LongSerializer.
第二步就是我们开始往Topic里发送消息。
使用如下的代码:
static void runProducer(final int sendMessageCount) throws Exception { final Producer<Long, String> producer = createProducer(); long time = System.currentTimeMillis(); try { for (long index = time; index < time sendMessageCount; index ) { final ProducerRecord<Long, String> record = new ProducerRecord<>(TOPIC, index, "Hello World " index); RecordMetadata metadata = producer.send(record).get(); long elapsedTime = System.currentTimeMillis() - time; System.out.printf("sent record(key=%s value=%s " "meta(partition=%d, offset=%d) time=%d \n", record.key(),record.value(),metadata.partition(), metadata.offset(),elapsedTime); LOGGER.info("sent record(key=" record.key() ", value=%s " record.value() "), meta(partition= " metadata.partition() ", offset=" metadata.offset() ", time= " elapsedTime ")"); } } finally { producer.flush(); producer.close(); } } |
然后run一个main 方法就可以运行了。
接下来我们来写消费端。
第一步:同样的,我们也要有一个KafkaConsumer。定义如下:
private static Consumer<Long, String> createConsumer() { final Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleConsumer"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // Create the consumer using props. final Consumer<Long, String> consumer = new KafkaConsumer<>(props); // Subscribe to the topic. consumer.subscribe(Collections.singletonList(TOPIC)); return consumer; } |
跟生产端一样,这里我们使用了StringDeserializer和LongDeserializer来反序列化Value和Key。
第二步:有了KafkaConsumer,我们就可以消费消息了。
static void runConsumer() { final Consumer<Long,String> consumer = createConsumer(); final int giveUp = 100; int noRecordsCount = 0; while (true) { final ConsumerRecords<Long, String> consumerRecords = consumer.poll(1000); if(consumerRecords.count() == 0) { noRecordsCount ; if(noRecordsCount > giveUp) break; else continue; } // System.out.printf("Consumer Record:(%d, %s, %d, %d)\n", record.key(), record.value(),record.partition(), record.offset()); consumerRecords.forEach(record -> { LOGGER.info("Consumer Record:(%d, %s, %d, %d)\n", record.key(), record.value(),record.partition(), record.offset()); }); consumer.commitSync(); } consumer.close(); System.out.printf("Done."); } |
这样我们有了生产者,消费者,也有了Kafka Server.我们就可以做一个回路测试了。
消费者:
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhgaggjf
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
怎样阻止微信小程序自动打开
PHP中文网 06-13