Redis 提供了对管道(Pipeline)的支持,这涉及将多个命令发送到服务器,而无需等待服务器回复,然后在单个步骤中读取回复。当您需要在一行中发送多个命令时,管道可以提高性能,例如:向同一列表(List)中添加许多元素。
Spring Data Redis 提供了几种用于在管道中运行命令的 RedisTemplate 方法。如果您不关心管道操作的结果,可以使用标准的 execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline) 方法,将 pipeline 参数设置为 true。
或者使用 executePipelined(RedisCallback<?> action) 或 executePipelined(SessionCallback<?> session) 方法,在回调接口的回调方法中进行管道操作。例如:
// 从队列(Queue)中弹出(Pop)指定数量的项 List<Object> results = stringRedisTemplate.executePipelined( new RedisCallback<Object>() { public Object doInRedis(RedisConnection connection) throws DataAccessException { StringRedisConnection stringRedisConn = (StringRedisConnection)connection; for(int i=0; i< batchSize; i++) { stringRedisConn.rPop("myqueue"); } return null; } });
前面的示例从管道中的队列中运行大量右弹(rPop,right pop)出项,结果列表(results 变量)包含所有弹出的项目。RedisTemplate 使用 key、hash-key 和 hash-value 的序列化器在返回之前对所有结果进行反序列化,因此前面的示例中返回的项是 String。还有其他 executePipelined 方法,允许您为流水线结果传递自定义序列化程序。如下:
List<Object> executePipelined(RedisCallback<?> action, RedisSerializer<?> resultSerializer)
List<Object> executePipelined(SessionCallback<?> session, RedisSerializer<?> resultSerializer)
请注意,RedisCallback 返回的值必须为空,因为为了返回管道命令的结果,该值被丢弃。如果 RedisCallback 的返回值不为 null,则抛出如下错误信息 “org.springframework.dao.InvalidDataAccessApiUsageException: Callback cannot return a non-null value as it gets overwritten by the pipeline”。
Lettuce 驱动程序支持更细粒度的刷新控制,允许在命令出现时刷新命令、缓冲或在连接关闭时发送命令。例如:
// Lettuce 驱动链接 LettuceConnectionFactory factory = //... // 设置管道刷新策略:本地缓冲区在每3个命令后刷新 factory.setPipeliningFlushPolicy(PipeliningFlushPolicy.buffered(3));
提示:
从 1.1 版开始,RedisConnection 和 RedisTemplate 的 exec 方法进行了重要更改。以前,这些方法直接从连接器返回事务的结果。这意味着数据类型通常与 RedisConnection 方法返回的数据类型不同。例如,zAdd 返回一个布尔值,指示元素是否已添加到排序集。大多数连接器将此值作为 long 值返回,Spring Data Redis 执行转换。另一个常见的区别是,大多数连接器都会为诸如 set 之类的操作返回状态回复(通常是字符串 OK)。这些回复通常被 Spring Data Redis 丢弃。
在 1.1 之前,这些转换不会对 exec() 的结果执行。此外,结果没有在 RedisTemplate 中反序列化,因此它们通常包括原始字节数组。如果此更改破坏了应用程序,请在 RedisConnectionFactory 上将 convertPipelineAndTxResults() 设置为 false 以禁用此行为。
使用管道命令从 myqueue 列表中 rPop 多个元素,如下:
package com.hxstrive.redis.pipeline; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.dao.DataAccessException; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.connection.StringRedisConnection; import org.springframework.data.redis.core.ListOperations; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.RedisTemplate; import java.util.Arrays; import java.util.List; import java.util.Set; /** * Spring Data Redis 管道 * @author hxstrive.com 2022/2/26 */ @SpringBootTest public class PipelineDemo { @Autowired private RedisTemplate<String,String> redisTemplate; @Test public void contextLoads() { // 向列表写入 50 个数据 ListOperations<String,String> ops = redisTemplate.opsForList(); for(int i = 0; i < 50; i ++) { ops.leftPush("myqueue", "value-" + i); } // 使用管道读取数据 List<Object> results = redisTemplate.executePipelined( new RedisCallback<Object>() { public Object doInRedis(RedisConnection connection) throws DataAccessException { for(int i=0; i< 10; i++) { connection.rPop("myqueue".getBytes()); } // 必须返回 null,否则抛出异常 return null; } } ); System.out.println(Arrays.toString(results.toArray())); } }
运行结果如下:
[value-0, value-1, value-2, value-3, value-4, value-5, value-6, value-7, value-8, value-9]