• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Java Springboot下redis用pipelining管道模式写入性能调优

武飞扬头像
技术很渣
帮助1

Springboot下redis写入pipelining管道模式性能调优实例

一、真实场景

生产真实项目过程中,需要将数据库的数据同步写入redis,此过程中遇到写入redis的瓶颈。每次启动项目都要将数据库数据重载到redis,这个过程耗费了大量的时间。

二、解决思路pipelining(管道)

Redis pipelined
Redis Pipelined是由Client提供的(是防止client端 阻塞的操作)一种请求redis的方式。Redis本身具有很高的吞吐量,因此性能最大的考察便是网络状况,如果应用到redis的网络状况不好,每次请求都将会出现轻微的 阻塞和延迟,这种延迟对于批量请求是很可怕的,譬如要进行数千次数据插入,或是批量获取数据时,我们就需要用到Pipelined。
Pipelined可以将多个请求无 阻塞的发出并按顺序将请求结果“打包”返回,这有点类似于并发请求,可以有效地利用等待结果的 阻塞时间。
注意,Pipelined并不能保证原子性,即pipelined执行的内容可能会被其他客户端或是线程的指令"插队",若想要原子性操作,需要使用事务。

基于RedisTemplate的pipelined
使用RedisTemplate可以轻松实现pipelined,需要依靠原生的RedisConnection对象实现相关操作!

pipelining(管道)
Pipeline:redis的管道命令,允许client将多个请求依次发给服务器,过程中而不需要等待请求的回复,在最后再一并读取结果即可,可以改善性能.
pipeline不是原子操作

为何用?
减少请求次数,将多条请求命令合成一次请求通过管道发给redis server,再通过回调函数一次性接收多个命令的结果,减少网络IO次数,在高并发情况下可带来明显性能提升。注意的是,redis server是单线程,多个命令合成一次请求到达redis server依然还是顺序一个个执行的,仅仅只是减少了请求IO次数。
如何用?
RedisCallback和SessionCallBack:
1.作用: 让RedisTemplate进行回调,通过他们可以在同一条连接中一次执行多个redis命令。
2.SessionCalback提供了良好的封装,优先使用它。
3.RedisCallback使用的是原生RedisConnection,用起来比较麻烦,可读性差,但原生api提供的功能比较齐全。

三、论据性能对比相较于传统模式

<dependency>
    <groupId>redis.clients</groupId> #maven引入
    <artifactId>jedis</artifactId>
    <version>2.9.0</version>
</dependency>
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
   compile('redis.clients:jedis:2.9.0')  #gradle引入
   compile('org.springframework.data:spring-data-redis:2.0.8.RELEASE')
package pipeline;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;

public class BatchOperSet {

    private static final String HOST = "127.0.0.1";
    private static final int PORT = 6379;

    // 批量插入数据到Redis,正常使用
    public static void batchSetNotUsePipeline() throws Exception {
        Jedis jedis = new Jedis(HOST, PORT);
        String keyPrefix = "normal";
        long begin = System.currentTimeMillis();
        for (int i = 1; i < 10000; i  ) {
            String key = keyPrefix   "_"   i; 
            String value = String.valueOf(i);
            jedis.set(key, value);
        }
        jedis.close();
        long end = System.currentTimeMillis();
        System.out.println("not use pipeline batch set total time:"   (end - begin));
    }

    // 批量插入数据到Redis,使用Pipeline
    public static void batchSetUsePipeline() throws Exception {
        Jedis jedis = new Jedis(HOST, PORT);
        Pipeline pipelined = jedis.pipelined();
        String keyPrefix = "pipeline";
        long begin = System.currentTimeMillis();
        for (int i = 1; i < 10000; i  ) {
            String key = keyPrefix   "_"   i; 
            String value = String.valueOf(i);
            pipelined.set(key, value);
        }
        pipelined.sync();
        jedis.close();
        long end = System.currentTimeMillis();
        System.out.println("use pipeline batch set total time:"   (end - begin));
    }

