RabbitMQ 教程

优先级队列

在谈优先级队列之前,我们先谈谈什么是优先级?优先级(priority)是一种约定,通常约定优先级高的先做/先处理,优先级低的后做/后处理。

在生活中,优先级也无处不在,例如去银行办理业务的VIP通道、开车中的转弯让直行等等均是优先级体现。但是我们这里谈的是计算机中的优先级,优先级是计算机分时操作系统在处理多个作业程序时,决定各个作业程序接受系统资源的优先等级的参数,优先级高的作业能够分配更多的资源,如:CPU、内存等等。

优先级队列,顾名思义,具有高优先级的队列具有高的优先权,优先级高的消息具备优先被消费的特权。

在 RabbitMQ 中,可以通过设置队列的 x-max-priority 参数来实现。

优先级队列示例

该示例将创建一个生产者和消费者,先由生产者发送5个不同优先级的消息到交换器,然后启动消费者,消费者将根据消息的优先级先后进行消费。

(1)生产者关键代码如下:

// 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
Connection connection = factory.newConnection();

// 创建信道
Channel channel = connection.createChannel();

// 声明交换器
channel.exchangeDeclare(EXCHANGE_NAME, "topic");

// 声明队列
Map<String,Object> argss =new HashMap<String,Object>() ;
// 设置队列的优先级为10
argss.put("x-max-priority", 10);
channel.queueDeclare(QUEUE_NAME, true, false, true, argss) ;
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.hxstrive.com");

// 发送消息
System.out.println("[Sender] Send Message...");
for(int i = 0; i < 5; i++) {
    // 为每个消息设置随机优先级,优先级位于 0~10
    int priority = (int)(Math.random()*11);
    String message = "Priority Message priority=" + priority;
    // 设置消息的优先级
    AMQP.BasicProperties msgProps = new AMQP.BasicProperties.Builder()
            .priority(priority).build();
    channel.basicPublish(EXCHANGE_NAME, "www.hxstrive.com",
            msgProps, message.getBytes());
    System.out.println("[Sender] message = " + message);
}

// 启动消费者
consumer();

上面的代码演示的是如何配置一个队列的最大优先级。可以通过 RabbitMQ 管理客户端查看队列的情况:

通过 Web 管理页面可以看到上面创建的队列拥有“Pri”标识。

(2)消费者关键代码如下:

// 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
Connection connection = factory.newConnection();

// 创建信道
final Channel channel = connection.createChannel();

// 声明交换器
channel.exchangeDeclare(EXCHANGE_NAME, "topic");

// 声明队列
Map<String,Object> argss =new HashMap<String,Object>() ;
// 设置队列的优先级为10
argss.put("x-max-priority", 10);
channel.queueDeclare(QUEUE_NAME, true, false, true, argss) ;
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.hxstrive.com");

// 消费消息
System.out.println("[Consumer] Waiting Message...");
channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
            AMQP.BasicProperties properties, byte[] body) throws IOException {
        try {
            // 模拟费时超过
            System.out.println("[Consumer] message = " + new String(body));
            Thread.sleep(1000);

            // 手动发送ACK确认消息
            channel.basicAck(envelope.getDeliveryTag(), false);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
});

上面的代码中为每个消息设置了随机优先级,默认最低优先级为 0,最高优先级为队列设置的最大优先级(上面设置的为10)。优先级高的消息可以被优先消费,这个也是有前提的:如果在消费者的消费速度大于生产者的速度且 Broker 服务中没有消息堆积的情况下,对发送的消息设置优先级也就没有什么实际意义。因为生产者刚发送完一条消息就被消费者消费了,那么就相当于 Broker 服务中至多只有一条消息,对于单条消息来说优先级是没有什么意义的。

运行程序输出如下:

[Sender] Send Message...
[Sender] message = Priority Message priority=10
[Sender] message = Priority Message priority=2
[Sender] message = Priority Message priority=7
[Sender] message = Priority Message priority=8
[Sender] message = Priority Message priority=8
[Consumer] Waiting Message...
[Consumer] message = Priority Message priority=10
[Consumer] message = Priority Message priority=8
[Consumer] message = Priority Message priority=8
[Consumer] message = Priority Message priority=7
[Consumer] message = Priority Message priority=2

点击查看完整的示例代码(PriorityQueueDemo.java)。

说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
公众号