使用redis的zset实现延迟队列
最近在写项目时候,因为一套代码要兼容两套版本,一个是activiMq一个是kafka,因为kafka不支持延迟队列,所以领导要求更换技术,一开始想到的是使用redis基于内存,效率高,一下是一些代码大家伙可以看看下,不过划重点,我们最后是直接改成定时任务扫数据库。
1、创建工具类RedisUtils
-
/**
-
* 使用Zset存储数据
-
*
-
* @param delayTaskKey 延迟key
-
* @param id 任务
-
* @param l 时间
-
*/
-
public void zsetAdd(String delayTaskKey, String id, long l) {
-
ZSetOperations<Serializable, Object> zSet = redisTemplate.opsForZSet();
-
zSet.add(delayTaskKey, id, l);
-
}
-
-
/**
-
* 删除Zset中对应key的value
-
*
-
* @param delayTaskKey 延迟key
-
* @param id 任务
-
*/
-
public void zsetRemove(String delayTaskKey, Object id) {
-
ZSetOperations<Serializable, Object> zSet = redisTemplate.opsForZSet();
-
zSet.remove(delayTaskKey, id);
-
}
-
-
/**
-
* 获取RedisZSetCommands的集合。元组,其中分数在排序集的最小值和最大值之间
-
*
-
* @param delayTaskKey 延迟key
-
* @param i 最小值
-
* @param currentTimeMillis 最大值
-
* @return RedisZSetCommands的集合
-
*/
-
public Set<ZSetOperations.TypedTuple<Object>> zsetRangeList(String delayTaskKey, int i, long currentTimeMillis) {
-
ZSetOperations<Serializable, Object> zSet = redisTemplate.opsForZSet();
-
return zSet.rangeByScoreWithScores(delayTaskKey, i, currentTimeMillis);
-
}
2、使用xxljob定时扫描那些过期的values
-
package com.cttq.apc.jobhandler;
-
-
import com.alibaba.fastjson.JSON;
-
import com.cttq.apc.service.business.batchsave.BatchSaveService;
-
import com.cttq.apc.service.common.dto.config.dto.MqMeassageDto;
-
import com.cttq.apc.service.common.utils.RedisUtils;
-
import com.cttq.apc.service.common.utils.ThreadLocalUtils;
-
import com.xxl.job.core.biz.model.ReturnT;
-
import com.xxl.job.core.handler.IJobHandler;
-
import com.xxl.job.core.handler.annotation.JobHander;
-
import lombok.extern.slf4j.Slf4j;
-
import org.apache.commons.collections4.CollectionUtils;
-
import org.apache.commons.lang3.exception.ExceptionUtils;
-
import org.springframework.data.redis.core.ZSetOperations;
-
import org.springframework.stereotype.Component;
-
-
import javax.annotation.Resource;
-
import java.util.Date;
-
import java.util.List;
-
import java.util.Set;
-
-
/**
-
* @author 张威威
-
* @date 2023/1/5 周四
-
*
-
* 延时任务,也是异步任务,延时任务达到时效之后处理业务,并将延时任务从redis zSet删除
-
*/
-
-
-
-
public class PollingMessageDelayJobHandler extends IJobHandler {
-
-
public static final String DELAY_TASK_KEY = "delayTask:";
-
-
public static final String SAVE = "save";
-
-
-
private RedisUtils redisUtils;
-
-
-
private BatchSaveService batchSaveService;
-
-
-
public ReturnT<String> execute(String... params) throws Exception {
-
Set<ZSetOperations.TypedTuple<Object>> ids = redisUtils.zsetRangeList(
-
DELAY_TASK_KEY,
-
0,
-
//延时任务score最小值
-
System.currentTimeMillis()
-
//延时任务score最大值(当前时间)
-
);
-
if (!CollectionUtils.isEmpty(ids)) {
-
for (ZSetOperations.TypedTuple<Object> id : ids) {
-
log.info("任务:{}, 超时被自动关闭, 关闭时间:{}", id.getValue(), JSON.toJSONString(new Date()));
-
try {
-
String value = (String) id.getValue();
-
List<String> list = JSON.parseObject(value, List.class);
-
MqMeassageDto mqMeassageDto = new MqMeassageDto();
-
mqMeassageDto.setIds(list);
-
mqMeassageDto.setUserInfo(ThreadLocalUtils.getThreadLocal());
-
mqMeassageDto.setIsImmediatelyEffective(Boolean.FALSE);
-
mqMeassageDto.setOperationType(SAVE);
-
batchSaveService.batchSaveByIds(mqMeassageDto);
-
System.out.println("看看我执行了吗????");
-
} catch (Exception e) {
-
log.error("解析xml时候出现异常了,详情:{}", ExceptionUtils.getStackTrace(e));
-
} finally {
-
redisUtils.zsetRemove(DELAY_TASK_KEY, id.getValue());
-
}
-
}
-
}
-
return ReturnT.SUCCESS;
-
}
-
}
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhfiehaf
系列文章
更多
同类精品
更多
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
excel下划线不显示怎么办
PHP中文网 06-23 -
怎样阻止微信小程序自动打开
PHP中文网 06-13 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
photoshop蒙版画笔没反应怎么办
PHP中文网 06-24