我们知道在 Spring Data Redis 中同时支持了 Jedis 客户端和 Lettuce 客户端。但是仅 Lettuce 客户端才支持 Reactive(响应式)方式的操作,所以如果你希望使用 Reactive 方式那你只能选择 Lettuce 客户端。
ReactiveRedisConnection 是 Redis 通信的核心,因为它处理与 Redis 后端的通信。它还会自动将底层驱动程序异常转换为Spring 一致的 DAO 异常层次结构,因此您可以在不更改任何代码的情况下切换连接器,因为操作语义保持不变。
ReactiveRedisConnectionFactory 创建活动的 ReactiveRedisConnection 实例。此外,工厂还充当PersistenceExceptionTranslator 实例,这意味着工厂一旦声明,就可以进行透明的异常转换。例如,通过使用 @Repository 注解和 AOP 进行异常转换。
ReactiveRedisTemplate 是实际的操作类,和 RedisTemplate 一样,提供了 opsFor***() 方法,使用这些方法可以获取 value、hash、set、zset、list、geo 等类型的响应式 Operations 操作接口。方法定义如下:
// 操作 geo ReactiveGeoOperations<K,V> opsForGeo() // 操作 hash <HK,HV> ReactiveHashOperations<K,HK,HV> opsForHash() // 操作 hyperloglog ReactiveHyperLogLogOperations<K,V> opsForHyperLogLog() // 操作 list ReactiveListOperations<K,V> opsForList() // 操作 set ReactiveSetOperations<K,V> opsForSet() // 操作 value ReactiveValueOperations<K,V> opsForValue() // 操作 zset ReactiveZSetOperations<K,V> opsForZSet()
和 RedisTemplate 类不同的是,该类获取的 Operations 操作接口均是响应式的,而 RedisTemplate 获取的操作接口是传统操作接口,非响应式。
下面是一个简单的 @Configuration 配置类,该配置类配置了一个 ReactiveRedisTemplate 响应式操作类,用来以响应式编程的方式去操作 Redis。如下:
package com.hxstrive.redis.config;
import com.alibaba.fastjson.parser.Feature;
import com.alibaba.fastjson.serializer.SerializerFeature;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.StringRedisSerializer;
/**
* Redis配置
* @author hxstrive.com 2022/2/26
*/
@Configuration
public class RedisConfig {
@Bean
public ReactiveRedisTemplate<String,String> reactiveRedisTemplate(
ReactiveRedisConnectionFactory connectionFactory) {
RedisSerializationContext.RedisSerializationContextBuilder<String,String> builder =
RedisSerializationContext.newSerializationContext();
// 设置序列化方式
builder.key(new StringRedisSerializer());
builder.value(new StringRedisSerializer());
builder.hashKey(new StringRedisSerializer());
builder.hashValue(new StringRedisSerializer());
builder.string(new StringRedisSerializer());
RedisSerializationContext build = builder.build();
ReactiveRedisTemplate<String,String> template = new ReactiveRedisTemplate<String,String>(connectionFactory, build);
return template;
}
}下面将介绍怎样获取操作 Redis 的 value 类型的响应式接口,然后利用该接口设置和获取指定键的值。代码如下:
CountDownLatch countDownLatch = new CountDownLatch(1);
ReactiveValueOperations<String,String> ops = reactiveRedisTemplate.opsForValue();
// 异步添加一个值到 Redis
Mono<Boolean> mono = ops.set("value-key", "value1");
// 订阅数据流,消费数据
mono.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) {
if(aBoolean) {
// 获取刚刚添加的值
ops.get("value-key").subscribe(new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println("value=" + s);
countDownLatch.countDown();
}
});
} else {
System.err.println("添加键值到 Redis 失败");
}
}
});
try {
countDownLatch.await();
} catch (Exception e) {
e.printStackTrace();
}运行结果如下:
value=value1
上面使用 CountDownLatch 计数器是为了等待异步调用执行完成,否则代码执行完直接退出了,没有机会执行 Consumer 中的代码。