RabbitMQ 教程

了解备份交换器(AE)

备份交换器(Alternate Exchange,简称:AE),也称之为“备胎交换器”。

生产者在发送消息(channel.basicPublish)的时候如果不设置 mandatory 参数,那么消息在未被路由(即没有匹配到任何队列)的情况下将会丢失。如果设置了 mandatory 参数,那么需要添加 ReturnListener 监听器的编程逻辑,生产者的代码将变得复杂。如果既不想复杂化生产者的编程逻辑,且又不想消息丢失,那么就可以使用备份交换器,这样可以将未被路由的消息存储在 RabbitMQ 中,在需要的时候再去处理这些消息。

怎样实现备份交换器呢?可以通过在声明交换器(调用 channel.exchangeDeclare 方法)的时候添加 alternate-exchange 参数来实现,也可以通过策略( Policy)的方式实现。如果两者同时使用,则声明交换器时设置的优先级更高,会覆盖掉 Policy 的设置。

示例:演示在 channel.exchangeDeclare() 方法中通过参数设置备份交换器。代码如下:

// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
// 创建连接
Connection connection = factory.newConnection();
// 创建信道
Channel channel = connection.createChannel();

// 声明交换器
// 下面交换器用作备份交换器,该交换器的所有消息均路由到 unroutedQueue 队列
channel.exchangeDeclare("myAe","fanout",
        true , false, null) ;
channel.queueDeclare("unroutedQueue", true,
        false , false, null);
channel.queueBind("unroutedQueue","myAe","");

// 下面交换器通过 alternate-exchange 参数设置一个备份交换器
// 将交换器 myAe 设置为 normalExchange 的交换器
Map<String, Object> exchangeArgs = new HashMap<String, Object>();
exchangeArgs.put("alternate-exchange", "myAe");
channel.exchangeDeclare("normalExchange","direct",
        true, false, exchangeArgs);
channel.queueDeclare("normalQueue", true,
        false , false , null);
channel.queueBind("normalQueue", "normalExchange", "normalKey");

// 发送消息
System.out.println("[Sender] 开始发送消息...");
// 该消息将路由给 normalQueue 队列
channel.basicPublish("normalExchange", "normalKey",
        null, "hello message1".getBytes());
System.out.println("[Sender] 发送消息 “hello message1”");

// 该消息不能匹配任何队列,将发往备份交换器
channel.basicPublish("normalExchange", "www.hxstrive.com",
        null, "hello message2".getBytes());
System.out.println("[Sender] 发送消息 “hello message2”");

// 关闭连接
channel.close();
connection.close();

上面的代码中声明了两个交换器 normalExchange 和 myAe,分别绑定了 normalQueue 和 unroutedQueue 这两个队列,同时将 myAe 设置为 normalExchange 的备份交换器。

注意:myAe 交换器将交换器类型设置为 fanout,这样可以将所有发往备份交换器的消息直接路由到绑定它上面的队列。

下面代码将发送一条消息到交换器,设置路由键为 normalKey。该路由键和 normalQueue 队列的绑定键匹配,因此将该消息路由给 normalQueue 队列。

channel.basicPublish("normalExchange", "normalKey", null, "hello message1".getBytes());
System.out.println("[Sender] 发送消息 “hello message1”");

下面代码将再次发送一条信息到交换器,设置路由键为 www.hxstrive.com。与 normalQueue 队列的绑定键不匹配,因此该消息将被路由给备份交换器。

channel.basicPublish("normalExchange", "www.hxstrive.com",  null, "hello message2".getBytes());
System.out.println("[Sender] 发送消息 “hello message2”");

同样,如果采用 Policy(策略)的方式来设置备份交换器,可以参考如下:

rabbitmqctl set_policy AE "^normalExchange$" '{"alternate-exchange":"myAe"}'

备份交换器其实和普通的交换器没有太大的区别,为了方便使用,建议设置为 fanout 类型,如若设置为 direct 或者 topic 的类型也没有什么问题。但是需要注意的是,消息被重新发送到备份交换器时的路由键和从生产者发出的路由键是一样的。

如果备份交换器的类型是 direct 并且有一个与其绑定的队列,假设绑定的路由键是 key1 当某条携带路由键为 key2 的消息被转发到这个备份交换器的时候,备份交换器没有匹配到合适的队列,则消息丢失。如果消息携带的路由键为 key1,则可以路由到队列中。

rabbitmqctl set_policy 命令用法

可以使用 rabbitmqctl set_policy --help 命令查看 rabbitmqctl set_policy 命令的用法,如下:

Usage

rabbitmqctl [--node <node>] [--longnames] [--quiet] set_policy [--vhost <vhost>] [--priority <priority>] [--apply-to <apply-to>] <name> <pattern> <definition>

Sets or updates a policy.

Arguments and Options

<name>
        policy name (identifier)

<pattern>
        regular expression pattern that will be used to match queues, exchanges, etc

<definition>
        policy definition (arguments). Must be a valid JSON document

--priority <priority>
        policy priority

--apply-to <queues | exchanges | all>
        policy should only apply to queues, exchanges, or all entities (both of the above)

Relevant Doc Guides

 * https://rabbitmq.com/parameters.html

General Options

The following options are accepted by most or all commands.

short            | long          | description
-----------------|---------------|--------------------------------
-?               | --help        | displays command help
-n <node>        | --node <node> | connect to node <node>
-l               | --longnames   | use long host names
-t               | --timeout <n> | for commands that support it, operation timeout in seconds
-q               | --quiet       | suppress informational messages
-s               | --silent      | suppress informational messages
                                 | and table header row
-p               | --vhost       | for commands that are scoped to a virtual host,
                 |               | virtual host to use
                 | --formatter   | alternative result formatter to use
                                 | if supported: json, pretty_table, table, csv, erlang
                                   not all commands support all (or any) alternative formatters.

总结

  • 如果设置的备份交换器不存在,客户端和 RabbitMQ 服务端都不会有异常出现,此时消息会丢失。

  • 如果备份交换器没有绑定任何队列,客户端和 RabbitMQ 服务端都不会有异常出现,此时消息会丢失。

  • 如果备份交换器没有任何匹配的队列,客户端和 RabbitMQ 服务端都不会有异常出现,此时消息会丢失。

  • 如果备份交换器和 mandatory 参数一起使用,那么 mandatory 参数无效。

点击查看完整示例:ExchangeAe1.java

说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
公众号