• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

使用redis的zset实现延迟队列

武飞扬头像
威威财神
帮助1

最近在写项目时候,因为一套代码要兼容两套版本,一个是activiMq一个是kafka,因为kafka不支持延迟队列,所以领导要求更换技术,一开始想到的是使用redis基于内存,效率高,一下是一些代码大家伙可以看看下,不过划重点,我们最后是直接改成定时任务扫数据库。


1、创建工具类RedisUtils

  1.  
    /**
  2.  
    * 使用Zset存储数据
  3.  
    *
  4.  
    * @param delayTaskKey 延迟key
  5.  
    * @param id 任务
  6.  
    * @param l 时间
  7.  
    */
  8.  
    public void zsetAdd(String delayTaskKey, String id, long l) {
  9.  
    ZSetOperations<Serializable, Object> zSet = redisTemplate.opsForZSet();
  10.  
    zSet.add(delayTaskKey, id, l);
  11.  
    }
  12.  
     
  13.  
    /**
  14.  
    * 删除Zset中对应key的value
  15.  
    *
  16.  
    * @param delayTaskKey 延迟key
  17.  
    * @param id 任务
  18.  
    */
  19.  
    public void zsetRemove(String delayTaskKey, Object id) {
  20.  
    ZSetOperations<Serializable, Object> zSet = redisTemplate.opsForZSet();
  21.  
    zSet.remove(delayTaskKey, id);
  22.  
    }
  23.  
     
  24.  
    /**
  25.  
    * 获取RedisZSetCommands的集合。元组,其中分数在排序集的最小值和最大值之间
  26.  
    *
  27.  
    * @param delayTaskKey 延迟key
  28.  
    * @param i 最小值
  29.  
    * @param currentTimeMillis 最大值
  30.  
    * @return RedisZSetCommands的集合
  31.  
    */
  32.  
    public Set<ZSetOperations.TypedTuple<Object>> zsetRangeList(String delayTaskKey, int i, long currentTimeMillis) {
  33.  
    ZSetOperations<Serializable, Object> zSet = redisTemplate.opsForZSet();
  34.  
    return zSet.rangeByScoreWithScores(delayTaskKey, i, currentTimeMillis);
  35.  
    }
学新通

2、使用xxljob定时扫描那些过期的values

  1.  
    package com.cttq.apc.jobhandler;
  2.  
     
  3.  
    import com.alibaba.fastjson.JSON;
  4.  
    import com.cttq.apc.service.business.batchsave.BatchSaveService;
  5.  
    import com.cttq.apc.service.common.dto.config.dto.MqMeassageDto;
  6.  
    import com.cttq.apc.service.common.utils.RedisUtils;
  7.  
    import com.cttq.apc.service.common.utils.ThreadLocalUtils;
  8.  
    import com.xxl.job.core.biz.model.ReturnT;
  9.  
    import com.xxl.job.core.handler.IJobHandler;
  10.  
    import com.xxl.job.core.handler.annotation.JobHander;
  11.  
    import lombok.extern.slf4j.Slf4j;
  12.  
    import org.apache.commons.collections4.CollectionUtils;
  13.  
    import org.apache.commons.lang3.exception.ExceptionUtils;
  14.  
    import org.springframework.data.redis.core.ZSetOperations;
  15.  
    import org.springframework.stereotype.Component;
  16.  
     
  17.  
    import javax.annotation.Resource;
  18.  
    import java.util.Date;
  19.  
    import java.util.List;
  20.  
    import java.util.Set;
  21.  
     
  22.  
    /**
  23.  
    * @author 张威威
  24.  
    * @date 2023/1/5 周四
  25.  
    *
  26.  
    * 延时任务,也是异步任务,延时任务达到时效之后处理业务,并将延时任务从redis zSet删除
  27.  
    */
  28.  
    @JobHander("pollingMessageDelayJobHandler")
  29.  
    @Component
  30.  
    @Slf4j
  31.  
    public class PollingMessageDelayJobHandler extends IJobHandler {
  32.  
     
  33.  
    public static final String DELAY_TASK_KEY = "delayTask:";
  34.  
     
  35.  
    public static final String SAVE = "save";
  36.  
     
  37.  
    @Resource
  38.  
    private RedisUtils redisUtils;
  39.  
     
  40.  
    @Resource
  41.  
    private BatchSaveService batchSaveService;
  42.  
     
  43.  
    @Override
  44.  
    public ReturnT<String> execute(String... params) throws Exception {
  45.  
    Set<ZSetOperations.TypedTuple<Object>> ids = redisUtils.zsetRangeList(
  46.  
    DELAY_TASK_KEY,
  47.  
    0,
  48.  
    //延时任务score最小值
  49.  
    System.currentTimeMillis()
  50.  
    //延时任务score最大值(当前时间)
  51.  
    );
  52.  
    if (!CollectionUtils.isEmpty(ids)) {
  53.  
    for (ZSetOperations.TypedTuple<Object> id : ids) {
  54.  
    log.info("任务:{}, 超时被自动关闭, 关闭时间:{}", id.getValue(), JSON.toJSONString(new Date()));
  55.  
    try {
  56.  
    String value = (String) id.getValue();
  57.  
    List<String> list = JSON.parseObject(value, List.class);
  58.  
    MqMeassageDto mqMeassageDto = new MqMeassageDto();
  59.  
    mqMeassageDto.setIds(list);
  60.  
    mqMeassageDto.setUserInfo(ThreadLocalUtils.getThreadLocal());
  61.  
    mqMeassageDto.setIsImmediatelyEffective(Boolean.FALSE);
  62.  
    mqMeassageDto.setOperationType(SAVE);
  63.  
    batchSaveService.batchSaveByIds(mqMeassageDto);
  64.  
    System.out.println("看看我执行了吗????");
  65.  
    } catch (Exception e) {
  66.  
    log.error("解析xml时候出现异常了,详情:{}", ExceptionUtils.getStackTrace(e));
  67.  
    } finally {
  68.  
    redisUtils.zsetRemove(DELAY_TASK_KEY, id.getValue());
  69.  
    }
  70.  
    }
  71.  
    }
  72.  
    return ReturnT.SUCCESS;
  73.  
    }
  74.  
    }
学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhfiehaf
系列文章
更多 icon
同类精品
更多 icon
继续加载