RabbitMQ 中由于某些原因(如路由键和绑定键不匹配,或者交换器上面没有绑定队列等等)导致消息无法被正确的投递,为了确保消息不会被无故的丢弃,一般将其置于一个特殊的队列中,这个队列一般称之为死信队列。
如果消息在一个队列中变成死信(Dead Message)之后,它将被重新发送到另一个交换器中,该交换器称为死信交换器,绑定在死信交换器上的队列就称之为死信队列。
死信交换器的简称为 DLX(Dead-Letter-Exchange),也有人称它为死信邮箱。
消息变成死信有如下几种情况:
消息被拒绝(Basic.Reject/Basic.Nack),并且设置 requeue 参数为 false;
消息过期;
队列达到最大长度;
死信交换器也是一个正常的交换器,和一般的交换器没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。当这个队列中存在死信时 RabbitMQ 就会自动地将这个消息重新发布到设置的死信交换器,进而死信交换器将消息路由到绑定到它的队列(即死信队列)。我们可以监听这个队列中的消息、以进行相应的处理,这个特性与将消息的 TTL 设置为 0 配合使用可以弥补 immediate 参数的功能。
通过在队列声明方法 channel.queueDeclare() 中设置 x-dead-letter-exchange 参数来为当前队列添加死信交换器(DLX)。示例代码如下:
// 创建信道 Channel channel = connection.createChannel(); // 声明死信交换器和死信队列 channel.exchangeDeclare(exchangeDlxName, "fanout"); channel.queueDeclare(queueDlxName, true, false, true, null); // 将死信队列绑定到死信交换器上 channel.queueBind(queueDlxName, exchangeDlxName, ""); // 声明普通交换器和队列 channel.exchangeDeclare(exchangeName, "topic"); Map<String,Object> queueArgs = new HashMap<String, Object>(); // 为队列设置死信交换器 queueArgs.put("x-dead-letter-exchange", exchangeDlxName); channel.queueDeclare(queueName, true, false, false, queueArgs); // 将普通队列绑定到普通交换器 channel.queueBind(queueName, exchangeName, "*.hxstrive.com"); // 发送消息 System.out.println("[Sender] Send Message..."); for(int i = 0; i < Integer.MAX_VALUE; i++) { String message = "exchange DLX message " + i; // 消息将被消费 channel.basicPublish(exchangeName, "www.hxstrive.com", null, message.getBytes()); System.out.println("[Sender] message = '" + message + "'"); Thread.sleep(3000); }
点击查看完整源码(ExchangeAe1.java)。
上面创建了两个交换器 exchangeDlxName 和 exchangeName,分别绑定了两个队列 queueDlxName 和 queueName。通过浏览器访问 http://localhost:15672 地址,打开 RabbitMQ 管理页面。上面创建的 queueDlxName 和 queueName 队列上面均被标记了“D”(即持久化 durable)。其中,queueName 队列还标记了“DLX”,DLX 指的是 x-dead-letter-exchange 属性设置的死信交换器。如下图:
上面的生产者,将每隔3秒发送一条包含“www.hxstrive.com”路由键的消息到 exchangeName 交换器。在该交换器上面有一个消费者,该消费者收到交换器的路由的信息后,直接将消息拒绝 channel.basicReject()。RabbitMQ 会将拒绝的消息发给死信交换器 exchangeDlxName,而死信交换器上面也绑定了一个队列 queueDlxName(即死信队列)。另外,还有一个消费者用来消费死信队列中的消息,收到消息后直接输出。流程如下图:
注意:上面指定的 exchangeDlxName、exchangeName、queueDlxName 和 queueName 均为变量名。
对于 RabbitMQ 来说,DLX 是一个非常有用的特性。它可以处理异常情况下,消息不能够被消费者正确消费(消费者调用了 Basic.Nack 或者 Basic.Reject)而被置入死信队列中的情况。