• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Kafka 生产者和消费者

武飞扬头像
发量不足
帮助1

目录

一、基于命令行使用Kafka

二、创建一个名为“itcasttopic”的主题

①、创建生产者

②、创建消费者

③、测试发送数据

三、基于Java API方式使用Kafka

①、创建工程添加依赖

②、编写生产者客户端

③ 、配置环境

④、编写消费者客户端

⑤、再运行KafkaConsumerTest程序 编辑编辑

⑥、再回到KafkaProducerTest.java运行该程序


一、基于命令行使用Kafka

    类似scala,mysql等,命令行是初学者操作Kafka的基本方式,kafka的模式是生产者消费者模式,他们之间通讯是通过,一个公共频道完成

二、创建一个名为“itcasttopic”的主题

kafka-topics.sh --create --topic itcasttopic  --partitions 3  --replication-factor 2  --zookeeper master:2181,slave1:2181,slave2:2181

--create --topic itcasttopic:  创建主题名称是 itcasttopic

--partitions 3  : 分区数是3

--replication-factor 2:副本数是 2

--zookeeper master:2181,slave1:2181,slave2:2181 : zookeeper:服务的IP地址和端口

学新通

##删除主题##

$ bin/kafka-topics.sh --delete -zookeeper master:2181,slave1:2181,slave2:2181 --topic itcasttopic

①、创建生产者

kafka-console-producer.sh  --broker-list master:9092,slave1:9092,slave2:9092 --topic itcasttopic

学新通

(上面是等待输入光标在闪烁)

学新通

转换到slave1

、创建消费者

kafka-console-consumer.sh  --from-beginning --topic itcasttopic --bootstrap-server master:90

学新通

③、测试发送数据

生产发送数据

学新通

消费接收数据

学新通

三、基于Java API方式使用Kafka

学新通

学新通

修改配置:

学新通

 学新通

①、创建工程添加依赖

在工程里面的pom.xml文件添加Kafka依赖

(Kafka依赖需要与虚拟机安装的Kafka版本保持一致)

  1.  
    <properties>
  2.  
    <scala.version>2.11.8</scala.version>
  3.  
    <hadoop.version>2.7.4</hadoop.version>
  4.  
    <spark.version>2.3.2</spark.version>
  5.  
    </properties>
  6.  
     
  7.  
     
  8.  
     
  9.  
    <build>
  10.  
        <plugins>
  11.  
            <plugin>
  12.  
                <groupId>org.apache.maven.plugins</groupId>
  13.  
                <artifactId>maven-compiler-plugin</artifactId>
  14.  
                <configuration>
  15.  
                    <source>1.8</source>
  16.  
                    <target>1.8</target>
  17.  
                </configuration>
  18.  
            </plugin>
  19.  
        </plugins>
  20.  
    </build>
  21.  
     
  22.  
     
  23.  
     
  24.  
    <!--kafka-->
  25.  
    <dependency>
  26.  
        <groupId>org.apache.kafka</groupId>
  27.  
        <artifactId>kafka-clients</artifactId>
  28.  
        <version>2.0.0</version>
  29.  
    </dependency>
  30.  
     
  31.  
    <dependency>
  32.  
        <groupId>org.apache.kafka</groupId>
  33.  
        <artifactId>kafka-streams</artifactId>
  34.  
        <version>2.0.0</version>
  35.  
    </dependency>

②、编写生产者客户端

在工程的java目录下创建KafkaProducerTest文件

学新通

  1.  
    import org.apache.kafka.clients.producer.KafkaProducer;
  2.  
    import org.apache.kafka.clients.producer.ProducerRecord;
  3.  
    import java.util.Properties;
  4.  
     
  5.  
    public class KafkaProducerTest {
  6.  
        public static void main(String[] args){
  7.  
            Properties props = new Properties();
  8.  
            //
  9.  
            props.put("bootstrap.servers","master:9092,slave1:9092,slave2:9092");
  10.  
            //
  11.  
            props.put("acks","all");
  12.  
            //
  13.  
            props.put("retries",0);
  14.  
            //
  15.  
            props.put("batch.size",16384);
  16.  
            //
  17.  
            props.put("linger.ms",1);
  18.  
            //
  19.  
            props.put("buffer.memory",33554432);
  20.  
            //
  21.  
            props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
  22.  
            //
  23.  
            props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
  24.  
            //
  25.  
            KafkaProducer<String,String> producer=new KafkaProducer<String, String>(props);
  26.  
            for (int i=0; i<50; i ){
  27.  
                producer.send(new ProducerRecord<String, String>("itcasttopic",Integer.toString(i),"hello world [2] -" i));
  28.  
            }
  29.  
            producer.close();
  30.  
        }


}学新通

Slave1上出现的结果

学新通

③ 、配置环境

学新通

学新通

④、编写消费者客户端

  1.  
    import org.apache.kafka.clients.consumer.ConsumerRecord;
  2.  
    import org.apache.kafka.clients.consumer.ConsumerRecords;
  3.  
    import org.apache.kafka.clients.consumer.KafkaConsumer;
  4.  
    import org.apache.kafka.clients.producer.Callback;
  5.  
    import org.apache.kafka.clients.producer.KafkaProducer;
  6.  
    import org.apache.kafka.clients.producer.ProducerRecord;
  7.  
    import org.apache.kafka.clients.producer.RecordMetadata;
  8.  
     
  9.  
    import java.util.Arrays;
  10.  
    import java.util.Properties;
  11.  
     
  12.  
    public class KafkaConsumerTest {
  13.  
        public static void main(String[] args) {
  14.  
            // 1、准备配置文件
  15.  
            Properties props = new Properties();
  16.  
            // 2、指定Kafka集群主机名和端口号
  17.  
            props.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092");
  18.  
            // 3、指定消费者组ID,在同一时刻同一消费组中只有一个线程可以去消费一个分区数据,不同的消费组可以去消费同一个分区的数据。
  19.  
            props.put("group.id", "itcasttopic");
  20.  
            // 4、自动提交偏移量
  21.  
            props.put("enable.auto.commit", "true");
  22.  
            // 5、自动提交时间间隔,每秒提交一次
  23.  
            props.put("auto.commit.interval.ms", "1000");
  24.  
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  25.  
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  26.  
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);
  27.  
            // 6、订阅数据,这里的topic可以是多个
  28.  
            kafkaConsumer.subscribe(Arrays.asList("itcasttopic"));
  29.  
            // 7、获取数据
  30.  
            while (true) {
  31.  
                //每隔100ms就拉去一次
  32.  
                ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
  33.  
                for (ConsumerRecord<String, String> record : records) {
  34.  
                    System.out.printf("topic = %s,offset = %d, key = %s, value = %s%n", record.topic(), record.offset(), record.key(), record.value());
  35.  
                }
  36.  
            }
  37.  
        }
  38.  
     
  39.  
    }

运行KafkaP roducerTest程序

学新通

⑤、再运行KafkaConsumerTest程序 学新通

⑥、再回到KafkaProducerTest.java运行该程序

(查看KafkaConsumerTest的运行框)由以下图可以看出生产者生产消息成功被终端消费

学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhfifbcc
系列文章
更多 icon
同类精品
更多 icon
继续加载