• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

kafka Consumer 消费者使用多线程并发执行,并保证顺序消费, 第一种使用纯线程方式、第二种使用Executors线程池

武飞扬头像
AutoEE_Double
帮助1

网上搜索kafka消费者通过多线程进行顺序消费的内容都不太理想,或者太过复杂,所以自己写了几个demo,供大家参考指正。

需求内容

        单个消费者,每秒需要处理1000条数据,每条数据的处理时间为500ms,相同accNum(客户账号)的数据需要保证消费的顺序。

注意点

1、如果1秒钟生产1000条数据,消费者处理时,每条数据需要500毫秒,则消费者每次拉取数据的条数最好能控制在500条以上,这样1秒内的数据可以拉取两次,每次使用500个线程进行处理,每次耗时500ms,

        2*500ms=1秒,基本可以保证1000条数据能够在1秒内处理完成。

如果消费者每100ms拉取一次,每次拉取100条数据,消费者使用100个线程处理这100条数据,耗时500ms,第二次再拉取100条,耗时500ms...这样处理完1秒内的1000条数据将一共需要

        10次*500ms=5秒钟,出现较大延迟。

        同时,还要注意,一批数据中存在相同的accNum(客户账号)的情况,如果存在2条相同的accNum,因为需要顺序执行,一条执行需要500ms,两条顺序执行完成将花费1秒,这批数据的整体完成时间将变为1秒。

        注意这三个参数的调整:

        // fetch.max.bytes:一次拉取的最小可返回数据量:1Bety
        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 100 * 1024);
        // fetch.max.wait.ms:一次拉取的最大等待时间:500ms
        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
        // max.poll.records: 一次拉取的最大条数
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2000);

        注意消费者的拉取延迟时间:

        tKafkaConsumer.poll(500);

2、每批次数据处理时,创建的线程数,会根据每次拉取的数据条数自动调整,最大线程数为消费者每次允许拉取的最大数据条数。这样系统可以根据数据量大小自动调整创建的线程数,线程池中的空闲线程可以在一定时间后自动释放。可以保证不同accNum(客户账号)的数据每次都分配一个线程单独处理,从而保证处理的时间(500ms)。

第一种使用纯线程方式(Thread Callable FutureTask)

