延迟队列顾名思义就是存放延迟消息的队列,所谓“延迟消息”是指消息被发送后,并不想让消费者立刻进行消费,而是需要等待指定的时间后,消费者才能够进行消费。
延迟队列的使用场景有很多,例如:
订单支付 —— 相信读者有过网上购物的习惯,当我们下了订单后,需要支付。而支付一般需要在30分钟(也可能是20分钟等)内完成支付。如果你没有在30分钟内完成支付,那么订单将变成无效的订单。此时,这个订单支付的时效功能就可以采用延迟队列来处理。
定时控制 —— 不知道读者是否玩过类似远程控制的设备,我们只需要在设备对应的APP/WEB应用中定义一个定时控制的指令,指定该指令在多少分钟后进行远程操作。例如:远程定时控制扫地机器人进行扫地、远程定时控制空调等等。这些远程控制的功能,也可以使用延迟队列来实现。
注意:在 AMQP 协议或者 RabbitMQ 中,并没有提供延迟队列的直接支持。但是,我们可以结合前面章节介绍的死信交换器(DLX)和过期时间(TTL)来模拟延迟队列的功能。
消息生产者发送一个带有 TTL(延迟时间)的消息到正常的交换器,且正常交换器绑定的队列没有被任何消费者订阅(也就不会被消费)。当时间慢慢过去,一旦时间超过我们为消息设置的 TTL 时间后,RabbitMQ 将会自动将过期的消息发送到队列指定的死信交换器,死信交换器将消息路由到死信队列。此时,如果有消费者订阅了死信队列,则这些消息将被消费者消费,就这样成功模拟了一个延迟队列。消息流转过程如下图:

示例的关键代码如下:
(1)消费者代码如下:
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
Connection connection = factory.newConnection();
// 创建信道
Channel channel = connection.createChannel();
// 声明死信交换器和死信队列,且将两者绑定到一起
channel.exchangeDeclare(exchangeDlxName, "fanout");
channel.queueDeclare(queueDlxName, true, false, true, null);
channel.queueBind(queueDlxName, exchangeDlxName, "");
// 声明普通交换器和队列,且将两者绑定到一起
channel.exchangeDeclare(exchangeName, "topic");
Map<String,Object> queueArgs = new HashMap<String, Object>();
// 为队列设置死信交换器
queueArgs.put("x-dead-letter-exchange", exchangeDlxName);
channel.queueDeclare(queueName, true, false, false, queueArgs);
channel.queueBind(queueName, exchangeName, "*.hxstrive.com");
// 发送带有TTL过期时间的消息(过期消息设置为10秒,即消息将延迟10秒后被执行)
System.out.println("[Sender] Send Message...");
String message = "exchange DLX message";
// 通过消息属性设置设置消息TTL时间
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2) // 持久化消息
.expiration("10000") // 设置 TTL=10秒
.build();
channel.basicPublish(exchangeName, "www.hxstrive.com", properties, message.getBytes());
System.out.println("[Sender] message = '" + message + "'");(2)消费者代码如下:
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
Connection connection = factory.newConnection();
// 创建信道
final Channel channel = connection.createChannel();
// 声明队列
// 声明死信交换器和死信队列,且将两者绑定
channel.exchangeDeclare(exchangeDlxName, "fanout");
channel.queueDeclare(queueDlxName, true, false, true, null);
channel.queueBind(queueDlxName, exchangeDlxName, "");
// 消费消息
System.out.println("[Consumer] Waiting Message...");
channel.basicConsume(queueDlxName, false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("[Consumer] body = " + new String(body));
}
});点击查看完整代码(ExchangeDlx2.java)。