在谈优先级队列之前,我们先谈谈什么是优先级?优先级(priority)是一种约定,通常约定优先级高的先做/先处理,优先级低的后做/后处理。
在生活中,优先级也无处不在,例如去银行办理业务的VIP通道、开车中的转弯让直行等等均是优先级体现。但是我们这里谈的是计算机中的优先级,优先级是计算机分时操作系统在处理多个作业程序时,决定各个作业程序接受系统资源的优先等级的参数,优先级高的作业能够分配更多的资源,如:CPU、内存等等。
优先级队列,顾名思义,具有高优先级的队列具有高的优先权,优先级高的消息具备优先被消费的特权。
在 RabbitMQ 中,可以通过设置队列的 x-max-priority 参数来实现。
该示例将创建一个生产者和消费者,先由生产者发送5个不同优先级的消息到交换器,然后启动消费者,消费者将根据消息的优先级先后进行消费。
(1)生产者关键代码如下:
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
Connection connection = factory.newConnection();
// 创建信道
Channel channel = connection.createChannel();
// 声明交换器
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 声明队列
Map<String,Object> argss =new HashMap<String,Object>() ;
// 设置队列的优先级为10
argss.put("x-max-priority", 10);
channel.queueDeclare(QUEUE_NAME, true, false, true, argss) ;
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.hxstrive.com");
// 发送消息
System.out.println("[Sender] Send Message...");
for(int i = 0; i < 5; i++) {
// 为每个消息设置随机优先级,优先级位于 0~10
int priority = (int)(Math.random()*11);
String message = "Priority Message priority=" + priority;
// 设置消息的优先级
AMQP.BasicProperties msgProps = new AMQP.BasicProperties.Builder()
.priority(priority).build();
channel.basicPublish(EXCHANGE_NAME, "www.hxstrive.com",
msgProps, message.getBytes());
System.out.println("[Sender] message = " + message);
}
// 启动消费者
consumer();上面的代码演示的是如何配置一个队列的最大优先级。可以通过 RabbitMQ 管理客户端查看队列的情况:

通过 Web 管理页面可以看到上面创建的队列拥有“Pri”标识。
(2)消费者关键代码如下:
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
Connection connection = factory.newConnection();
// 创建信道
final Channel channel = connection.createChannel();
// 声明交换器
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 声明队列
Map<String,Object> argss =new HashMap<String,Object>() ;
// 设置队列的优先级为10
argss.put("x-max-priority", 10);
channel.queueDeclare(QUEUE_NAME, true, false, true, argss) ;
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.hxstrive.com");
// 消费消息
System.out.println("[Consumer] Waiting Message...");
channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
// 模拟费时超过
System.out.println("[Consumer] message = " + new String(body));
Thread.sleep(1000);
// 手动发送ACK确认消息
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (Exception e) {
e.printStackTrace();
}
}
});上面的代码中为每个消息设置了随机优先级,默认最低优先级为 0,最高优先级为队列设置的最大优先级(上面设置的为10)。优先级高的消息可以被优先消费,这个也是有前提的:如果在消费者的消费速度大于生产者的速度且 Broker 服务中没有消息堆积的情况下,对发送的消息设置优先级也就没有什么实际意义。因为生产者刚发送完一条消息就被消费者消费了,那么就相当于 Broker 服务中至多只有一条消息,对于单条消息来说优先级是没有什么意义的。
运行程序输出如下:
[Sender] Send Message... [Sender] message = Priority Message priority=10 [Sender] message = Priority Message priority=2 [Sender] message = Priority Message priority=7 [Sender] message = Priority Message priority=8 [Sender] message = Priority Message priority=8 [Consumer] Waiting Message... [Consumer] message = Priority Message priority=10 [Consumer] message = Priority Message priority=8 [Consumer] message = Priority Message priority=8 [Consumer] message = Priority Message priority=7 [Consumer] message = Priority Message priority=2
点击查看完整的示例代码(PriorityQueueDemo.java)。