RabbitMQ 为了保证消息从队列可靠地到达消费者,它提供了消息确认机制(Message Acknowledgement)。消费者在订阅队列时,可以指定 autoAck 参数,当 autoAck 设置为 false,RabbitMQ 会等待消费者手动地回复 ACK 确认信号,当收到 ACK 确认信号后才将消息从消息队列移除(实际上是先打上删除标记,之后再删除)。当 autoAck 等于 true 时,RabbitMQ 会自动把发送出去的消息设置为确认,然后从队列中删除,而不管消费者是否真正地消费到这个消息。
channel.basicConsume() 方法定义如下:
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
参数说明:
queue:指待订阅队列的名称
autoAck:指是否自动确认消息,true-自动确认、false-手动确认。
callback:回调类,该类必须实现 Consumer 接口,推荐直接继承 DefaultConsumer 类。
RabbitMQ 采用消息确认机制后,只要设置 autoAck 参数为 false,消费者就有足够的时间去处理消息,而不用担心处理消息过程中消费者进程宕掉后消息丢失问题。因为 RabbitMQ 会一直等待消息确认消息(Basic.Ack)。如果 RabbitMQ 一直没有收到消息确认信号,并且消费此消息的消费者已经断开连接,则 RabbitMQ 会将该消息重新放入到队列,等待投递给下一个消费者,当然也可能还是原来的那个消费者。
对 RabbitMQ 服务器而言,队列中的消息分成了两部分:
第一部分:等待投递给消费者的消息,待投递消息个数可以在 RabbitMQ 管理界面查看消息中的 “Ready” 状态;
第二部分:已经投递给消费者,但是还没有收到消息确认信号的消息;已投递待确认的消息个数可以在 RabbitMQ 管理界面查看消息中的 “Unacked” 状态。如下图:
也可以通过下面命令查看上图信息:
D:\server\rabbitmq_server-3.9.11\sbin>rabbitmqctl list_queues name messages_ready Timeout: 60.0 seconds ... Listing queues for vhost / ... name messages_ready amq.gen-2Msc0BixbKt1A5pAwlYmng 0
点击查看完整示例代码:
AckMessage1.java 消费消息,手动从消息队列拉取,手动确认
AckMessage1.java 消费消息,手动从消息队列拉取,自动确认