RabbitMQ 教程

死信队列(DLX)

RabbitMQ 中由于某些原因(如路由键和绑定键不匹配,或者交换器上面没有绑定队列等等)导致消息无法被正确的投递,为了确保消息不会被无故的丢弃,一般将其置于一个特殊的队列中,这个队列一般称之为死信队列。

如果消息在一个队列中变成死信(Dead Message)之后,它将被重新发送到另一个交换器中,该交换器称为死信交换器,绑定在死信交换器上的队列就称之为死信队列。

死信交换器的简称为 DLX(Dead-Letter-Exchange),也有人称它为死信邮箱。

消息变成死信有如下几种情况:

  • 消息被拒绝(Basic.Reject/Basic.Nack),并且设置 requeue 参数为 false;

  • 消息过期;

  • 队列达到最大长度;

死信交换器也是一个正常的交换器,和一般的交换器没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。当这个队列中存在死信时 RabbitMQ 就会自动地将这个消息重新发布到设置的死信交换器,进而死信交换器将消息路由到绑定到它的队列(即死信队列)。我们可以监听这个队列中的消息、以进行相应的处理,这个特性与将消息的 TTL 设置为 0 配合使用可以弥补 immediate 参数的功能。

队列添加死信交换器

通过在队列声明方法 channel.queueDeclare() 中设置 x-dead-letter-exchange 参数来为当前队列添加死信交换器(DLX)。示例代码如下:

// 创建信道
Channel channel = connection.createChannel();
 
// 声明死信交换器和死信队列
channel.exchangeDeclare(exchangeDlxName, "fanout");
channel.queueDeclare(queueDlxName, true, false, true, null);
// 将死信队列绑定到死信交换器上
channel.queueBind(queueDlxName, exchangeDlxName, "");
 
// 声明普通交换器和队列
channel.exchangeDeclare(exchangeName, "topic");
Map<String,Object> queueArgs = new HashMap<String, Object>();
// 为队列设置死信交换器
queueArgs.put("x-dead-letter-exchange", exchangeDlxName);
channel.queueDeclare(queueName, true, false, false, queueArgs);
// 将普通队列绑定到普通交换器
channel.queueBind(queueName, exchangeName, "*.hxstrive.com");

// 发送消息
System.out.println("[Sender] Send Message...");
for(int i = 0; i < Integer.MAX_VALUE; i++) {
    String message = "exchange DLX message " + i;
    // 消息将被消费
    channel.basicPublish(exchangeName, "www.hxstrive.com", null, message.getBytes());
    System.out.println("[Sender] message = '" + message + "'");
    Thread.sleep(3000);
}

点击查看完整源码(ExchangeAe1.java)。

上面创建了两个交换器 exchangeDlxName 和 exchangeName,分别绑定了两个队列 queueDlxName 和 queueName。通过浏览器访问 http://localhost:15672 地址,打开 RabbitMQ 管理页面。上面创建的 queueDlxName 和 queueName 队列上面均被标记了“D”(即持久化 durable)。其中,queueName 队列还标记了“DLX”,DLX 指的是 x-dead-letter-exchange 属性设置的死信交换器。如下图:

上面的生产者,将每隔3秒发送一条包含“www.hxstrive.com”路由键的消息到 exchangeName 交换器。在该交换器上面有一个消费者,该消费者收到交换器的路由的信息后,直接将消息拒绝 channel.basicReject()。RabbitMQ 会将拒绝的消息发给死信交换器 exchangeDlxName,而死信交换器上面也绑定了一个队列 queueDlxName(即死信队列)。另外,还有一个消费者用来消费死信队列中的消息,收到消息后直接输出。流程如下图:

注意:上面指定的 exchangeDlxName、exchangeName、queueDlxName 和 queueName 均为变量名。

对于 RabbitMQ 来说,DLX 是一个非常有用的特性。它可以处理异常情况下,消息不能够被消费者正确消费(消费者调用了 Basic.Nack 或者 Basic.Reject)而被置入死信队列中的情况。

说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
公众号