RabbitMQ 教程

消费消息

上面章节介绍了怎样利用 RabbitMQ Java 客户端快速发送一条消息,本章节将介绍怎样消费一条消息。

RabbitMQ 的消费分两种模式:

  • 推模式(Push):该模式采用 Basic.Consume 命令进行消费;

  • 拉模式(Pull):该模式采用 Basic.Get 命令进行消费;

推模式

推模式指有 RabbitMQ Broker 服务主动将消息推送给客户端,触发客户端实现的回调方法(Consumer 接口实现类)。

在推模式中,可以通过持续订阅某个队列的方式来从队列中消费消息。

接收消息一般通过实现 Consumer 接口或者继承 DefaultConsumer 类来实现。Consumer 接口定义如下:

public interface Consumer {
    // 当消费者通过调用任何 Channel.basicConsume 方法注册时调用。
    void handleConsumeOk(String consumerTag);
    
    // 当调用 Channel.basicCancel 取消消费者时调用
    void handleCancelOk(String consumerTag);

    // 当消费者因调用 Channel.basicCancel 以外的原因而被取消时调用。
    // 例如,队列已被删除。 有关 Channel.basicCancel 导致的消费者取消通知,请参见 handleCancelOk。
    void handleCancel(String consumerTag) throws IOException;

    // 当信道或基础连接已关闭时调用
    void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);

    // 当收到作为对 basic.recover 的回复的 basic.recover-ok 时调用。
    // 在调用此之前收到的所有尚未确认的消息将被重新传递。之后收到的所有消息都不会。
    void handleRecoverOk(String consumerTag);

    // 在收到此消费者的 basic.deliver 时调用。
    void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException;
}

当调用与 Consumer 相关的 API 方法时,不同的订阅采用不同的消费者标签(consumerTag)来区分彼此,在同一个信道 Channel 中的消费者也需要通过唯一的消费者标签进行区分。

一个简单的消费者代码如下:

// 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
Connection connection = factory.newConnection();

// 创建通道
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");

// 绑定exchange与queue
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "*.hxstrive.com");
System.out.println("[Receive] Waiting Message...");

// 消费消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
            AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("[Receive] Receive Message :: " + new String(body));
    }
});

上面代码中,basiceConsume() 方法将 autoAck 设置为 true,实现消息自动确认。当然,我们也可以将 autoAck 设置为 false,通过手动确认消息,代码如下:

// 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
Connection connection = factory.newConnection();

// 创建通道
final Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");

// 绑定exchange与queue
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "*.hxstrive.com");
System.out.println("[Receive] Waiting Message...");

// 消费消息
channel.basicConsume(queueName, false, new DefaultConsumer(channel){
   @Override
   public void handleDelivery(String consumerTag, Envelope envelope,
         AMQP.BasicProperties properties, byte[] body) throws IOException {
      System.out.println("[Receive] Receive Message :: " + new String(body));
      System.out.println("routingKey = " + envelope.getRoutingKey());
      System.out.println("contentType = " + properties.getContentType());

      // 手动确认消息
      long deliveryTag = envelope.getDeliveryTag();
      channel.basicAck(deliveryTag, false);
   }
});

上面代码中显示的设置 autoAck 为 false,然后在 handleDelivery() 方法结尾处显示调用 channel.basicAck() 方法手动进行 ack 操作。对于每个消费者来说这个设置非常重要,可以防止消息不必要地丢失。

Channel 类中的 basicConsume() 方法有如下几种重载:

String basicConsume(String queue, Consumer callback) throws IOException;

String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, Consumer callback) throws IOException;

String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException;

String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException;

参数说明:

  • queue:队列的名称

  • autoAck:设置是否自动确认。建议设成 false ,即不自动确认,手动确认

  • consumerTag:消费者标签,用来多个消费者之间进行区分

  • noLocal:设置为 true 则表示不能将同一个 Connection(连接)中生产者发送的消息传送给这个 Connection 中的消费者

  • exclusive:设置是否排它,true 表示排它

  • arguments:设置消费者的其他参数

  • callback:设置消费者的回调函数(必须实现 Consumer 接口)。用来处理 RabbitMQ 推送过来的消息,比如 DefaultConsumer 使用时需要客户端重写其中的 handleDelivery() 方法。

如果你要实现自己消费者回调类,建议直接集成 DefaultConsumer,而不是实现 Consumer 接口。继承 DefaultConsumer 类,我们一般情况下只需要重写 handleDelivery() 方法即可。如果实现 Consumer 接口,你将实现所有的方法,很是不方便。

当然,除了重写 handleDelivery() 接收消息外。你还可以重写 handleCancelOk() 和 handleCancel() 方法,这样消费端可以在取消订阅的时候触发(取消订阅使用 channel.basicCancel() 方法)。

线程安全问题

消费者客户端同样也需要考虑线程安全的问题。消费者客户端的这些回调方法被分配到与 Channel 不同的线程池上,意味着消费者客户端可以安全地调用这些阻塞方法,比如:channel.queueDeclare()、channel.basicCancel() 等。

每个 Channel 都拥有自己独立的线程,最常用的做法是一个 Channel 对应一个消费者,消费者彼此之间没有任何关联,也就不存在线程安全问题。

拉模式

拉模式指消费者主动从消息队列中获取消息,然后进行消费。拉取这个动作可以是一个定时任务,定时从消息队列中获取消息。

拉模式通过信道 Channel 的 basicGet() 方法实现,该方法可以单条地获取消息,其返回值是 GetResponse。

在信道 Channel 中仅仅定义了一个 basicGet() 方法,定义如下:

GetResponse basicGet(String queue, boolean autoAck) throws IOException;

参数说明:

  • queue:表示队列名称

  • autoAck:表示是否自动确认。如果设置 autoAck 为 false,那么需要手动调用 channel.basicAck 来确认消息已被成功消费。

下面是拉模式的示例代码:

// 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
Connection connection = factory.newConnection();

// 创建通道
final Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");

// 绑定exchange与queue
final String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "*.hxstrive.com");
System.out.println("[Receive] Waiting Message...");

// 消费消息
while (true) {
    Thread.sleep(1000);
    System.out.println("[Receive] get message...");

    GetResponse response = channel.basicGet(queueName, true);
    if (null != response) {
        String body = new String(response.getBody());
        System.out.print("[Receive] Receive Message :: " + new String(body));

        Envelope envelope = response.getEnvelope();
        System.out.print(" routingKey = " + envelope.getRoutingKey());

        BasicProperties properties = response.getProps();
        System.out.print(" contentType = " + properties.getContentType());
        System.out.println();
    }
}

注意:

Basic.Consume 将信道(Channel)设置为接收模式,直到取消队列的订阅为止。在接收模式期间,RabbitMQ 会不断地推送消息给消费者,当然推送消息的个数还是会受到 Basic.Qos 的限制。如果只想从队列获得单条消息而不是持续订阅,建议使用 Basic.Get 进行消费。但是不能将 Basic.Get 放在一个循环里来代替 Basic.Consume,这样做会严重影响 RabbitMQ的性能。如果要实现高吞吐量,消费者理应使用 Basic.Consume 方法。

点击查看完整源码:

说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
公众号