学新通技术网

Kafka 基本使用方法和介绍

juejin 9 1
Kafka 基本使用方法和介绍

Kafka概述

Apache Kafka 是一个开源分布式事件流平台,基于发布/订阅模式,广泛应用于大数据实时处理领域

Kafka的应用场景

  1. 异步处理

    1. 一般调用第三方接口(短信,邮件)的rt都比较长,这时候来个消息队列是极好的
  2. 系统解耦

    1. 把系统之间的直接调用,变为通过消息中间件,降低耦合度,提高程序的扩展性
  3. 削峰填谷

    1. 当大量请求短时间涌入时,消息中间件就是首选啦,返回用户正在处理中,处理完毕再返回处理完毕的信息
  4. 事件驱动的数据来源,一般和flume配合使用,收集日志,交由spark或flink处理

Kafa集群搭建

  1. 安装包下载地址

  2. 本文以3.0.0为例,列出详细步骤

    1. 目录以实际为准

      1. mkdir -p /opt/{module,software}
      2. cd /opt/software
      3. wget https://dlcdn.apache.org/kafka/3.0.0/kafka_2.12-3.0.0.tgz
      4. tar -xvf kafka_2.12-3.0.0.tgz -C ../module/
      5. cd ../module/
      6. mv kafka_2.12-3.0.0/ kafka-3.0.0
      7. cd kafka-3.0.0/
      8. mkdir mykfk-logs			# 存放kafaka数据的目录
      9. cd config/
      10. vim server.properties
      
    2. 修改server.properties内容如下

      1. 21行, broker.id=1 #每个broker的id需要是唯一的
      2. 31行
      	# 允许外部端口连接                                            
      	listeners=PLAINTEXT://0.0.0.0:9092  
      	# 外部代理地址                                                
      	advertised.listeners=PLAINTEXT://server1:9092
      3. 60行, log.dirs=/opt/module/kafka-3.0.0/mykfk-logs
      4. 126行,  zookeeper.connect=server1:2181,server2:2181,server3:2181
      
    3. 配置环境变量: vim /etc/profile, 编辑完成后使环境变量生效: source /etc/profile

      # Kafka Home
      export KAFKA_HOME=/opt/module/kafka-3.0.0
      export PATH=$KAFKA_HOME/bin:$KAFKA_HOME/sbin:$PATH
      
    4. 将文件进行分发

      1. xsync /opt/module/kafka-3.0.0/
      2. xsync /etc/profile
      3. 修改server.properties中的broker的id值,listeners对应的值server2,server3
  3. Zookeeper安装和启动:

  4. Kafka集群一键启动/关闭: vim /bin/mykfk.sh, 然后授予执行权限: chmod +x /bin/mykfk.sh

    #!/bin/bash
    
    export CLU_SERVER="server1 server2 server3"
    if [ $# -lt 1 ]
    then
     echo "Pls Input Args Like 'start','stop'"
     exit ;
    fi
    case $1 in
    "start")
    echo " =================== 启动 kafka 集群 ==================="
     for i in $CLU_SERVER
      do
        echo " --------------- 启动 $i ---------------"
        ssh $i "source /etc/profile;export JMX_PORT=9988;nohup $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties >/dev/null 2>&1 &"
      done
    ;;
    "stop")
     echo " =================== 关闭 kafka 集群 ==================="
     for i in $CLU_SERVER
      do
        echo " --------------- 关闭 $i ---------------"
        ssh $i "source /etc/profile;jps | grep Kafka |cut -d' ' -f1 |xargs kill -s 9"
      done
    ;;
    *)
     echo "Input Args Error..."
    ;;
    esac
    

Kafka基本操作

  1. 创建主题(类似于文件系统中的文件夹或是mysql中的表的概念)

    kafka-topics.sh --create --bootstrap-server localhost:9092  --replication-factor 1 --partitions 2 --topic quickstart-events
    
  2. 查看主题

    kafka-topics.sh --bootstrap-server localhost:9092  --list
    

    在这里插入图片描述

  3. 生产与消费数据-简单模式

    1. 控制台生产信息

      kafka-console-producer.sh --broker-list  localhost:9092  --topic quickstart
      
    2. 控制台从起始位置消费

      kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic quickstart --from-beginning
      
  4. kafka自带压测工具(抛砖引玉)
    在这里插入图片描述

  5. 实际生产用一般使用Kafka Tool操作Kafka(抛砖引玉)

