RabbitMQ 教程

拒绝消息

前面章节介绍了怎样进行消息确认(ack),如果我们在收到某条消息后,并不想消费这条消息,怎么办呢?

如果不做任何业务逻辑处理,直接进行消息确认,这会导致该条消息丢失,因为消息没有被真正的消费。

如果我们不进行消息确认,这会导致该条消息一直位于 RabbitMQ 服务的“已投递,待确认”区域,其他消费者也不能消费(直到当前消费者断开连接,消息可能会被重新投递给其他消费者消费),明显这也不合理。

难道 RabbitMQ 没有提供更好的处理方式吗?当然不是,RabbitMQ 在 2.0.0 版本开始引入了 Basic.Reject 命令,消费者可以调用与其对应的 channel.basicReject() 方法来告诉 RabbitMQ 拒绝这个消息。

Channel 类中的 basicReject() 方法定义如下:

void basicReject(long deliveryTag, boolean requeue) throws IOException;

参数说明:

  • deliveryTag:可以把他当做消息编号,它是一个64位的长整型值。

  • requeue:是否将消息重新存入队列。如果设置为 true,则 RabbitMQ 会重新将这条消息存入队列,以便可以发送给下一个订阅者。如果设置为 false,则 RabbitMQ 立即会把消息从队列中删除,而不是把消息重新存入队列。

示例代码

生产者发送一个消息,消费者消费该消息。消费者并没有进行真实消费,收到消息后对消息进行拒绝,且重新将消息放入队列。代码如下:

// 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
Connection connection = factory.newConnection();

// 创建通道
final Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");

// 绑定exchange与queue
final String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "*.hxstrive.com");
System.out.println("[Receive] Waiting Message...");

// 消费消息
channel.basicConsume(queueName, false, new DefaultConsumer(channel){
   @Override
   public void handleDelivery(String consumerTag, Envelope envelope,
         AMQP.BasicProperties properties, byte[] body) throws IOException {
      // 打印消息
      System.out.println("[Receive] message = " + new String(body));

      // 拒绝消息
      try {
         Thread.sleep(1000);
      } catch (Exception e) {}
      channel.basicReject(envelope.getDeliveryTag(), true);
   }
});

运行上面代码,控制台会每隔一秒打印“[Receive] message = ...”消息。

Basic.Reject 命令一次只能拒绝一条消息,如果想要批量拒绝消息,则可以使用 Basic.Nack 这个命令,该命令对应 channel.basicNack() 方法,定义如下:

void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

参数说明:

  • deliveryTag:可以把他当做消息编号,它是一个64位的长整型值。

  • multiple:如果设置为 false,则表示拒绝编号为 deliveryTag 的这一条消息,这时候 basicNack() 和 basicReject() 方法一样;如果设置为 true,则表示拒绝 deliveryTag 编号之前所有未被当前消费者确认的消息。

  • requeue:是否将消息重新存入队列。如果设置为 true,则 RabbitMQ 会重新将这条消息存入队列,以便可以发送给下一个订阅者。如果设置为 false,则 RabbitMQ 立即会把消息从队列中删除,而不是把消息重新存入队列。

示例代码

生产者一次发送5条消息,消息格式为“hello word 序号”的消息。消费者消费到消息中包含序号 5 时,利用 channel.basicReject() 方法进行批量拒绝消息。代码如下:

// 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
Connection connection = factory.newConnection();

// 创建通道
final Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");

// 绑定exchange与queue
final String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "*.hxstrive.com");
System.out.println("[Receive] Waiting Message...");

// 消费消息
channel.basicConsume(queueName, false, new DefaultConsumer(channel){
   @Override
   public void handleDelivery(String consumerTag, Envelope envelope,
         AMQP.BasicProperties properties, byte[] body) throws IOException {
      // 打印消息
      String msg = new String(body);
      System.out.println("[Receive] message = " + msg);
      // 拒绝消息,不重新放入队列
      if(msg.contains("5")) {
         try {
            Thread.sleep(10000);
            // 批量拒绝前4个和当前消息
            // 通过 RabbitMQ 管理界面观察
            // Unacked 值为 5,等待 10 秒后 Unacked 为 0
            System.out.println("[Receive] call channel.basicNack()");
            channel.basicNack(envelope.getDeliveryTag(),
                  true, false);
         } catch (Exception e) {
            e.printStackTrace();
         }
      }
   }
});

注意:

将 channel.basicReject() 或者 channel.basicNack() 中的 requeue  参数设置为 false,可以启用 “死信队列” 功能。死信队列,又可以称之为“延迟队列”、“延时队列”,也是 RabbitMQ 队列中的一种;顾名思义,指的是进入该队列中的消息会被延迟消费的队列,这种队列跟普通的队列相比,最大的差异在于消息一旦进入普通队列将会立即被消费处理,而延迟队列则是会过一定的时间再进行消费。死信队列可以通过检测被拒绝或者未送达的消息来追踪问题。

对于 requeue,AMQP 中还有一个命令 Basic.Recover 具备可重入队列的特性。它对应 channel.basicRecover() 方法,该方法用来请求 RabbitMQ 重新发送还未被确认的消息。方法定义如下:

Basic.RecoverOk basicRecover() throws IOException;
Basic.RecoverOk basicRecover(boolean requeue) throws IOException;

参数说明:

  • requeue:如果将 requeue 参数设置为 true,则未被确认的消息会被重新加入到队列中,且对于同一条消息可能被分配给其他消费者。如果将 requeue 参数设置为 false,那么同一条消息会被分配给与之前相同的消费者。默认情况下 requeue 参数为 true。

点击查看完整示例:

说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
公众号