kafka Consumer 消费者使用多线程并发执行,并保证顺序消费, 第一种使用纯线程方式、第二种使用Executors线程池
网上搜索kafka消费者通过多线程进行顺序消费的内容都不太理想,或者太过复杂,所以自己写了几个demo,供大家参考指正。
需求内容
单个消费者,每秒需要处理1000条数据,每条数据的处理时间为500ms,相同accNum(客户账号)的数据需要保证消费的顺序。
注意点
1、如果1秒钟生产1000条数据,消费者处理时,每条数据需要500毫秒,则消费者每次拉取数据的条数最好能控制在500条以上,这样1秒内的数据可以拉取两次,每次使用500个线程进行处理,每次耗时500ms,
2*500ms=1秒,基本可以保证1000条数据能够在1秒内处理完成。
如果消费者每100ms拉取一次,每次拉取100条数据,消费者使用100个线程处理这100条数据,耗时500ms,第二次再拉取100条,耗时500ms...这样处理完1秒内的1000条数据将一共需要
10次*500ms=5秒钟,出现较大延迟。
同时,还要注意,一批数据中存在相同的accNum(客户账号)的情况,如果存在2条相同的accNum,因为需要顺序执行,一条执行需要500ms,两条顺序执行完成将花费1秒,这批数据的整体完成时间将变为1秒。
注意这三个参数的调整:
// fetch.max.bytes:一次拉取的最小可返回数据量:1Bety
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 100 * 1024);
// fetch.max.wait.ms:一次拉取的最大等待时间:500ms
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
// max.poll.records: 一次拉取的最大条数
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2000);
注意消费者的拉取延迟时间:
tKafkaConsumer.poll(500);
2、每批次数据处理时,创建的线程数,会根据每次拉取的数据条数自动调整,最大线程数为消费者每次允许拉取的最大数据条数。这样系统可以根据数据量大小自动调整创建的线程数,线程池中的空闲线程可以在一定时间后自动释放。可以保证不同accNum(客户账号)的数据每次都分配一个线程单独处理,从而保证处理的时间(500ms)。
第一种使用纯线程方式(Thread Callable FutureTask)
因为每次处理都创建新的线程,造成大量线程同时创建和销毁,线程数波动剧烈,GC频繁,系统各项指标均不平稳。
-
package com.autoee.demo.kafka.main;
-
-
import ch.qos.logback.classic.Level;
-
import ch.qos.logback.classic.LoggerContext;
-
import cn.hutool.core.date.DateUtil;
-
import cn.hutool.core.date.TimeInterval;
-
import cn.hutool.core.map.MapUtil;
-
import cn.hutool.json.JSONUtil;
-
import com.autoee.demo.riskmonitor.BusiDataEntity;
-
import org.apache.kafka.clients.consumer.*;
-
import org.apache.kafka.common.TopicPartition;
-
import org.apache.kafka.common.serialization.StringDeserializer;
-
import org.slf4j.Logger;
-
import org.slf4j.LoggerFactory;
-
-
import java.util.*;
-
import java.util.concurrent.Callable;
-
import java.util.concurrent.ExecutionException;
-
import java.util.concurrent.FutureTask;
-
import java.util.concurrent.atomic.AtomicInteger;
-
-
/**
-
* Title: <br>
-
* Desc: <br>
-
* Date: 2022-8-19 <br>
-
* @author Double
-
* @version 1.0.0
-
*/
-
public class KafkaConsumerMutiThreadsTest3_Callable_HashMap {
-
-
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerMutiThreadsTest3_Callable_HashMap.class);
-
-
// 设置main方法执行时的日志输出级别
-
static {
-
LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
-
List<ch.qos.logback.classic.Logger> loggerList = loggerContext.getLoggerList();
-
loggerList.forEach(logger -> {
-
logger.setLevel(Level.INFO);
-
});
-
}
-
-
// 需求内容:单个消费者,每秒需要处理1000条数据,每条数据的处理时间为500ms,相同accNum的数据需要保证消费的顺序
-
-
// 测试极限情况:数据已存在大量积压,启动消费者进行消费
-
// 每次拉取都达到设置的单次可以拉取的最大条数:2000条
-
-
-
public static void main(String[] args) throws InterruptedException {
-
-
Properties props = new Properties();
-
// bootstrap.servers:kafka集群地址
-
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-
// 消费者组id
-
props.put("group.id", "test_consumer_group"); //消费者组
-
// key.deserializer:key的反序列化器
-
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-
// value.deserializer:value的反序列化器
-
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-
// fetch.max.bytes:一次拉取的最小可返回数据量:1Bety
-
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 100 * 1024);
-
// fetch.max.bytes:一次拉取的最大数据量:50M
-
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 50 * 1024 * 1024);
-
// fetch.max.wait.ms:一次拉取的最大等待时间:500ms
-
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
-
// max.poll.records: 一次拉取的最大条数
-
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2000);
-
// max.partition.fetch.bytes:一次拉取时,每个分区最大拉取数据量,默认1M
-
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1 * 1024 * 1024);
-
// auto.offset.reset:当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在 (如,数据被删除了)时,自动设置开始消费的偏移量位置,默认latest。
-
// earliest:自动重置偏移量到最早的偏移量(从头开始消费)。
-
// latest:默认,自动重置偏移量为最新的偏移量(从最新的接收到的数据开始消费)。
-
// none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。
-
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
-
// enable.auto.commit:是否允许自动提交offset,默认是。
-
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
-
// auto.commit.interval.ms:自动提交offset的时间间隔,默认5秒。
-
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
-
// heartbeat.interval.ms:消费者心跳检测时间间隔,默认3秒。
-
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);
-
// session.timeout.ms:session过期时间,默认10秒。
-
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
-
// max.poll.interval.ms:一批次数据最大可以执行时间,默认5分钟。
-
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
-
// partition.assignment.strategy:分区分配策略,默认5分钟。
-
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());
-
-
KafkaConsumer<String, String> tKafkaConsumer = new KafkaConsumer<String, String>(props);
-
tKafkaConsumer.subscribe(Arrays.asList("riskMoniterTopic"));
-
-
HashMap<String, List<BusiDataEntity>> hashMap;
-
while (true) {
-
TimeInterval timer = DateUtil.timer();
-
logger.info("[开始]-consumer拉取数据");
-
ConsumerRecords<String, String> records = tKafkaConsumer.poll(500);
-
int dataCount = records.count();
-
AtomicInteger tAtomicInteger = new AtomicInteger();
-
logger.info("[完成]-consumer拉取数据-条数=[{}]-耗时=[{}]", dataCount, timer.intervalMs());
-
// 拉取的数据条数大于0时,才进行处理操作
-
timer = DateUtil.timer();
-
if (dataCount > 0) {
-
// 初始化hashMap:容量可以设置为拉取数据条数的1倍,或者2倍,2倍更加分散
-
// 消费者参数中设置一次拉取的最大条数为2000,基本不会超过该值。
-
// hashMap的hash碰撞概率较低,2000条数据,分布到4000容量的hashMap中时,基本不会出现碰撞,只有相同的key会在一起,导致整体执行时间为相同多个key顺序执行的时间
-
// [线程执行完成]消费者线程:consumer-thread-VV0039-已处理数据数量=3-已处理的所有客户账号=VV0039,VV0039,VV0039,
-
// [线程执行完成]消费者线程:consumer-thread-AG0097-已处理数据数量=2-已处理的所有客户账号=AG0097,AG0097,
-
// [线程执行完成]消费者线程:consumer-thread-ID0045-已处理数据数量=1-已处理的所有客户账号=ID0045,
-
int arrListCapacity = dataCount * 2;
-
hashMap = new HashMap<>(arrListCapacity);
-
// 将拉取的数据按客户号码分散到HashMap中
-
for (ConsumerRecord<String, String> record : records) {
-
Object value = record.value();
-
String jsonStr = JSONUtil.toJsonStr(value);
-
// logger.info("[获取]-传入报文=[{}]", jsonStr);
-
BusiDataEntity busiDataEntity = JSONUtil.toBean(jsonStr, BusiDataEntity.class);
-
String accNum = busiDataEntity.getAccNum();
-
-
if (hashMap.containsKey(accNum)) {
-
hashMap.get(accNum).add(busiDataEntity);
-
} else {
-
List<BusiDataEntity> newList = new ArrayList<>();
-
newList.add(busiDataEntity);
-
hashMap.put(accNum, newList);
-
}
-
}
-
-
ArrayList<FutureTask<String>> tFutureTaskArrayList = new ArrayList<>(dataCount);
-
// 循环hashMap,每个value开启一个线程循环处理该List中的全部数据,保证数据处理的顺序
-
int num = 0;
-
hashMap.forEach((k, v) -> {
-
List<BusiDataEntity> busiDataEntities = v;
-
String threadName = "";
-
if (busiDataEntities.size() > 0) {
-
threadName = "consumer-thread-" k;
-
// 使用Callable执行一组数据
-
FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
-
-
public String call() {
-
String threadName = Thread.currentThread().getName();
-
// logger.info("[获取]-消费者线程:{}-获取到待处理数据数量:{}", threadName, busiDataEntities.size());
-
String allAccNum = "";
-
for (BusiDataEntity busiDataEntity : busiDataEntities) {
-
allAccNum = allAccNum busiDataEntity.getAccNum() ",";
-
try {
-
// 模拟业务处理时间,默认500ms
-
Thread.sleep(500);
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
}
-
}
-
return "消费者线程:" threadName "-已处理数据数量=" busiDataEntities.size() "-已处理的所有客户账号=" allAccNum;
-
}
-
});
-
-
// 启动一个线程执行一组数据
-
new Thread(futureTask, threadName).start();
-
// 将每个线程的futureTask都放入同一个ArrayList中
-
tFutureTaskArrayList.add(futureTask);
-
}
-
});
-
// 循环tFutureTaskArrayList,检查所有futureTask是否都已经返回,没返回的阻塞等待,等都返回后证明所有线程都执行完成,提交offset
-
// 因为每次处理都创建新的线程,大量线程同时创建和销毁,线程数波动剧烈,考虑通过线程池进行优化
-
for (int i = 0; i < tFutureTaskArrayList.size(); i ) {
-
try {
-
String returnStr = tFutureTaskArrayList.get(i).get();
-
logger.info("[线程执行完成]" returnStr);
-
} catch (ExecutionException e) {
-
e.printStackTrace();
-
}
-
}
-
}
-
-
//同步提交offset
-
// tKafkaConsumer.commitSync();
-
//异步提交
-
tKafkaConsumer.commitAsync(new OffsetCommitCallback() {
-
-
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
-
if (exception != null) {
-
logger.error("[失败]-提交offset失败!" offsets);
-
} else {
-
logger.info("[成功]-提交offset成功!");
-
}
-
}
-
});
-
-
logger.info("【完成处理数据】-条数=[{}]-耗时=[{}]", dataCount, timer.intervalMs());
-
}
-
-
}
-
}
测试结果:
// [开始]-consumer拉取数据
// [完成]-consumer拉取数据-条数=[2000]-耗时=[5]
// [成功]-提交offset成功!
// 【完成处理数据】-条数=[2000]-耗时=[1731]
// [开始]-consumer拉取数据
// [完成]-consumer拉取数据-条数=[2000]-耗时=[4]
// [成功]-提交offset成功!
// 【完成处理数据】-条数=[2000]-耗时=[1678]
// [开始]-consumer拉取数据
// [完成]-consumer拉取数据-条数=[2000]-耗时=[23]
// [成功]-提交offset成功!
// 【完成处理数据】-条数=[2000]-耗时=[1637]
// 测试结果:2000条可以在2秒处理完成,则可以保证1000条时可以在1秒能处理完成,满足需求内容。
// 因为每次处理都创建新的线程,造成大量线程同时创建和销毁,线程数波动剧烈,GC频繁,系统各项指标均不平稳。
第二种使用Executors线程池(Executors Callable FutureTask)
通过线程池进行处理,线程数一直保持在2000个左右。
-
package com.autoee.demo.kafka.main;
-
-
import ch.qos.logback.classic.Level;
-
import ch.qos.logback.classic.LoggerContext;
-
import cn.hutool.core.date.DateUtil;
-
import cn.hutool.core.date.TimeInterval;
-
import cn.hutool.json.JSONUtil;
-
import com.autoee.demo.riskmonitor.BusiDataEntity;
-
import org.apache.kafka.clients.consumer.*;
-
import org.apache.kafka.common.TopicPartition;
-
import org.apache.kafka.common.serialization.StringDeserializer;
-
import org.slf4j.Logger;
-
import org.slf4j.LoggerFactory;
-
-
import java.util.*;
-
import java.util.concurrent.*;
-
import java.util.concurrent.atomic.AtomicInteger;
-
-
/**
-
* Title: <br>
-
* Desc: <br>
-
* Date: 2022-8-19 <br>
-
* @author Double
-
* @version 1.0.0
-
*/
-
public class KafkaConsumerMutiThreadsTest4_Executors_HashMap {
-
-
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerMutiThreadsTest4_Executors_HashMap.class);
-
-
// 设置main方法执行时的日志输出级别
-
static {
-
LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
-
List<ch.qos.logback.classic.Logger> loggerList = loggerContext.getLoggerList();
-
loggerList.forEach(logger -> {
-
logger.setLevel(Level.INFO);
-
});
-
}
-
-
// 需求内容:单个消费者,每秒需要处理1000条数据,每条数据的处理时间为500ms,相同accNum的数据需要保证消费的顺序
-
-
// 测试极限情况:数据已存在大量积压,启动消费者进行消费
-
// 每次拉取都达到设置的单次可以拉取的最大条数:2000条
-
-
public static void main(String[] args) throws InterruptedException {
-
-
Properties props = new Properties();
-
// bootstrap.servers:kafka集群地址
-
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-
// 消费者组id
-
props.put("group.id", "test_consumer_group"); //消费者组
-
// key.deserializer:key的反序列化器
-
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-
// value.deserializer:value的反序列化器
-
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-
// fetch.max.bytes:一次拉取的最小可返回数据量:1Bety
-
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 100 * 1024);
-
// fetch.max.bytes:一次拉取的最大数据量:50M
-
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 50 * 1024 * 1024);
-
// fetch.max.wait.ms:一次拉取的最大等待时间:500ms
-
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
-
// max.poll.records: 一次拉取的最大条数
-
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2000);
-
// max.partition.fetch.bytes:一次拉取时,每个分区最大拉取数据量,默认1M
-
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1 * 1024 * 1024);
-
// auto.offset.reset:当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在 (如,数据被删除了)时,自动设置开始消费的偏移量位置,默认latest。
-
// earliest:自动重置偏移量到最早的偏移量(从头开始消费)。
-
// latest:默认,自动重置偏移量为最新的偏移量(从最新的接收到的数据开始消费)。
-
// none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。
-
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
-
// enable.auto.commit:是否允许自动提交offset,默认是。
-
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
-
// auto.commit.interval.ms:自动提交offset的时间间隔,默认5秒。
-
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
-
// heartbeat.interval.ms:消费者心跳检测时间间隔,默认3秒。
-
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);
-
// session.timeout.ms:session过期时间,默认10秒。
-
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
-
// max.poll.interval.ms:一批次数据最大可以执行时间,默认5分钟。
-
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
-
// partition.assignment.strategy:分区分配策略,默认5分钟。
-
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());
-
-
KafkaConsumer<String, String> tKafkaConsumer = new KafkaConsumer<String, String>(props);
-
tKafkaConsumer.subscribe(Arrays.asList("riskMoniterTopic"));
-
-
// 使用Executors中的CachedThreadPool,初始核心线程数为0,最大线程数为无限大,线程最大空闲时间为60秒
-
// corePoolSize=0
-
// maximumPoolSize=Integer.MAX_VALUE,即2147483647,基本属于无界。
-
// keepAliveTime=60秒
-
// 工作队列使用没有容量的 SynchronousQueue,来一个任务处理一个任务,不进行缓存。如果提交任务速度高于线程池中线程处理任务的速度,则会不断创建新线程。极端情况下会创建过多的线程,耗尽 CPU 和内存资源。
-
// 可以自定义线程池进行优化
-
ExecutorService executorService = Executors.newCachedThreadPool();
-
-
HashMap<String, List<BusiDataEntity>> busiDataHashMap;
-
while (true) {
-
TimeInterval timer = DateUtil.timer();
-
logger.info("[开始]-consumer拉取数据");
-
ConsumerRecords<String, String> records = tKafkaConsumer.poll(500);
-
int dataCount = records.count();
-
AtomicInteger tAtomicInteger = new AtomicInteger();
-
logger.info("[完成]-consumer拉取数据-条数=[{}]-耗时=[{}]", dataCount, timer.intervalMs());
-
// 拉取的数据条数大于0时,才进行处理操作
-
timer = DateUtil.timer();
-
if (dataCount > 0) {
-
// 初始化hashMap:容量可以设置为拉取数据条数的1倍,或者2倍,2倍更加分散
-
// 消费者参数中设置一次拉取的最大条数为2000,基本不会超过该值。
-
// hashMap的hash碰撞概率较低,2000条数据,分布到4000容量的hashMap中时,基本不会出现碰撞,只有相同的key会在一起,导致整体执行时间为相同多个key顺序执行的时间
-
// [线程执行完成]消费者线程:pool-1-thread-1898-已处理数据数量=3-已处理的所有客户账号=GW0032,GW0032,GW0032,
-
// [线程执行完成]消费者线程:pool-1-thread-1193-已处理数据数量=2-已处理的所有客户账号=KE0055,KE0055,
-
// [线程执行完成]消费者线程:pool-1-thread-1187-已处理数据数量=2-已处理的所有客户账号=0E0005,0E0005,
-
int capacity = dataCount * 2;
-
busiDataHashMap = new HashMap<>(capacity);
-
// 将拉取的数据按客户号码分散到HashMap中
-
for (ConsumerRecord<String, String> record : records) {
-
Object value = record.value();
-
String jsonStr = JSONUtil.toJsonStr(value);
-
// logger.info("[获取]-传入报文=[{}]", jsonStr);
-
BusiDataEntity busiDataEntity = JSONUtil.toBean(jsonStr, BusiDataEntity.class);
-
String accNum = busiDataEntity.getAccNum();
-
-
if (busiDataHashMap.containsKey(accNum)) {
-
busiDataHashMap.get(accNum).add(busiDataEntity);
-
} else {
-
List<BusiDataEntity> newList = new ArrayList<>();
-
newList.add(busiDataEntity);
-
busiDataHashMap.put(accNum, newList);
-
}
-
}
-
-
ArrayList<FutureTask<String>> tFutureTaskArrayList = new ArrayList<>(dataCount);
-
// 循环hashMap,每个value开启一个线程循环处理该List中的全部数据,保证数据处理的顺序
-
int num = 0;
-
busiDataHashMap.forEach((k, v) -> {
-
List<BusiDataEntity> busiDataEntities = v;
-
String threadName = "";
-
if (busiDataEntities.size() > 0) {
-
threadName = k;
-
// 使用Callable执行同一个Key下的一组数据
-
FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
-
-
public String call() {
-
String threadName = Thread.currentThread().getName();
-
// logger.info("[获取]-消费者线程:{}-获取到待处理数据数量:{}", threadName, busiDataEntities.size());
-
String allAccNum = "";
-
for (BusiDataEntity busiDataEntity : busiDataEntities) {
-
allAccNum = allAccNum busiDataEntity.getAccNum() ",";
-
try {
-
// 模拟业务处理时间,默认500ms
-
Thread.sleep(500);
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
}
-
}
-
return "消费者线程:" threadName "-已处理数据数量=" busiDataEntities.size() "-已处理的所有客户账号=" allAccNum;
-
}
-
});
-
-
// 通过线程池进行任务处理
-
executorService.submit(futureTask);
-
// 将每个线程的futureTask都放入同一个ArrayList中
-
tFutureTaskArrayList.add(futureTask);
-
}
-
});
-
// 循环tFutureTaskArrayList,检查所有futureTask是否都已经返回,没返回的阻塞等待,等都返回后证明所有线程都执行完成,提交offset
-
// 使用线程池后,线程数一直保持在2000个左右。
-
for (int i = 0; i < tFutureTaskArrayList.size(); i ) {
-
try {
-
String returnStr = tFutureTaskArrayList.get(i).get();
-
logger.info("[线程执行完成]" returnStr);
-
} catch (ExecutionException e) {
-
e.printStackTrace();
-
}
-
}
-
}
-
-
//同步提交offset
-
// tKafkaConsumer.commitSync();
-
//异步提交
-
tKafkaConsumer.commitAsync(new OffsetCommitCallback() {
-
-
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
-
if (exception != null) {
-
logger.error("[失败]-提交offset失败!" offsets);
-
} else {
-
logger.info("[成功]-提交offset成功!");
-
}
-
}
-
});
-
-
logger.info("【完成处理数据】-条数=[{}]-耗时=[{}]", dataCount, timer.intervalMs());
-
-
}
-
-
}
-
}
测试结果:
// [开始]-consumer拉取数据
// [完成]-consumer拉取数据-条数=[2000]-耗时=[5]
// [成功]-提交offset成功!
// 【完成处理数据】-条数=[2000]-耗时=[1731]
// [开始]-consumer拉取数据
// [完成]-consumer拉取数据-条数=[2000]-耗时=[4]
// [成功]-提交offset成功!
// 【完成处理数据】-条数=[2000]-耗时=[1678]
// [开始]-consumer拉取数据
// [完成]-consumer拉取数据-条数=[2000]-耗时=[23]
// [成功]-提交offset成功!
// 【完成处理数据】-条数=[2000]-耗时=[1637]
// 测试结果:2000条可以在2秒处理完成,则可以保证1000条时可以在1秒能处理完成,满足需求内容。
// 使用线程池后,线程数一直保持在2000个左右
第三种使用Executors线程池(Executors Runnable CountDownLatch)
-
package com.autoee.demo.kafka.main;
-
-
import ch.qos.logback.classic.Level;
-
import ch.qos.logback.classic.LoggerContext;
-
import cn.hutool.core.date.DateUtil;
-
import cn.hutool.core.date.TimeInterval;
-
import cn.hutool.json.JSONUtil;
-
import com.autoee.demo.riskmonitor.BusiDataEntity;
-
import org.apache.kafka.clients.consumer.*;
-
import org.apache.kafka.common.TopicPartition;
-
import org.apache.kafka.common.serialization.StringDeserializer;
-
import org.slf4j.Logger;
-
import org.slf4j.LoggerFactory;
-
-
import java.util.*;
-
import java.util.concurrent.*;
-
import java.util.concurrent.atomic.AtomicInteger;
-
-
/**
-
* Title: <br>
-
* Desc: <br>
-
* Date: 2022-8-19 <br>
-
* @author Double
-
* @version 1.0.0
-
*/
-
public class KafkaConsumerMutiThreadsTest5_Executors_HashMap_CountDownLatch {
-
-
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerMutiThreadsTest5_Executors_HashMap_CountDownLatch.class);
-
-
// 设置main方法执行时的日志输出级别
-
static {
-
LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
-
List<ch.qos.logback.classic.Logger> loggerList = loggerContext.getLoggerList();
-
loggerList.forEach(logger -> {
-
logger.setLevel(Level.INFO);
-
});
-
}
-
-
// 需求内容:单个消费者,每秒需要处理1000条数据,每条数据的处理时间为500ms,相同accNum的数据需要保证消费的顺序
-
-
// 测试极限情况:数据已存在大量积压,启动消费者进行消费
-
// 每次拉取都达到设置的单次可以拉取的最大条数:2000条
-
-
// [开始]-consumer拉取数据
-
// [完成]-consumer拉取数据-条数=[2000]-耗时=[5]
-
// [成功]-提交offset成功!
-
// 【完成处理数据】-条数=[2000]-耗时=[1731]
-
// [开始]-consumer拉取数据
-
// [完成]-consumer拉取数据-条数=[2000]-耗时=[4]
-
// [成功]-提交offset成功!
-
// 【完成处理数据】-条数=[2000]-耗时=[1678]
-
// [开始]-consumer拉取数据
-
// [完成]-consumer拉取数据-条数=[2000]-耗时=[23]
-
// [成功]-提交offset成功!
-
// 【完成处理数据】-条数=[2000]-耗时=[1637]
-
-
// 测试结果:2000条可以在2秒处理完成,则可以保证1000条时可以在1秒能处理完成,满足需求内容。
-
// 通过线程池进行处理,线程数非常平稳,而且只需要十个左右线程就能处理每次2000条的数据。
-
-
public static void main(String[] args) throws InterruptedException {
-
-
Properties props = new Properties();
-
// bootstrap.servers:kafka集群地址
-
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-
// 消费者组id
-
props.put("group.id", "test_consumer_group"); //消费者组
-
// key.deserializer:key的反序列化器
-
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-
// value.deserializer:value的反序列化器
-
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-
// fetch.max.bytes:一次拉取的最小可返回数据量:1Bety
-
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 100 * 1024);
-
// fetch.max.bytes:一次拉取的最大数据量:50M
-
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 50 * 1024 * 1024);
-
// fetch.max.wait.ms:一次拉取的最大等待时间:500ms
-
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
-
// max.poll.records: 一次拉取的最大条数
-
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2000);
-
// max.partition.fetch.bytes:一次拉取时,每个分区最大拉取数据量,默认1M
-
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1 * 1024 * 1024);
-
// auto.offset.reset:当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在 (如,数据被删除了)时,自动设置开始消费的偏移量位置,默认latest。
-
// earliest:自动重置偏移量到最早的偏移量(从头开始消费)。
-
// latest:默认,自动重置偏移量为最新的偏移量(从最新的接收到的数据开始消费)。
-
// none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。
-
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
-
// enable.auto.commit:是否允许自动提交offset,默认是。
-
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
-
// auto.commit.interval.ms:自动提交offset的时间间隔,默认5秒。
-
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
-
// heartbeat.interval.ms:消费者心跳检测时间间隔,默认3秒。
-
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);
-
// session.timeout.ms:session过期时间,默认10秒。
-
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
-
// max.poll.interval.ms:一批次数据最大可以执行时间,默认5分钟。
-
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
-
// partition.assignment.strategy:分区分配策略,默认5分钟。
-
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());
-
-
KafkaConsumer<String, String> tKafkaConsumer = new KafkaConsumer<String, String>(props);
-
tKafkaConsumer.subscribe(Arrays.asList("riskMoniterTopic"));
-
-
// 使用Executors中的CachedThreadPool,初始核心线程数为0,最大线程数为无限大,线程最大空闲时间为60秒
-
// corePoolSize=0
-
// maximumPoolSize=Integer.MAX_VALUE,即2147483647,基本属于无界。
-
// keepAliveTime=60秒
-
// 工作队列使用没有容量的 SynchronousQueue,来一个任务处理一个任务,不进行缓存。如果提交任务速度高于线程池中线程处理任务的速度,则会不断创建新线程。极端情况下会创建过多的线程,耗尽 CPU 和内存资源。
-
// 可以自定义线程池进行优化
-
ExecutorService executorService = Executors.newCachedThreadPool();
-
-
HashMap<String, List<BusiDataEntity>> busiDataHashMap;
-
while (true) {
-
TimeInterval timer = DateUtil.timer();
-
logger.info("[开始]-consumer拉取数据");
-
ConsumerRecords<String, String> records = tKafkaConsumer.poll(500);
-
int dataCount = records.count();
-
AtomicInteger tAtomicInteger = new AtomicInteger();
-
logger.info("[完成]-consumer拉取数据-条数=[{}]-耗时=[{}]", dataCount, timer.intervalMs());
-
// 拉取的数据条数大于0时,才进行处理操作
-
timer = DateUtil.timer();
-
if (dataCount > 0) {
-
// 初始化hashMap:容量可以设置为拉取数据条数的1倍,或者2倍,2倍更加分散
-
// 消费者参数中设置一次拉取的最大条数为2000,基本不会超过该值。
-
// hashMap的hash碰撞概率较低,2000条数据,分布到4000容量的hashMap中时,基本不会出现碰撞,只有相同的key会在一起,导致整体执行时间为相同多个key顺序执行的时间
-
// [线程执行完成]消费者线程:pool-1-thread-1898-已处理数据数量=3-已处理的所有客户账号=GW0032,GW0032,GW0032,
-
// [线程执行完成]消费者线程:pool-1-thread-1193-已处理数据数量=2-已处理的所有客户账号=KE0055,KE0055,
-
// [线程执行完成]消费者线程:pool-1-thread-1187-已处理数据数量=2-已处理的所有客户账号=0E0005,0E0005,
-
int capacity = dataCount * 2;
-
busiDataHashMap = new HashMap<>(capacity);
-
// 将拉取的数据按客户号码分散到ArrayList中
-
for (ConsumerRecord<String, String> record : records) {
-
Object value = record.value();
-
String jsonStr = JSONUtil.toJsonStr(value);
-
// logger.info("[获取]-传入报文=[{}]", jsonStr);
-
BusiDataEntity busiDataEntity = JSONUtil.toBean(jsonStr, BusiDataEntity.class);
-
String accNum = busiDataEntity.getAccNum();
-
-
if (busiDataHashMap.containsKey(accNum)) {
-
busiDataHashMap.get(accNum).add(busiDataEntity);
-
} else {
-
List<BusiDataEntity> newList = new ArrayList<>();
-
newList.add(busiDataEntity);
-
busiDataHashMap.put(accNum, newList);
-
}
-
}
-
-
// 循环ArrayList,每个下标中的List数据条数大于0时,开启一个线程循环处理该List中的全部数据,保证数据处理的顺序
-
int num = 0;
-
int busiDataHashMapSize = busiDataHashMap.keySet().size();
-
// 使用CountDownLatch判断是否所有子线程都已执行完成,子线程个数等于busiDataHashMap中key的个数
-
CountDownLatch tCountDownLatch = new CountDownLatch(busiDataHashMapSize);
-
busiDataHashMap.forEach((k, v) -> {
-
List<BusiDataEntity> busiDataEntities = v;
-
String threadName = "";
-
if (busiDataEntities.size() > 0) {
-
threadName = k;
-
// 使用Runnable执行同一个Key下的一组数据
-
Runnable runnableTask = new Runnable() {
-
-
public void run() {
-
String threadName = Thread.currentThread().getName();
-
// logger.info("[获取]-消费者线程:{}-获取到待处理数据数量:{}", threadName, busiDataEntities.size());
-
String allAccNum = "";
-
String allBatchNo = "";
-
for (BusiDataEntity busiDataEntity : busiDataEntities) {
-
allAccNum = allAccNum busiDataEntity.getAccNum() ",";
-
allBatchNo = allBatchNo busiDataEntity.getBatchNo() ",";
-
try {
-
// 模拟业务处理时间,默认500ms
-
Thread.sleep(500);
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
}
-
}
-
logger.info("[线程执行完成]-消费者线程:" threadName "-已处理数据数量=" busiDataEntities.size() "-已处理的所有客户账号=" allAccNum "-已处理的所有批次号=" allBatchNo);
-
// 每个线程处理完成后,将tCountDownLatch减1
-
tCountDownLatch.countDown();
-
}
-
};
-
-
// 通过线程池进行任务处理
-
executorService.submit(runnableTask);
-
}
-
});
-
-
// 通过CountDownLatch阻塞等待,等待所有线程都执行完成,提交offset
-
tCountDownLatch.await();
-
-
//同步提交offset
-
// tKafkaConsumer.commitSync();
-
//异步提交
-
tKafkaConsumer.commitAsync(new OffsetCommitCallback() {
-
-
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
-
if (exception != null) {
-
logger.error("[失败]-提交offset失败!" offsets);
-
} else {
-
logger.info("[成功]-提交offset成功!");
-
}
-
}
-
});
-
-
logger.info("【完成处理数据】-条数=[{}]-耗时=[{}]", dataCount, timer.intervalMs());
-
logger.info("----------------------------------------------------------------------------------------------------------------------------------------");
-
-
}
-
}
-
}
-
}
测试结果:
和第二种的执行时间差不多,但是各项性能指标好像更加平稳了,但是很出现线程阻塞的情况。
如果对您有帮助,请我喝杯咖啡吧!
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhgcgikf
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
怎样阻止微信小程序自动打开
PHP中文网 06-13 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01