Redis Stream(流)是 Redis 5.0 版本引入的一个新的数据类型。
Stream 以更抽象的方式模拟日志数据结构,但日志仍然是完整的:就像一个日志文件,通常实现为以只附加模式打开的文件,Redis流主要是一个仅附加数据结构。
本文将介绍怎样通过 jedis 库操作 Redis 的 Streams 数据类型。
注意,低版本的 jedis 不支持流,jedis 至少需要 3.1.0 版本以上,本文将使用最新版本 4.3.1,依赖如下:
<dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>4.3.1</version> </dependency>
用来将指定的数据添加到流中,返回条目ID,方法定义如下:
byte[] xadd(byte[] key, XAddParams params, Map<byte[],byte[]> hash)
StreamEntryID xadd(String key, StreamEntryID id, Map<String,String> hash) 对应的 Redis 命令为 XADD key ID field string [field string ...]
StreamEntryID xadd(String key, XAddParams params, Map<String,String> hash)
示例:
// 添加一个条目到流 Map<String,String> map = new HashMap<>(); map.put("id", "1000"); map.put("name", "Helen"); StreamEntryID id = jedis.xadd("mystream", StreamEntryID.NEW_ENTRY, map); System.out.println(id); // 1679289623159-0 // 注意:流条目内容为 {"id":"1000","name":"Helen"} // 添加一个条目到流,指定最大长度,以及是否近乎精确 for(int i = 0; i < 5; i++) { map = new HashMap<>(); map.put("id", Integer.valueOf(2000 + i).toString()); map.put("name", "Bill-" + i); XAddParams xAddParams = new XAddParams(); xAddParams.maxLen(2); // 最大长度 xAddParams.approximateTrimming(); // 近乎精确分割 //xAddParams.exactTrimming(); // 精确分割 xAddParams.id(StreamEntryID.NEW_ENTRY); // ID策略,即 * id = jedis.xadd("mystream", xAddParams, map); System.out.println(id); }
其中,StreamEntryID 用来指定流 ID 的生成策略,取值如下:
StreamEntryID.LAST_ENTRY 应该只与 XGROUP CREATE 一起使用,如:XGROUP CREATE mystream -group-name $
StreamEntryID.NEW_ENTRY 应该只与 XADD 一起使用,如: XADD mystream * field1 value1
StreamEntryID.UNRECEIVED_ENTRY 应该只与 XREADGROUP 一起使用,如: XREADGROUP $GroupName $ConsumerName BLOCK 2000 COUNT 10 STREAMS mystream >
用来从流中读取条目,方法定义如下:
List<byte[]> xread(XReadParams xReadParams, Map.Entry<byte[],byte[]>... streams)
List<Map.Entry<String,List<StreamEntry>>> xread(XReadParams xReadParams, Map<String,StreamEntryID> streams) 对应的 Redis 命令为 XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
List<byte[]> xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams, Map.Entry<byte[],byte[]>... streams)
List<Map.Entry<String,List<StreamEntry>>> xreadGroup(String groupName, String consumer, XReadGroupParams xReadGroupParams, Map<String,StreamEntryID> streams) 对应的 Redis 命令为 XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
示例:
// 准备数据 List<StreamEntryID> idList = new ArrayList<>(); for(int i = 0; i < 5; i++) { Map<String, String> map = new HashMap<>(); map.put("uuid", UUID.randomUUID().toString()); StreamEntryID id = jedis.xadd("mystream", StreamEntryID.NEW_ENTRY, map); idList.add(id); } System.out.println(Arrays.toString(idList.toArray())); // [1679375078037-0, 1679375078038-0, 1679375078038-1, 1679375078038-2, 1679375078039-0] // 从 mystream 流中获取条目 ID 大于 1679375078038-1 的两个条目 Map<String,StreamEntryID> params = new HashMap<>(); params.put("mystream", idList.get(2)); XReadParams xReadParams = new XReadParams(); xReadParams.count(2); // 最大去读个数 xReadParams.block(1000); // 阻塞时间,毫秒 List<Map.Entry<String, List<StreamEntry>>> result = jedis.xread(xReadParams, params); for(Map.Entry<String, List<StreamEntry>> entry : result) { String key = entry.getKey(); List<StreamEntry> value = entry.getValue(); System.out.println("key = " + key); // key = mystream for(StreamEntry streamEntry : value) { System.out.println(streamEntry.getID() + " = " + JSONObject.toJSONString(streamEntry.getFields())); } // 1679375078038-2 = {"uuid":"d79024c2-e60d-42c5-9beb-4da6c302d7fc"} // 1679375078039-0 = {"uuid":"16c1caf4-6048-4add-8b79-a6ddbc10ec8b"} }
用来读取指定两个消息ID(开始ID,结束ID)之间的消息,方法定义如下:
List<byte[]> xrange(byte[] key, byte[] start, byte[] end)
List<byte[]> xrange(byte[] key, byte[] start, byte[] end, int count)
List<StreamEntry> xrange(String key, StreamEntryID start, StreamEntryID end) 对应的 Redis 命令为 XRANGE key start end
List<StreamEntry> xrange(String key, StreamEntryID start, StreamEntryID end, int count) 对应的 Redis 命令为 XRANGE key start end COUNT count
List<StreamEntry> xrange(String key, String start, String end)
List<StreamEntry> xrange(String key, String start, String end, int count)
示例:
// 准备数据,添加十条数据 List<StreamEntryID> idList = new ArrayList<>(); for(int i = 0; i < 5; i++) { Map<String, String> map = new HashMap<>(); map.put("uuid", UUID.randomUUID().toString()); StreamEntryID id = jedis.xadd("mystream", StreamEntryID.NEW_ENTRY, map); idList.add(id); } System.out.println(Arrays.toString(idList.toArray())); // [1679373956962-0, 1679373956963-0, 1679373956964-0, 1679373956964-1, 1679373956964-2] // 根据ID范围获取数据 StreamEntryID startId = idList.get(0); StreamEntryID endId = idList.get(idList.size() - 1); List<StreamEntry> streamEntryList = jedis.xrange("mystream", startId, endId, 5); for(StreamEntry streamEntry : streamEntryList) { System.out.println(streamEntry.getID() + " = " + JSONObject.toJSONString(streamEntry.getFields())); } // 1679373956962-0 = {"uuid":"5beca5dc-a1b8-4b59-a5a5-c7357baf287c"} // 1679373956963-0 = {"uuid":"82227f64-eddf-470e-86ab-de8fcf1ac1c3"} // 1679373956964-0 = {"uuid":"2647c5a4-8e77-454f-a1d1-a1772688aa4f"} // 1679373956964-1 = {"uuid":"d5c17352-8622-4d00-b7c0-2143aaae0ce7"} // 1679373956964-2 = {"uuid":"a3400f40-ae40-46a0-bb9c-1a907795fd13"}
用来获取指定流的长度,即条目个数,方法定义如下:
Long xlen(String key)
示例:
// 准备数据,添加十条数据 List<StreamEntryID> idList = new ArrayList<>(); for(int i = 0; i < 5; i++) { Map<String, String> map = new HashMap<>(); map.put("uuid", UUID.randomUUID().toString()); StreamEntryID id = jedis.xadd("mystream", StreamEntryID.NEW_ENTRY, map); idList.add(id); } System.out.println(Arrays.toString(idList.toArray())); // [1679374066012-0, 1679374066018-0, 1679374066019-0, 1679374066019-1, 1679374066021-0] // 获取流条目数 Long len = jedis.xlen("mystream"); System.out.println("len = " +len); // len = 5
用来,方法定义如下:
long xtrim(byte[] key, long maxLen, boolean approximateLength)
long xtrim(byte[] key, XTrimParams params)
long xtrim(String key, long maxLen, boolean approximateLength) 对应的 Redis 命令 XTRIM key MAXLEN [~] count
long xtrim(String key, XTrimParams params)
示例:
// 准备数据,添加十条数据 List<StreamEntryID> idList = new ArrayList<>(); for(int i = 0; i < 5; i++) { Map<String, String> map = new HashMap<>(); map.put("uuid", UUID.randomUUID().toString()); StreamEntryID id = jedis.xadd("mystream", StreamEntryID.NEW_ENTRY, map); idList.add(id); } System.out.println(Arrays.toString(idList.toArray())); // [1679633388548-0, 1679633388550-0, 1679633388550-1, 1679633388551-0, 1679633388551-1] long len = jedis.xlen("mystream"); System.out.println("old len=" + len); // old len=5 // 第三个参数 approximateLength 用来指定是否采用近乎准确的格式 // 如果 approximateLength = true,则采用近乎精确的方式,即指定 2 不一定会裁剪到只剩下2个消息 // 如果 approximateLength = false,则采用精确的方式,即指定 2 则流会被裁剪到长度为 2 jedis.xtrim("mystream", 2, false); len = jedis.xlen("mystream"); System.out.println("new len=" + len); // new len=2
用来从流中删除一个或多个消息,方法定义如下:
long xdel(String key, StreamEntryID... ids) 对应的 Redis 命令 XDEL key ID [ID ...]
实例:
// 准备数据 Map<String, String> map = new HashMap<>(); map.put("uuid", UUID.randomUUID().toString()); StreamEntryID id = jedis.xadd("mystream", StreamEntryID.NEW_ENTRY, map); System.out.println("id=" + id); // id=1679633800258-0 // 删除流中指定条目ID的条目 long size = jedis.xdel("mystream", id); System.out.println("size=" + size); // size=1
用来在指定的流中创建指定名称的消费组或者删除消费组,方法定义如下:
String xgroupCreate(String key, String groupname, StreamEntryID id, boolean makeStream) 对应 Redis 的命令 XGROUP CREATE
long xgroupDestroy(String key, String groupname) 对应 Redis 的命令为 XGROUP DESTROY
实例:
// 在 mystream 流中创建名为 mygroup 的消费组 // makeStream 参数用来指定当流不存在时是否自动创建流,true-自动创建,false-不自动创建 // 如果流不存在,且 makeStream = false,则将抛出异常 // ERR The XGROUP subcommand requires the key to exist. // Note that for CREATE you may want to use the MKSTREAM option to create an empty stream automatically. String str = jedis.xgroupCreate("mystream", "mygroup", StreamEntryID.LAST_ENTRY, true); System.out.println("str=" + str); // str=OK // 删除消费组 long ln = jedis.xgroupDestroy("mystream", "mygroup"); System.out.println("ln=" + ln); // ln=1
用来在指定的流中的消费组中创建消费者,或者删除消费者,方法定义如下:
long xgroupDelConsumer(byte[] key, byte[] groupName, byte[] consumerName)
long xgroupDelConsumer(String key, String groupName, String consumerName) 对应 Redis 的命令为 XGROUP DELCONSUMER
注意:jedis 没有提供单独创建消费者的方法,而是通过 readGroup() 方法,当我们读取消息时,将会自动创建消费者。
示例:
// 创建消费组 String str = jedis.xgroupCreate("mystream", "mygroup", StreamEntryID.LAST_ENTRY, true); System.out.println("str=" + str); // str=OK // 从消费组读取消息,将自动创建消费者 Map<String,StreamEntryID> params = new HashMap<>(); params.put("mystream", StreamEntryID.UNRECEIVED_ENTRY); // 指示读取未确认消息 XReadGroupParams xReadGroupParams = new XReadGroupParams(); xReadGroupParams.count(5); // 限制最大读取5个 xReadGroupParams.block(1000); // 阻塞1秒 xReadGroupParams.noAck(); // 自动确认 List<Map.Entry<String, List<StreamEntry>>> result = jedis.xreadGroup( "mygroup", "myconsumer", xReadGroupParams, params); System.out.println("result=" + JSONObject.toJSONString(result)); // 删除消费者 myconsumer long val = jedis.xgroupDelConsumer("mystream", "mygroup", "myconsumer"); System.out.println("删除消费者 val=" + val); // 删除消费者 val=0
用来通过消费组的方式从流中消费消息,方法定义如下:
List<byte[]> xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams, Map.Entry<byte[],byte[]>... streams)
List<Map.Entry<String,List<StreamEntry>>> xreadGroup(String groupName, String consumer, XReadGroupParams xReadGroupParams, Map<String,StreamEntryID> streams) 对应的 Redis 命令为 XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
示例:
// 创建消费组 String str = jedis.xgroupCreate("mystream", "mygroup", StreamEntryID.LAST_ENTRY, true); System.out.println("str=" + str); // str=OK // 从消费组读取消息,将自动创建消费者 Map<String,StreamEntryID> params = new HashMap<>(); params.put("mystream", StreamEntryID.UNRECEIVED_ENTRY); // 指示读取未确认消息 boolean run = true; while(run) { // 用来从流的消费组中读取消息,自动创建 myconsumer 消费者 // 每次从流中最多读取 5 条消息,如果没有可用的消息则阻塞 500 毫秒 XReadGroupParams xReadGroupParams = new XReadGroupParams(); xReadGroupParams.count(5); // 限制最大读取5个 xReadGroupParams.block(500); // 阻塞1秒 xReadGroupParams.noAck(); // 自动确认 List<Map.Entry<String, List<StreamEntry>>> result = jedis.xreadGroup( "mygroup", "myconsumer", xReadGroupParams, params); if(null != result && result.size() > 0) { for(Map.Entry<String, List<StreamEntry>> entry : result) { List<StreamEntry> value = entry.getValue(); for(StreamEntry streamEntry : value) { String val = streamEntry.getFields().get("message"); System.out.println("message = " + val); // 退出消息接受 if("exit".equalsIgnoreCase(val)) { run = false; } } } } } // 演示,先运行上面代码,等待消费消息 // 然后,使用 redis-cli 连接到 redis 服务,执行下面命令添加消息到流 // xadd mystream * message value1 // xadd mystream * message value2 // xadd mystream * message exit 则退出 while 循环