前面章节介绍了怎样进行消息确认(ack),如果我们在收到某条消息后,并不想消费这条消息,怎么办呢?
如果不做任何业务逻辑处理,直接进行消息确认,这会导致该条消息丢失,因为消息没有被真正的消费。
如果我们不进行消息确认,这会导致该条消息一直位于 RabbitMQ 服务的“已投递,待确认”区域,其他消费者也不能消费(直到当前消费者断开连接,消息可能会被重新投递给其他消费者消费),明显这也不合理。
难道 RabbitMQ 没有提供更好的处理方式吗?当然不是,RabbitMQ 在 2.0.0 版本开始引入了 Basic.Reject 命令,消费者可以调用与其对应的 channel.basicReject() 方法来告诉 RabbitMQ 拒绝这个消息。
Channel 类中的 basicReject() 方法定义如下:
void basicReject(long deliveryTag, boolean requeue) throws IOException;
参数说明:
deliveryTag:可以把他当做消息编号,它是一个64位的长整型值。
requeue:是否将消息重新存入队列。如果设置为 true,则 RabbitMQ 会重新将这条消息存入队列,以便可以发送给下一个订阅者。如果设置为 false,则 RabbitMQ 立即会把消息从队列中删除,而不是把消息重新存入队列。
生产者发送一个消息,消费者消费该消息。消费者并没有进行真实消费,收到消息后对消息进行拒绝,且重新将消息放入队列。代码如下:
// 创建连接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); Connection connection = factory.newConnection(); // 创建通道 final Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 绑定exchange与queue final String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "*.hxstrive.com"); System.out.println("[Receive] Waiting Message..."); // 消费消息 channel.basicConsume(queueName, false, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 打印消息 System.out.println("[Receive] message = " + new String(body)); // 拒绝消息 try { Thread.sleep(1000); } catch (Exception e) {} channel.basicReject(envelope.getDeliveryTag(), true); } });
运行上面代码,控制台会每隔一秒打印“[Receive] message = ...”消息。
Basic.Reject 命令一次只能拒绝一条消息,如果想要批量拒绝消息,则可以使用 Basic.Nack 这个命令,该命令对应 channel.basicNack() 方法,定义如下:
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
参数说明:
deliveryTag:可以把他当做消息编号,它是一个64位的长整型值。
multiple:如果设置为 false,则表示拒绝编号为 deliveryTag 的这一条消息,这时候 basicNack() 和 basicReject() 方法一样;如果设置为 true,则表示拒绝 deliveryTag 编号之前所有未被当前消费者确认的消息。
requeue:是否将消息重新存入队列。如果设置为 true,则 RabbitMQ 会重新将这条消息存入队列,以便可以发送给下一个订阅者。如果设置为 false,则 RabbitMQ 立即会把消息从队列中删除,而不是把消息重新存入队列。
生产者一次发送5条消息,消息格式为“hello word 序号”的消息。消费者消费到消息中包含序号 5 时,利用 channel.basicReject() 方法进行批量拒绝消息。代码如下:
// 创建连接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); Connection connection = factory.newConnection(); // 创建通道 final Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 绑定exchange与queue final String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "*.hxstrive.com"); System.out.println("[Receive] Waiting Message..."); // 消费消息 channel.basicConsume(queueName, false, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 打印消息 String msg = new String(body); System.out.println("[Receive] message = " + msg); // 拒绝消息,不重新放入队列 if(msg.contains("5")) { try { Thread.sleep(10000); // 批量拒绝前4个和当前消息 // 通过 RabbitMQ 管理界面观察 // Unacked 值为 5,等待 10 秒后 Unacked 为 0 System.out.println("[Receive] call channel.basicNack()"); channel.basicNack(envelope.getDeliveryTag(), true, false); } catch (Exception e) { e.printStackTrace(); } } } });
注意:
将 channel.basicReject() 或者 channel.basicNack() 中的 requeue 参数设置为 false,可以启用 “死信队列” 功能。死信队列,又可以称之为“延迟队列”、“延时队列”,也是 RabbitMQ 队列中的一种;顾名思义,指的是进入该队列中的消息会被延迟消费的队列,这种队列跟普通的队列相比,最大的差异在于消息一旦进入普通队列将会立即被消费处理,而延迟队列则是会过一定的时间再进行消费。死信队列可以通过检测被拒绝或者未送达的消息来追踪问题。
对于 requeue,AMQP 中还有一个命令 Basic.Recover 具备可重入队列的特性。它对应 channel.basicRecover() 方法,该方法用来请求 RabbitMQ 重新发送还未被确认的消息。方法定义如下:
Basic.RecoverOk basicRecover() throws IOException; Basic.RecoverOk basicRecover(boolean requeue) throws IOException;
参数说明:
requeue:如果将 requeue 参数设置为 true,则未被确认的消息会被重新加入到队列中,且对于同一条消息可能被分配给其他消费者。如果将 requeue 参数设置为 false,那么同一条消息会被分配给与之前相同的消费者。默认情况下 requeue 参数为 true。
点击查看完整示例:
RejectMessage1.java 消费消息,拒绝消息(拒绝后自动重新放入到队列
RejectMessage2.java 消费消息,批量拒绝消息