    public static void main(String[] args) {    
        try {
            batchSetNotUsePipeline();
            batchSetUsePipeline();      
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

写入性能对比的运行结果如下:

not use pipeline batch get total time:2990
use pipeline batch get total time:41

结论:pipeline模式的写入性能更快

四、实际案例举例

真实案例的采用redis数据结构
Hash(散列)
基本概念:
Redis 散列可以存储多个键值对之间的映射。和字符串一样,散列存储的值既可以是字符串又可以是数值,并且用户同样可以对散列存储的数字值执行自增或自减操作。这个和 Java 的 HashMap 很像,每个 HashMap 有自己的名字,同时可以存储多个 k/v 对。
底层实现:
哈希对象的编码可以是 ziplist 或者 hashtable 。
哈希对象保存的所有键值对的键和值的字符串长度都小于 64 字节并且保存的键值对数量小于 512 个,使用ziplist 编码;否则使用 hashtable;
应用场景:
Hash 更适合存储结构化的数据,比如 Java 中的对象;其实 Java 中的对象也可以用 string 进行存储,只需要将 对象 序列化成 json 串就可以,但是如果这个对象的某个属性更新比较频繁的话,那么每次就需要重新将整个对象序列化存储,这样消耗开销比较大。可如果用 hash 来存储 对象的每个属性,那么每次只需要更新要更新的属性就可以。
购物车场景:可以以用户的 id 为 key ,商品的 id 为存储的 field ,商品数量为键值对的value,这样就构成了购物车的三个要素。

redis 127.0.0.1:6379> hset myhash field1 "zhang"            #给键值为myhash的键设置字段为field1,值为zhang。
(integer) 1
redis 127.0.0.1:6379> hget myhash field1             #获取键值为myhash,字段为field1的值。
"zhang"
redis 127.0.0.1:6379> hget myhash field2             #myhash键中不存在field2字段,因此返回nil。
(nil)
redis 127.0.0.1:6379> hset myhash field2 "san"                   #给myhash添加一个新的字段field2,其值为san。
(integer) 1
redis 127.0.0.1:6379> hlen myhash                    #hlen命令获取myhash键的字段数量。
(integer) 2
redis 127.0.0.1:6379> hexists myhash field1                  #判断myhash键中是否存在字段名为field1的字段,由于存在,返回值为1。
(integer) 1
redis 127.0.0.1:6379> hdel myhash field1                 #删除myhash键中字段名为field1的字段,删除成功返回1。
(integer) 1
redis 127.0.0.1:6379> hdel myhash field1                    #再次删除myhash键中字段名为field1的字段,由于上一条命令已经将其删除,因为没有删除,返回0。
(integer) 0
redis 127.0.0.1:6379> hexists myhash field1                     #判断myhash键中是否存在field1字段,由于上一条命令已经将其删除,因为返回0。
(integer) 0
redis 127.0.0.1:6379> hsetnx myhash field1 zhang            #通过hsetnx命令给myhash添加新字段field1,其值为zhang,因为该字段已经被删除,所以该命令添加成功并返回1。
(integer) 1
redis 127.0.0.1:6379> hsetnx myhash field1 zhang            #由于myhash的field1字段已经通过上一条命令添加成功,因为本条命令不做任何操作后返回0。
(integer) 0

学新通
我们可以用Redis Desktop Manager工具进去看redis数据库里面的内容。

五、Hash结构模式例子

Hash传统代码模式如下:

#按制定的大KEY值往redis放入全部map内容 
public static <T> void hPutAll(String hashKey, Map<String,T> map, long timeout){
        try{
            if(StringUtils.isBlank(hashKey)){
                return;
            }
            BoundHashOperations<String, String, String> stringObjectObjectBoundHashOperations = redisClient.redisTemplate.boundHashOps(hashKey);

            map.forEach((key,v)->{
                stringObjectObjectBoundHashOperations.put(key,gson.toJson(v));
            });
        }catch (Exception e){
            Logger.error("redis hputall异常",e);
        }
    }

Hash管道模式如下:

 public static <T> void hPutAll(String hashKey, Map<String,T> map, long timeout){
        try{
            if(StringUtils.isBlank(hashKey)){
                return;
            }
			redisClient.redisTemplate.executePipelined(new SessionCallback<Object>() {
				public Object execute(RedisOperations ro) {
					BoundHashOperations<String, String, String> hashOps = redisClient.redisTemplate
							.boundHashOps(hashKey);

					map.forEach((key, v) -> {
						hashOps.put(key, gson.toJson(v));

					});

					//redisClient.redisTemplate.expire(hashKey, defaultTimeOut, TimeUnit.SECONDS);
					return null;
				}
			});

六、SET结构模式例子

附加set数据结构用管道模式:

@Component
public class RedisTest {
 
    @Autowired
    RedisTemplate<String, Object> redisTemplate;
 
    @PostConstruct
    public void init() {
        test1();
    }
 
    public void test1() {
        List<Object> pipelinedResultList = redisTemplate.executePipelined(new SessionCallback<Object>() {
            @Override
            public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {
                ValueOperations<String, Object> valueOperations = (ValueOperations<String, Object>) operations.opsForValue();
 
                valueOperations.set("yzh1", "hello world");
                valueOperations.set("yzh2", "redis");
 
                valueOperations.get("yzh1");
                valueOperations.get("yzh2");
 
                // 返回null即可,因为返回值会被管道的返回值覆盖,外层取不到这里的返回值
                return null;
            }
        });
        System.out.println("pipelinedResultList="   pipelinedResultList);
    }
}

管道预热代码
有的系统对延迟要求很高,那么redis管道第一次请求很慢,就需要在系统启动时进行管道的预热,保证系统启动后每次请求的低延迟。

  @PostConstruct
    public void init() {
        long startTime = System.currentTimeMillis();
        redisTemplate.executePipelined(new SessionCallback<Object>() {
            @Override
            public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {
                operations.hasKey((K) "");
                return null;
            }
        });
        log.info("redis初始化管道请求end,耗时:{}ms", System.currentTimeMillis() - startTime);
    }

七、LIST结构模式例子

附加LIST数据结构代码

public void redisPop(List<String> list) {
    List<Object> keys = redisTemplate.executePipelined(new SessionCallback<String>() {
        @Override
        public <K, V> String execute(RedisOperations<K, V> redisOperations) throws DataAccessException {
            for(String str: list){
                for (int i = 0; i < 200; i  ) {
                    redisOperations.opsForList().rightPop((K) str);
                }
            }
            return null;
        }
    });
   
   
}

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgcgkec
系列文章
更多 icon
同类精品
更多 icon
继续加载