上一章节介绍了怎样通过实现 MessageListener 接口来实现消息订阅。下面将介绍通过不扩展 MessageListener 接口,而是使用MessageListenerAdapter 类,将它用作 MDP(消息驱动 POJO,message-driven POJOs)。
注意各种消息处理方法是如何根据它们可以接收和处理的各种消息类型的内容进行强类型化的。此外,消息发送到的通道或模式可以作为 String 类型的第二个参数传递给方法。
import java.io.Serializable;
import java.util.Map;
/**
* 消息代理
* @author hxstrive.com 2022/10/18
*/
public interface MessageDelegate {
/** 接收字符串消息 */
void handleMessage(String message);
/** 接收Map消息 */
void handleMessage(Map message);
/** 接收字节数组消息 */
void handleMessage(byte[] message);
/** 接收序列化消息 */
void handleMessage(Serializable message);
/** 同时传递消息和频道/模式 */
void handleMessage(Serializable message, String channel);
}import com.alibaba.fastjson.JSONObject;
import java.io.Serializable;
import java.util.Map;
/**
* 默认消息代理实现
* @author hxstrive.com 2022/10/18
*/
public class DefaultMessageDelegate implements MessageDelegate {
@Override
public void handleMessage(String message) {
System.out.println("handleMessage(String message) message=" + message);
}
@Override
public void handleMessage(Map message) {
System.out.println("handleMessage(Map message) message=" + JSONObject.toJSONString(message));
}
@Override
public void handleMessage(byte[] message) {
System.out.println("handleMessage(byte[] message) message=" + new String(message));
}
@Override
public void handleMessage(Serializable message) {
System.out.println("handleMessage(Serializable message) message=" + message);
}
@Override
public void handleMessage(Serializable message, String channel) {
System.out.println("handleMessage(Serializable message, String channel) message=" + message
+ ", channel=" + channel);
}
}import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import java.util.ArrayList;
import java.util.List;
/**
* 消息配置
* @author hxstrive.com 2022/10/13
*/
@Configuration
public class MessageConfig {
/**
* 配置 RedisMessageListenerContainer 监听器容器
* @param connectionFactory 连接工厂
* @param listenerAdapter 消息侦听器适配器
* @return
*/
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
List<Topic> topicList = new ArrayList<>();
// 订阅 “hxstrive” 频道
topicList.add(new ChannelTopic("hxstrive"));
// 订阅 “*.hxstrive.com” 模式
topicList.add(new PatternTopic("*.hxstrive.com"));
container.addMessageListener(listenerAdapter, topicList);
return container;
}
/**
* 消息侦听器适配器,能将消息委托给目标侦听器
* @return
*/
@Bean
public MessageListenerAdapter listenerAdapter(DefaultMessageDelegate defaultMessageDelegate) {
// defaultMessageDelegate 这就是消息代理的默认实现,见 defaultMessageDelegate()
return new MessageListenerAdapter(defaultMessageDelegate);
}
/**
* 定义默认消息代理
* @return
*/
@Bean
public DefaultMessageDelegate defaultMessageDelegate() {
return new DefaultMessageDelegate();
}
}import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RedisLearnSpringDataApplication {
public static void main(String[] args) {
SpringApplication.run(RedisLearnSpringDataApplication.class, args);
}
}运行上面示例,输出日志如下:
. ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v2.1.7.RELEASE) ... 2022-10-18 13:17:33.835 INFO 13696 --- [ restartedMain] c.h.r.RedisLearnSpringDataApplication : Started RedisLearnSpringDataApplication in 5.865 seconds (JVM running for 8.019)
打开 DOS/CMD 窗口,使用 redis-cli 命令连接到 Redis 服务,通过 publish 命令发送一条命令给客户端。命令如下:
D:\server\redis-x64-5.0.14.1> redis-cli.exe 127.0.0.1:6379> publish hxstrive "hello" (integer) 1 127.0.0.1:6379>
上面命令,将向 hxstrive 频道发送 “hello” 消息。此时,客户端将输出如下信息:
handleMessage(Serializable message) message=hello
上面演示了怎样在 Spring Boot 中,通过 @Configuration 配置类配置 MDP、MessageListenerAdapter。如果在非 Spring Boot 的 Spring 项目中,可以通过 XML 配置文件进行配置。如下:
<!-- 配置文件名:application.xml --> <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:redis="http://www.springframework.org/schema/redis" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/redis https://www.springframework.org/schema/redis/spring-redis.xsd"> <!-- 配置 Redis 连接工厂 --> <bean id="redisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory" p:use-pool="true"/> <!-- the default ConnectionFactory --> <redis:listener-container> <!-- the method attribute can be skipped as the default method name is "handleMessage" --> <!-- 侦听器主题可以是频道(例如,topic="hxstrive")或模式(例如,topic="*.hxstrive.com")--> <redis:listener ref="listener" method="handleMessage" topic="hxstrive" /> </redis:listener-container> <bean id="listener" class="com.hxstrive.redis.subscribing.DefaultMessageDelegate"/> </beans>
编写一个简单的客户端,手动加载配置文件。代码如下:
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
* 消息订阅验证
* @author hxstrive.com 2022/10/18
*/
public class SubscribingApp {
public static void main(String[] args) throws Exception {
SubscribingApp demo = new SubscribingApp();
new Thread(demo.new SubscribingTask()).start();
Thread.sleep(Long.MAX_VALUE);
}
class SubscribingTask implements Runnable {
@Override
public void run() {
try {
ApplicationContext context = new ClassPathXmlApplicationContext(
"application-content3.xml");
System.out.println("context = " + context);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}启动客户端,输出日志如下:
log4j:WARN No appenders could be found for logger (org.springframework.context.support.ClassPathXmlApplicationContext). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. context = org.springframework.context.support.ClassPathXmlApplicationContext@793ac4b7, started on Tue Oct 18 13:41:33 CST 2022
然后利用 redis-cli 连接到 redis 服务,使用 publish 命令发送一条消息。如下:
D:\server\redis-x64-5.0.14.1> redis-cli.exe 127.0.0.1:6379> publish hxstrive "hello" (integer) 1 127.0.0.1:6379>
客户端将收到如下消息:
handleMessage(Serializable message) message=hello
上面示例使用 Redis 名称空间(xmlns:redis="http://www.springframework.org/schema/redis")声明消息侦听器容器,并自动将 POJO 注册为侦听器。
请注意,上面的 MessageDelegate 接口实现(上面的 DefaultMessageDelegation 类)完全没有 Redis 依赖项。它确实是一个 POJO,我们通过一些配置将它制成了 MDP。
每次收到消息时,适配器都会自动、透明地执行低级格式和所需对象类型之间的转换(使用配置的 RedisSerializer)。由方法调用引起的任何异常都由容器捕获和处理(默认情况下,会记录异常)。