Redis Stream 是 Redis 5.0 版本新增加的数据结构。
Redis Stream 主要用于消息队列(MQ,Message Queue),Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有个缺点就是消息无法持久化。如果出现网络断开、Redis 宕机等,消息就会被丢弃。简单来说发布订阅 (pub/sub) 可以分发消息,但无法记录历史消息。
Redis Stream 提供了消息持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。每个 Redis Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 指令追加消息时自动创建。
Redis Stream 的结构如下图:
上图中,维护了一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容。其中:
消息正文(Message Content) 负载了每条消息的真实数据。
消费组(Consumer Group) 使用 XGROUP CREATE 命令创建,一个消费组有多个消费者(Consumer)。
游标(last_delivered_id) 每个消费组会有个游标 last_delivered_id,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。
消费者(Consumer) 消息的最终消费者,从队列取消息进行消费。
消费者状态变量(pending_ids) 作用是维护消费者的未确认的 id。pending_ids 记录了当前已经被客户端读取的消息,但是还没有 确认,即未 ack (Acknowledge character)。
在介绍示例前,我们先了解几个简单的命令,分别如下:
XADD 添加消息到末尾,语法:注意,如果将 ID 设置为 *,则表示由 Redis 自动生成 ID。
XLEN 获取流包含的元素数量,即消息长度。语法:
XRANGE 获取消息列表,会自动过滤已经删除的消息。语法:注意,start 为 - 表示最小值,end 为 + 表示最大值。
(1)向 Redis Stream 中添加一个消息
127.0.0.1:6379> XADD mystream * name zhangsan age 20 "1666244653117-0"
(2)获取 key 为 mystream 的 Redis Stream 的长度
127.0.0.1:6379> XLEN mystream (integer) 1
(3)获取 key 为 mystream 的 Redis Stream 中的所有消息
127.0.0.1:6379> XRANGE mystream - + 1) 1) "1666244653117-0" 2) 1) "name" 2) "zhangsan" 3) "age" 4) "20"
到这里就简单的介绍了 Redis Stream,更多点击连接访问 Redis 官方文档(https://redis.io/docs/data-types/streams)
Redis Streams 以抽象的方式对日志数据结构进行建模。通常,日志是只能追加的数据结构,并且从头开始、在随机位置或通过流式传输新消息来使用。
Redis Streams 大致可分为两个功能:
追加记录(Appending records) 将消息追加到 Redis Stream 消息队列后面,等待被消费。
消费记录(Consuming records) 客户端通过主动轮询的方式去消费消息。
尽管这种模式与 Pub/Sub 模式相似,但主要的区别在于消息的持久性和使用它们的方式。
Pub/Sub 依赖于瞬时消息的广播(例如,如果你不监听,你就会错过一条消息),而 Redis Stream 使用持久的、只能追加的数据类型来持久化消息,直到 Stream 被修剪。消费的另一个区别是 Pub/Sub 注册一个服务器端订阅,然后主动推送消息到客户端,而 Redis Stream 需要主动轮询。
注意,Spring Data Redis 通过 org.springframework.data.redis.connection 和 org.springframework.data.redis.stream 包提供对 Redis Streams 的核心功能支持,后续章节将介绍怎样通过 API 去操作 Redis Stream。