我们知道在 Spring Data Redis 中同时支持了 Jedis 客户端和 Lettuce 客户端。但是仅 Lettuce 客户端才支持 Reactive(响应式)方式的操作,所以如果你希望使用 Reactive 方式那你只能选择 Lettuce 客户端。
ReactiveRedisConnection 是 Redis 通信的核心,因为它处理与 Redis 后端的通信。它还会自动将底层驱动程序异常转换为Spring 一致的 DAO 异常层次结构,因此您可以在不更改任何代码的情况下切换连接器,因为操作语义保持不变。
ReactiveRedisConnectionFactory 创建活动的 ReactiveRedisConnection 实例。此外,工厂还充当PersistenceExceptionTranslator 实例,这意味着工厂一旦声明,就可以进行透明的异常转换。例如,通过使用 @Repository 注解和 AOP 进行异常转换。
ReactiveRedisTemplate 是实际的操作类,和 RedisTemplate 一样,提供了 opsFor***() 方法,使用这些方法可以获取 value、hash、set、zset、list、geo 等类型的响应式 Operations 操作接口。方法定义如下:
// 操作 geo ReactiveGeoOperations<K,V> opsForGeo() // 操作 hash <HK,HV> ReactiveHashOperations<K,HK,HV> opsForHash() // 操作 hyperloglog ReactiveHyperLogLogOperations<K,V> opsForHyperLogLog() // 操作 list ReactiveListOperations<K,V> opsForList() // 操作 set ReactiveSetOperations<K,V> opsForSet() // 操作 value ReactiveValueOperations<K,V> opsForValue() // 操作 zset ReactiveZSetOperations<K,V> opsForZSet()
和 RedisTemplate 类不同的是,该类获取的 Operations 操作接口均是响应式的,而 RedisTemplate 获取的操作接口是传统操作接口,非响应式。
下面是一个简单的 @Configuration 配置类,该配置类配置了一个 ReactiveRedisTemplate 响应式操作类,用来以响应式编程的方式去操作 Redis。如下:
package com.hxstrive.redis.config; import com.alibaba.fastjson.parser.Feature; import com.alibaba.fastjson.serializer.SerializerFeature; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory; import org.springframework.data.redis.core.ReactiveRedisTemplate; import org.springframework.data.redis.serializer.RedisSerializationContext; import org.springframework.data.redis.serializer.StringRedisSerializer; /** * Redis配置 * @author hxstrive.com 2022/2/26 */ @Configuration public class RedisConfig { @Bean public ReactiveRedisTemplate<String,String> reactiveRedisTemplate( ReactiveRedisConnectionFactory connectionFactory) { RedisSerializationContext.RedisSerializationContextBuilder<String,String> builder = RedisSerializationContext.newSerializationContext(); // 设置序列化方式 builder.key(new StringRedisSerializer()); builder.value(new StringRedisSerializer()); builder.hashKey(new StringRedisSerializer()); builder.hashValue(new StringRedisSerializer()); builder.string(new StringRedisSerializer()); RedisSerializationContext build = builder.build(); ReactiveRedisTemplate<String,String> template = new ReactiveRedisTemplate<String,String>(connectionFactory, build); return template; } }
下面将介绍怎样获取操作 Redis 的 value 类型的响应式接口,然后利用该接口设置和获取指定键的值。代码如下:
CountDownLatch countDownLatch = new CountDownLatch(1); ReactiveValueOperations<String,String> ops = reactiveRedisTemplate.opsForValue(); // 异步添加一个值到 Redis Mono<Boolean> mono = ops.set("value-key", "value1"); // 订阅数据流,消费数据 mono.subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) { if(aBoolean) { // 获取刚刚添加的值 ops.get("value-key").subscribe(new Consumer<String>() { @Override public void accept(String s) { System.out.println("value=" + s); countDownLatch.countDown(); } }); } else { System.err.println("添加键值到 Redis 失败"); } } }); try { countDownLatch.await(); } catch (Exception e) { e.printStackTrace(); }
运行结果如下:
value=value1
上面使用 CountDownLatch 计数器是为了等待异步调用执行完成,否则代码执行完直接退出了,没有机会执行 Consumer 中的代码。