Jedis 操作 Streams(流)

Redis Stream(流)是 Redis 5.0 版本引入的一个新的数据类型。

Stream 以更抽象的方式模拟日志数据结构,但日志仍然是完整的:就像一个日志文件,通常实现为以只附加模式打开的文件,Redis流主要是一个仅附加数据结构。

本文将介绍怎样通过 jedis 库操作 Redis 的 Streams 数据类型。

注意,低版本的 jedis 不支持流,jedis 至少需要 3.1.0 版本以上,本文将使用最新版本 4.3.1,依赖如下:

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>4.3.1</version>
</dependency>

添加条目

用来将指定的数据添加到流中,返回条目ID,方法定义如下:

  • byte[] xadd(byte[] key, XAddParams params, Map<byte[],byte[]> hash)

  • StreamEntryID xadd(String key, StreamEntryID id, Map<String,String> hash)   对应的 Redis 命令为 XADD key ID field string [field string ...]

  • StreamEntryID xadd(String key, XAddParams params, Map<String,String> hash)

示例:

// 添加一个条目到流
Map<String,String> map = new HashMap<>();
map.put("id", "1000");
map.put("name", "Helen");
StreamEntryID id = jedis.xadd("mystream", StreamEntryID.NEW_ENTRY, map);
System.out.println(id); // 1679289623159-0
// 注意:流条目内容为 {"id":"1000","name":"Helen"}

// 添加一个条目到流,指定最大长度,以及是否近乎精确
for(int i = 0; i < 5; i++) {
   map = new HashMap<>();
   map.put("id", Integer.valueOf(2000 + i).toString());
   map.put("name", "Bill-" + i);

   XAddParams xAddParams = new XAddParams();
   xAddParams.maxLen(2); // 最大长度
   xAddParams.approximateTrimming(); // 近乎精确分割
   //xAddParams.exactTrimming(); // 精确分割
   xAddParams.id(StreamEntryID.NEW_ENTRY); // ID策略,即 *
   id = jedis.xadd("mystream", xAddParams, map);
   System.out.println(id);
}

其中,StreamEntryID 用来指定流 ID 的生成策略,取值如下:

  • StreamEntryID.LAST_ENTRY  应该只与 XGROUP CREATE 一起使用,如:XGROUP CREATE mystream -group-name $

  • StreamEntryID.NEW_ENTRY  应该只与 XADD 一起使用,如: XADD mystream * field1 value1

  • StreamEntryID.UNRECEIVED_ENTRY  应该只与 XREADGROUP 一起使用,如: XREADGROUP $GroupName $ConsumerName BLOCK 2000 COUNT 10 STREAMS mystream >

读取条目

用来从流中读取条目,方法定义如下:

  • List<byte[]> xread(XReadParams xReadParams, Map.Entry<byte[],byte[]>... streams)

  • List<Map.Entry<String,List<StreamEntry>>> xread(XReadParams xReadParams, Map<String,StreamEntryID> streams)  对应的 Redis 命令为 XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

  • List<byte[]> xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams, Map.Entry<byte[],byte[]>... streams)

  • List<Map.Entry<String,List<StreamEntry>>> xreadGroup(String groupName, String consumer, XReadGroupParams xReadGroupParams, Map<String,StreamEntryID> streams)  对应的 Redis 命令为 XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

示例:

// 准备数据
List<StreamEntryID> idList = new ArrayList<>();
for(int i = 0; i < 5; i++) {
   Map<String, String> map = new HashMap<>();
   map.put("uuid", UUID.randomUUID().toString());
   StreamEntryID id = jedis.xadd("mystream", StreamEntryID.NEW_ENTRY, map);
   idList.add(id);
}
System.out.println(Arrays.toString(idList.toArray()));
// [1679375078037-0, 1679375078038-0, 1679375078038-1, 1679375078038-2, 1679375078039-0]

// 从 mystream 流中获取条目 ID 大于 1679375078038-1 的两个条目
Map<String,StreamEntryID> params = new HashMap<>();
params.put("mystream", idList.get(2));

XReadParams xReadParams = new XReadParams();
xReadParams.count(2); // 最大去读个数
xReadParams.block(1000); // 阻塞时间,毫秒

List<Map.Entry<String, List<StreamEntry>>> result = jedis.xread(xReadParams, params);
for(Map.Entry<String, List<StreamEntry>> entry : result) {
   String key = entry.getKey();
   List<StreamEntry> value = entry.getValue();
   System.out.println("key = " + key); // key = mystream
   for(StreamEntry streamEntry : value) {
       System.out.println(streamEntry.getID() + " = " + JSONObject.toJSONString(streamEntry.getFields()));
   }
   // 1679375078038-2 = {"uuid":"d79024c2-e60d-42c5-9beb-4da6c302d7fc"}
   // 1679375078039-0 = {"uuid":"16c1caf4-6048-4add-8b79-a6ddbc10ec8b"}
}

范围读取

