Spring Data Redis 中,Set 类型的响应式接口为 ReactiveSetOperations,该接口定义的方法和 SetOperations、BoundSetOperations 接口定义的方法非常类似。
我们可以通过 ReactiveRedisTemplate 的 opsForSet() 方法获取获取,代码如下:
ReactiveSetOperations<String,String> ops = reactiveRedisTemplate.opsForSet();
reactor.core.publisher.Mono<Long> add(K key, V... values) 将给定的 values 添加到 key 对应的集合。
示例:
@Test public void add() throws Exception { LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(); ReactiveSetOperations<String,String> ops = reactiveRedisTemplate.opsForSet(); ops.add("set-key", "value1", "value2", "value3") .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) { queue.add("向 set-key 成功添加 " + aLong + " 元素"); } }); System.out.println(queue.take()); ops.members("set-key").collectList() .subscribe(new Consumer<List<String>>() { @Override public void accept(List<String> strings) { System.out.println(Arrays.toString(strings.toArray())); } }); }
运行示例,输出结果如下:
向 set-key 成功添加 3 元素 [value1, value3, value2]
reactor.core.publisher.Mono<Boolean> delete(K key) 删除给定的键。
示例:
@Test public void delete() throws Exception { LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(); ReactiveSetOperations<String,String> ops = reactiveRedisTemplate.opsForSet(); ops.add("set-key", "value1", "value2", "value3") .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) { queue.add("向 set-key 成功添加 " + aLong + " 元素"); } }); System.out.println(queue.take()); // 删除 ops.delete("set-key") .subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) { queue.add(aBoolean ? "删除 set-key 成功" : "删除 set-key 失败"); } }); System.out.println(queue.take()); // 验证key是否存在 ops.members("set-key").collectList() .subscribe(new Consumer<List<String>>() { @Override public void accept(List<String> strings) { System.out.println(Arrays.toString(strings.toArray())); } }); }
运行示例,输出结果如下:
向 set-key 成功添加 3 元素 删除 set-key 成功 []
reactor.core.publisher.Flux<V> difference(K key, Collection<K> otherKeys) 计算 key 和 otherKeys 对应集合的差异,返回 key 集合中没有在 otherKeys 集合中存在的元素列表
reactor.core.publisher.Flux<V> difference(K key, K otherKey) 计算 key 和 otherKey 对应集合的差异,返回 key 集合中没有在 otherKeys 集合中存在的元素列表
reactor.core.publisher.Mono<Long> differenceAndStore(K key, Collection<K> otherKeys, K destKey) 计算 key 和 otherKeys 对应集合的差异,并且将 key 集合中没有在 otherKeys 集合中存在的元素列表存储到 destKey 集合
reactor.core.publisher.Mono<Long> differenceAndStore(K key, K otherKey, K destKey) 计算 key 和 otherKeys 对应集合的差异,并且将 key 集合中没有在 otherKey 集合中存在的元素列表存储到 destKey 集合
示例:
@Test public void difference() throws Exception { LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(); ReactiveSetOperations<String,String> ops = reactiveRedisTemplate.opsForSet(); ops.add("set-key1", "value1", "value2", "value3") .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) { queue.add("向 set-key1 成功添加 " + aLong + " 元素"); } }); System.out.println(queue.take()); ops.add("set-key2", "value1", "value2", "value4") .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) { queue.add("向 set-key2 成功添加 " + aLong + " 元素"); } }); System.out.println(queue.take()); // 计算两个key的 ops.difference("set-key1", "set-key2") .collectList().subscribe(new Consumer<List<String>>() { @Override public void accept(List<String> strings) { System.out.println("difference:" + Arrays.toString(strings.toArray())); } }); }
运行示例,输出结果如下:
向 set-key1 成功添加 3 元素 向 set-key2 成功添加 3 元素 difference:[value3]
reactor.core.publisher.Flux<V> distinctRandomMembers(K key, long count) 随机从 key 对应的集合中获取多个元素,这些元素不能相同。
示例:
@Test public void distinctRandomMembers() throws Exception { LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(); ReactiveSetOperations<String,String> ops = reactiveRedisTemplate.opsForSet(); ops.add("set-key", "value1", "value2", "value3", "value4", "value5") .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) { queue.add("向 set-key 成功添加 " + aLong + " 元素"); } }); System.out.println(queue.take()); ops.distinctRandomMembers("set-key", 2).collectList() .subscribe(new Consumer<List<String>>() { @Override public void accept(List<String> strings) { System.out.println(Arrays.toString(strings.toArray())); } }); }
运行示例,输出结果如下:
向 set-key 成功添加 5 元素 [value4, value2]
reactor.core.publisher.Flux<V> intersect(K key, Collection<K> otherKeys) 计算 key 和 otherKeys 集合所有元素的交集
reactor.core.publisher.Flux<V> intersect(K key, K otherKey) 计算 key 和 otherKey 集合所有元素的交集
reactor.core.publisher.Mono<Long> intersectAndStore(K key, Collection<K> otherKeys, K destKey) 计算 key 和 otherKeys 集合所有元素的交集,并且将结果存储到 destKey 集合
reactor.core.publisher.Mono<Long> intersectAndStore(K key, K otherKey, K destKey) 计算 key 和 otherKeys 集合所有元素的交集,并且将结果存储到 destKey 集合
示例:
@Test public void intersect() throws Exception { LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(); ReactiveSetOperations<String,String> ops = reactiveRedisTemplate.opsForSet(); ops.add("set-key1", "value1", "value2", "value3") .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) { queue.add("向 set-key1 成功添加 " + aLong + " 元素"); } }); System.out.println(queue.take()); ops.add("set-key2", "value1", "value2", "value4") .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) { queue.add("向 set-key2 成功添加 " + aLong + " 元素"); } }); System.out.println(queue.take()); ops.add("set-key3", "value1", "value2", "value5") .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) { queue.add("向 set-key3 成功添加 " + aLong + " 元素"); } }); System.out.println(queue.take()); // 计算两个key的 ops.intersect("set-key1", Arrays.asList("set-key2", "set-key3")) .collectList().subscribe(new Consumer<List<String>>() { @Override public void accept(List<String> strings) { System.out.println("intersect:" + Arrays.toString(strings.toArray())); } }); }
运行示例,输出结果如下:
向 set-key1 成功添加 3 元素 向 set-key2 成功添加 3 元素 向 set-key3 成功添加 3 元素 intersect:[value1, value2]
reactor.core.publisher.Mono<Boolean> isMember(K key, Object o) 检查指定的值 o 是否是 key 对应集合中的成员。如果是,则发返回 true。如果不是,则返回 false。
示例:
@Test public void isMember() throws Exception { LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(); ReactiveSetOperations<String,String> ops = reactiveRedisTemplate.opsForSet(); ops.add("set-key", "value1", "value2", "value3", "value4") .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) { queue.add("向 set-key 成功添加 " + aLong + " 元素"); } }); System.out.println(queue.take()); ops.isMember("set-key", "value3") .subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) { queue.add(aBoolean ? "value3 存在" : "value3 不存在"); } }); System.out.println(queue.take()); ops.isMember("set-key", "value5") .subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) { queue.add(aBoolean ? "value5 存在" : "value5 不存在"); } }); System.out.println(queue.take()); }
运行示例,输出结果如下:
向 set-key 成功添加 4 元素 value3 存在 value5 不存在
reactor.core.publisher.Flux<V> members(K key) 返回指定 key 对应集合的所有元素
示例:
@Test public void members() throws Exception { LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(); ReactiveSetOperations<String,String> ops = reactiveRedisTemplate.opsForSet(); ops.add("set-key", "value1", "value2", "value3") .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) { queue.add("向 set-key 成功添加 " + aLong + " 元素"); } }); System.out.println(queue.take()); ops.members("set-key").collectList() .subscribe(new Consumer<List<String>>() { @Override public void accept(List<String> strings) { System.out.println(Arrays.toString(strings.toArray())); } }); }
运行示例,输出结果如下:
向 set-key 成功添加 3 元素 [value1, value3, value2]
reactor.core.publisher.Mono<Boolean> move(K sourceKey, V value, K destKey) 将指定 value 从 sourceKey 移动到 destKey 集合
示例:
@Test public void move() throws Exception { LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(); ReactiveSetOperations<String,String> ops = reactiveRedisTemplate.opsForSet(); ops.add("set-key", "value1", "value2", "value3", "value4") .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) { queue.add("向 set-key 成功添加 " + aLong + " 元素"); } }); System.out.println(queue.take()); // 将 value2 从 set-key 移动到 set-key2 ops.move("set-key", "value2", "set-key2"); ops.members("set-key").collectList() .subscribe(new Consumer<List<String>>() { @Override public void accept(List<String> strings) { queue.add(Arrays.toString(strings.toArray())); } }); System.out.println(queue.take()); ops.members("set-key2").collectList() .subscribe(new Consumer<List<String>>() { @Override public void accept(List<String> strings) { queue.add(Arrays.toString(strings.toArray())); } }); System.out.println(queue.take()); }
运行示例,输出结果如下:
向 set-key 成功添加 4 元素 [value4, value1, value3, value2] []
reactor.core.publisher.Mono<V> pop(K key) 从 key 对应集合中随机移除且返回一个元素
reactor.core.publisher.Flux<V> pop(K key, long count) 从 key 对应集合中随机移除且返回 count 个元素
示例:
@Test public void pop() throws Exception { LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(); ReactiveSetOperations<String,String> ops = reactiveRedisTemplate.opsForSet(); ops.add("set-key", "value1", "value2", "value3", "value4") .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) { queue.add("向 set-key 成功添加 " + aLong + " 元素"); } }); System.out.println(queue.take()); ops.pop("set-key").subscribe(new Consumer<String>() { @Override public void accept(String s) { queue.add("pop: " + s); } }); System.out.println(queue.take()); }
运行示例,输出结果如下:
向 set-key 成功添加 4 元素 pop: value3
reactor.core.publisher.Mono<V> randomMember(K key) 从 key 对应的集合中随机返回一个元素
reactor.core.publisher.Flux<V> randomMembers(K key, long count) 从 key 对应的集合中随机返回 count 元素
示例:
@Test public void randomMember() throws Exception { LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(); ReactiveSetOperations<String,String> ops = reactiveRedisTemplate.opsForSet(); ops.add("set-key", "value1", "value2", "value3", "value4") .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) { queue.add("向 set-key 成功添加 " + aLong + " 元素"); } }); System.out.println(queue.take()); ops.randomMember("set-key").subscribe(new Consumer<String>() { @Override public void accept(String s) { queue.add(s); } }); System.out.println(queue.take()); }
运行示例,输出结果如下:
向 set-key 成功添加 4 元素 value1
reactor.core.publisher.Mono<Long> remove(K key, Object... values) 从 key 对应的集合中移除 values 多个元素
示例:
@Test public void remove() throws Exception { LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(); ReactiveSetOperations<String,String> ops = reactiveRedisTemplate.opsForSet(); ops.add("set-key", "value1", "value2", "value3", "value4") .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) { queue.add("向 set-key 成功添加 " + aLong + " 元素"); } }); System.out.println(queue.take()); ops.remove("set-key", "value2", "value4") .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) { queue.add("从 set-key 删除" + aLong + "个元素"); } }); System.out.println(queue.take()); ops.members("set-key").collectList() .subscribe(new Consumer<List<String>>() { @Override public void accept(List<String> strings) { queue.add(Arrays.toString(strings.toArray())); } }); System.out.println(queue.take()); }
运行示例,输出结果如下:
向 set-key 成功添加 4 元素 从 set-key 删除2个元素 [value1, value3]
reactor.core.publisher.Mono<Long> size(K key) 返回 key 对应集合的大小
示例:
@Test public void size() throws Exception { LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(); ReactiveSetOperations<String,String> ops = reactiveRedisTemplate.opsForSet(); ops.add("set-key", "value1", "value2", "value3", "value4") .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) { queue.add("向 set-key 成功添加 " + aLong + " 元素"); } }); System.out.println(queue.take()); ops.size("set-key").subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) { queue.add("set-key 有" + aLong + "个元素"); } }); System.out.println(queue.take()); }
运行示例,输出结果如下:
向 set-key 成功添加 4 元素 set-key 有4个元素
reactor.core.publisher.Flux<V> union(K key, Collection<K> otherKeys) 合并 key 和 otherKeys 集合中的元素
reactor.core.publisher.Flux<V> union(K key, K otherKey) 合并 key 和 otherKey 集合中的元素
reactor.core.publisher.Mono<Long> unionAndStore(K key, Collection<K> otherKeys, K destKey) 合并 key 和 otherKeys 集合中的元素,并且将合并结果存储到 destKey 集合
reactor.core.publisher.Mono<Long> unionAndStore(K key, K otherKey, K destKey) 合并 key 和 otherKeys 集合中的元素,并且将合并结果存储到 destKey 集合
示例:
@Test public void union() throws Exception { LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(); ReactiveSetOperations<String,String> ops = reactiveRedisTemplate.opsForSet(); ops.add("set-key1", "value1", "value2", "value3") .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) { queue.add("向 set-key1 成功添加 " + aLong + " 元素"); } }); System.out.println(queue.take()); ops.add("set-key2", "value1", "value2", "value4") .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) { queue.add("向 set-key2 成功添加 " + aLong + " 元素"); } }); System.out.println(queue.take()); // 计算两个key的 ops.union("set-key1", "set-key2") .collectList().subscribe(new Consumer<List<String>>() { @Override public void accept(List<String> strings) { System.out.println("union:" + Arrays.toString(strings.toArray())); } }); }
运行示例,输出结果如下:
向 set-key1 成功添加 3 元素 向 set-key2 成功添加 3 元素 union:[value3, value2, value1, value4]