Spring Data Redis 教程

Redis 消息订阅(配置 MDP)

上一章节介绍了怎样通过实现 MessageListener 接口来实现消息订阅。下面将介绍通过不扩展 MessageListener 接口,而是使用MessageListenerAdapter 类,将它用作 MDP(消息驱动 POJO,message-driven POJOs)。

注意各种消息处理方法是如何根据它们可以接收和处理的各种消息类型的内容进行强类型化的。此外,消息发送到的通道或模式可以作为 String 类型的第二个参数传递给方法。

示例

定义 MessageDelegate 接口

import java.io.Serializable;
import java.util.Map;

/**
 * 消息代理
 * @author hxstrive.com 2022/10/18
 */
public interface MessageDelegate {
    /** 接收字符串消息 */
    void handleMessage(String message);
    /** 接收Map消息 */
    void handleMessage(Map message);
    /** 接收字节数组消息 */
    void handleMessage(byte[] message);
    /** 接收序列化消息 */
    void handleMessage(Serializable message);
    /** 同时传递消息和频道/模式 */
    void handleMessage(Serializable message, String channel);
}

MessageDelegate 默认实现

import com.alibaba.fastjson.JSONObject;
import java.io.Serializable;
import java.util.Map;

/**
 * 默认消息代理实现
 * @author hxstrive.com 2022/10/18
 */
public class DefaultMessageDelegate implements MessageDelegate {

    @Override
    public void handleMessage(String message) {
        System.out.println("handleMessage(String message) message=" + message);
    }

    @Override
    public void handleMessage(Map message) {
        System.out.println("handleMessage(Map message) message=" + JSONObject.toJSONString(message));
    }

    @Override
    public void handleMessage(byte[] message) {
        System.out.println("handleMessage(byte[] message) message=" + new String(message));
    }

    @Override
    public void handleMessage(Serializable message) {
        System.out.println("handleMessage(Serializable message) message=" + message);
    }

    @Override
    public void handleMessage(Serializable message, String channel) {
        System.out.println("handleMessage(Serializable message, String channel) message=" + message
                + ", channel=" + channel);
    }

}

消息配置

import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import java.util.ArrayList;
import java.util.List;

/**
 * 消息配置
 * @author hxstrive.com 2022/10/13
 */
@Configuration
public class MessageConfig {

    /**
     * 配置 RedisMessageListenerContainer 监听器容器
     * @param connectionFactory 连接工厂
     * @param listenerAdapter 消息侦听器适配器
     * @return
     */
    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
            MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        List<Topic> topicList = new ArrayList<>();
        // 订阅 “hxstrive” 频道
        topicList.add(new ChannelTopic("hxstrive"));
        // 订阅 “*.hxstrive.com” 模式
        topicList.add(new PatternTopic("*.hxstrive.com"));
        container.addMessageListener(listenerAdapter, topicList);
        return container;
    }

    /**
     * 消息侦听器适配器,能将消息委托给目标侦听器
     * @return
     */
    @Bean
    public MessageListenerAdapter listenerAdapter(DefaultMessageDelegate defaultMessageDelegate) {
		// defaultMessageDelegate 这就是消息代理的默认实现,见 defaultMessageDelegate()
        return new MessageListenerAdapter(defaultMessageDelegate);
    }

    /**
     * 定义默认消息代理
     * @return
     */
    @Bean
    public DefaultMessageDelegate defaultMessageDelegate() {
        return new DefaultMessageDelegate();
    }

}

启动类

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RedisLearnSpringDataApplication {

    public static void main(String[] args) {
        SpringApplication.run(RedisLearnSpringDataApplication.class, args);
    }

}

运行上面示例,输出日志如下:

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.1.7.RELEASE)

...
2022-10-18 13:17:33.835  INFO 13696 --- [  restartedMain] c.h.r.RedisLearnSpringDataApplication    : Started RedisLearnSpringDataApplication in 5.865 seconds (JVM running for 8.019)

发送消息

打开 DOS/CMD 窗口,使用 redis-cli 命令连接到 Redis 服务,通过 publish 命令发送一条命令给客户端。命令如下:

D:\server\redis-x64-5.0.14.1> redis-cli.exe
127.0.0.1:6379> publish hxstrive "hello"
(integer) 1
127.0.0.1:6379>

上面命令,将向 hxstrive 频道发送 “hello” 消息。此时,客户端将输出如下信息:

handleMessage(Serializable message) message=hello

上面演示了怎样在 Spring Boot 中,通过 @Configuration 配置类配置 MDP、MessageListenerAdapter。如果在非 Spring Boot 的 Spring 项目中,可以通过 XML 配置文件进行配置。如下:

<!-- 配置文件名:application.xml -->
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:redis="http://www.springframework.org/schema/redis"
        xmlns:p="http://www.springframework.org/schema/p"
        xsi:schemaLocation="http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/redis
            https://www.springframework.org/schema/redis/spring-redis.xsd">

    <!-- 配置 Redis 连接工厂 -->
    <bean id="redisConnectionFactory"
            class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"
            p:use-pool="true"/>

    <!-- the default ConnectionFactory -->
    <redis:listener-container>
        <!-- the method attribute can be skipped as the default method name is "handleMessage" -->
		<!-- 侦听器主题可以是频道(例如,topic="hxstrive")或模式(例如,topic="*.hxstrive.com")-->
        <redis:listener ref="listener" method="handleMessage" topic="hxstrive" />
    </redis:listener-container>

    <bean id="listener" class="com.hxstrive.redis.subscribing.DefaultMessageDelegate"/>
</beans>

编写一个简单的客户端,手动加载配置文件。代码如下:

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * 消息订阅验证
 * @author hxstrive.com 2022/10/18
 */
public class SubscribingApp {

    public static void main(String[] args) throws Exception {
        SubscribingApp demo = new SubscribingApp();
        new Thread(demo.new SubscribingTask()).start();
        Thread.sleep(Long.MAX_VALUE);
    }

    class SubscribingTask implements Runnable {

        @Override
        public void run() {
            try {
                ApplicationContext context = new ClassPathXmlApplicationContext(
                        "application-content3.xml");
                System.out.println("context = " + context);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }

}

启动客户端,输出日志如下:

log4j:WARN No appenders could be found for logger (org.springframework.context.support.ClassPathXmlApplicationContext).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
context = org.springframework.context.support.ClassPathXmlApplicationContext@793ac4b7, started on Tue Oct 18 13:41:33 CST 2022

然后利用 redis-cli 连接到 redis 服务,使用 publish 命令发送一条消息。如下:

D:\server\redis-x64-5.0.14.1> redis-cli.exe
127.0.0.1:6379> publish hxstrive "hello"
(integer) 1
127.0.0.1:6379>

客户端将收到如下消息:

handleMessage(Serializable message) message=hello

上面示例使用 Redis 名称空间(xmlns:redis="http://www.springframework.org/schema/redis")声明消息侦听器容器,并自动将 POJO 注册为侦听器。

请注意,上面的 MessageDelegate 接口实现(上面的 DefaultMessageDelegation 类)完全没有 Redis 依赖项。它确实是一个 POJO,我们通过一些配置将它制成了 MDP。

每次收到消息时,适配器都会自动、透明地执行低级格式和所需对象类型之间的转换(使用配置的 RedisSerializer)。由方法调用引起的任何异常都由容器捕获和处理(默认情况下,会记录异常)。

说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
公众号