当 RabbitMQ 服务器的队列拥有多个消费者时,队列收到的消息将以轮询(round-robin)的方式分发给消费者。每条消息只会发送给消费者订阅列表里的一个消费者。这种方式非常适合扩展,而且它是专门为并发程序设计的。如果现在消费者负载非常高,消息根本处理不过来,只需要创建更多的消费者来消费处理消息即可。
在大部分的情况下,轮询分发消息能工作的很好,但是一些特定情况下还是存在着问题。默认情况下,如果有 n 个消费者,那么 RabbitMQ 服务器会将第 m 条消息分发给第 m%n(取余的方式)个消费者,RabbitMQ 服务器不管消费者是否消费并己经确认(Basic.Ack)了消息,到下一个轮训周期依然会发送消息给当前消费者。试想一下,如果某些消费者任务繁重,来不及消费那么多的消息,而某些其他消费者由于某些原因(比如业务逻辑简单、机器性能卓越等)很快地处理完了所分配到的消息,进而进程空闲,这样就会造成整体应用吞吐量的下降,即部分机器忙的不行,部分机器闲的不行,资源没有被最大程度使用。
那么该如何处理上面出现的问题呢?这就要用到方法,channel.basicQos() 方法允许限制信道上的消费者所能保持的最大未确认消息的数量。
例如消费者在订阅消费队列之前,消费端程序调用了 channel.basicQos(5),之后订阅了某个队列进行消费。RabbitMQ 服务器会保存一个消费者的列表,每发送一条消息都会为对应的消费者计数,如果达到了所设定的上限(指通过 channel.basicQos 方法设置的值),那么 RabbitMQ 服务器就不会向这个消费者再发送任何消息。直到消费者确认了某条消息之后 RabbitMQ 服务器将相应的计数减一,之后消费者可以继续接收消息,直到再次到达计数上限。这种机制可以类比于 TCP/IP 中的 “滑动窗口”。
注意:Basic.Qos 的使用对于拉模式的消费方式无效。
channel.basicQos() 有三种类型的重载方法,如下:
void basicQos(int prefetchCount) throws IOException; void basicQos(int prefetchCount, boolean global) throws IOException; void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
参数说明:
prefetchSize:指定消费者所能接收未确认消息的总体大小的上限(单位:B)。如果将 prefetchSize 设置为 0,则表示没有上限。
prefetchCount:指定该信道 Channel 上的消费者所能保持的最大未确认消息的数量。如果将 prefetchCount 设置为 0,则表示没有上限。
注意:对于一个信道来说,它可以同时消费多个队列,当设置了 prefetchCount 大于 1 时,这个信道需要和各个队列协调以确保发送的消息都没有超过所限定的 prefetchCount 的值,这样会降低 RabbitMQ 的性能,尤其是这些队列分散在集群中的多个 Broker 节点之中。
global:RabbitMQ 为了提升相关的性能(例如:同一个信道消费多个队列时,且 prefetchCount 参数大于 1),在 AMQP0-9-1 协议之上重新定义了 global 这个参数。取值如下:
true:在 AMQP0-9-1 协议中,当前通信链路(Connection)上所有的消费者都需要遵从 prefetchCount 参数的限定值;而 RabbitMQ 中,只需要信道(Channel)上所有的消费者都要遵从 prefetchCount 的限定值;
false:在 AMQP0-9-1 协议中,信道上所有的消费者都需要遵从 prefetchCount 参数的限定;而 RabbitMQ 中,信道上新的消费者才需要遵从 prefetchCount 参数的限定;
(1)声明一个生产者和两个消费者。其中,两个消费者均将 basicQos 设置为1,且均采用手动 Ack 确认。消费者内部采用 Thread.sleep 来模拟消息消费耗时,一个消费者休眠1秒,另一个消费者休眠2秒。生产者发送10个消息到交换器,观察消息被两个消费者消费的情况。关键代码如下:
// 发送消息 System.out.println("[Sender] Send Message..."); for(int i = 0; i < 10; i++) { String message = "qos message i=" + i; channel.basicPublish(EXCHANGE_NAME, "www.hxstrive.com", null, message.getBytes()); System.out.println("[Sender] message=" + message); } // 消费者1 System.out.println("[Consumer1] Waiting for a message...."); channel.basicQos(1); channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { System.out.println("[Consumer1] body = " + new String(body)); Thread.sleep(2000); channel.basicAck(envelope.getDeliveryTag(), false); } catch (Exception e) { e.printStackTrace(); } } }); // 消费者2 System.out.println("[Consumer2] Waiting for a message...."); channel.basicQos(1); channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { System.out.println("[Consumer2] body = " + new String(body)); Thread.sleep(1000); channel.basicAck(envelope.getDeliveryTag(), false); } catch (Exception e) { e.printStackTrace(); } } });
运行示例程序,输出信息如下:
[Consumer1] Waiting for a message.... [Consumer2] Waiting for a message.... [Sender] Send Message... [Sender] message=qos message i=0 [Sender] message=qos message i=1 [Sender] message=qos message i=2 [Sender] message=qos message i=3 [Sender] message=qos message i=4 [Sender] message=qos message i=5 [Sender] message=qos message i=6 [Sender] message=qos message i=7 [Sender] message=qos message i=8 [Sender] message=qos message i=9 [Consumer1] body = qos message i=0 [Consumer2] body = qos message i=1 [Consumer2] body = qos message i=2 [Consumer1] body = qos message i=3 [Consumer2] body = qos message i=4 [Consumer2] body = qos message i=5 [Consumer1] body = qos message i=6 [Consumer2] body = qos message i=7 [Consumer2] body = qos message i=8 [Consumer1] body = qos message i=9
仔细观察上面输出日志,可以得知,Consumer2消费消息的速度是Consumer1的两倍。这也说明了,每个消费者只能持有一个未确认的消息。读者也可以将 channel.basicQos() 设置为2,观察输出结果。点击查看示例完整代码(QosDemo1.java)。