Spring Data Redis 中,ZSet 类型的响应式接口为 ReactiveHashOperations,该接口定义的方法和 HashOperations、BoundHashOperations 接口定义的方法非常类似。
我们可以通过 ReactiveRedisTemplate 的 opsForHash() 方法获取获取,代码如下:
ReactiveHashOperations<String,String> ops = reactiveRedisTemplate.opsForHash();
reactor.core.publisher.Mono<Boolean> put(H key, HK hashKey, HV value) 设置哈希 hashKey 的值。
reactor.core.publisher.Mono<Boolean> putAll(H key, Map<? extends HK,? extends HV> map) 使用 map 中提供的数据将多个哈希字段设置为多个值。
reactor.core.publisher.Mono<Boolean> putIfAbsent(H key, HK hashKey, HV value) 仅当 hashKey 不存在时才设置 Hash hashKey 的值。
示例:
@Test
public void put() throws Exception {
LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
ReactiveHashOperations<String,String,String> ops = reactiveRedisTemplate.opsForHash();
// 添加一个值
ops.put("key", "k1", "v1")
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) {
queue.add(aBoolean ? "添加k1成功" : "添加k1失败");
}
});
System.out.println(queue.take());
// 添加多个值
Map<String,String> dataMap = new HashMap<>();
dataMap.put("k2", "v3");
dataMap.put("k3", "v3");
ops.putAll("key", dataMap)
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) {
queue.add(aBoolean ? "添加k2、k3成功" : "添加k2、k3失败");
}
});
System.out.println(queue.take());
// 获取 key 的数据
ops.scan("key").collectList()
.subscribe(new Consumer<List<Map.Entry<String, String>>>() {
@Override
public void accept(List<Map.Entry<String, String>> entries) {
for(Map.Entry<String,String> entry : entries) {
System.out.println(entry.getKey() + "=" + entry.getValue());
}
queue.add("key 大小=" + entries.size());
}
});
System.out.println(queue.take());
}运行示例,输出结果如下:
添加k1成功 添加k2、k3成功 k1=v1 k3=v3 k2=v3 key 大小=3
reactor.core.publisher.Mono<HV> get(H key, Object hashKey) 获取 key 对应 Hash 中 hashKey 的值
示例:
@Test
public void get() throws Exception {
LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
ReactiveHashOperations<String,String,String> ops = reactiveRedisTemplate.opsForHash();
// 添加一个值
ops.put("key", "k1", "v1")
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) {
queue.add(aBoolean ? "添加k1成功" : "添加k1失败");
}
});
System.out.println(queue.take());
// 获取 k1 的值
ops.get("key", "k1").subscribe(new Consumer<String>() {
@Override
public void accept(String s) {
queue.add("k1=" + s);
}
});
System.out.println(queue.take());
}运行示例,输出结果如下:
添加k1成功 k1=v1
reactor.core.publisher.Mono<Boolean> hasKey(H key, Object hashKey) 确定 hashKey 是否存在
示例:
@Test
public void hasKey() throws Exception {
LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
ReactiveHashOperations<String,String,String> ops = reactiveRedisTemplate.opsForHash();
// 添加一个值
ops.put("key", "k1", "v1")
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) {
queue.add(aBoolean ? "添加k1成功" : "添加k1失败");
}
});
System.out.println(queue.take());
// 判断 k1 是否存在
ops.hasKey("key", "k1").subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) {
queue.add(aBoolean ? "k1存在" : "k1不存在");
}
});
System.out.println(queue.take());
}运行示例,输出结果如下:
添加k1成功 k1存在
reactor.core.publisher.Mono<Double> increment(H key, HK hashKey, double delta) 按给定增量递增哈希 hashKey 的值。
reactor.core.publisher.Mono<Long> increment(H key, HK hashKey, long delta) 按给定增量递增哈希 hashKey 的值。
示例:
@Test
public void increment() throws Exception {
LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
ReactiveHashOperations<String,String,String> ops = reactiveRedisTemplate.opsForHash();
// 添加一个值
ops.put("key", "k1", "100")
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) {
queue.add(aBoolean ? "添加k1成功" : "添加k1失败");
}
});
System.out.println(queue.take());
// 将 k1 递增 50
ops.increment("key", "k1", 50).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) {
System.out.println("递增后,k1的值为" + aLong);
}
});
System.out.println(queue.take());
}运行示例,输出结果如下:
添加k1成功 递增后,k1的值为150
reactor.core.publisher.Flux<Map.Entry<HK,HV>> entries(H key) 获取指定 key 存储的 Map.Entry 列表
示例:
@Test
public void entries() throws Exception {
LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
ReactiveHashOperations<String,String,String> ops = reactiveRedisTemplate.opsForHash();
// 添加多个值
Map<String,String> dataMap = new HashMap<>();
dataMap.put("k1", "v1");
dataMap.put("k2", "v2");
dataMap.put("k3", "v3");
ops.putAll("key", dataMap)
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) {
queue.add(aBoolean ? "添加k1、k2、k3成功" : "添加k1、k2、k3失败");
}
});
System.out.println(queue.take());
// 获取 Hash 所有的 Map.Entry
ops.entries("key").collectList()
.subscribe(new Consumer<List<Map.Entry<String, String>>>() {
@Override
public void accept(List<Map.Entry<String, String>> entries) {
for(Map.Entry<String, String> entry : entries) {
System.out.println(entry.getKey() + "=" + entry.getValue());
}
queue.add("");
}
});
System.out.println(queue.take());
}运行示例,输出结果如下:
添加k1、k2、k3成功 k3=v3 k2=v2 k1=v1
reactor.core.publisher.Flux<HK> keys(H key) 获取 Hash 中的所有 key
示例:
@Test
public void keys() throws Exception {
LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
ReactiveHashOperations<String,String,String> ops = reactiveRedisTemplate.opsForHash();
// 添加多个值
Map<String,String> dataMap = new HashMap<>();
dataMap.put("k1", "v1");
dataMap.put("k2", "v2");
dataMap.put("k3", "v3");
ops.putAll("key", dataMap)
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) {
queue.add(aBoolean ? "添加k1、k2、k3成功" : "添加k1、k2、k3失败");
}
});
System.out.println(queue.take());
// 获取所有的key
ops.keys("key").collectList()
.subscribe(new Consumer<List<String>>() {
@Override
public void accept(List<String> strings) {
for(String str : strings) {
System.out.println(str);
}
}
});
}运行示例,输出结果如下:
添加k1、k2、k3成功 k3 k2 k1
reactor.core.publisher.Flux<HV> values(H key) 获取 Hash 中所有值
示例:
@Test
public void values() throws Exception {
LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
ReactiveHashOperations<String,String,String> ops = reactiveRedisTemplate.opsForHash();
// 添加多个值
Map<String,String> dataMap = new HashMap<>();
dataMap.put("k1", "v1");
dataMap.put("k2", "v2");
dataMap.put("k3", "v3");
ops.putAll("key", dataMap)
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) {
queue.add(aBoolean ? "添加k1、k2、k3成功" : "添加k1、k2、k3失败");
}
});
System.out.println(queue.take());
// 获取所有的值
ops.values("key").collectList()
.subscribe(new Consumer<List<String>>() {
@Override
public void accept(List<String> strings) {
for(String str : strings) {
System.out.println(str);
}
}
});
}运行示例,输出结果如下:
添加k1、k2、k3成功 v3 v2 v1
reactor.core.publisher.Mono<List<HV>> multiGet(H key, Collection<HK> hashKeys) 从 Hash 中获取给定 hashKeys 的值。
示例:
@Test
public void multiGet() throws Exception {
LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
ReactiveHashOperations<String,String,String> ops = reactiveRedisTemplate.opsForHash();
// 添加多个值
Map<String,String> dataMap = new HashMap<>();
dataMap.put("k1", "v1");
dataMap.put("k2", "v2");
dataMap.put("k3", "v3");
ops.putAll("key", dataMap)
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) {
queue.add(aBoolean ? "添加k1、k2、k3成功" : "添加k1、k2、k3失败");
}
});
System.out.println(queue.take());
// 获取多个key的值
ops.multiGet("key", Arrays.asList("k1", "k2", "k3"))
.subscribe(new Consumer<List<String>>() {
@Override
public void accept(List<String> strings) {
for(String str : strings) {
System.out.println(str);
}
}
});
}运行示例,输出结果如下:
添加k1、k2、k3成功 v1 v2 v3
reactor.core.publisher.Mono<Boolean> delete(H key) 移除给定的key
示例:
@Test
public void delete() throws Exception {
LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
ReactiveHashOperations<String,String,String> ops = reactiveRedisTemplate.opsForHash();
// 添加一个值
ops.put("key", "k1", "100")
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) {
queue.add(aBoolean ? "添加k1成功" : "添加k1失败");
}
});
System.out.println(queue.take());
// 删除key
ops.delete("key").subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) {
System.out.println(aBoolean ? "删除key成功" : "删除key失败");
}
});
}运行示例,输出结果如下:
添加k1成功 删除key成功
reactor.core.publisher.Mono<Long> remove(H key, Object... hashKeys) 从 key 的散列中删除给定的 hashKeys。
示例:
@Test
public void remove() throws Exception {
LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
ReactiveHashOperations<String,String,String> ops = reactiveRedisTemplate.opsForHash();
// 添加多个值
Map<String,String> dataMap = new HashMap<>();
dataMap.put("k1", "v1");
dataMap.put("k2", "v2");
dataMap.put("k3", "v3");
ops.putAll("key", dataMap)
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) {
queue.add(aBoolean ? "添加k1、k2、k3成功" : "添加k1、k2、k3失败");
}
});
System.out.println(queue.take());
// 获取多个key的值
ops.remove("key", "k1", "k2", "k3")
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) {
System.out.println("成功删除" + aLong + "个元素");
}
});
}运行示例,输出结果如下:
添加k1、k2、k3成功 成功删除3个元素
reactor.core.publisher.Mono<Long> size(H key) 获取 key 对应 Hash 的大小
示例:
@Test
public void size() throws Exception {
LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
ReactiveHashOperations<String,String,String> ops = reactiveRedisTemplate.opsForHash();
// 添加多个值
Map<String,String> dataMap = new HashMap<>();
dataMap.put("k1", "v1");
dataMap.put("k2", "v2");
dataMap.put("k3", "v3");
ops.putAll("key", dataMap)
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) {
queue.add(aBoolean ? "添加k1、k2、k3成功" : "添加k1、k2、k3失败");
}
});
System.out.println(queue.take());
// 获取多个key的值
ops.size("key")
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) {
System.out.println("Hash大小为" + aLong);
}
});
}运行示例,输出结果如下:
添加k1、k2、k3成功 Hash大小为3