在谈优先级队列之前,我们先谈谈什么是优先级?优先级(priority)是一种约定,通常约定优先级高的先做/先处理,优先级低的后做/后处理。
在生活中,优先级也无处不在,例如去银行办理业务的VIP通道、开车中的转弯让直行等等均是优先级体现。但是我们这里谈的是计算机中的优先级,优先级是计算机分时操作系统在处理多个作业程序时,决定各个作业程序接受系统资源的优先等级的参数,优先级高的作业能够分配更多的资源,如:CPU、内存等等。
优先级队列,顾名思义,具有高优先级的队列具有高的优先权,优先级高的消息具备优先被消费的特权。
在 RabbitMQ 中,可以通过设置队列的 x-max-priority 参数来实现。
该示例将创建一个生产者和消费者,先由生产者发送5个不同优先级的消息到交换器,然后启动消费者,消费者将根据消息的优先级先后进行消费。
(1)生产者关键代码如下:
// 创建连接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); Connection connection = factory.newConnection(); // 创建信道 Channel channel = connection.createChannel(); // 声明交换器 channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 声明队列 Map<String,Object> argss =new HashMap<String,Object>() ; // 设置队列的优先级为10 argss.put("x-max-priority", 10); channel.queueDeclare(QUEUE_NAME, true, false, true, argss) ; channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.hxstrive.com"); // 发送消息 System.out.println("[Sender] Send Message..."); for(int i = 0; i < 5; i++) { // 为每个消息设置随机优先级,优先级位于 0~10 int priority = (int)(Math.random()*11); String message = "Priority Message priority=" + priority; // 设置消息的优先级 AMQP.BasicProperties msgProps = new AMQP.BasicProperties.Builder() .priority(priority).build(); channel.basicPublish(EXCHANGE_NAME, "www.hxstrive.com", msgProps, message.getBytes()); System.out.println("[Sender] message = " + message); } // 启动消费者 consumer();
上面的代码演示的是如何配置一个队列的最大优先级。可以通过 RabbitMQ 管理客户端查看队列的情况:
通过 Web 管理页面可以看到上面创建的队列拥有“Pri”标识。
(2)消费者关键代码如下:
// 创建连接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); Connection connection = factory.newConnection(); // 创建信道 final Channel channel = connection.createChannel(); // 声明交换器 channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 声明队列 Map<String,Object> argss =new HashMap<String,Object>() ; // 设置队列的优先级为10 argss.put("x-max-priority", 10); channel.queueDeclare(QUEUE_NAME, true, false, true, argss) ; channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.hxstrive.com"); // 消费消息 System.out.println("[Consumer] Waiting Message..."); channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { // 模拟费时超过 System.out.println("[Consumer] message = " + new String(body)); Thread.sleep(1000); // 手动发送ACK确认消息 channel.basicAck(envelope.getDeliveryTag(), false); } catch (Exception e) { e.printStackTrace(); } } });
上面的代码中为每个消息设置了随机优先级,默认最低优先级为 0,最高优先级为队列设置的最大优先级(上面设置的为10)。优先级高的消息可以被优先消费,这个也是有前提的:如果在消费者的消费速度大于生产者的速度且 Broker 服务中没有消息堆积的情况下,对发送的消息设置优先级也就没有什么实际意义。因为生产者刚发送完一条消息就被消费者消费了,那么就相当于 Broker 服务中至多只有一条消息,对于单条消息来说优先级是没有什么意义的。
运行程序输出如下:
[Sender] Send Message... [Sender] message = Priority Message priority=10 [Sender] message = Priority Message priority=2 [Sender] message = Priority Message priority=7 [Sender] message = Priority Message priority=8 [Sender] message = Priority Message priority=8 [Consumer] Waiting Message... [Consumer] message = Priority Message priority=10 [Consumer] message = Priority Message priority=8 [Consumer] message = Priority Message priority=8 [Consumer] message = Priority Message priority=7 [Consumer] message = Priority Message priority=2
点击查看完整的示例代码(PriorityQueueDemo.java)。