延迟队列顾名思义就是存放延迟消息的队列,所谓“延迟消息”是指消息被发送后,并不想让消费者立刻进行消费,而是需要等待指定的时间后,消费者才能够进行消费。
延迟队列的使用场景有很多,例如:
订单支付 —— 相信读者有过网上购物的习惯,当我们下了订单后,需要支付。而支付一般需要在30分钟(也可能是20分钟等)内完成支付。如果你没有在30分钟内完成支付,那么订单将变成无效的订单。此时,这个订单支付的时效功能就可以采用延迟队列来处理。
定时控制 —— 不知道读者是否玩过类似远程控制的设备,我们只需要在设备对应的APP/WEB应用中定义一个定时控制的指令,指定该指令在多少分钟后进行远程操作。例如:远程定时控制扫地机器人进行扫地、远程定时控制空调等等。这些远程控制的功能,也可以使用延迟队列来实现。
注意:在 AMQP 协议或者 RabbitMQ 中,并没有提供延迟队列的直接支持。但是,我们可以结合前面章节介绍的死信交换器(DLX)和过期时间(TTL)来模拟延迟队列的功能。
消息生产者发送一个带有 TTL(延迟时间)的消息到正常的交换器,且正常交换器绑定的队列没有被任何消费者订阅(也就不会被消费)。当时间慢慢过去,一旦时间超过我们为消息设置的 TTL 时间后,RabbitMQ 将会自动将过期的消息发送到队列指定的死信交换器,死信交换器将消息路由到死信队列。此时,如果有消费者订阅了死信队列,则这些消息将被消费者消费,就这样成功模拟了一个延迟队列。消息流转过程如下图:
示例的关键代码如下:
(1)消费者代码如下:
// 创建连接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); Connection connection = factory.newConnection(); // 创建信道 Channel channel = connection.createChannel(); // 声明死信交换器和死信队列,且将两者绑定到一起 channel.exchangeDeclare(exchangeDlxName, "fanout"); channel.queueDeclare(queueDlxName, true, false, true, null); channel.queueBind(queueDlxName, exchangeDlxName, ""); // 声明普通交换器和队列,且将两者绑定到一起 channel.exchangeDeclare(exchangeName, "topic"); Map<String,Object> queueArgs = new HashMap<String, Object>(); // 为队列设置死信交换器 queueArgs.put("x-dead-letter-exchange", exchangeDlxName); channel.queueDeclare(queueName, true, false, false, queueArgs); channel.queueBind(queueName, exchangeName, "*.hxstrive.com"); // 发送带有TTL过期时间的消息(过期消息设置为10秒,即消息将延迟10秒后被执行) System.out.println("[Sender] Send Message..."); String message = "exchange DLX message"; // 通过消息属性设置设置消息TTL时间 AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .deliveryMode(2) // 持久化消息 .expiration("10000") // 设置 TTL=10秒 .build(); channel.basicPublish(exchangeName, "www.hxstrive.com", properties, message.getBytes()); System.out.println("[Sender] message = '" + message + "'");
(2)消费者代码如下:
// 创建连接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); Connection connection = factory.newConnection(); // 创建信道 final Channel channel = connection.createChannel(); // 声明队列 // 声明死信交换器和死信队列,且将两者绑定 channel.exchangeDeclare(exchangeDlxName, "fanout"); channel.queueDeclare(queueDlxName, true, false, true, null); channel.queueBind(queueDlxName, exchangeDlxName, ""); // 消费消息 System.out.println("[Consumer] Waiting Message..."); channel.basicConsume(queueDlxName, false, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("[Consumer] body = " + new String(body)); } });
点击查看完整代码(ExchangeDlx2.java)。