• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Kafka 消息的生产和消费

武飞扬头像
阳光宅男
帮助1

原文链接:
Kafka Tutorial: Creating a Kafka Producer in Java (cloudurable.com)
Kafka Tutorial: Creating a Kafka Consumer in Java (cloudurable.com)

承接着上文我们配置好了Kafka之后,我们现在用java 来实现向Kafka里发送message 和消费message。

首先Kafka的依赖我们需要加一下:
添加kafka-clients依赖就可以了。其他的jar,会自动download 下来。

<dependency>

            <groupId>org.apache.kafka</groupId>

            <artifactId>kafka-clients</artifactId>

            <version>0.11.0.2</version>

        </dependency>


第一步就是要创建一个Producer.创建一个Producer,我们需要知道我们要连的kafka 的bootstrap server和要往哪个topic里面发送消息。这里假设我们的服务器有3台:"localhost:9092,localhost:9093,localhost:9094"

Topic我们来mock一个:"my-example-topic"

下面是我们创建KafkaProducer的代码。

private static Producer<Long, String> createProducer() {

        Properties props = new Properties();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);

        props.put(ProducerConfig.CLIENT_ID_CONFIG,"KafkaProducer");

        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());

        return new org.apache.kafka.clients.producer.KafkaProducer<>(props);

    }


这里用到了序列化,这里Kafka 的key 和value 都需要序列化,这样才能持久化到硬盘。这里我们使用的序列化类是:StringSerializer和LongSerializer.

第二步就是我们开始往Topic里发送消息。
使用如下的代码:

static void runProducer(final int sendMessageCount) throws Exception {

        final Producer<Long, String> producer = createProducer();

        long time = System.currentTimeMillis();

        try {

            for (long index =  time; index < time sendMessageCount; index ) {

                final ProducerRecord<Long, String> record =  new ProducerRecord<>(TOPIC, index, "Hello World " index);

                RecordMetadata metadata = producer.send(record).get();

                long elapsedTime =  System.currentTimeMillis() -  time;

                System.out.printf("sent record(key=%s value=%s " "meta(partition=%d, offset=%d) time=%d \n",

                record.key(),record.value(),metadata.partition(), metadata.offset(),elapsedTime);

                LOGGER.info("sent record(key="  record.key() ", value=%s " record.value()  "), meta(partition= " metadata.partition() ", offset=" metadata.offset()   ", time= " elapsedTime ")");

            }

        } finally {

            producer.flush();

            producer.close();

        }

    }


然后run一个main 方法就可以运行了。


接下来我们来写消费端。

第一步:同样的,我们也要有一个KafkaConsumer。定义如下:

 private static Consumer<Long, String> createConsumer() {

        final Properties props = new Properties();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,

                BOOTSTRAP_SERVERS);

        props.put(ConsumerConfig.GROUP_ID_CONFIG,

                "KafkaExampleConsumer");

        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,

                LongDeserializer.class.getName());

        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,

                StringDeserializer.class.getName());

        // Create the consumer using props.

        final Consumer<Long, String> consumer =

                new KafkaConsumer<>(props);

        // Subscribe to the topic.

        consumer.subscribe(Collections.singletonList(TOPIC));

        return consumer;

    }


跟生产端一样,这里我们使用了StringDeserializer和LongDeserializer来反序列化Value和Key。

第二步:有了KafkaConsumer,我们就可以消费消息了。

static void runConsumer() {

        final Consumer<Long,String> consumer = createConsumer();

        final int giveUp = 100;

        int noRecordsCount = 0;

        while (true) {

            final ConsumerRecords<Long, String> consumerRecords =

                    consumer.poll(1000);

            if(consumerRecords.count() == 0) {

                noRecordsCount ;

                if(noRecordsCount > giveUp) break;

                else continue;

            }

//            System.out.printf("Consumer Record:(%d, %s, %d, %d)\n", record.key(), record.value(),record.partition(), record.offset());

            consumerRecords.forEach(record -> {

                LOGGER.info("Consumer Record:(%d, %s, %d, %d)\n", record.key(), record.value(),record.partition(), record.offset());

            });

            consumer.commitSync();

        }

        consumer.close();

        System.out.printf("Done.");

    }

这样我们有了生产者,消费者,也有了Kafka Server.我们就可以做一个回路测试了。


学新通
消费者:

 学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgaggjf
系列文章
更多 icon
同类精品
更多 icon
继续加载