发布订阅(pub/sub)是一种消息范式,消息的发送者(称为发布者 publish)不会将消息直接发送给特定的接收者(称为订阅者 subscribe)。而是将发布的消息分为不同的类别,无需了解哪些订阅者(如果有的话)可能存在。同样的,订阅者可以表达对一个或多个类别的兴趣,只接收感兴趣的消息,无需了解哪些发布者(如果有的话)存在。本文将介绍怎样通过 jedis 实现消息发布订阅。
redis.clients.jedis.JedisPubSub 是一个抽象类,该类用来作为订阅的参数进行传递,当某类事件触发时,将触发该类的某个方法。方法定义如下:
public void onMessage(String channel, String message) 基于频道,当收到消息时,触发该方法
public void onSubscribe(String channel, int subscribedChannels) 基于频道,初始化订阅时触发
public void onUnsubscribe(String channel, int subscribedChannels) 基于频道,取消订阅时触发
public void onPMessage(String pattern, String channel, String message) 基于模式,当收到消息时,触发该方法
public void onPSubscribe(String pattern, int subscribedChannels) 基于模式,初始化订阅时触发
public void onPUnsubscribe(String pattern, int subscribedChannels) 基于模式,取消订阅时触发
public void onPong(String pattern) 收到 ping 消息时触发
“发布/订阅”包含两种角色:发布者和订阅者。发布者可以向指定的频道(channel)发送消息。订阅者可以订阅一个或者多个频道(channel),所有订阅此频道的订阅者都会收到此消息。
下面是订阅和发布消息方法定义:
void subscribe(BinaryJedisPubSub jedisPubSub, byte[]... channels) 订阅一个或多个频道,回调需要实现 BinaryJedisPubSub 抽象类
void subscribe(JedisPubSub jedisPubSub, String... channels) 订阅一个或多个频道,回调需实现 JedisPubSub 抽象类
long publish(byte[] channel, byte[] message) 发布字节数组消息到给定的频道
long publish(String channel, String message) 发布字符串消息到给定频道
示例:
package com.hxstrive.redis.demo.pubsub; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPubSub; /** * Redis 发布/订阅测试 * @author hxstrive.com */ public class PubSubTest { public static void main(String[] args) { server(); // 启动服务端,订阅频道 client(); // 启动客户端,发布消息 } private static void server() { new Thread(new Runnable() { @Override public void run() { JedisPool jedisPool = new JedisPool("127.0.01", 6379); Jedis jedis = jedisPool.getResource(); // 订阅名为 myChannel 频道 jedis.subscribe(new JedisPubSub() { @Override public void onMessage(String channel, String message) { // 输出频道和消息 System.out.println("channel = " + channel + ", message = " + message); } }, "myChannel"); } }).start(); } private static void client() { try { JedisPool jedisPool = new JedisPool("127.0.01", 6379); Jedis jedis = jedisPool.getResource(); for(int i = 0; i < 5; i++) { Thread.sleep(500); // 发布一个消息 jedis.publish("myChannel", "hello! jedis"); } } catch (Exception e) { e.printStackTrace(); } } }
运行程序,输出如下:
channel = myChannel, message = hello! jedis channel = myChannel, message = hello! jedis channel = myChannel, message = hello! jedis channel = myChannel, message = hello! jedis channel = myChannel, message = hello! jedis
如果有某个/某些模式和该频道匹配,所有订阅这个/这些频道的客户端也同样会收到信息。方法定义如下:
void psubscribe(BinaryJedisPubSub jedisPubSub, byte[]... patterns) 订阅一个或多个模式,接收所有和这些模式匹配频道中的消息,回调需要实现 BinaryJedisPubSub 抽象类
void psubscribe(JedisPubSub jedisPubSub, String... patterns) 订阅一个或多个模式,接收所有和这些模式匹配频道中的消息,回调需实现 JedisPubSub 抽象类
示例:
package com.hxstrive.redis.demo.pubsub; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPubSub; /** * Redis 发布/订阅测试,基于模式订阅 * @author hxstrive.com 2023/3/27 */ public class PubSubTest2 { public static void main(String[] args) { server(); // 启动服务端,订阅频道 client(); // 启动客户端,发布消息 } private static void server() { new Thread(new Runnable() { @Override public void run() { JedisPool jedisPool = new JedisPool("127.0.01", 6379); Jedis jedis = jedisPool.getResource(); // 订阅频道名称匹配 *.hxstrive.com 模式的频道 jedis.psubscribe(new JedisPubSub() { @Override public void onPMessage(String pattern, String channel, String message) { // 输出频道和消息 System.out.println("pattern = " + pattern + ", channel = " + channel + ", message = " + message); } }, "*.hxstrive.com"); } }).start(); } private static void client() { JedisPool jedisPool = new JedisPool("127.0.01", 6379); Jedis jedis = jedisPool.getResource(); // 发布多个消息 try { Thread.sleep(100); jedis.publish("www.hxstrive.com", "hi, www.hxstrive.com"); Thread.sleep(100); jedis.publish("doc.hxstrive.com", "hi, doc.hxstrive.com"); Thread.sleep(100); jedis.publish("sms.hxstrive.com", "hi, sms.hxstrive.com"); Thread.sleep(100); jedis.publish("www.inlive365.com", "hi, www.inlive365.com"); } catch (Exception e) { e.printStackTrace(); } } }
运行程序,输出如下:
pattern = *.hxstrive.com, channel = www.hxstrive.com, message = hi, www.hxstrive.com pattern = *.hxstrive.com, channel = doc.hxstrive.com, message = hi, doc.hxstrive.com pattern = *.hxstrive.com, channel = sms.hxstrive.com, message = hi, sms.hxstrive.com