用来读取指定两个消息ID(开始ID,结束ID)之间的消息,方法定义如下:

  • List<byte[]> xrange(byte[] key, byte[] start, byte[] end)

  • List<byte[]> xrange(byte[] key, byte[] start, byte[] end, int count)

  • List<StreamEntry> xrange(String key, StreamEntryID start, StreamEntryID end)  对应的 Redis 命令为 XRANGE key start end

  • List<StreamEntry> xrange(String key, StreamEntryID start, StreamEntryID end, int count)  对应的 Redis 命令为 XRANGE key start end COUNT count

  • List<StreamEntry> xrange(String key, String start, String end)

  • List<StreamEntry> xrange(String key, String start, String end, int count)

示例:

// 准备数据,添加十条数据
List<StreamEntryID> idList = new ArrayList<>();
for(int i = 0; i < 5; i++) {
   Map<String, String> map = new HashMap<>();
   map.put("uuid", UUID.randomUUID().toString());
   StreamEntryID id = jedis.xadd("mystream", StreamEntryID.NEW_ENTRY, map);
   idList.add(id);
}
System.out.println(Arrays.toString(idList.toArray()));
// [1679373956962-0, 1679373956963-0, 1679373956964-0, 1679373956964-1, 1679373956964-2]

// 根据ID范围获取数据
StreamEntryID startId = idList.get(0);
StreamEntryID endId = idList.get(idList.size() - 1);
List<StreamEntry> streamEntryList = jedis.xrange("mystream", startId, endId, 5);
for(StreamEntry streamEntry : streamEntryList) {
   System.out.println(streamEntry.getID() + " = " + JSONObject.toJSONString(streamEntry.getFields()));
}
// 1679373956962-0 = {"uuid":"5beca5dc-a1b8-4b59-a5a5-c7357baf287c"}
// 1679373956963-0 = {"uuid":"82227f64-eddf-470e-86ab-de8fcf1ac1c3"}
// 1679373956964-0 = {"uuid":"2647c5a4-8e77-454f-a1d1-a1772688aa4f"}
// 1679373956964-1 = {"uuid":"d5c17352-8622-4d00-b7c0-2143aaae0ce7"}
// 1679373956964-2 = {"uuid":"a3400f40-ae40-46a0-bb9c-1a907795fd13"}

流长度

用来获取指定流的长度,即条目个数,方法定义如下:

  • Long xlen(String key)

示例:

// 准备数据,添加十条数据
List<StreamEntryID> idList = new ArrayList<>();
for(int i = 0; i < 5; i++) {
   Map<String, String> map = new HashMap<>();
   map.put("uuid", UUID.randomUUID().toString());
   StreamEntryID id = jedis.xadd("mystream", StreamEntryID.NEW_ENTRY, map);
   idList.add(id);
}
System.out.println(Arrays.toString(idList.toArray()));
// [1679374066012-0, 1679374066018-0, 1679374066019-0, 1679374066019-1, 1679374066021-0]

// 获取流条目数
Long len = jedis.xlen("mystream");
System.out.println("len = " +len); // len = 5

流裁剪

用来,方法定义如下:

  • long xtrim(byte[] key, long maxLen, boolean approximateLength)

  • long xtrim(byte[] key, XTrimParams params)

  • long xtrim(String key, long maxLen, boolean approximateLength)  对应的 Redis 命令 XTRIM key MAXLEN [~] count

  • long xtrim(String key, XTrimParams params)

示例:

// 准备数据,添加十条数据
List<StreamEntryID> idList = new ArrayList<>();
for(int i = 0; i < 5; i++) {
   Map<String, String> map = new HashMap<>();
   map.put("uuid", UUID.randomUUID().toString());
   StreamEntryID id = jedis.xadd("mystream", StreamEntryID.NEW_ENTRY, map);
   idList.add(id);
}
System.out.println(Arrays.toString(idList.toArray()));
// [1679633388548-0, 1679633388550-0, 1679633388550-1, 1679633388551-0, 1679633388551-1]

long len = jedis.xlen("mystream");
System.out.println("old len=" + len); // old len=5

// 第三个参数 approximateLength 用来指定是否采用近乎准确的格式
// 如果 approximateLength = true,则采用近乎精确的方式,即指定 2 不一定会裁剪到只剩下2个消息
// 如果 approximateLength = false,则采用精确的方式,即指定 2 则流会被裁剪到长度为 2
jedis.xtrim("mystream", 2, false);
len = jedis.xlen("mystream");
System.out.println("new len=" + len); // new len=2

删除流

用来从流中删除一个或多个消息,方法定义如下:

  • long xdel(String key, StreamEntryID... ids)  对应的 Redis 命令 XDEL key ID [ID ...]

实例:

// 准备数据
Map<String, String> map = new HashMap<>();
map.put("uuid", UUID.randomUUID().toString());
StreamEntryID id = jedis.xadd("mystream", StreamEntryID.NEW_ENTRY, map);
System.out.println("id=" + id); // id=1679633800258-0

