前面章节介绍了 RabbitMQ 的基础用法,如声明交换器、声明队列、发送消息、消费消息等。
不知道读者是否会有这么一个疑问?发送给 RabbitMQ 的消息到底到哪里去了。如果发送给 RabbitMQ 服务器的消息的路由键和某个绑定到交换器上的队列的绑定键匹配,则消息将会路由到该队列。如果发送给 RabbitMQ 服务器的消息的路由键和任何绑定键都不匹配,此时消息时如何处理的呢?
消息生产者在发送消息时使用 channel.basicPublish() 方法,方法定义如下:
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
参数说明:
mandatory:如果将 mandatory 参数设为 true 时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么 RabbitMQ 会调用 Basic.Return 命令将消息返回给生产者。当 mandatory 参数设置为 false 时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,则消息直接被丢弃。
immediate:当 immediate 参数设为 true 时,如果交换器在将消息路由到队列时发现队列上并不存在任何消费者,那么这条消息将不会存入队列中。当与路由键匹配的所有队列都没有消费者时,该消息会通过 Basic .Return 返回至生产者。
其他参数说明见 “发送消息” 章节。其中,mandatory 和 immediate 参数它们都有当消息传递过程中不可到达目的地时将消息返回给消息生产者的功能。RabbitMQ 提供的备份交换器可以将未能被交换器路由的消息存储起来,而不用返回给客户端。
当 mandatory 参数设置为 true 时,交换器无法根据自身的交换器类型和路由键找到一个符合条件的队列时,那么 RabbitMQ 会调用 Basic.Return 命令将消息返回给生产者。当 mandatory 参数设置为 false 时,如果消息无法匹配到任何队列,则消息直接被丢弃。
那么生产者如何获取到没有被正确路由到合适队列的消息呢?可以通过调用 channel.addReturnListener() 方法来添加 ReturnListener 监听器实现。
示例代码:
该示例中的生产者发送一个消息,但是该生产者的交换器上面并没有绑定任何队列。通过 channel.addReturnListener() 添加监听器 ReturnListener 监听没有匹配队列的消息,代码如下:
// 创建连接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); Connection connection = factory.newConnection(); // 创建通道 Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); channel.addReturnListener(new ReturnListener() { public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("<< " + new String(body)); } }); // 发送消息 System.out.println("[Send] Sending Message..."); byte[] msg = ("hello wrold " + System.currentTimeMillis()).getBytes(); channel.basicPublish(EXCHANGE_NAME, "www.hxstrive.com", true, MessageProperties.PERSISTENT_TEXT_PLAIN, msg); System.out.println("[Send] msg = " + new String(msg)); Thread.sleep(Long.MAX_VALUE); // 释放资源 channel.close(); connection.close();
上面代码中生产者没有成功地将消息路由到队列,此时 RabbitMQ 会通过 Basic.Return 命令返回这条消息,之后生产者客户端可通过 ReturnListener 监听到了这个事件,运行上面代码的最后输出应该是“<< hello wrold 1645507799205”。
当 immediate 参数设置为 true 时,如果交换器在将消息路由到队列时发现队列并没有被任何消费者订阅,那么这条消息将不会存入队列中。当与路由键匹配的所有队列都没有绑定消费者时,该消息会通过 Basic.Return 返回给生产者。
从 RabbitMQ 3.0 版本开始去掉了对 immediate 参数的支持,对此 RabbitMQ 官方的解释为:immediate 参数会影响镜像队列的性能,增加了代码复杂性,建议采用 TTL 和 DLX 的方法替代。
mandatory 参数告诉服务器至少将该消息路由到一个队里中,否则将消息返回给生产者。immediate 参数告诉服务器,如果该消息关联的队列上有消费者,则立刻投递。如果所有匹配的队列上都没有消费者,则直接将消息返回给生产者,不用将消息存入队列等待消费者。
点击查看完整示例:ReturnListener1.java