Spring Data Redis 中的 ClusterOperations 操作接口针对特定于群集的操作的 Redis 操作。RedisClusterNode 可以从连接中获得,也可以使用 host 和 RedisNode.getPort() 或节点 ID 来构造。
由于 ClusterOperations 操作接口用来操作 Redis 集群,在正式介绍 ClusterOperations 具体用法前,我们需要先搭建一个 Redis 集群环境,为了测试方便,直接在本地 Windows 11 中搭建伪集群。集群如下:
127.0.0.1:6379 127.0.0.1:6378 127.0.0.1:6377 127.0.0.1:6376 127.0.0.1:6375 127.0.0.1:6374
该 Redis 集群总共有6个节点,3 个 Master,3 个 Slave。
Redis 集群模式和单机模式的配置文件有一定的区别,将简单的集群配置写入到 application.yml 文件,配置内容如下:
# Redis 集群配置 spring: redis: database: 1 cluster: nodes: 127.0.0.1:6379,127.0.0.1:6378,127.0.0.1:6377,127.0.0.1:6376,127.0.0.1:6375,127.0.0.1:6374
使用 bgSave() 方法可以对集群中指定的节点手动触发持久化,方法定义如下:
void bgSave(RedisClusterNode node) 在给定节点上开始后台保存数据。
示例:
ClusterOperations<String,String> ops = redisTemplate.opsForCluster(); RedisClusterNode node = new RedisClusterNode("127.0.0.1", 6374); ops.bgSave(node);
执行上面代码后,观察 127.0.0.1:6374 节点控制台,控制台会输出如下信息:
... [11384] 24 Aug 13:05:00.304 # fork operation complete [11384] 24 Aug 13:05:00.326 * Background saving terminated with success
void save(RedisClusterNode node) 在服务器上同步保存当前数据库快照。
示例:
ClusterOperations<String,String> ops = redisTemplate.opsForCluster(); RedisClusterNode node = new RedisClusterNode("127.0.0.1", 6374); ops.save(node);
执行上面代码后,观察 127.0.0.1:6374 节点控制台,控制台会输出如下信息:
... [11384] 24 Aug 13:26:45.044 * DB saved on disk
使用 flushDb() 方法可以手动刷新数据库,方法定义如下:
void flushDb(RedisClusterNode node) 在节点上刷新数据库。
示例:
ClusterOperations<String,String> ops = redisTemplate.opsForCluster(); RedisClusterNode node = new RedisClusterNode("127.0.0.1", 6374); ops.flushDb(node);
上面代码手动刷新了 127.0.0.1:6374 节点。
使用 keys() 方法可以从指定节点获取键集合,方法定义如下:
Set<K> keys(RedisClusterNode node, K pattern) 获取位于给定节点中匹配 pattern 的所有键。
示例:
ClusterOperations<String,String> ops = redisTemplate.opsForCluster(); // 写入数据 ValueOperations<String,String> valueOps = redisTemplate.opsForValue(); valueOps.set("key1", "value1"); valueOps.set("key2", "value2"); valueOps.set("key3", "value3"); valueOps.set("key4", "value4"); // 获取每个节点所有的缓存 key for(int i = 6374; i <= 6379; i++) { RedisClusterNode node = new RedisClusterNode("127.0.0.1", i); Set<String> keys = ops.keys(node, "*"); System.out.println(node + " ==> " + Arrays.toString(keys.toArray())); }
运行示例,输出结果如下:
127.0.0.1:6374 ==> [key3, key2] 127.0.0.1:6375 ==> [key1] 127.0.0.1:6376 ==> [key4] 127.0.0.1:6377 ==> [key4] 127.0.0.1:6378 ==> [key2, key3] 127.0.0.1:6379 ==> [key1]
使用 ping() 方法可以 Ping 指定的节点,查看节点状态,方法定义如下:
String ping(RedisClusterNode node) Ping 给定的节点
示例:
ClusterOperations<String,String> ops = redisTemplate.opsForCluster(); // ping 每一个节点 for(int i = 6374; i <= 6379; i++) { RedisClusterNode node = new RedisClusterNode("127.0.0.1", i); String ping = ops.ping(node); System.out.println(node + " ==> " + ping); }
运行示例,输出结果如下:
127.0.0.1:6374 ==> PONG 127.0.0.1:6375 ==> PONG 127.0.0.1:6376 ==> PONG 127.0.0.1:6377 ==> PONG 127.0.0.1:6378 ==> PONG 127.0.0.1:6379 ==> PONG
注意:如果能够正常 Ping 通,则返回 “PONG” 字符串。如果不能 Ping 通,则抛出 “Unable to connect to 127.0.0.1:6378” 错误。
使用 randomKey() 方法我们可以从指定节点中随机返回一个键,方法定义如下:
K randomKey(RedisClusterNode node) 从给定节点所服务的范围中获取随机密钥。
示例:
// 准备数据 ValueOperations<String,String> valueOps = redisTemplate.opsForValue(); for(int i = 0; i < 20; i++) { valueOps.set("key" + i, "value" + i); } ClusterOperations<String,String> ops = redisTemplate.opsForCluster(); // 从每个节点随机返回一个键 for(int i = 6374; i <= 6379; i++) { RedisClusterNode node = new RedisClusterNode("127.0.0.1", i); String key = ops.randomKey(node); System.out.println(node + " ==> " + key); }
运行示例,输出结果如下:
127.0.0.1:6374 ==> key7 127.0.0.1:6375 ==> key10 127.0.0.1:6376 ==> key16 127.0.0.1:6377 ==> key0 127.0.0.1:6378 ==> key3 127.0.0.1:6379 ==> key14
使用 shutdown() 方法可以手动关闭指定节点,方法定义如下:
void shutdown(RedisClusterNode node) 关闭给定的节点
示例:
ClusterOperations<String,String> ops = redisTemplate.opsForCluster(); // 从每个节点随机返回一个键 for(int i = 6374; i <= 6379; i++) { RedisClusterNode node = new RedisClusterNode("127.0.0.1", i); ops.shutdown(node); System.out.println(node + " ==> shutdown"); }
运行示例,输出结果如下:
127.0.0.1:6374 ==> shutdown 127.0.0.1:6375 ==> shutdown 127.0.0.1:6376 ==> shutdown 127.0.0.1:6377 ==> shutdown 127.0.0.1:6378 ==> shutdown 127.0.0.1:6379 ==> shutdown
执行上面代码后,观察所有节点控制台,控制台会输出如下信息:
... [12752] 24 Aug 13:40:39.312 # User requested shutdown... [12752] 24 Aug 13:40:39.312 # There is a child saving an .rdb. Killing it! [12752] 24 Aug 13:40:39.359 * Saving the final RDB snapshot before exiting. [12752] 24 Aug 13:40:39.437 * DB saved on disk [12752] 24 Aug 13:40:39.453 # Redis is now ready to exit, bye bye...
使用 meet() 方法可以将一个节点添加到 Redis 集群,forget() 方法将指定节点从 Redis 集群中移除。方法定义如下:
void forget(RedisClusterNode node) 从集群中移除指定的节点
void meet(RedisClusterNode node) 将指定的节点添加到集群中
Collection<RedisClusterNode> getSlaves(RedisClusterNode node) 获取节点对应的所有 Slave 节点
示例:
ClusterOperations<String,String> ops = redisTemplate.opsForCluster(); // 选择一个 Slave 节点 RedisClusterNode slaveNode = null; for(int i = 6374; i <= 6379; i++) { try { RedisClusterNode node = new RedisClusterNode("127.0.0.1", i); Collection<RedisClusterNode> slaves = ops.getSlaves(node); for (RedisClusterNode slave : slaves) { System.out.println("slave ==> " + slave); if (null == slaveNode) { slaveNode = slave; } } } catch (RedisSystemException e) {} } if(null == slaveNode) { System.err.println("很抱歉,没有 Slave 节点!"); return; } // 将节点从集群移除 ops.forget(slaveNode); System.out.println("从集群移除节点:" + slaveNode); // 将节点添加到集群 ops.meet(slaveNode); System.out.println("添加节点到集群:" + slaveNode);
运行示例,输出结果如下:
slave ==> 127.0.0.1:6378 slave ==> 127.0.0.1:6379 slave ==> 127.0.0.1:6377 从集群移除节点:127.0.0.1:6378 添加节点到集群:127.0.0.1:6378
注意:如果我们调用 forget() 或者 meet() 方法时,传递的节点不是 slave 节点,则会抛出如下错误信息:
Error in execution; nested exception is io.lettuce.core.RedisCommandExecutionException: ERR Can't forget my master!; nested exception is org.springframework.data.redis.RedisSystemException: Error in execution; nested exception is io.lettuce.core.RedisCommandExecutionException: ERR Can't forget my master! 不能移除 master 类型节点
一个 Redis 集群包含 16384 个插槽(hash slot),数据库中的每个键都属于这 16384 个插槽的其中一个,集群使用公式 CRC16(key) % 16384 来计算键 key 属于哪个槽,其中 CRC16(key) 语句用于计算键 key 的 CRC16 校验和,集群中的每个节点负责处理一部分插槽。
Spring Data Redis 中,可以使用 addSlots() 方法添加指定插槽到某个节点,方法定义如下:
void addSlots(RedisClusterNode node, int... slots) 将给定的插槽添加到节点
void addSlots(RedisClusterNode node, RedisClusterNode.SlotRange range) 将 RedisClusterNode.SlotRange 中的插槽添加到给定节点
示例:
ClusterOperations<String,String> ops = redisTemplate.opsForCluster(); RedisClusterNode node = new RedisClusterNode("127.0.0.1", 6374); // 添加一个或多个卡槽到 Redis ops.addSlots(node, 1); // 将一个范围内的卡槽添加到 Redis RedisClusterNode.SlotRange slotRange = new RedisClusterNode.SlotRange(100, 200); ops.addSlots(node, slotRange);
使用 reshard() 方法可以将指定插槽以及插槽下面的键移动到指定的节点。方法定义如下:
void reshard(RedisClusterNode source, int slot, RedisClusterNode target) 将插槽分配从一个源移到目标节点,并复制与该插槽关联的关键字
示例:
ClusterOperations<String,String> ops = redisTemplate.opsForCluster(); RedisClusterNode node1 = new RedisClusterNode("127.0.0.1", 6374); RedisClusterNode node2 = new RedisClusterNode("127.0.0.1", 6379); ops.reshard(node1, 1, node2);
注意:进行卡槽移动必须在 Master 节点上面进行,否则抛出如下错误:
org.springframework.data.redis.RedisSystemException: Error in execution; nested exception is io.lettuce.core.RedisCommandExecutionException: ERR Please use SETSLOT only with masters.
AOF 是 Redis 的另一种持久化的方式。它是基于文本形式的,追加写入命令的方式来做持久化的。这个有些类似 Mysql 中的 binlog,它的持久化记录的粒度比 RDB 要更好,所以生产环境中一般会开启 AOF 持久化的。当然,在 Redis4 之后的版本,rdb 和 aof 是可以开启混合模式的。
在 Spring Data Redis 中,使用 bgReWriteAof() 方法手动执行 AOF 写入。方法定义如下:
void bgReWriteAof(RedisClusterNode node) 在给定节点上启动仅附加文件重写过程。