// 删除流中指定条目ID的条目
long size = jedis.xdel("mystream", id);
System.out.println("size=" + size); // size=1

创建/删除消费组

用来在指定的流中创建指定名称的消费组或者删除消费组,方法定义如下:

  • String xgroupCreate(String key, String groupname, StreamEntryID id, boolean makeStream)  对应 Redis 的命令 XGROUP CREATE

  • long xgroupDestroy(String key, String groupname) 对应 Redis 的命令为 XGROUP DESTROY

实例:

// 在 mystream 流中创建名为 mygroup 的消费组
// makeStream 参数用来指定当流不存在时是否自动创建流,true-自动创建,false-不自动创建
// 如果流不存在,且 makeStream = false,则将抛出异常
// ERR The XGROUP subcommand requires the key to exist.
// Note that for CREATE you may want to use the MKSTREAM option to create an empty stream automatically.
String str = jedis.xgroupCreate("mystream", "mygroup", StreamEntryID.LAST_ENTRY, true);
System.out.println("str=" + str); // str=OK

// 删除消费组
long ln = jedis.xgroupDestroy("mystream", "mygroup");
System.out.println("ln=" + ln); // ln=1

创建/删除消费者

用来在指定的流中的消费组中创建消费者,或者删除消费者,方法定义如下:

  • long xgroupDelConsumer(byte[] key, byte[] groupName, byte[] consumerName)

  • long xgroupDelConsumer(String key, String groupName, String consumerName)  对应 Redis 的命令为 XGROUP DELCONSUMER

注意:jedis 没有提供单独创建消费者的方法,而是通过 readGroup() 方法,当我们读取消息时,将会自动创建消费者。

示例:

// 创建消费组
String str = jedis.xgroupCreate("mystream", "mygroup", StreamEntryID.LAST_ENTRY, true);
System.out.println("str=" + str); // str=OK

// 从消费组读取消息,将自动创建消费者
Map<String,StreamEntryID> params = new HashMap<>();
params.put("mystream", StreamEntryID.UNRECEIVED_ENTRY); // 指示读取未确认消息

XReadGroupParams xReadGroupParams = new XReadGroupParams();
xReadGroupParams.count(5); // 限制最大读取5个
xReadGroupParams.block(1000); // 阻塞1秒
xReadGroupParams.noAck(); // 自动确认

List<Map.Entry<String, List<StreamEntry>>> result = jedis.xreadGroup(
       "mygroup", "myconsumer", xReadGroupParams, params);
System.out.println("result=" + JSONObject.toJSONString(result));

// 删除消费者 myconsumer
long val = jedis.xgroupDelConsumer("mystream", "mygroup", "myconsumer");
System.out.println("删除消费者 val=" + val); // 删除消费者 val=0

消费消费组中消息

用来通过消费组的方式从流中消费消息,方法定义如下:

  • List<byte[]> xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams, Map.Entry<byte[],byte[]>... streams)

  • List<Map.Entry<String,List<StreamEntry>>> xreadGroup(String groupName, String consumer, XReadGroupParams xReadGroupParams, Map<String,StreamEntryID> streams)  对应的 Redis 命令为 XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

示例:

// 创建消费组
String str = jedis.xgroupCreate("mystream", "mygroup", StreamEntryID.LAST_ENTRY, true);
System.out.println("str=" + str); // str=OK

// 从消费组读取消息,将自动创建消费者
Map<String,StreamEntryID> params = new HashMap<>();
params.put("mystream", StreamEntryID.UNRECEIVED_ENTRY); // 指示读取未确认消息

boolean run = true;
while(run) {
   // 用来从流的消费组中读取消息,自动创建 myconsumer 消费者
   // 每次从流中最多读取 5 条消息,如果没有可用的消息则阻塞 500 毫秒
   XReadGroupParams xReadGroupParams = new XReadGroupParams();
   xReadGroupParams.count(5); // 限制最大读取5个
   xReadGroupParams.block(500); // 阻塞1秒
   xReadGroupParams.noAck(); // 自动确认

   List<Map.Entry<String, List<StreamEntry>>> result = jedis.xreadGroup(
           "mygroup", "myconsumer", xReadGroupParams, params);
   if(null != result && result.size() > 0) {
       for(Map.Entry<String, List<StreamEntry>> entry : result) {
           List<StreamEntry> value = entry.getValue();
           for(StreamEntry streamEntry : value) {
               String val = streamEntry.getFields().get("message");
               System.out.println("message = " + val);
               // 退出消息接受
               if("exit".equalsIgnoreCase(val)) {
                   run = false;
               }
           }
       }
   }
}

// 演示,先运行上面代码,等待消费消息
// 然后,使用 redis-cli 连接到 redis 服务,执行下面命令添加消息到流
// xadd mystream * message value1
// xadd mystream * message value2
// xadd mystream * message exit   则退出 while 循环
说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
公众号