因为每次处理都创建新的线程,造成大量线程同时创建和销毁,线程数波动剧烈,GC频繁,系统各项指标均不平稳。
  1.  
    package com.autoee.demo.kafka.main;
  2.  
     
  3.  
    import ch.qos.logback.classic.Level;
  4.  
    import ch.qos.logback.classic.LoggerContext;
  5.  
    import cn.hutool.core.date.DateUtil;
  6.  
    import cn.hutool.core.date.TimeInterval;
  7.  
    import cn.hutool.core.map.MapUtil;
  8.  
    import cn.hutool.json.JSONUtil;
  9.  
    import com.autoee.demo.riskmonitor.BusiDataEntity;
  10.  
    import org.apache.kafka.clients.consumer.*;
  11.  
    import org.apache.kafka.common.TopicPartition;
  12.  
    import org.apache.kafka.common.serialization.StringDeserializer;
  13.  
    import org.slf4j.Logger;
  14.  
    import org.slf4j.LoggerFactory;
  15.  
     
  16.  
    import java.util.*;
  17.  
    import java.util.concurrent.Callable;
  18.  
    import java.util.concurrent.ExecutionException;
  19.  
    import java.util.concurrent.FutureTask;
  20.  
    import java.util.concurrent.atomic.AtomicInteger;
  21.  
     
  22.  
    /**
  23.  
    * Title: <br>
  24.  
    * Desc: <br>
  25.  
    * Date: 2022-8-19 <br>
  26.  
    * @author Double
  27.  
    * @version 1.0.0
  28.  
    */
  29.  
    public class KafkaConsumerMutiThreadsTest3_Callable_HashMap {
  30.  
     
  31.  
    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerMutiThreadsTest3_Callable_HashMap.class);
  32.  
     
  33.  
    // 设置main方法执行时的日志输出级别
  34.  
    static {
  35.  
    LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
  36.  
    List<ch.qos.logback.classic.Logger> loggerList = loggerContext.getLoggerList();
  37.  
    loggerList.forEach(logger -> {
  38.  
    logger.setLevel(Level.INFO);
  39.  
    });
  40.  
    }
  41.  
     
  42.  
    // 需求内容:单个消费者,每秒需要处理1000条数据,每条数据的处理时间为500ms,相同accNum的数据需要保证消费的顺序
  43.  
     
  44.  
    // 测试极限情况:数据已存在大量积压,启动消费者进行消费
  45.  
    // 每次拉取都达到设置的单次可以拉取的最大条数:2000条
  46.  
     
  47.  
     
  48.  
    public static void main(String[] args) throws InterruptedException {
  49.  
     
  50.  
    Properties props = new Properties();
  51.  
    // bootstrap.servers:kafka集群地址
  52.  
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  53.  
    // 消费者组id
  54.  
    props.put("group.id", "test_consumer_group"); //消费者组
  55.  
    // key.deserializer:key的反序列化器
  56.  
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  57.  
    // value.deserializer:value的反序列化器
  58.  
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  59.  
    // fetch.max.bytes:一次拉取的最小可返回数据量:1Bety
  60.  
    props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 100 * 1024);
  61.  
    // fetch.max.bytes:一次拉取的最大数据量:50M
  62.  
    props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 50 * 1024 * 1024);
  63.  
    // fetch.max.wait.ms:一次拉取的最大等待时间:500ms
  64.  
    props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
  65.  
    // max.poll.records: 一次拉取的最大条数
  66.  
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2000);
  67.  
    // max.partition.fetch.bytes:一次拉取时,每个分区最大拉取数据量,默认1M
  68.  
    props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1 * 1024 * 1024);
  69.  
    // auto.offset.reset:当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在 (如,数据被删除了)时,自动设置开始消费的偏移量位置,默认latest。
  70.  
    // earliest:自动重置偏移量到最早的偏移量(从头开始消费)。
  71.  
    // latest:默认,自动重置偏移量为最新的偏移量(从最新的接收到的数据开始消费)。
  72.  
    // none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。
  73.  
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
  74.  
    // enable.auto.commit:是否允许自动提交offset,默认是。
  75.  
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  76.  
    // auto.commit.interval.ms:自动提交offset的时间间隔,默认5秒。
  77.  
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
  78.  
    // heartbeat.interval.ms:消费者心跳检测时间间隔,默认3秒。
  79.  
    props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);
  80.  
    // session.timeout.ms:session过期时间,默认10秒。
  81.  
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
  82.  
    // max.poll.interval.ms:一批次数据最大可以执行时间,默认5分钟。
  83.  
    props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
  84.  
    // partition.assignment.strategy:分区分配策略,默认5分钟。
  85.  
    props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());
  86.  
     
  87.  
    KafkaConsumer<String, String> tKafkaConsumer = new KafkaConsumer<String, String>(props);
  88.  
    tKafkaConsumer.subscribe(Arrays.asList("riskMoniterTopic"));
  89.  
     
  90.  
    HashMap<String, List<BusiDataEntity>> hashMap;
  91.  
    while (true) {
  92.  
    TimeInterval timer = DateUtil.timer();
  93.  
    logger.info("[开始]-consumer拉取数据");
  94.  
    ConsumerRecords<String, String> records = tKafkaConsumer.poll(500);
  95.  
    int dataCount = records.count();
  96.  
    AtomicInteger tAtomicInteger = new AtomicInteger();
  97.  
    logger.info("[完成]-consumer拉取数据-条数=[{}]-耗时=[{}]", dataCount, timer.intervalMs());
  98.  
    // 拉取的数据条数大于0时,才进行处理操作
  99.  
    timer = DateUtil.timer();
  100.  
    if (dataCount > 0) {
  101.  
    // 初始化hashMap:容量可以设置为拉取数据条数的1倍,或者2倍,2倍更加分散
  102.  
    // 消费者参数中设置一次拉取的最大条数为2000,基本不会超过该值。
  103.  
    // hashMap的hash碰撞概率较低,2000条数据,分布到4000容量的hashMap中时,基本不会出现碰撞,只有相同的key会在一起,导致整体执行时间为相同多个key顺序执行的时间
  104.  
    // [线程执行完成]消费者线程:consumer-thread-VV0039-已处理数据数量=3-已处理的所有客户账号=VV0039,VV0039,VV0039,
  105.  
    // [线程执行完成]消费者线程:consumer-thread-AG0097-已处理数据数量=2-已处理的所有客户账号=AG0097,AG0097,
  106.  
    // [线程执行完成]消费者线程:consumer-thread-ID0045-已处理数据数量=1-已处理的所有客户账号=ID0045,
  107.  
    int arrListCapacity = dataCount * 2;
  108.  
    hashMap = new HashMap<>(arrListCapacity);
  109.  
    // 将拉取的数据按客户号码分散到HashMap中
  110.  
    for (ConsumerRecord<String, String> record : records) {
  111.  
    Object value = record.value();
  112.  
    String jsonStr = JSONUtil.toJsonStr(value);
  113.  
    // logger.info("[获取]-传入报文=[{}]", jsonStr);
  114.  
    BusiDataEntity busiDataEntity = JSONUtil.toBean(jsonStr, BusiDataEntity.class);
  115.  
    String accNum = busiDataEntity.getAccNum();
  116.  
     
  117.  
    if (hashMap.containsKey(accNum)) {
  118.  
    hashMap.get(accNum).add(busiDataEntity);
  119.  
    } else {
  120.  
    List<BusiDataEntity> newList = new ArrayList<>();
  121.  
    newList.add(busiDataEntity);
  122.  
    hashMap.put(accNum, newList);
  123.  
    }
  124.  
    }
  125.  
     
  126.  
    ArrayList<FutureTask<String>> tFutureTaskArrayList = new ArrayList<>(dataCount);
  127.  
    // 循环hashMap,每个value开启一个线程循环处理该List中的全部数据,保证数据处理的顺序
  128.  
    int num = 0;
  129.  
    hashMap.forEach((k, v) -> {
  130.  
    List<BusiDataEntity> busiDataEntities = v;
  131.  
    String threadName = "";
  132.  
    if (busiDataEntities.size() > 0) {
  133.  
    threadName = "consumer-thread-" k;
  134.  
    // 使用Callable执行一组数据
  135.  
    FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
  136.  
    @Override
  137.  
    public String call() {
  138.  
    String threadName = Thread.currentThread().getName();
  139.  
    // logger.info("[获取]-消费者线程:{}-获取到待处理数据数量:{}", threadName, busiDataEntities.size());
  140.  
    String allAccNum = "";
  141.  
    for (BusiDataEntity busiDataEntity : busiDataEntities) {
  142.  
    allAccNum = allAccNum busiDataEntity.getAccNum() ",";
  143.  
    try {
  144.  
    // 模拟业务处理时间,默认500ms
  145.  
    Thread.sleep(500);
  146.  
    } catch (InterruptedException e) {
  147.  
    e.printStackTrace();
  148.  
    }
  149.  
    }
  150.  
    return "消费者线程:" threadName "-已处理数据数量=" busiDataEntities.size() "-已处理的所有客户账号=" allAccNum;
  151.  
    }
  152.  
    });
  153.  
     
  154.  
    // 启动一个线程执行一组数据
  155.  
    new Thread(futureTask, threadName).start();
  156.  
    // 将每个线程的futureTask都放入同一个ArrayList中
  157.  
    tFutureTaskArrayList.add(futureTask);
  158.  
    }
  159.  
    });
  160.  
    // 循环tFutureTaskArrayList,检查所有futureTask是否都已经返回,没返回的阻塞等待,等都返回后证明所有线程都执行完成,提交offset
  161.  
    // 因为每次处理都创建新的线程,大量线程同时创建和销毁,线程数波动剧烈,考虑通过线程池进行优化
  162.  
    for (int i = 0; i < tFutureTaskArrayList.size(); i ) {
  163.  
    try {
  164.  
    String returnStr = tFutureTaskArrayList.get(i).get();
  165.  
    logger.info("[线程执行完成]" returnStr);
  166.  
    } catch (ExecutionException e) {
  167.  
    e.printStackTrace();
  168.  
    }
  169.  
    }
  170.  
    }
  171.  
     
  172.  
    //同步提交offset
  173.  
    // tKafkaConsumer.commitSync();
  174.  
    //异步提交
  175.  
    tKafkaConsumer.commitAsync(new OffsetCommitCallback() {
  176.  
    @Override
  177.  
    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
  178.  
    if (exception != null) {
  179.  
    logger.error("[失败]-提交offset失败!" offsets);
  180.  
    } else {
  181.  
    logger.info("[成功]-提交offset成功!");
  182.  
    }
  183.  
    }
  184.  
    });
  185.  
     
  186.  
    logger.info("【完成处理数据】-条数=[{}]-耗时=[{}]", dataCount, timer.intervalMs());
  187.  
    }
  188.  
     
  189.  
    }
  190.  
    }
