• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Kafka 使用java实现,极快入门

武飞扬头像
Java知者
帮助1

一、kafka的生产者和消费者

1. 生产者发送消息的流程

学新通

 2. 消费者接收消息的流程

学新通

 二、 java 代码实现

1. 添加依赖:

  1.  
    <dependency>
  2.  
    <groupId>org.apache.kafka</groupId>
  3.  
    <artifactId>kafka_2.12</artifactId>
  4.  
    </dependency>

2. 实现生产者

  1.  
    public class NormalProducer {
  2.  
     
  3.  
    public static void main(String[] args) {
  4.  
    Properties properties = new Properties();
  5.  
    // 1.配置生产者启动的关键属性参数
  6.  
     
  7.  
    // 1.1 BOOTSTRAP_SERVERS_CONFIG:连接kafka集群的服务列表,如果有多个,使用"逗号"进行分隔
  8.  
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.101:9092");
  9.  
    // 1.2 CLIENT_ID_CONFIG:这个属性的目的是标记kafkaclient的ID
  10.  
    properties.put(ProducerConfig.CLIENT_ID_CONFIG, "quickstart-producer");
  11.  
    // 1.3 KEY_SERIALIZER_CLASS_CONFIG VALUE_SERIALIZER_CLASS_CONFIG
  12.  
    // Q: 对 kafka的 key 和 value 做序列化,为什么需要序列化?
  13.  
    // A: 因为KAFKA Broker 在接收消息的时候,必须要以二进制的方式接收,所以必须要对KEY和VALUE进行序列化
  14.  
    // 字符串序列化类:org.apache.kafka.common.serialization.StringSerializer
  15.  
    // KEY: 是kafka用于做消息投递计算具体投递到对应的主题的哪一个partition而需要的
  16.  
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  17.  
    // VALUE: 实际发送消息的内容
  18.  
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  19.  
     
  20.  
    // 2.创建kafka生产者对象 传递properties属性参数集合
  21.  
    KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
  22.  
     
  23.  
    for(int i = 0; i <10; i ) {
  24.  
    // 3.构造消息内容
  25.  
    User user = new User("00" i, "张三");
  26.  
    ProducerRecord<String, String> record =
  27.  
    // arg1:topic , arg2:实际的消息体内容,quick_start 是 topic 名称
  28.  
    new ProducerRecord<String, String>("quick_start",
  29.  
    JSON.toJSONString(user));
  30.  
     
  31.  
    // 4.发送消息
  32.  
    producer.send(record);
  33.  
    }
  34.  
     
  35.  
     
  36.  
    // 5.关闭生产者
  37.  
    producer.close();
  38.  
     
  39.  
    }
  40.  
    }
学新通

其中的 User 对象为:

  1.  
    public class User {
  2.  
     
  3.  
    private String id;
  4.  
     
  5.  
    private String name;
  6.  
     
  7.  
    public User() {
  8.  
    }
  9.  
     
  10.  
    public User(String id, String name) {
  11.  
    this.id = id;
  12.  
    this.name = name;
  13.  
    }
  14.  
     
  15.  
    public String getId() {
  16.  
    return id;
  17.  
    }
  18.  
     
  19.  
    public void setId(String id) {
  20.  
    this.id = id;
  21.  
    }
  22.  
     
  23.  
    public String getName() {
  24.  
    return name;
  25.  
    }
  26.  
     
  27.  
    public void setName(String name) {
  28.  
    this.name = name;
  29.  
    }
  30.  
    }
学新通

3. 实现消费者

  1.  
    public class NormalConsumer {
  2.  
     
  3.  
    public static void main(String[] args) {
  4.  
     
  5.  
    // 1. 配置属性参数
  6.  
    Properties properties = new Properties();
  7.  
     
  8.  
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.101:9092");
  9.  
     
  10.  
    // org.apache.kafka.common.serialization.StringDeserializer
  11.  
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  12.  
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  13.  
     
  14.  
    // 非常重要的属性配置:与我们消费者订阅组有关系
  15.  
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "quickstart-group");
  16.  
    // 常规属性:会话连接超时时间
  17.  
    properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
  18.  
    // 消费者提交offset: 自动提交 & 手工提交,默认是自动提交
  19.  
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
  20.  
    properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
  21.  
     
  22.  
    // 2. 创建消费者对象
  23.  
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
  24.  
     
  25.  
    // 3. 订阅你感兴趣的主题:quick_start
  26.  
    consumer.subscribe(Collections.singletonList("quick_start"));
  27.  
     
  28.  
    System.err.println("quickstart consumer started...");
  29.  
     
  30.  
    try {
  31.  
    // 4.采用拉取消息的方式消费数据
  32.  
    while(true) {
  33.  
    // 等待多久拉取一次消息
  34.  
    // 拉取TOPIC_QUICKSTART主题里面所有的消息
  35.  
    // topic 和 partition是 一对多的关系,一个topic可以有多个partition
  36.  
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
  37.  
    // 因为消息是在partition中存储的,所以需要遍历partition集合
  38.  
    for(TopicPartition topicPartition : records.partitions()) {
  39.  
    // 通过TopicPartition获取指定的消息集合,获取到的就是当前topicPartition下面所有的消息
  40.  
    List<ConsumerRecord<String, String>> partitionRecords = records.records(topicPartition);
  41.  
    // 获取TopicPartition对应的主题名称
  42.  
    String topic = topicPartition.topic();
  43.  
    // 获取当前topicPartition下的消息条数
  44.  
    int size = partitionRecords.size();
  45.  
     
  46.  
    System.err.println(String.format("--- 获取topic: %s, 分区位置:%s, 消息总数: %s",
  47.  
    topic,
  48.  
    topicPartition.partition(),
  49.  
    size));
  50.  
     
  51.  
    for(int i = 0; i < size; i ) {
  52.  
    ConsumerRecord<String, String> consumerRecord = partitionRecords.get(i);
  53.  
    // 实际的数据内容
  54.  
    String value = consumerRecord.value();
  55.  
    // 当前获取的消息偏移量
  56.  
    long offset = consumerRecord.offset();
  57.  
    // ISR : High Watermark, 如果要提交的话,比如提交当前消息的offset 1
  58.  
    // 表示下一次从什么位置(offset)拉取消息
  59.  
    long commitOffser = offset 1;
  60.  
    System.err.println(String.format("获取实际消息 value:%s, 消息offset: %s, 提交offset: %s",
  61.  
    value, offset, commitOffser));
  62.  
    }
  63.  
    }
  64.  
    }
  65.  
    } finally {
  66.  
    consumer.close();
  67.  
    }
  68.  
    }
  69.  
    }
学新通

4. 测试结果

生产者发送的消息在消费者端可以正常接收:

学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgcgifh
系列文章
更多 icon
同类精品
更多 icon
继续加载