Spring Data Redis 教程

Redis 消息订阅(MessageListener接口)

在消息接收端或消息消费端,Spring Data Redis 可以通过直接命名或使用模式匹配订阅一个或多个频道(Channel)。

模式匹配方式非常有用,因为它不仅允许使用一个命令创建多个订阅,还可以侦听订阅时尚未创建的频道(只要它们与模式匹配)。例如:

(1)订阅消息,模式为 *.hxstrive.com,可以匹配任何以 .hxstrive.com 结尾的模式,如下:

D:\server\redis-x64-5.0.14.1> redis-cli
127.0.0.1:6379> psubscribe *.hxstrive.com
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "*.hxstrive.com"
3) (integer) 1
1) "pmessage"
2) "*.hxstrive.com"
3) "www.hxstrive.com"
4) "hello, www.hxstrive.com"
1) "pmessage"
2) "*.hxstrive.com"
3) "doc.hxstrive.com"
4) "hello, doc.hxstrive.com"

(2)发送消息,分别给 www.hxstrive.com 和 doc.hxstrive.com 模式,如下:

D:\server\redis-x64-5.0.14.1> redis-cli
127.0.0.1:6379> publish www.hxstrive.com "hello, www.hxstrive.com"
(integer) 1
127.0.0.1:6379> publish doc.hxstrive.com "hello, doc.hxstrive.com"
(integer) 1
127.0.0.1:6379>

在底层,RedisConnection 提供了 subscribe 和 pSubscribe 方法,分别映射 Redis 按频道订阅 subscribe 命令,按模式订阅 psubscribe 命令。它们语法如下:

subscribe channel [channel ...]
psubscribe pattern [pattern ...]

其中:

  • channel 表示频道名称

  • pattern 表示模式字符串

请注意,可以将多个频道(Channel)或多个匹配模式用作参数。为了更改连接的订阅或查询它是否正在侦听,RedisConnection 提供了getSubscription() 和 isSubscribed() 方法。

Spring Data Redis 中的订阅命令是阻塞模式。也就是说,在连接上调用 subscribe 会导致当前线程阻塞,等待消息。只有当订阅被取消时,线程才会被释放,你可以在另一个线程上,对订阅消息的连接调用 unsubscribe 或 pUnsubscrribe 方法释放。

如前所述,一旦订阅,连接就会开始等待消息。此时,连接仅允许添加新的订阅、修改现有订阅、取消现有订阅的命令。如果调用 subscribe、pSubscribe、unsubscrribe 或 pUnsubscrube 之外的任何命令都会引发异常。

为了订阅消息,需要实现 MessageListener 回调。每次新消息到达时,都会调用回调,并通过 onMessage 方法执行用户业务代码。该接口不仅可以访问实际消息,还可以访问接收消息的频道(Channel),以及订阅时用于匹配频道(Channel)的模式。此信息使被调用者不仅可以通过内容区分各种消息,还可以检查其他详细信息。

MessageListener 接口源码如下:

package org.springframework.data.redis.connection;

import org.springframework.lang.Nullable;

/**
 * Listener of messages published in Redis.
 * 在 Redis 上发布的消息的监听者。
 * @author Costin Leau
 * @author Christoph Strobl
 */
public interface MessageListener {

	/**
	 * Callback for processing received objects through Redis.
	 * 通过 Redis 处理接收到的对象的回调。
	 * @param message message must not be {@literal null}.
	 *                消息对象,不能为 null
	 * @param pattern pattern matching the channel (if specified) - can be {@literal null}.
	 *                与通道匹配的模式(如果指定)-可以为空。
	 */
	void onMessage(Message message, @Nullable byte[] pattern);
}

消息侦听器容器

由于其阻塞特性,低级订阅不具吸引力,因为它需要对每个侦听器进行连接和线程管理。为了缓解这个问题,Spring Data 提供了RedisMessageListenerContainer 类,它可以完成所有繁重的工作。如果您熟悉 EJB 和 JMS,您应该会发现熟悉的概念,因为它被设计为尽可能接近 Spring Framework 消息驱动 POJO(MDP,message-driven POJOs)中的支持。

RedisMessageListenerContainer 充当消息侦听器容器。它用于接收来自 Redis 频道(Channel)的消息,并驱动注入其中的MessageListener 实例。侦听器容器负责消息接收的所有线程,并将消息发送到侦听器进行处理。消息侦听器容器是 MDP 和消息传递提供者之间的中介,负责注册以接收消息、资源获取和释放、异常转换等。这让您作为应用程序开发人员可以编写与接收消息相关的业务逻辑,并将样板 Redis 基础设施关注点委托给框架。

MessageListener 还可以实现 SubscriptionListener,以便在确认订阅/取消订阅时接收通知。同步调用时,侦听订阅通知非常有用。

此外,为了最小化应用程序占用空间,RedisMessageListenerContainer 允许多个侦听器共享一个连接和一个线程,即使它们不共享频道(Channel)。因此,无论应用程序跟踪多少侦听器或频道(Channel),运行时成本在其整个生命周期中都保持不变。此外,容器允许运行时更改配置,以便您可以在应用程序运行时添加或删除侦听器,而无需重新启动。此外,容器使用延迟订阅方法,仅在需要时使用RedisConnection。如果所有侦听器都被取消订阅,则会自动执行清理,并释放线程。

