在消费端,可以消费一个或多个流(Stream)。Redis Streams 提供读取命令,允许从已知流内容内的任意位置(随机访问)以及流端以外的任意位置消费流,以消费新的流记录。
在底层,RedisConnection 提供了 xRead 和 xReadGroup 方法,分别映射 Redis 命令以在消费者组内进行读取。
Redis 中的订阅命令可能会被阻塞。也就是说,在连接上调用 xRead 会导致当前线程在开始等待消息时阻塞。只有当读取命令超时或收到消息时,线程才会被释放。
要使用流消息,可以轮询应用程序代码中的消息,或者通过消息监听容器异步接收。每次新记录到达时,容器都会通知应用程序代码。
虽然流消费通常与异步处理相关,但也可以同步从流中消费消息。重载的 StreamOperations.read(…) 方法提供了此功能。在同步接收期间,调用线程可能会阻塞,直到消息可用为止。StreamReadOptions 属性用来指定接收方在放弃等待消息之前应等待多长时间。
示例1:通过 RedisTemplate 同步读取消息。
(1)配置我们的 RedisTemplate,代码如下:
@Bean public RedisTemplate<String,String> redisTemplate(RedisConnectionFactory factory) { RedisTemplate<String,String> redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(factory); // 设置键序列化方式 redisTemplate.setKeySerializer(new StringRedisSerializer()); // 设置简单类型值的序列化方式 redisTemplate.setValueSerializer(new StringRedisSerializer()); // 设置 hash 类型键的序列化方式 redisTemplate.setHashKeySerializer(new StringRedisSerializer()); // 设置 hash 类型值的序列化方式 redisTemplate.setHashValueSerializer(new StringRedisSerializer()); // 设置默认序列化方式 redisTemplate.setDefaultSerializer(new StringRedisSerializer()); redisTemplate.afterPropertiesSet(); return redisTemplate; }
(2)直接从 Redis Stream 中读取消息,代码如下:
@Autowired private RedisTemplate<String,String> redisTemplate; @Test public void syncRecMessage() { // 获取 StreamOperations 对象 StreamOperations<String,String,String> ops = redisTemplate.opsForStream(); // 从 myStream 流中读取消息 List<MapRecord<String,String,String>> messages = ops.read( StreamReadOptions.empty(), StreamOffset.fromStart("myStream")); // 将获取到的消息输出到控制台 for(MapRecord<String,String,String> message : messages) { System.out.println("id=" + message.getId()); System.out.println("stream=" + message.getStream()); System.out.println("value=" + JSONObject.toJSONString(message.getValue())); } }
运行示例,输出如下:
id=1666331433947-0 stream=myStream value={"version":"v1.0.0","title":"Redis Streams","summary":"Redis Streams model a log data structure in an abstract approach."} ...
其中,StreamOperations.read() 方法定义如下:
List<MapRecord<K,HK,HV>> read(StreamReadOptions readOptions, StreamOffset<K>... streams)
使用一个消费者组从一个或多个 StreamOffsets 读取记录。readOptions 表示读取参数,上面创建了一个空的读取参数,streams 表示要读取的数据流。
(3)通过消费者组消费消息,代码如下:
@Autowired private RedisTemplate<String,String> redisTemplate; @Test public void syncRecMsgFromConsumerGroup() { StreamOperations<String,String,String> ops = redisTemplate.opsForStream(); // 在消费组 myGroup 中创建一个名为 myConsumer 的消费者 // 消费者将从名为 myStream 的 Stream 中消费消息 List<MapRecord<String,String,String>> messages = ops.read( Consumer.from("myGroup", "myConsumer"), StreamReadOptions.empty(), StreamOffset.create("myStream", ReadOffset.lastConsumed())); for(MapRecord<String,String,String> message : messages) { System.out.println("id=" + message.getId()); System.out.println("stream=" + message.getStream()); System.out.println("value=" + JSONObject.toJSONString(message.getValue())); } }
运行示例,输出如下:
id=1666331433947-0 stream=myStream value={"version":"v1.0.0","title":"Redis Streams","summary":"Redis Streams model a log data structure in an abstract approach."} ...
注意,使用消费组消费消息时,需要先使用 xgroup create 命令去创建消费组,否则代码执行异常。如下:
127.0.0.1:6379[1]> xgroup create myStream myGroup $
上面创建了一个名为 myGroup 的消费组,在创建消费组之前,需要先去创建名为 myStream 的 Stream,否则命令执行报错。
在创建消费组的命令中我们必须指定一个 ID,在示例中使用了 $。这是必需的,因为消费者组必须知道在第一个消费者连接时接下来要提供哪条消息。$ 表示从现在开始到达流中的新消息才会提供给组中的消费者。我们也可以指定一个有效 ID,会提供给消费者大于指定 ID 的消息。
xgroup create 命令还支持自动创建流,如果流不存在,使用可选的 mkstream 子命令作为最后一个参数可以自动创建对应的流,如下:
127.0.0.1:6379[1]> xgroup create myStream2 myGroup2 $ mkstream OK
由于其阻塞特性,低级轮询没有吸引力,因为它需要对每个使用者进行连接和线程管理。为了缓解这个问题,Spring Data 提供了消息侦听器,它可以完成所有繁重的工作。如果您熟悉 EJB 和 JMS,您应该会发现熟悉的概念,因为它被设计为尽可能接近 Spring Framework 及其消息驱动 POJO(MDP)中的支持。
Spring Data 提供了两种针对所用编程模型定制的实现:
StreamMessageListenerContainer 充当命令式编程模型的消息侦听器容器。它用于消费来自 Redis Stream 的记录,并驱动注入其中的 StreamListener 实例。
StreamReceiver 提供了消息侦听器的响应式变体。它用于将来自 Redis Stream 的消息作为潜在的无限流使用,并通过 Flux 发出流消息。
StreamMessageListenerContainer 和 StreamReceiver 负责将消息接收和发送的所有线程处理到侦听器中进行处理。消息侦听器容器/接收器是 MDP 和消息传递提供者之间的中介,负责注册以接收消息、资源获取和释放、异常转换等。这让您作为应用程序开发人员可以编写与接收消息相关的业务逻辑,并将样板 Redis 基础设施关注点委托给框架。
这两个容器都允许运行时配置更改,以便您可以在应用程序运行时添加或删除订阅,而无需重新启动。此外,容器使用延迟订阅方法,仅在需要时使用 RedisConnection。如果所有侦听器都被取消订阅,那么它会自动执行清理,并释放线程。
与 EJB 世界中的消息驱动 Bean(MDB)类似,流驱动 POJO(SDP)充当流消息的接收器。SDP 的一个限制是它必须实现 org.springframework.data.redis.stream.StreamListener 接口。还请注意,如果 POJO 在多个线程上接收消息,那么确保实现是线程安全的非常重要。
示例:实现一个自己的 StreamListener,代码如下:
import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.stream.StreamListener; /** * 实现消息流监听 * @author hxstrive.com 2022/10/28 */ public class ExampleStreamListener implements StreamListener<String, MapRecord<String, String, String>> { @Override public void onMessage(MapRecord<String, String, String> message) { System.out.println("MessageId: " + message.getId()); System.out.println("Stream: " + message.getStream()); System.out.println("Body: " + message.getValue()); } }
实现 StreamListener 后,就可以创建消息侦听器容器并将消息监听器注册到容器,代码如下:
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.connection.stream.StreamOffset; import org.springframework.data.redis.stream.StreamListener; import org.springframework.data.redis.stream.StreamMessageListenerContainer; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.time.Duration; /** * 测试实例 * @author hxstrive.com 2022/10/28 */ @Component public class ExampleStreamListenerDemo { @Autowired private RedisConnectionFactory redisConnectionFactory; @PostConstruct public void init() { System.out.println("启动监听器..."); // 1.流监听器 StreamListener<String, MapRecord<String, String, String>> streamListener = new ExampleStreamListener(); // 2.消息监听容器选项 StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> containerOptions = StreamMessageListenerContainer.StreamMessageListenerContainerOptions .builder().pollTimeout(Duration.ofSeconds(10)).build(); // 3.消息流监听容器 StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer.create(redisConnectionFactory, containerOptions); // 4.注册监听器 container.receive(StreamOffset.fromStart("myStream"), streamListener); // 5.启动消息流监听容器 container.start(); } }
运行示例,启动 Spring Boot 程序,然后再使用 redis-cli 向名为 myStream 的流中发送消息进行观察。命令如下:
127.0.0.1:6379> xadd myStream * field1 value1 "1666935084843-0"
收到消息如下:
MessageId: 1666935084843-0 Stream: myStream Body: {field1=value1}
流数据源的反应式消费通常通过事件或消息的 Flux 发生。反应式接收器的实现由StreamReceiver及其重载的receive(...)消息提供。与StreamMessageListenerContainer 相比,反应式方法需要更少的基础设施资源,如线程,因为它是利用驱动提供的线程资源。接收流是一个需求驱动的 StreamMessage 的发布者。
示例:通过一个简单的 Flux 来演示反应式流接收器
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory; import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.connection.stream.StreamOffset; import org.springframework.data.redis.stream.StreamReceiver; import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; import javax.annotation.PostConstruct; import java.time.Duration; import java.util.function.Consumer; /** * 测试实例 * @author hxstrive.com 2022/10/28 */ @Component public class ExampleStreamListenerDemo { @Autowired private ReactiveRedisConnectionFactory redisConnectionFactory; @PostConstruct public void init() { System.out.println("启动监听器..."); StreamReceiver.StreamReceiverOptions<String, MapRecord<String, String, String>> options = StreamReceiver.StreamReceiverOptions.builder() .pollTimeout(Duration.ofSeconds(100)).build(); StreamReceiver<String, MapRecord<String, String, String>> receiver = StreamReceiver.create(redisConnectionFactory, options); // 从名为 myStream 的流中读取 Flux<MapRecord<String, String, String>> messages = receiver.receive(StreamOffset.fromStart("myStream")); messages.doOnNext(new Consumer<MapRecord<String, String, String>>() { @Override public void accept(MapRecord<String, String, String> message) { // 输出接收到的消息 System.out.println("MessageId: " + message.getId()); System.out.println("Stream: " + message.getStream()); System.out.println("Body: " + message.getValue()); } }) // 订阅这个Flux .subscribe(); } }
注意:需求驱动的消费使用背压信号来激活和停用轮询。如果需求得到满足,StreamReceiver 订阅将暂停轮询,直到订户发出进一步需求的信号。根据 ReadOffset 策略,这可能会导致消息被跳过。
当您通过 Consumer Group(消费组)读取消息时,服务器将记住给定的消息已发送,并将其添加到 Pending Entries List(PEL)中。已发送但尚未确认的消息列表。
必须通过 StreamOperations 确认消息。确认,以便从待定条目列表中删除,如下面的代码段所示。
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = ... // 从名为 my-group 的消费组的 my-consumer 中消费消息,接收到的消息不确认。 container.receive(Consumer.from("my-group", "my-consumer"), StreamOffset.create("my-stream", ReadOffset.lastConsumed()), msg -> { // 消息处理后,进行消息确认 redisTemplate.opsForStream().acknowledge("my-group", msg); });
流读取操作接受读取偏移量规范以使用上给定偏移量的消息。ReadOffset 表示读取偏移量规格。Redis 支持 3 种不同的偏移量,具体取决于您是独立使用流还是在消费者组内使用流:
ReadOffset.latest() 读取最新消息
ReadOffset.from(…) 读取特定消息 Id 之后的消息
ReadOffset.lastconsume() 读取最后使用的消息 Id 之后的消息(仅针对消费者组)
在基于消息容器的消费上下文中,我们需要在消费消息时提前(或增加)读取偏移量。前进取决于请求的 ReadOffset 和消费模式(有/无消费组)。以下矩阵解释了容器如何推进 ReadOffset:
读取偏移量(ReadOffset) | 非消费组 | 消费组 |
latest() | 读取最新消息 | 读取最新消息 |
from() | 将上次看到的消息用作下一个MessageId | 将上次看到的消息用作下一个MessageId |
lastconsume() | 将上次看到的消息用作下一个MessageId | 按消费群体计算的最后消费消息 |
读取特定消息 id 和上次使用的消息可以被视为安全操作,以确保使用附加到流中的所有消息。使用最新消息进行读取可以跳过轮询操作处于停滞状态时添加到流中的消息。轮询引入了死区时间,消息可以在各个轮询命令之间到达。流消耗不是线性连续读取,而是拆分为重复的 XREAD 调用。