java高并发下数据入库
java高并发下数据入库
该服务利用线程池和 redis来解决高并发下数据入库问题,做到实时数据存入redis和数据批量入库,使用的时候需要修改为自己的业务数据,该模块是根据下面的设置进行高并发处理。
1、达到设置的超时时间。
2、达到最大批次。
package io.jack.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* <pre>
* 数据批量入库服务
* </pre>
* Created by RuiXing Hou on 2021-08-05.
*
* @since 1.0
*/
@Component
@Slf4j
public class BatchDataStorageService implements InitializingBean
{
/**
* 最大批次数量
*/
@Value("${app.db.maxBatchCount:800}")
private int maxBatchCount;
/**
* 最大线程数
*/
@Value("${app.db.maxBatchThreads:100}")
private int maxBatchThreads;
/**
* 超时时间
*/
@Value("${app.db.batchTimeout:3000}")
private int batchTimeout;
/**
* 批次数量
*/
private int batchCount = 0;
/**
* 批次号
*/
private static long batchNo = 0;
/**
* 获取当前机器的核数
*/
public static final int cpuNum = Runtime.getRuntime().availableProcessors();
/**
* 线程池定义接口
*/
private ExecutorService executorService = null;
/**
* 服务器缓存工具类,下面提供源码
*/
@Resource
private CacheService cacheService;
/**
* 业务接口
*/
@Resource
private DeviceRealTimeService deviceRealTimeService;
/**
* redis工具类
*/
@Resource
private RedisUtils redisUtils;
@Override
public void afterPropertiesSet() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
// 核心线程大小
taskExecutor.setCorePoolSize(cpuNum);
// 最大线程大小
taskExecutor.setMaxPoolSize(cpuNum * 2);
// 队列最大容量
taskExecutor.setQueueCapacity(500);
// 当提交的任务个数大于QueueCapacity,就需要设置该参数,但spring提供的都不太满足业务场景,可以自定义一个,也可以注意不要超过QueueCapacity即可
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
taskExecutor.setAwaitTerminationSeconds(60);
taskExecutor.setThreadFactory(r -> {
Thread thread = new Thread(r);
if (r instanceof BatchWorker) {
thread.setName("batch-worker-" ((BatchWorker) r).batchKey);
});
taskExecutor.initialize();
executorService = taskExecutor.getThreadPoolExecutor();
}
/**
* 需要做高并发处理的类只需要调用该方法 (我用的是rabbitMq)
*
* @param deviceRealTimeDTO
*/
public void saveRealTimeData(DeviceRealTimeDTO deviceRealTimeDTO) {
final String failedCacheKey = "device:real_time:failed_records";
try {
String durationKey = "device:real_time:batchDuration" batchNo;
String batchKey = "device:real_time:batch" batchNo;
if (!cacheService.exists(durationKey)) {
cacheService.put(durationKey, System.currentTimeMillis());
new BatchTimeoutCommitThread(batchKey, durationKey, failedCacheKey).start();
}
cacheService.lPush(batchKey, deviceRealTimeDTO);
if ( batchCount >= maxBatchCount) {
// 达到最大批次,执行入库逻辑
dataStorage(durationKey, batchKey, failedCacheKey);
}
} catch (Exception ex) {
log.warn("[DB:FAILED] 设备上报记录入批处理集合异常: " ex.getMessage() ", DeviceRealTimeDTO: " JSON.toJSONString(deviceRealTimeDTO), ex);
cacheService.lPush(failedCacheKey, deviceRealTimeDTO);
} finally {
updateRealTimeData(deviceRealTimeDTO);
}
}
/**
* 更新实时数据
* @param deviceRealTimeDTO 业务POJO
*/
private void updateRealTimeData(DeviceRealTimeDTO deviceRealTimeDTO) {
redisUtils.set("real_time:" deviceRealTimeDTO.getDeviceId(), JSONArray.toJSONString(deviceRealTimeDTO));
}
/**
*
* @param durationKey 持续时间标识
* @param batchKey 批次标识
* @param failedCacheKey 错误标识
*/
private void dataStorage(String durationKey, String batchKey, String failedCacheKey) {
batchNo ;
batchCount = 0;
cacheService.del(durationKey);
if (batchNo >= Long.MAX_VALUE) {
batchNo = 0;
}
executorService.execute(new BatchWorker(batchKey, failedCacheKey));
}
private class BatchWorker implements Runnable
{
private final String failedCacheKey;
private final String batchKey;
public BatchWorker(String batchKey, String failedCacheKey) {
this.batchKey = batchKey;
this.failedCacheKey = failedCacheKey;
}
@Override
public void run() {
final List<DeviceRealTimeDTO> deviceRealTimeDTOList = new ArrayList<>();
try {
DeviceRealTimeDTO deviceRealTimeDTO = cacheService.lPop(batchKey);
while(deviceRealTimeDTO != null) {
deviceRealTimeDTOList.add(deviceRealTimeDTO);
deviceRealTimeDTO = cacheService.lPop(batchKey);
}
long timeMillis = System.currentTimeMillis();
try {
List<DeviceRealTimeEntity> deviceRealTimeEntityList = ConvertUtils.sourceToTarget(deviceRealTimeDTOList, DeviceRealTimeEntity.class);
deviceRealTimeService.insertBatch(deviceRealTimeEntityList);
} finally {
cacheService.del(batchKey);
log.info("[DB:BATCH_WORKER] 批次:" batchKey ",保存设备上报记录数:" deviceRealTimeDTOList.size() ", 耗时:" (System.currentTimeMillis() - timeMillis) "ms");
}
} catch (Exception e) {
log.warn("[DB:FAILED] 设备上报记录批量入库失败:" e.getMessage() ", DeviceRealTimeDTO: " deviceRealTimeDTOList.size(), e);
for (DeviceRealTimeDTO deviceRealTimeDTO : deviceRealTimeDTOList) {
cacheService.lPush(failedCacheKey, deviceRealTimeDTO);
}
}
}
}
class BatchTimeoutCommitThread extends Thread {
private final String batchKey;
private final String durationKey;
private final String failedCacheKey;
public BatchTimeoutCommitThread(String batchKey, String durationKey, String failedCacheKey) {
this.batchKey = batchKey;
this.durationKey = durationKey;
this.failedCacheKey = failedCacheKey;
this.setName("batch-thread-" batchKey);
}
public void run() {
try {
Thread.sleep(batchTimeout);
} catch (InterruptedException e) {
log.error("[DB] 内部错误,直接提交:" e.getMessage());
}
if (cacheService.exists(durationKey)) {
// 达到最大批次的超时间,执行入库逻辑
dataStorage(durationKey, batchKey, failedCacheKey);
}
}
}
}
package io.jack.service;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
@Component
@Scope("singleton")
public class CacheService implements InitializingBean {
private Map<String, Object> objectCache = new ConcurrentHashMap<>();
private Map<String, AtomicLong> statCache = new ConcurrentHashMap<>();
@Override
public void afterPropertiesSet() {
statCache.put("terminals", new AtomicLong(0));
statCache.put("connections", new AtomicLong(0));
}
public long incr(String statName) {
if (!statCache.containsKey(statName))
statCache.put(statName, new AtomicLong(0));
return statCache.get(statName).incrementAndGet();
}
public long decr(String statName) {
if (!statCache.containsKey(statName))
statCache.put(statName, new AtomicLong(0));
return statCache.get(statName).decrementAndGet();
}
public long stat(String statName) {
if (!statCache.containsKey(statName))
statCache.put(statName, new AtomicLong(0));
return statCache.get(statName).get();
}
public <T> void put(String key, T object) {
objectCache.put(key, object);
}
public <T> T get(String key) {
return (T) objectCache.get(key);
}
public void remove(String key) {
objectCache.remove(key);
}
public void hSet(String key, String subkey, Object value) {
synchronized (objectCache) {
HashMap<String, Object> submap = (HashMap<String, Object>) objectCache.get(key);
if (submap == null) {
submap = new HashMap<>();
objectCache.put(key, submap);
}
submap.put(subkey, value);
}
}
public <T> T hGet(String key, String subkey) {
synchronized (objectCache) {
HashMap<String, Object> submap = (HashMap<String, Object>) objectCache.get(key);
if (submap != null) {
return (T) submap.get(subkey);
}
return null;
}
}
public boolean hExists(String key, String subkey) {
synchronized (objectCache) {
HashMap<String, Object> submap = (HashMap<String, Object>) objectCache.get(key);
if (submap != null) {
return submap.containsKey(subkey);
}
return false;
}
}
public void lPush(String key, Object value) {
synchronized (objectCache) {
LinkedList queue = (LinkedList) objectCache.get (key);
if (queue == null) {
queue = new LinkedList();
objectCache.put(key, queue);
}
queue.addLast(value);
}
}
public <T> T lPop(String key) {
synchronized (objectCache) {
LinkedList queue = (LinkedList) objectCache.get (key);
if (queue != null) {
if (!queue.isEmpty()) {
return (T)queue.removeLast();
}
objectCache.remove(key);
}
return null;
}
}
public void del(String key) {
objectCache.remove(key);
}
public boolean exists(String key) {
return objectCache.containsKey(key);
}
public void dump() {
}
}
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhgbgkai
系列文章
更多
同类精品
更多
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
怎样阻止微信小程序自动打开
PHP中文网 06-13 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01