为了帮助处理消息的异步特性,容器需要一个java.util.concurrent.Executor(或 Spring 的 TaskExecutor)来分派消息。根据负载、侦听器数量或运行时环境的不同,您应该更改或调整执行器以更好地满足您的需求。特别是,在托管环境(如应用程序服务器)中,强烈建议选择合适的 TaskExecutor 来利用其运行。

MessageListenerAdapter

MessageListenerAdapter 类是 Spring 异步消息传递支持中的最后一个组件。简而言之,它允许您将几乎任何类公开为 MDP(尽管存在一些约束)。

示例

使用 Spring Data Redis 订阅“hxstrive”频道(称为“客户端”),然后利用 redis-cli 工具的 publish 命令向“hxstrive”频道推送一条消息,Redis 将频道收到的消息转发给客户端,最后客户端将消息内容打印到控制台。

配置 RedisTemplate

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
public class RedisConfigPubSubStr {

    @Bean
    public RedisTemplate<String,String> pubSubStrRedisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String,String> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(factory);
		// 设置简单 key 和 value 的序列化器
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new StringRedisSerializer());
		// 设置 hash 的 key 和 value 的序列化器
        redisTemplate.setHashValueSerializer(new StringRedisSerializer());
        redisTemplate.setHashValueSerializer(new StringRedisSerializer());
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }

}

开发订阅服务

自定义 MessageListener 接口实现,来开发一个订阅服务。代码如下:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.stereotype.Component;

/**
 * 订阅服务
 * @author hxstrive.com 2022/10/13
 */
@Component
public class Subscriber implements MessageListener {

    @Autowired
    @Qualifier(value = "pubSubStrRedisTemplate")
    private RedisTemplate<String,String> redisTemplate;

    /**
     * 每次新消息到达时,都会调用回调
     * @param message
     * @param pattern
     */
    @Override
    public void onMessage(Message message, byte[] pattern) {
        RedisSerializer<?> keySerializer = redisTemplate.getKeySerializer();
        RedisSerializer<?> valueSerializer = redisTemplate.getValueSerializer();
        String channel = (String)keySerializer.deserialize(message.getChannel());
        String body = (String)valueSerializer.deserialize(message.getBody());
        System.out.println("频道: " + channel);
        System.out.println("消息内容: " + String.valueOf(body));
    }

}

消息配置

配置 RedisMessageListenerContainer 监听器容器、消息侦听器适配器和频道。代码如下:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import java.util.ArrayList;
import java.util.List;

/**
 * 消息配置
 * @author hxstrive.com 2022/10/13
 */
@Configuration
public class MessageConfig {

    @Autowired
    private Subscriber subscriber;

    /**
     * 配置 RedisMessageListenerContainer 监听器容器
     * @param connectionFactory 连接工厂
     * @param listenerAdapter 消息侦听器适配器
     * @return
     */
    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
            MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
		// 消息主题列表
        List<Topic> topicList = new ArrayList<>();
        // 订阅 “hxstrive” 频道
        topicList.add(new ChannelTopic("hxstrive"));
        // 订阅 “*.hxstrive.com” 模式
        topicList.add(new PatternTopic("*.hxstrive.com"));
        container.addMessageListener(listenerAdapter, topicList);
        return container;
		// 将消息侦听器添加到容器中
        container.addMessageListener(listenerAdapter, topicList);
        return container;
    }

    /**
     * 消息侦听器适配器,能将消息委托给目标侦听器
     * @return
     */
    @Bean
    public MessageListenerAdapter listenerAdapter() {
        return new MessageListenerAdapter(subscriber);
    }

}

启动类

启动类代码如下:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RedisLearnSpringDataApplication {
    public static void main(String[] args) {
        SpringApplication.run(RedisLearnSpringDataApplication.class, args);
    }
}

启动 Spring Boot 服务,如下:

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.1.7.RELEASE)
...
2022-10-17 12:38:34.292  INFO 2932 --- [  restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8080 (http) with context path ''
2022-10-17 12:38:34.292  INFO 2932 --- [  restartedMain] c.h.r.RedisLearnSpringDataApplication    : Started RedisLearnSpringDataApplication in 6.267 seconds (JVM running for 10.329)

发送消息

使用 redis-cli 命令连接到 Redis,然后使用 publish 命令向“hxstrive”频道发送一条消息。如下:

D:\server\redis-x64-5.0.14.1>redis-cli
127.0.0.1:6379> publish hxstrive "hi! spring data redis"
(integer) 1
127.0.0.1:6379> publish doc.hxstrive.com "hi! spring data redis"
(integer) 1
127.0.0.1:6379> publish www.hxstrive.com "hi! spring data redis"
(integer) 1
127.0.0.1:6379>

此时,客户端将收到如下消息:

频道: hxstrive
消息内容: hi! spring data redis
频道: doc.hxstrive.com
消息内容: hi! spring data redis
频道: www.hxstrive.com
消息内容: hi! spring data redis
说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
公众号