Kafka 基本使用方法和介绍
Kafka概述
Apache Kafka 是一个开源分布式事件流平台,基于发布/订阅模式,广泛应用于大数据实时处理领域
Kafka的应用场景
-
异步处理
- 一般调用第三方接口(短信,邮件)的rt都比较长,这时候来个消息队列是极好的
-
系统解耦
- 把系统之间的直接调用,变为通过消息中间件,降低耦合度,提高程序的扩展性
-
削峰填谷
- 当大量请求短时间涌入时,消息中间件就是首选啦,返回用户正在处理中,处理完毕再返回处理完毕的信息
-
事件驱动的数据来源,一般和flume配合使用,收集日志,交由spark或flink处理
Kafa集群搭建
-
本文以3.0.0为例,列出详细步骤
-
目录以实际为准
1. mkdir -p /opt/{module,software} 2. cd /opt/software 3. wget https://dlcdn.apache.org/kafka/3.0.0/kafka_2.12-3.0.0.tgz 4. tar -xvf kafka_2.12-3.0.0.tgz -C ../module/ 5. cd ../module/ 6. mv kafka_2.12-3.0.0/ kafka-3.0.0 7. cd kafka-3.0.0/ 8. mkdir mykfk-logs # 存放kafaka数据的目录 9. cd config/ 10. vim server.properties
-
修改server.properties内容如下
1. 21行, broker.id=1 #每个broker的id需要是唯一的 2. 31行 # 允许外部端口连接 listeners=PLAINTEXT://0.0.0.0:9092 # 外部代理地址 advertised.listeners=PLAINTEXT://server1:9092 3. 60行, log.dirs=/opt/module/kafka-3.0.0/mykfk-logs 4. 126行, zookeeper.connect=server1:2181,server2:2181,server3:2181
-
配置环境变量:
vim /etc/profile
, 编辑完成后使环境变量生效:source /etc/profile
# Kafka Home export KAFKA_HOME=/opt/module/kafka-3.0.0 export PATH=$KAFKA_HOME/bin:$KAFKA_HOME/sbin:$PATH
-
xsync /opt/module/kafka-3.0.0/
xsync /etc/profile
- 修改server.properties中的broker的id值,listeners对应的值server2,server3
-
-
Kafka集群一键启动/关闭:
vim /bin/mykfk.sh
, 然后授予执行权限:chmod x /bin/mykfk.sh
#!/bin/bash export CLU_SERVER="server1 server2 server3" if [ $# -lt 1 ] then echo "Pls Input Args Like 'start','stop'" exit ; fi case $1 in "start") echo " =================== 启动 kafka 集群 ===================" for i in $CLU_SERVER do echo " --------------- 启动 $i ---------------" ssh $i "source /etc/profile;export JMX_PORT=9988;nohup $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties >/dev/null 2>&1 &" done ;; "stop") echo " =================== 关闭 kafka 集群 ===================" for i in $CLU_SERVER do echo " --------------- 关闭 $i ---------------" ssh $i "source /etc/profile;jps | grep Kafka |cut -d' ' -f1 |xargs kill -s 9" done ;; *) echo "Input Args Error..." ;; esac
Kafka基本操作
-
创建主题(类似于文件系统中的文件夹或是mysql中的表的概念)
kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic quickstart-events
-
查看主题
kafka-topics.sh --bootstrap-server localhost:9092 --list
-
生产与消费数据-简单模式
-
控制台生产信息
kafka-console-producer.sh --broker-list localhost:9092 --topic quickstart
-
控制台从起始位置消费
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic quickstart --from-beginning
-
-
kafka自带压测工具(抛砖引玉)
-
实际生产用一般使用Kafka Tool操作Kafka(抛砖引玉)
Kafka API
-
以下示例均参考Kafka官方文档
-
创建maven项目并引入依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.tyvek</groupId> <artifactId>kafkademo</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <!-- kafka客户端工具 <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.0.0</version> </dependency> <!-- 工具类 <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-io</artifactId> <version>1.3.2</version> </dependency> <!-- SLF桥接LOG4J日志 <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.6</version> </dependency> <!-- LOG4J日志 <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.16</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.7.0</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project>
-
创建生产者
public class KafkaProducerTest { public static void main(String[] args) { // 1. 创建用于连接Kafka的Properties配置 Properties props = new Properties(); props.put("bootstrap.servers", "192.168.0.123:9092"); props.put("acks", "all"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 2. 创建一个生产者对象KafkaProducer KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); // 3. 调用send发送1-100消息到指定Topic test for(int i = 0; i < 100; i) { try { // 获取返回值Future,该对象封装了返回值 Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>( "quickstart", Integer.toString(i), Integer.toString(i))); // 调用一个Future.get()方法等待响应 future.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } // 5. 关闭生产者 producer.close(); } }
-
创建消费者
public class KafkaConsumerTest { public static void main(String[] args) { Properties props = new Properties(); props.setProperty("bootstrap.servers", "192.168.0.123:9092"); props.setProperty("group.id", "test"); props.setProperty("enable.auto.commit", "true"); props.setProperty("auto.commit.interval.ms", "1000"); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("quickstart")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } } }
Kafka 基本概念
-
Broker: 一个broker就是一个kafka进程
- Kafka集群通常由多个broker组成
- 目前有ZK管理和协调,并且存储了Kafka的MetaData
-
Producer(生产者)
- 生产者负责将数据推送给broker的topic
-
Consumer(消费者)
- 消费者负责从broker的topic中拉取数据,并进行实际的业务逻辑
-
Consumer Group(消费者组)
- 一个消费者组有一个唯一的ID(group id)
- 组内的消费者一起消费主题的所有分区数据
-
Partioions(分区)
- 一个主题可以被分为多个分区
- 一个分区只会让一个消费者组中的一个消费者消费
-
Replicas(副本)
- kafka的高可用机制,就是把源数据进行备份的数量
-
Topic(主题)
- 类似于数据库中的表,同一类信息的一个集合
-
Offset(偏移量)
- 可以理解为是一个标记,用来标识Consumer消费的位置
- 默认的offset是存储在Zookeeper中的,也可以存放在mysql或者redis中
Kafka 生产者幂等性与事务
-
幂等性
-
是否会发生这样的情况,同一个消息由于某种原因被重复发送了两次
-
是否同一条数据被重复消费
-
Kafaka幂等性的实现原理,通过Producer ID和Sequence Number的概念
- PID: 每个Producer在初始化时,都会分配一个唯一的PID
- SN: 针对每个生产者对应有相应的PID,发送到指定主题分区的消息都对应一个从0开始递增的SN.
-
-
事务
- initTransactions: 初始化事务
- beginTransaction: 开始事务
- sendOffsetsToTransaction: 提交偏移量
- commitTransaction: 提交事务
- abortTransaction: 取消事务
Kafka的更多细节
-
自定义分区策略
1. 自定义分区器public class MyPartitioner implements Partitioner { private Random r; @Override public void configure(Map<String, ?> configs) { r = new Random(); } @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { // cluster.partitionCountForTopic 表示获取指定topic的分区数量 return r.nextInt(1000) % cluster.partitionCountForTopic(topic); } @Override public void close() { } }
-
在Kafka生产者配置中,使用自定义分区的类名
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());
-
-
副本机制
- 就是把原始数据进行备份的数量
- acks设置为0时标识,不等待broker确认,直接发送下一条数据
- acks设置为all或者-1时,等待所有副本已经将数据同步后,才会发送下一条数据
- acks设置为1等待leader副本确认接收后,才会发送下一条数据
-
功能增强
- 我想从指定的位置读取数据并且自定义分区,自定义副本可以吗?
- 自行探究一下
-
分区的leader与follower
- leader负责写数据,follwer负责数据的同步
- follower也有备胎转正的机会
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanejab
系列文章
更多
同类精品
更多
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
怎样阻止微信小程序自动打开
PHP中文网 06-13 -
photoshop蒙版画笔没反应怎么办
PHP中文网 06-24