• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

java高并发下数据入库

武飞扬头像
Jack_hrx
帮助1

java高并发下数据入库

该服务利用线程池和 redis来解决高并发下数据入库问题,做到实时数据存入redis和数据批量入库,使用的时候需要修改为自己的业务数据,该模块是根据下面的设置进行高并发处理。
1、达到设置的超时时间。
2、达到最大批次。

package io.jack.service.impl;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * <pre>
 *   数据批量入库服务
 * </pre>
 * Created by RuiXing Hou on 2021-08-05.
 *
 * @since 1.0
 */
@Component
@Slf4j
public class BatchDataStorageService implements InitializingBean
{
	/**
	 * 最大批次数量
	 */
	@Value("${app.db.maxBatchCount:800}")
    private int maxBatchCount;

	/**
	 * 最大线程数
	 */
    @Value("${app.db.maxBatchThreads:100}")
    private int maxBatchThreads;

	/**
	 * 超时时间
	 */
	@Value("${app.db.batchTimeout:3000}")
    private int batchTimeout;

	/**
	 * 批次数量
	 */
    private int batchCount = 0;

	/**
	 * 批次号
	 */
	private static long batchNo = 0;

	/**
	* 获取当前机器的核数
	*/
	public static final int cpuNum = Runtime.getRuntime().availableProcessors();

	/**
	 * 线程池定义接口
	 */
    private ExecutorService executorService = null;

	/**
	 * 服务器缓存工具类,下面提供源码
	 */
	@Resource
	private CacheService cacheService;

	/**
	 * 业务接口
	 */
	@Resource
	private DeviceRealTimeService deviceRealTimeService;

	/**
	 * redis工具类
	 */
	@Resource
	private RedisUtils redisUtils;

	@Override
	public void afterPropertiesSet() {
		ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
		// 核心线程大小
        taskExecutor.setCorePoolSize(cpuNum);
        // 最大线程大小
        taskExecutor.setMaxPoolSize(cpuNum * 2);
        // 队列最大容量
        taskExecutor.setQueueCapacity(500);
        // 当提交的任务个数大于QueueCapacity,就需要设置该参数,但spring提供的都不太满足业务场景,可以自定义一个,也可以注意不要超过QueueCapacity即可
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        taskExecutor.setAwaitTerminationSeconds(60);
        taskExecutor.setThreadFactory(r -> {
            Thread thread = new Thread(r);
            if (r instanceof BatchWorker) {
                thread.setName("batch-worker-"   ((BatchWorker) r).batchKey);
            });
        taskExecutor.initialize();
        executorService = taskExecutor.getThreadPoolExecutor();
	}

	/**
	 * 需要做高并发处理的类只需要调用该方法 (我用的是rabbitMq)
	 *
	 * @param deviceRealTimeDTO
	 */
	public void saveRealTimeData(DeviceRealTimeDTO deviceRealTimeDTO) {
		final String failedCacheKey = "device:real_time:failed_records";

		try {

			String durationKey = "device:real_time:batchDuration"   batchNo;
			String batchKey = "device:real_time:batch"   batchNo;

			if (!cacheService.exists(durationKey)) {
				cacheService.put(durationKey, System.currentTimeMillis());
				new BatchTimeoutCommitThread(batchKey, durationKey, failedCacheKey).start();
			}

			cacheService.lPush(batchKey, deviceRealTimeDTO);
			if (  batchCount >= maxBatchCount) {
				// 达到最大批次,执行入库逻辑
				dataStorage(durationKey, batchKey, failedCacheKey);
			}

		} catch (Exception ex) {
			log.warn("[DB:FAILED] 设备上报记录入批处理集合异常: "   ex.getMessage()   ", DeviceRealTimeDTO: "   JSON.toJSONString(deviceRealTimeDTO), ex);
			cacheService.lPush(failedCacheKey, deviceRealTimeDTO);
		} finally {
			updateRealTimeData(deviceRealTimeDTO);
		}
	}

	/**
	 * 更新实时数据
	 * @param deviceRealTimeDTO 业务POJO
	 */
	private void updateRealTimeData(DeviceRealTimeDTO deviceRealTimeDTO) {
		redisUtils.set("real_time:" deviceRealTimeDTO.getDeviceId(), JSONArray.toJSONString(deviceRealTimeDTO));
	}

	/**
	 *
	 * @param durationKey 		持续时间标识
	 * @param batchKey			批次标识
	 * @param failedCacheKey	错误标识
	 */
	private void dataStorage(String durationKey, String batchKey, String failedCacheKey) {
		batchNo  ;
		batchCount = 0;
		cacheService.del(durationKey);
		if (batchNo >= Long.MAX_VALUE) {
			batchNo = 0;
		}
		executorService.execute(new BatchWorker(batchKey, failedCacheKey));
	}

	private class BatchWorker implements Runnable
	{

		private final String failedCacheKey;
		private final String batchKey;

		public BatchWorker(String batchKey, String failedCacheKey) {
			this.batchKey = batchKey;
			this.failedCacheKey = failedCacheKey;
		}
		
		@Override
		public void run() {
			final List<DeviceRealTimeDTO> deviceRealTimeDTOList = new ArrayList<>();
			try {
				DeviceRealTimeDTO deviceRealTimeDTO = cacheService.lPop(batchKey);
				while(deviceRealTimeDTO != null) {
					deviceRealTimeDTOList.add(deviceRealTimeDTO);
					deviceRealTimeDTO = cacheService.lPop(batchKey);
				}

				long timeMillis = System.currentTimeMillis();

				try {
					List<DeviceRealTimeEntity> deviceRealTimeEntityList = ConvertUtils.sourceToTarget(deviceRealTimeDTOList, DeviceRealTimeEntity.class);
					deviceRealTimeService.insertBatch(deviceRealTimeEntityList);
				} finally {
					cacheService.del(batchKey);
					log.info("[DB:BATCH_WORKER] 批次:"   batchKey   ",保存设备上报记录数:"   deviceRealTimeDTOList.size()   ", 耗时:"   (System.currentTimeMillis() - timeMillis)   "ms");
				}
			} catch (Exception e) {
				log.warn("[DB:FAILED] 设备上报记录批量入库失败:"   e.getMessage()   ", DeviceRealTimeDTO: "   deviceRealTimeDTOList.size(), e);
				for (DeviceRealTimeDTO deviceRealTimeDTO : deviceRealTimeDTOList) {
					cacheService.lPush(failedCacheKey, deviceRealTimeDTO);
				}
			}
		}
    }

	class BatchTimeoutCommitThread extends Thread {

		private final String batchKey;
		private final String durationKey;
		private final String failedCacheKey;

		public BatchTimeoutCommitThread(String batchKey, String durationKey, String failedCacheKey) {
			this.batchKey = batchKey;
			this.durationKey = durationKey;
			this.failedCacheKey = failedCacheKey;
			this.setName("batch-thread-"   batchKey);
		}

		public void run() {
			try {
				Thread.sleep(batchTimeout);
			} catch (InterruptedException e) {
				log.error("[DB] 内部错误,直接提交:"   e.getMessage());
			}

			if (cacheService.exists(durationKey)) {
				// 达到最大批次的超时间,执行入库逻辑
				dataStorage(durationKey, batchKey, failedCacheKey);
			}
		}

	}

}

学新通
package io.jack.service;

import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

@Component
@Scope("singleton")
public class CacheService implements InitializingBean {

