RabbitMQ 教程

rabbitmq_management 管理 Policy 策略

在前面章节我们介绍了怎样使用 RabbitMQ Java 客户端去发送/消费消息,在发送/消费消息之前,我们会通过 Java 代码动态声明一个交换器和队列。在创建交换器或队列时可以配置一些可选的属性来获得不同的功能,比如:x-message-ttl、x-expires、x-max-length 等等。

但是,通过 Java 客户端为交换器或队列设定的属性参数一旦设置成功就不能在改变,除非将原来的交换器或队列删除,重新创建新的交换器或队列。难道 RabbitMQ 没有提供可以动态修改这些属性的方法吗?当然提供了的,Policy(策略)就能很好的解决上面的问题。

Policy 是一种特殊的运行时参数的用法。Policy 是 vhost 级别的。一个 Policy 可以匹配一个或多个交换器或队列,这样便于批量管理。

Policy 支持动态地修改一些属性参数,这就解决了 RabbitMQ 客户端创建的交换器和队列不能修改的问题,也大大提高了应用的灵活性。

rabbitmq_management 管理 Policy

我们可以通过 RabbitMQ 的 rabbitmq_management 插件使用可视化的方式对 Policy 进行管理。登录进入 RabbitMQ 页面,打开 “Admin”->“Policies”,如下图: 

点击“Add / update a policy”链接,添加一个 Policy,如下图:

上面需要用户输入很多信息,其中 Name 和 Pattern 是必输的。每个输入项的含义如下:

  • Virtual host :表示当前 Policy 所在的 vhost 虚拟机是哪个

  • Name :表示当前 Policy 的名称

  • Pattern:一个正则表达式,用来匹配相关的队列或者交换器。例如:^test.+ 将匹配所有以 test 开头的交换器或队列

  • Apply to:用来指定当前 Policy 作用范围,取值如下:

    • Exchanges and  queues:表示作用与 Pattern 所匹配的所有队列和交换器

    • Exchanges:表示作用于 Pattern 所匹配的所有交换器

    • Queues:表示作用于与 Pattern 所匹配的所有队列

  • Priority:定义 Policy 的优先级。如果有多个 Policy 作用于同一个交换器或者队列,那么 Priority 最大的那个 Policy 才会有用

  • Definition:定义一组或者多组键值对,为匹配的交换器或者队列附加相应的功能

示例

通过上面介绍的页面创建一个名为 policy-test 的 Policy 策略。该策略将应用到所有以 policy 开头的队列中,设置队列中消息的 message-ttl 为 3 秒中。如下图:

然后,编写如下测试代码,一次性发送 10 个消息到 policy-PolicyDemo1 队列。然后观察队列消息的变化,所有消息在过了3秒后将自动被删除。代码如下:

import com.rabbitmq.client.*;

public class PolicyDemo1 {
    /** 交换器名称 */
    private final String EXCHANGE_NAME = "exchange-" + PolicyDemo1.class.getSimpleName();
    /** 队列名称 */
    private final String QUEUE_NAME = "policy-" + PolicyDemo1.class.getSimpleName();

    public static void main(String[] args) throws Exception {
        PolicyDemo1 demo = new PolicyDemo1();
        demo.sender();
    }

    /**
     * 生产者
     * @throws Exception
     */
    private void sender() throws Exception {
        // 创建连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        Connection connection = factory.newConnection();

        // 创建信道
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        channel.queueDeclare(QUEUE_NAME, true, false, false, null) ;
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.hxstrive.com");

        // 发送消息
        System.out.println("[Sender] Send Message...");
        for(int i = 0; i < 10; i++) {
            String message = "policy message i = " + i;
            channel.basicPublish(EXCHANGE_NAME, "www.hxstrive.com",
                    null, message.getBytes());
            System.out.println("[Sender] message = " + message);
        }
    }

}
说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
公众号