Kafka API

  1. 以下示例均参考Kafka官方文档

  2. 创建maven项目并引入依赖

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>org.tyvek</groupId>
        <artifactId>kafkademo</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
        </properties>
    
        <dependencies>
            <!-- kafka客户端工具 
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>3.0.0</version>
            </dependency>
    
            <!-- 工具类 
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-io</artifactId>
                <version>1.3.2</version>
            </dependency>
    
            <!-- SLF桥接LOG4J日志 
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>1.7.6</version>
            </dependency>
    
            <!-- LOG4J日志 
            <dependency>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
                <version>1.2.16</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.7.0</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    
    </project>
    
  3. 创建生产者

    public class KafkaProducerTest {
       public static void main(String[] args) {
          // 1. 创建用于连接Kafka的Properties配置
          Properties props = new Properties();
          props.put("bootstrap.servers", "192.168.0.123:9092");
          props.put("acks", "all");
          props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
          props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
          // 2. 创建一个生产者对象KafkaProducer
          KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
    
          // 3. 调用send发送1-100消息到指定Topic test
          for(int i = 0; i < 100; ++i) {
             try {
                // 获取返回值Future,该对象封装了返回值
                Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>(
                        "quickstart", Integer.toString(i), Integer.toString(i)));
                // 调用一个Future.get()方法等待响应
                future.get();
             } catch (InterruptedException e) {
                e.printStackTrace();
             } catch (ExecutionException e) {
                e.printStackTrace();
             }
          }
    
          // 5. 关闭生产者
          producer.close();
       }
    }
    
  4. 创建消费者

    public class KafkaConsumerTest {
       public static void main(String[] args) {
          Properties props = new Properties();
          props.setProperty("bootstrap.servers", "192.168.0.123:9092");
          props.setProperty("group.id", "test");
          props.setProperty("enable.auto.commit", "true");
          props.setProperty("auto.commit.interval.ms", "1000");
          props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
          props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
          KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
          consumer.subscribe(Arrays.asList("quickstart"));
          while (true) {
             ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
             for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
             }
          }
       }
    }
    

Kafka 基本概念

  1. Broker: 一个broker就是一个kafka进程

    1. Kafka集群通常由多个broker组成
    2. 目前有ZK管理和协调,并且存储了Kafka的MetaData
  2. Producer(生产者)

    1. 生产者负责将数据推送给broker的topic
  3. Consumer(消费者)

    1. 消费者负责从broker的topic中拉取数据,并进行实际的业务逻辑
  4. Consumer Group(消费者组)

    1. 一个消费者组有一个唯一的ID(group id)
    2. 组内的消费者一起消费主题的所有分区数据
  5. Partioions(分区)

    1. 一个主题可以被分为多个分区
    2. 一个分区只会让一个消费者组中的一个消费者消费
  6. Replicas(副本)

    1. kafka的高可用机制,就是把源数据进行备份的数量
  7. Topic(主题)

    1. 类似于数据库中的表,同一类信息的一个集合
  8. Offset(偏移量)

    1. 可以理解为是一个标记,用来标识Consumer消费的位置
    2. 默认的offset是存储在Zookeeper中的,也可以存放在mysql或者redis中

Kafka 生产者幂等性与事务

  1. 幂等性

    1. 是否会发生这样的情况,同一个消息由于某种原因被重复发送了两次

    2. 是否同一条数据被重复消费

    3. Kafaka幂等性的实现原理,通过Producer ID和Sequence Number的概念

      1. PID: 每个Producer在初始化时,都会分配一个唯一的PID
      2. SN: 针对每个生产者对应有相应的PID,发送到指定主题分区的消息都对应一个从0开始递增的SN.
  2. 事务

    1. initTransactions: 初始化事务
    2. beginTransaction: 开始事务
    3. sendOffsetsToTransaction: 提交偏移量
    4. commitTransaction: 提交事务
    5. abortTransaction: 取消事务

Kafka的更多细节

  1. 自定义分区策略
    1. 自定义分区器

    public class MyPartitioner implements Partitioner {
    
        private Random r;
    
        @Override
        public void configure(Map<String, ?> configs) {
            r = new Random();
        }
    
        @Override
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            // cluster.partitionCountForTopic 表示获取指定topic的分区数量
            return r.nextInt(1000) % cluster.partitionCountForTopic(topic);
        }
    
        @Override
        public void close() {
        }
    }
    
    1. 在Kafka生产者配置中,使用自定义分区的类名

      props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());
      
  2. 副本机制

    1. 就是把原始数据进行备份的数量
    2. acks设置为0时标识,不等待broker确认,直接发送下一条数据
    3. acks设置为all或者-1时,等待所有副本已经将数据同步后,才会发送下一条数据
    4. acks设置为1等待leader副本确认接收后,才会发送下一条数据
  3. 功能增强

    1. 我想从指定的位置读取数据并且自定义分区,自定义副本可以吗?
    2. 自行探究一下
  4. 分区的leader与follower

    1. leader负责写数据,follwer负责数据的同步
    2. follower也有备胎转正的机会

本文出至:学新通技术网

标签: