在前面章节我们介绍了怎样使用 RabbitMQ Java 客户端去发送/消费消息,在发送/消费消息之前,我们会通过 Java 代码动态声明一个交换器和队列。在创建交换器或队列时可以配置一些可选的属性来获得不同的功能,比如:x-message-ttl、x-expires、x-max-length 等等。
但是,通过 Java 客户端为交换器或队列设定的属性参数一旦设置成功就不能在改变,除非将原来的交换器或队列删除,重新创建新的交换器或队列。难道 RabbitMQ 没有提供可以动态修改这些属性的方法吗?当然提供了的,Policy(策略)就能很好的解决上面的问题。
Policy 是一种特殊的运行时参数的用法。Policy 是 vhost 级别的。一个 Policy 可以匹配一个或多个交换器或队列,这样便于批量管理。
Policy 支持动态地修改一些属性参数,这就解决了 RabbitMQ 客户端创建的交换器和队列不能修改的问题,也大大提高了应用的灵活性。
我们可以通过 RabbitMQ 的 rabbitmq_management 插件使用可视化的方式对 Policy 进行管理。登录进入 RabbitMQ 页面,打开 “Admin”->“Policies”,如下图:
点击“Add / update a policy”链接,添加一个 Policy,如下图:
上面需要用户输入很多信息,其中 Name 和 Pattern 是必输的。每个输入项的含义如下:
Virtual host :表示当前 Policy 所在的 vhost 虚拟机是哪个
Name :表示当前 Policy 的名称
Pattern:一个正则表达式,用来匹配相关的队列或者交换器。例如:^test.+ 将匹配所有以 test 开头的交换器或队列
Apply to:用来指定当前 Policy 作用范围,取值如下:
Exchanges and queues:表示作用与 Pattern 所匹配的所有队列和交换器
Exchanges:表示作用于 Pattern 所匹配的所有交换器
Queues:表示作用于与 Pattern 所匹配的所有队列
Priority:定义 Policy 的优先级。如果有多个 Policy 作用于同一个交换器或者队列,那么 Priority 最大的那个 Policy 才会有用
Definition:定义一组或者多组键值对,为匹配的交换器或者队列附加相应的功能
通过上面介绍的页面创建一个名为 policy-test 的 Policy 策略。该策略将应用到所有以 policy 开头的队列中,设置队列中消息的 message-ttl 为 3 秒中。如下图:
然后,编写如下测试代码,一次性发送 10 个消息到 policy-PolicyDemo1 队列。然后观察队列消息的变化,所有消息在过了3秒后将自动被删除。代码如下:
import com.rabbitmq.client.*; public class PolicyDemo1 { /** 交换器名称 */ private final String EXCHANGE_NAME = "exchange-" + PolicyDemo1.class.getSimpleName(); /** 队列名称 */ private final String QUEUE_NAME = "policy-" + PolicyDemo1.class.getSimpleName(); public static void main(String[] args) throws Exception { PolicyDemo1 demo = new PolicyDemo1(); demo.sender(); } /** * 生产者 * @throws Exception */ private void sender() throws Exception { // 创建连接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); Connection connection = factory.newConnection(); // 创建信道 Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); channel.queueDeclare(QUEUE_NAME, true, false, false, null) ; channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.hxstrive.com"); // 发送消息 System.out.println("[Sender] Send Message..."); for(int i = 0; i < 10; i++) { String message = "policy message i = " + i; channel.basicPublish(EXCHANGE_NAME, "www.hxstrive.com", null, message.getBytes()); System.out.println("[Sender] message = " + message); } } }