学新通

学新通

测试结果:


    // [开始]-consumer拉取数据
    // [完成]-consumer拉取数据-条数=[2000]-耗时=[5]
    // [成功]-提交offset成功!
    // 【完成处理数据】-条数=[2000]-耗时=[1731]
    // [开始]-consumer拉取数据
    // [完成]-consumer拉取数据-条数=[2000]-耗时=[4]
    // [成功]-提交offset成功!
    // 【完成处理数据】-条数=[2000]-耗时=[1678]
    // [开始]-consumer拉取数据
    // [完成]-consumer拉取数据-条数=[2000]-耗时=[23]
    // [成功]-提交offset成功!
    // 【完成处理数据】-条数=[2000]-耗时=[1637]

    // 测试结果:2000条可以在2秒处理完成,则可以保证1000条时可以在1秒能处理完成,满足需求内容。
    // 因为每次处理都创建新的线程,造成大量线程同时创建和销毁,线程数波动剧烈,GC频繁,系统各项指标均不平稳。

第二种使用Executors线程池(Executors Callable FutureTask)

通过线程池进行处理,线程数一直保持在2000个左右。
  1.  
    package com.autoee.demo.kafka.main;
  2.  
     
  3.  
    import ch.qos.logback.classic.Level;
  4.  
    import ch.qos.logback.classic.LoggerContext;
  5.  
    import cn.hutool.core.date.DateUtil;
  6.  
    import cn.hutool.core.date.TimeInterval;
  7.  
    import cn.hutool.json.JSONUtil;
  8.  
    import com.autoee.demo.riskmonitor.BusiDataEntity;
  9.  
    import org.apache.kafka.clients.consumer.*;
  10.  
    import org.apache.kafka.common.TopicPartition;
  11.  
    import org.apache.kafka.common.serialization.StringDeserializer;
  12.  
    import org.slf4j.Logger;
  13.  
    import org.slf4j.LoggerFactory;
  14.  
     
  15.  
    import java.util.*;
  16.  
    import java.util.concurrent.*;
  17.  
    import java.util.concurrent.atomic.AtomicInteger;
  18.  
     
  19.  
    /**
  20.  
    * Title: <br>
  21.  
    * Desc: <br>
  22.  
    * Date: 2022-8-19 <br>
  23.  
    * @author Double
  24.  
    * @version 1.0.0
  25.  
    */
  26.  
    public class KafkaConsumerMutiThreadsTest4_Executors_HashMap {
  27.  
     
  28.  
    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerMutiThreadsTest4_Executors_HashMap.class);
  29.  
     
  30.  
    // 设置main方法执行时的日志输出级别
  31.  
    static {
  32.  
    LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
  33.  
    List<ch.qos.logback.classic.Logger> loggerList = loggerContext.getLoggerList();
  34.  
    loggerList.forEach(logger -> {
  35.  
    logger.setLevel(Level.INFO);
  36.  
    });
  37.  
    }
  38.  
     
  39.  
    // 需求内容:单个消费者,每秒需要处理1000条数据,每条数据的处理时间为500ms,相同accNum的数据需要保证消费的顺序
  40.  
     
  41.  
    // 测试极限情况:数据已存在大量积压,启动消费者进行消费
  42.  
    // 每次拉取都达到设置的单次可以拉取的最大条数:2000条
  43.  
     
  44.  
    public static void main(String[] args) throws InterruptedException {
  45.  
     
  46.  
    Properties props = new Properties();
  47.  
    // bootstrap.servers:kafka集群地址
  48.  
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  49.  
    // 消费者组id
  50.  
    props.put("group.id", "test_consumer_group"); //消费者组
  51.  
    // key.deserializer:key的反序列化器
  52.  
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  53.  
    // value.deserializer:value的反序列化器
  54.  
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  55.  
    // fetch.max.bytes:一次拉取的最小可返回数据量:1Bety
  56.  
    props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 100 * 1024);
  57.  
    // fetch.max.bytes:一次拉取的最大数据量:50M
  58.  
    props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 50 * 1024 * 1024);
  59.  
    // fetch.max.wait.ms:一次拉取的最大等待时间:500ms
  60.  
    props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
  61.  
    // max.poll.records: 一次拉取的最大条数
  62.  
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2000);
  63.  
    // max.partition.fetch.bytes:一次拉取时,每个分区最大拉取数据量,默认1M
  64.  
    props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1 * 1024 * 1024);
  65.  
    // auto.offset.reset:当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在 (如,数据被删除了)时,自动设置开始消费的偏移量位置,默认latest。
  66.  
    // earliest:自动重置偏移量到最早的偏移量(从头开始消费)。
  67.  
    // latest:默认,自动重置偏移量为最新的偏移量(从最新的接收到的数据开始消费)。
  68.  
    // none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。
  69.  
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
  70.  
    // enable.auto.commit:是否允许自动提交offset,默认是。
  71.  
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  72.  
    // auto.commit.interval.ms:自动提交offset的时间间隔,默认5秒。
  73.  
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
  74.  
    // heartbeat.interval.ms:消费者心跳检测时间间隔,默认3秒。
  75.  
    props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);
  76.  
    // session.timeout.ms:session过期时间,默认10秒。
  77.  
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
  78.  
    // max.poll.interval.ms:一批次数据最大可以执行时间,默认5分钟。
  79.  
    props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
  80.  
    // partition.assignment.strategy:分区分配策略,默认5分钟。
  81.  
    props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());
  82.  
     
  83.  
    KafkaConsumer<String, String> tKafkaConsumer = new KafkaConsumer<String, String>(props);
  84.  
    tKafkaConsumer.subscribe(Arrays.asList("riskMoniterTopic"));
  85.  
     
  86.  
    // 使用Executors中的CachedThreadPool,初始核心线程数为0,最大线程数为无限大,线程最大空闲时间为60秒
  87.  
    // corePoolSize=0
  88.  
    // maximumPoolSize=Integer.MAX_VALUE,即2147483647,基本属于无界。
  89.  
    // keepAliveTime=60秒
  90.  
    // 工作队列使用没有容量的 SynchronousQueue,来一个任务处理一个任务,不进行缓存。如果提交任务速度高于线程池中线程处理任务的速度,则会不断创建新线程。极端情况下会创建过多的线程,耗尽 CPU 和内存资源。
  91.  
    // 可以自定义线程池进行优化
  92.  
    ExecutorService executorService = Executors.newCachedThreadPool();
  93.  
     
  94.  
    HashMap<String, List<BusiDataEntity>> busiDataHashMap;
  95.  
    while (true) {
  96.  
    TimeInterval timer = DateUtil.timer();
  97.  
    logger.info("[开始]-consumer拉取数据");
  98.  
    ConsumerRecords<String, String> records = tKafkaConsumer.poll(500);
  99.  
    int dataCount = records.count();
  100.  
    AtomicInteger tAtomicInteger = new AtomicInteger();
  101.  
    logger.info("[完成]-consumer拉取数据-条数=[{}]-耗时=[{}]", dataCount, timer.intervalMs());
  102.  
    // 拉取的数据条数大于0时,才进行处理操作
  103.  
    timer = DateUtil.timer();
  104.  
    if (dataCount > 0) {
  105.  
    // 初始化hashMap:容量可以设置为拉取数据条数的1倍,或者2倍,2倍更加分散
  106.  
    // 消费者参数中设置一次拉取的最大条数为2000,基本不会超过该值。
  107.  
    // hashMap的hash碰撞概率较低,2000条数据,分布到4000容量的hashMap中时,基本不会出现碰撞,只有相同的key会在一起,导致整体执行时间为相同多个key顺序执行的时间
  108.  
    // [线程执行完成]消费者线程:pool-1-thread-1898-已处理数据数量=3-已处理的所有客户账号=GW0032,GW0032,GW0032,
  109.  
    // [线程执行完成]消费者线程:pool-1-thread-1193-已处理数据数量=2-已处理的所有客户账号=KE0055,KE0055,
  110.  
    // [线程执行完成]消费者线程:pool-1-thread-1187-已处理数据数量=2-已处理的所有客户账号=0E0005,0E0005,
  111.  
    int capacity = dataCount * 2;
  112.  
    busiDataHashMap = new HashMap<>(capacity);
  113.  
    // 将拉取的数据按客户号码分散到HashMap中
  114.  
    for (ConsumerRecord<String, String> record : records) {
  115.  
    Object value = record.value();
  116.  
    String jsonStr = JSONUtil.toJsonStr(value);
  117.  
    // logger.info("[获取]-传入报文=[{}]", jsonStr);
  118.  
    BusiDataEntity busiDataEntity = JSONUtil.toBean(jsonStr, BusiDataEntity.class);
  119.  
    String accNum = busiDataEntity.getAccNum();
  120.  
     
  121.  
    if (busiDataHashMap.containsKey(accNum)) {
  122.  
    busiDataHashMap.get(accNum).add(busiDataEntity);
  123.  
    } else {
  124.  
    List<BusiDataEntity> newList = new ArrayList<>();
  125.  
    newList.add(busiDataEntity);
  126.  
    busiDataHashMap.put(accNum, newList);
  127.  
    }
  128.  
    }
  129.  
     
  130.  
    ArrayList<FutureTask<String>> tFutureTaskArrayList = new ArrayList<>(dataCount);
  131.  
    // 循环hashMap,每个value开启一个线程循环处理该List中的全部数据,保证数据处理的顺序
  132.  
    int num = 0;
  133.  
    busiDataHashMap.forEach((k, v) -> {
  134.  
    List<BusiDataEntity> busiDataEntities = v;
  135.  
    String threadName = "";
  136.  
    if (busiDataEntities.size() > 0) {
  137.  
    threadName = k;
  138.  
    // 使用Callable执行同一个Key下的一组数据
  139.  
    FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
  140.  
    @Override
  141.  
    public String call() {
  142.  
    String threadName = Thread.currentThread().getName();
  143.  
    // logger.info("[获取]-消费者线程:{}-获取到待处理数据数量:{}", threadName, busiDataEntities.size());
  144.  
    String allAccNum = "";
  145.  
    for (BusiDataEntity busiDataEntity : busiDataEntities) {
  146.  
    allAccNum = allAccNum busiDataEntity.getAccNum() ",";
  147.  
    try {
  148.  
    // 模拟业务处理时间,默认500ms
  149.  
    Thread.sleep(500);
  150.  
    } catch (InterruptedException e) {
  151.  
    e.printStackTrace();
  152.  
    }
  153.  
    }
  154.  
    return "消费者线程:" threadName "-已处理数据数量=" busiDataEntities.size() "-已处理的所有客户账号=" allAccNum;
  155.  
    }
  156.  
    });
  157.  
     
  158.  
    // 通过线程池进行任务处理
  159.  
    executorService.submit(futureTask);
  160.  
    // 将每个线程的futureTask都放入同一个ArrayList中
  161.  
    tFutureTaskArrayList.add(futureTask);
  162.  
    }
  163.  
    });
  164.  
    // 循环tFutureTaskArrayList,检查所有futureTask是否都已经返回,没返回的阻塞等待,等都返回后证明所有线程都执行完成,提交offset
  165.  
    // 使用线程池后,线程数一直保持在2000个左右。
  166.  
    for (int i = 0; i < tFutureTaskArrayList.size(); i ) {
  167.  
    try {
  168.  
    String returnStr = tFutureTaskArrayList.get(i).get();
  169.  
    logger.info("[线程执行完成]" returnStr);
  170.  
    } catch (ExecutionException e) {
  171.  
    e.printStackTrace();
  172.  
    }
  173.  
    }
  174.  
    }
  175.  
     
  176.  
    //同步提交offset
  177.  
    // tKafkaConsumer.commitSync();
  178.  
    //异步提交
  179.  
    tKafkaConsumer.commitAsync(new OffsetCommitCallback() {
  180.  
    @Override
  181.  
    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
  182.  
    if (exception != null) {
  183.  
    logger.error("[失败]-提交offset失败!" offsets);
  184.  
    } else {
  185.  
    logger.info("[成功]-提交offset成功!");
  186.  
    }
  187.  
    }
  188.  
    });
  189.  
     
  190.  
    logger.info("【完成处理数据】-条数=[{}]-耗时=[{}]", dataCount, timer.intervalMs());
  191.  
     
  192.  
    }
  193.  
     
  194.  
    }
  195.  
    }