    private Map<String, Object> objectCache = new ConcurrentHashMap<>();

    private Map<String, AtomicLong> statCache = new ConcurrentHashMap<>();

    @Override
    public void afterPropertiesSet() {
        statCache.put("terminals", new AtomicLong(0));
        statCache.put("connections", new AtomicLong(0));
    }

    public long incr(String statName) {
        if (!statCache.containsKey(statName))
            statCache.put(statName, new AtomicLong(0));
        return statCache.get(statName).incrementAndGet();
    }

    public long decr(String statName) {
        if (!statCache.containsKey(statName))
            statCache.put(statName, new AtomicLong(0));
        return statCache.get(statName).decrementAndGet();
    }

    public long stat(String statName) {
        if (!statCache.containsKey(statName))
            statCache.put(statName, new AtomicLong(0));
        return statCache.get(statName).get();
    }

    public <T> void put(String key, T object) {
        objectCache.put(key, object);
    }

    public <T> T get(String key) {
        return (T) objectCache.get(key);
    }

    public void remove(String key) {
        objectCache.remove(key);
    }

    public void hSet(String key, String subkey, Object value) {
        synchronized (objectCache) {
            HashMap<String, Object> submap = (HashMap<String, Object>) objectCache.get(key);
            if (submap == null) {
                submap = new HashMap<>();
                objectCache.put(key, submap);
            }
            submap.put(subkey, value);
        }
    }

    public <T> T hGet(String key, String subkey) {
        synchronized (objectCache) {
            HashMap<String, Object> submap = (HashMap<String, Object>) objectCache.get(key);
            if (submap != null) {
                return (T) submap.get(subkey);
            }
            return null;
        }
    }

    public boolean hExists(String key, String subkey) {
        synchronized (objectCache) {
            HashMap<String, Object> submap = (HashMap<String, Object>) objectCache.get(key);
            if (submap != null) {
                return submap.containsKey(subkey);
            }
            return false;
        }
    }

    public void lPush(String key, Object value) {
        synchronized (objectCache) {
            LinkedList queue = (LinkedList) objectCache.get (key);
            if (queue == null) {
                queue = new LinkedList();
                objectCache.put(key, queue);
            }
            queue.addLast(value);
        }
    }

    public <T> T lPop(String key) {
        synchronized (objectCache) {
            LinkedList queue = (LinkedList) objectCache.get (key);
            if (queue != null) {
                if (!queue.isEmpty()) {
                    return (T)queue.removeLast();
                }
                objectCache.remove(key);
            }
            return null;
        }
    }

    public void del(String key) {
        objectCache.remove(key);
    }

    public boolean exists(String key) {
        return objectCache.containsKey(key);
    }

    public void dump() {

    }
}

学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgbgkai
系列文章
更多 icon
同类精品
更多 icon
继续加载