学新通

学新通

测试结果:

    // [开始]-consumer拉取数据
    // [完成]-consumer拉取数据-条数=[2000]-耗时=[5]
    // [成功]-提交offset成功!
    // 【完成处理数据】-条数=[2000]-耗时=[1731]
    // [开始]-consumer拉取数据
    // [完成]-consumer拉取数据-条数=[2000]-耗时=[4]
    // [成功]-提交offset成功!
    // 【完成处理数据】-条数=[2000]-耗时=[1678]
    // [开始]-consumer拉取数据
    // [完成]-consumer拉取数据-条数=[2000]-耗时=[23]
    // [成功]-提交offset成功!
    // 【完成处理数据】-条数=[2000]-耗时=[1637]

    // 测试结果:2000条可以在2秒处理完成,则可以保证1000条时可以在1秒能处理完成,满足需求内容。
    // 使用线程池后,线程数一直保持在2000个左右

第三种使用Executors线程池(Executors Runnable CountDownLatch)

  1.  
    package com.autoee.demo.kafka.main;
  2.  
     
  3.  
    import ch.qos.logback.classic.Level;
  4.  
    import ch.qos.logback.classic.LoggerContext;
  5.  
    import cn.hutool.core.date.DateUtil;
  6.  
    import cn.hutool.core.date.TimeInterval;
  7.  
    import cn.hutool.json.JSONUtil;
  8.  
    import com.autoee.demo.riskmonitor.BusiDataEntity;
  9.  
    import org.apache.kafka.clients.consumer.*;
  10.  
    import org.apache.kafka.common.TopicPartition;
  11.  
    import org.apache.kafka.common.serialization.StringDeserializer;
  12.  
    import org.slf4j.Logger;
  13.  
    import org.slf4j.LoggerFactory;
  14.  
     
  15.  
    import java.util.*;
  16.  
    import java.util.concurrent.*;
  17.  
    import java.util.concurrent.atomic.AtomicInteger;
  18.  
     
  19.  
    /**
  20.  
    * Title: <br>
  21.  
    * Desc: <br>
  22.  
    * Date: 2022-8-19 <br>
  23.  
    * @author Double
  24.  
    * @version 1.0.0
  25.  
    */
  26.  
    public class KafkaConsumerMutiThreadsTest5_Executors_HashMap_CountDownLatch {
  27.  
     
  28.  
    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerMutiThreadsTest5_Executors_HashMap_CountDownLatch.class);
  29.  
     
  30.  
    // 设置main方法执行时的日志输出级别
  31.  
    static {
  32.  
    LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
  33.  
    List<ch.qos.logback.classic.Logger> loggerList = loggerContext.getLoggerList();
  34.  
    loggerList.forEach(logger -> {
  35.  
    logger.setLevel(Level.INFO);
  36.  
    });
  37.  
    }
  38.  
     
  39.  
    // 需求内容:单个消费者,每秒需要处理1000条数据,每条数据的处理时间为500ms,相同accNum的数据需要保证消费的顺序
  40.  
     
  41.  
    // 测试极限情况:数据已存在大量积压,启动消费者进行消费
  42.  
    // 每次拉取都达到设置的单次可以拉取的最大条数:2000条
  43.  
     
  44.  
    // [开始]-consumer拉取数据
  45.  
    // [完成]-consumer拉取数据-条数=[2000]-耗时=[5]
  46.  
    // [成功]-提交offset成功!
  47.  
    // 【完成处理数据】-条数=[2000]-耗时=[1731]
  48.  
    // [开始]-consumer拉取数据
  49.  
    // [完成]-consumer拉取数据-条数=[2000]-耗时=[4]
  50.  
    // [成功]-提交offset成功!
  51.  
    // 【完成处理数据】-条数=[2000]-耗时=[1678]
  52.  
    // [开始]-consumer拉取数据
  53.  
    // [完成]-consumer拉取数据-条数=[2000]-耗时=[23]
  54.  
    // [成功]-提交offset成功!
  55.  
    // 【完成处理数据】-条数=[2000]-耗时=[1637]
  56.  
     
  57.  
    // 测试结果:2000条可以在2秒处理完成,则可以保证1000条时可以在1秒能处理完成,满足需求内容。
  58.  
    // 通过线程池进行处理,线程数非常平稳,而且只需要十个左右线程就能处理每次2000条的数据。
  59.  
     
  60.  
    public static void main(String[] args) throws InterruptedException {
  61.  
     
  62.  
    Properties props = new Properties();
  63.  
    // bootstrap.servers:kafka集群地址
  64.  
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  65.  
    // 消费者组id
  66.  
    props.put("group.id", "test_consumer_group"); //消费者组
  67.  
    // key.deserializer:key的反序列化器
  68.  
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  69.  
    // value.deserializer:value的反序列化器
  70.  
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  71.  
    // fetch.max.bytes:一次拉取的最小可返回数据量:1Bety
  72.  
    props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 100 * 1024);
  73.  
    // fetch.max.bytes:一次拉取的最大数据量:50M
  74.  
    props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 50 * 1024 * 1024);
  75.  
    // fetch.max.wait.ms:一次拉取的最大等待时间:500ms
  76.  
    props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
  77.  
    // max.poll.records: 一次拉取的最大条数
  78.  
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2000);
  79.  
    // max.partition.fetch.bytes:一次拉取时,每个分区最大拉取数据量,默认1M
  80.  
    props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1 * 1024 * 1024);
  81.  
    // auto.offset.reset:当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在 (如,数据被删除了)时,自动设置开始消费的偏移量位置,默认latest。
  82.  
    // earliest:自动重置偏移量到最早的偏移量(从头开始消费)。
  83.  
    // latest:默认,自动重置偏移量为最新的偏移量(从最新的接收到的数据开始消费)。
  84.  
    // none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。
  85.  
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
  86.  
    // enable.auto.commit:是否允许自动提交offset,默认是。
  87.  
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  88.  
    // auto.commit.interval.ms:自动提交offset的时间间隔,默认5秒。
  89.  
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
  90.  
    // heartbeat.interval.ms:消费者心跳检测时间间隔,默认3秒。
  91.  
    props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);
  92.  
    // session.timeout.ms:session过期时间,默认10秒。
  93.  
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
  94.  
    // max.poll.interval.ms:一批次数据最大可以执行时间,默认5分钟。
  95.  
    props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
  96.  
    // partition.assignment.strategy:分区分配策略,默认5分钟。
  97.  
    props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());
  98.  
     
  99.  
    KafkaConsumer<String, String> tKafkaConsumer = new KafkaConsumer<String, String>(props);
  100.  
    tKafkaConsumer.subscribe(Arrays.asList("riskMoniterTopic"));
  101.  
     
  102.  
    // 使用Executors中的CachedThreadPool,初始核心线程数为0,最大线程数为无限大,线程最大空闲时间为60秒
  103.  
    // corePoolSize=0
  104.  
    // maximumPoolSize=Integer.MAX_VALUE,即2147483647,基本属于无界。
  105.  
    // keepAliveTime=60秒
  106.  
    // 工作队列使用没有容量的 SynchronousQueue,来一个任务处理一个任务,不进行缓存。如果提交任务速度高于线程池中线程处理任务的速度,则会不断创建新线程。极端情况下会创建过多的线程,耗尽 CPU 和内存资源。
  107.  
    // 可以自定义线程池进行优化
  108.  
    ExecutorService executorService = Executors.newCachedThreadPool();
  109.  
     
  110.  
    HashMap<String, List<BusiDataEntity>> busiDataHashMap;
  111.  
    while (true) {
  112.  
    TimeInterval timer = DateUtil.timer();
  113.  
    logger.info("[开始]-consumer拉取数据");
  114.  
    ConsumerRecords<String, String> records = tKafkaConsumer.poll(500);
  115.  
    int dataCount = records.count();
  116.  
    AtomicInteger tAtomicInteger = new AtomicInteger();
  117.  
    logger.info("[完成]-consumer拉取数据-条数=[{}]-耗时=[{}]", dataCount, timer.intervalMs());
  118.  
    // 拉取的数据条数大于0时,才进行处理操作
  119.  
    timer = DateUtil.timer();
  120.  
    if (dataCount > 0) {
  121.  
    // 初始化hashMap:容量可以设置为拉取数据条数的1倍,或者2倍,2倍更加分散
  122.  
    // 消费者参数中设置一次拉取的最大条数为2000,基本不会超过该值。
  123.  
    // hashMap的hash碰撞概率较低,2000条数据,分布到4000容量的hashMap中时,基本不会出现碰撞,只有相同的key会在一起,导致整体执行时间为相同多个key顺序执行的时间
  124.  
    // [线程执行完成]消费者线程:pool-1-thread-1898-已处理数据数量=3-已处理的所有客户账号=GW0032,GW0032,GW0032,
  125.  
    // [线程执行完成]消费者线程:pool-1-thread-1193-已处理数据数量=2-已处理的所有客户账号=KE0055,KE0055,
  126.  
    // [线程执行完成]消费者线程:pool-1-thread-1187-已处理数据数量=2-已处理的所有客户账号=0E0005,0E0005,
  127.  
    int capacity = dataCount * 2;
  128.  
    busiDataHashMap = new HashMap<>(capacity);
  129.  
    // 将拉取的数据按客户号码分散到ArrayList中
  130.  
    for (ConsumerRecord<String, String> record : records) {
  131.  
    Object value = record.value();
  132.  
    String jsonStr = JSONUtil.toJsonStr(value);
  133.  
    // logger.info("[获取]-传入报文=[{}]", jsonStr);
  134.  
    BusiDataEntity busiDataEntity = JSONUtil.toBean(jsonStr, BusiDataEntity.class);
  135.  
    String accNum = busiDataEntity.getAccNum();
  136.  
     
  137.  
    if (busiDataHashMap.containsKey(accNum)) {
  138.  
    busiDataHashMap.get(accNum).add(busiDataEntity);
  139.  
    } else {
  140.  
    List<BusiDataEntity> newList = new ArrayList<>();
  141.  
    newList.add(busiDataEntity);
  142.  
    busiDataHashMap.put(accNum, newList);
  143.  
    }
  144.  
    }
  145.  
     
  146.  
    // 循环ArrayList,每个下标中的List数据条数大于0时,开启一个线程循环处理该List中的全部数据,保证数据处理的顺序
  147.  
    int num = 0;
  148.  
    int busiDataHashMapSize = busiDataHashMap.keySet().size();
  149.  
    // 使用CountDownLatch判断是否所有子线程都已执行完成,子线程个数等于busiDataHashMap中key的个数
  150.  
    CountDownLatch tCountDownLatch = new CountDownLatch(busiDataHashMapSize);
  151.  
    busiDataHashMap.forEach((k, v) -> {
  152.  
    List<BusiDataEntity> busiDataEntities = v;
  153.  
    String threadName = "";
  154.  
    if (busiDataEntities.size() > 0) {
  155.  
    threadName = k;
  156.  
    // 使用Runnable执行同一个Key下的一组数据
  157.  
    Runnable runnableTask = new Runnable() {
  158.  
    @Override
  159.  
    public void run() {
  160.  
    String threadName = Thread.currentThread().getName();
  161.  
    // logger.info("[获取]-消费者线程:{}-获取到待处理数据数量:{}", threadName, busiDataEntities.size());
  162.  
    String allAccNum = "";
  163.  
    String allBatchNo = "";
  164.  
    for (BusiDataEntity busiDataEntity : busiDataEntities) {
  165.  
    allAccNum = allAccNum busiDataEntity.getAccNum() ",";
  166.  
    allBatchNo = allBatchNo busiDataEntity.getBatchNo() ",";
  167.  
    try {
  168.  
    // 模拟业务处理时间,默认500ms
  169.  
    Thread.sleep(500);
  170.  
    } catch (InterruptedException e) {
  171.  
    e.printStackTrace();
  172.  
    }
  173.  
    }
  174.  
    logger.info("[线程执行完成]-消费者线程:" threadName "-已处理数据数量=" busiDataEntities.size() "-已处理的所有客户账号=" allAccNum "-已处理的所有批次号=" allBatchNo);
  175.  
    // 每个线程处理完成后,将tCountDownLatch减1
  176.  
    tCountDownLatch.countDown();
  177.  
    }
  178.  
    };
  179.  
     
  180.  
    // 通过线程池进行任务处理
  181.  
    executorService.submit(runnableTask);
  182.  
    }
  183.  
    });
  184.  
     
  185.  
    // 通过CountDownLatch阻塞等待,等待所有线程都执行完成,提交offset
  186.  
    tCountDownLatch.await();
  187.  
     
  188.  
    //同步提交offset
  189.  
    // tKafkaConsumer.commitSync();
  190.  
    //异步提交
  191.  
    tKafkaConsumer.commitAsync(new OffsetCommitCallback() {
  192.  
    @Override
  193.  
    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
  194.  
    if (exception != null) {
  195.  
    logger.error("[失败]-提交offset失败!" offsets);
  196.  
    } else {
  197.  
    logger.info("[成功]-提交offset成功!");
  198.  
    }
  199.  
    }
  200.  
    });
  201.  
     
  202.  
    logger.info("【完成处理数据】-条数=[{}]-耗时=[{}]", dataCount, timer.intervalMs());
  203.  
    logger.info("----------------------------------------------------------------------------------------------------------------------------------------");
  204.  
     
  205.  
    }
  206.  
    }
  207.  
    }
  208.  
    }
学新通

学新通

测试结果:

        和第二种的执行时间差不多,但是各项性能指标好像更加平稳了,但是很出现线程阻塞的情况。

如果对您有帮助,请我喝杯咖啡吧!

学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgcgikf
系列文章
更多 icon
同类精品
更多 icon
继续加载