RabbitMQ 教程

生产者确认

在使用 RabbitMQ 的时候,可以通过队列和消息的持久化操作来解决因为服务器的崩溃而导致的消息丢失。除此之外,我们还会遇到一个问题,当消息的生产者将消息发送出去之后,消息到底有没有正确地被发送到 RabbitMq 服务器呢?如果不进行任何特殊配置,默认情况下发送消息的操作是不会返回任何信息给生产者的,也就是默认情况下生产者是不知道消息有没有正确发送给 RabbitMQ 服务器。如果在消息到达 RabbitMQ 服务器之前就己经丢失,持久化操作也解决不了消息丢失问题,因为消息根本没有到达服务器,何谈持久化?

RabbitMQ 针对上面问题,提供了两种解决方案:

(1)通过事务机制实现;

(2)通过发送方确认(Publisher Confirm)机制实现;

事务机制

RabbitMQ Java 客户端中与事务机制相关的方法有三个:

  • channel.txSelect() 用于将当前信道 Channel 设置为事务模式;

  • channel.txCommit() 用于提交事务;

  • channel.txRollback() 用于事务回滚;

我们在通过 channel.txSelect() 方法开启事务之后,我们就可以发布消息给 RabbitMQ 服务器了。如果事务提交成功,则消息一定会到达 RabbitMQ 服务器;如果在事务提交执行之前由于 RabbitMQ 异常崩溃或者其他原因抛出异常,这个时候我们便可以将其捕获,进而通过执行 channel.txRollback() 方法来实现事务回滚。

注意:这里的 RabbitMQ 中的事务机制与大多数数据库中的事务概念并不相同,需要注意区分。

示例:该示例定义了一个生产者和一个消费者,生产者使用事务机制的方式发送一条消息,然后由消费者进行消费。关键代码如下:

try {
    // 开启事务
    channel.txSelect();
    // 发送消息
    String message = "transaction message";
    channel.basicPublish(EXCHANGE_NAME, "www.hxstrive.com",
            false, null, message.getBytes());
    System.out.println("[Send] Send message ‘" + message + "’");
    // 提交消息
    channel.txCommit();
} catch (Exception e) {
    e.printStackTrace();
    // 事务回滚
    channel.txRollback();
}

上面代码,先使用 channel.txSelect() 开启事务,然后发送消息,最后使用 channel.txCommit() 提交消息。如果发送消息没有抛出任何异常,则对应的 AMQP 协议流转过程如下图:

点击查看示例完整代码(TransactionDemo1.java)。

如果发送消息期间发送了任何异常,则对应的 AMQP 协议流转过程如下图:

点击查看示例完整代码(TransactionDemo2.java)。

如果要发送多条消息,直接将 channel.txSelect()、channel.txCommit() 和 channel.txRollback() 放入循环语句即可。关键代码如下:

for(int i = 0; i < 10; i++) {
    try {
        // 开启事务
        channel.txSelect();
        // 发送消息
        String message = "transaction message i=" + i;
        channel.basicPublish(EXCHANGE_NAME, "www.hxstrive.com", false, null, message.getBytes());
        System.out.println("[Send] Send message ‘" + message + "’");
        // 提交消息
        channel.txCommit();
    } catch (Exception e) {
        e.printStackTrace();
        // 事务回滚
        channel.txRollback();
    }
}

点击查看示例完整代码(TransactionDemo3.java)。

事务确实能够解决消息发送方和 RabbitMQ 之间消息确认的问题,只有消息成功被 RabbitMQ 接收,事务才能提交成功,否则便可在捕获异常之后进行事务回滚,与此同时可以进行消息重发。

但是使用事务机制会 “吸干” RabbitMQ 的性能,那么有没有更好的方法既能保证消息发送方确认消息已经正确送达,又能基本上不带来性能上的损失呢?从 AMQP 协议层面来看并没有更好的办法,但是 RabbitMQ 提供了一个改进方案,即发送方确认机制。

发送方确认机制

前面介绍了 RabbitMQ 可能会遇到的一个问题,即消息发送方(生产者〉并不知道消息是否真正地到达了 RabbitMQ 服务器。随后了解到在 AMQP 协议层面提供了事务机制来解决这个问题,但是采用事务机制实现会严重降低 RabbitMQ 的消息吞吐量,这里就引入了一种轻量级的方式 ———— 发送方确认(Publisher Confirm)机制

生产者将信道(Channel)设置成 confirm(确认)模式。一旦信道进入 confirm 模式,所有在该信道上面发布的消息都会被指派一个唯一的 ID(ID 从 1 开始)。一旦消息被投递到所有匹配的队列之后,RabbitMQ 会发送一个确认(Basic.Ack)给生产者(包含消息的唯一 ID),这就使得生产者知晓消息已经正确到达了目的地了。如果消息和队列是可持久化的,那么确认消息会在消息写入磁盘之后发出。RabbitMQ 回传给生产者的确认消息中的 deliveryTag 包含了确认消息的序号,此外 RabbitMQ 可以设置 channel.basicAck() 方法中的 multiple 参数,表示到这个序号之前的所有消息都己经得到了处理。

事务机制在一条消息发送之后会使发送端阻塞,以等待 RabbitMQ 的回应,之后才能继续发送下一条消息。相比之下,发送方确认机制最大的好处在于它是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用程序便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack(Basic.Nack)命令,生产者应用程序同样可以在回调方法中处理该 nack 命令。

生产者通过调用 channel.confirmSelect() 方法(即 Confirm.Select 命令)将信道设置为 confirm 模式,之后 RabbitMQ 会返回 Confirm.Select-Ok 命令表示同意生产者将当前信道设置为 confirm 模式。所有被发送的后续消息都被 ack 或者 nack 次,不会出现一条消息既被 ack 又被 nack 情况,并且 RabbitMQ 并没有对消息被 confirm 的快慢做任何保证。

下面是确定机制的关键代码,如下:

// 将信道置为 publisher confirm 模式
channel.confirmSelect();
// 发送消息
String message = "transaction message";
channel.basicPublish(EXCHANGE_NAME, "www.hxstrive.com",
        false, null, message.getBytes());
System.out.println("[Send] Send message ‘" + message + "’");
// 等待消息被确认
if(!channel.waitForConfirms()) {
    System.out.println("[Send] Send message failed.");
}

点击查看完整示例代码(SenderAckDemo1.java),对应的 AMQP 协议流转过程如下图:

如果发送多条消息,只需要将 channel.basicPublish() 和 chnnel.waitForConfirms() 方法包裹在循环里面即可,不过不需要将channel.confirmSelect() 方法放在循环内部。示例代码如下:

// 将信道置为 publisher confirm 模式
channel.confirmSelect();
for(int i = 0; i < 10; i++) {
    // 发送消息
    String message = "transaction message i=" + i;
    channel.basicPublish(EXCHANGE_NAME, "www.hxstrive.com", false, null, message.getBytes());
    System.out.println("[Send] Send message ‘" + message + "’");
    // 等待消息被确认
    if (!channel.waitForConfirms()) {
        System.out.println("[Send] Send message failed.");
    }
}

点击查看完整示例代码(SenderAckDemo2.java)。

对于 channel.waitForConfirms() 而言,在 RabbitMQ 客户端中它有4个同类方法:

boolean waitForConfirms() throws InterruptedException;
boolean waitForConfirms(long timeout) throws InterruptedException, TimeoutException;

参数说明:

  • timeout 表示超时时间,一旦等待 RabbitMQ 回应超时就会抛出 java.util.concurrent.TimeoutException 的异常。

如果信道没有开启 publisher confirm 模式,则调用任何 waitForConfirms() 方法都会报出 java.lang.IllegalStateException 。对于没有参数的 waitForConfirms() 方法来说,其返回的条件是客户端收到了相应的 Basic.Ack/Basic.Nack 或者被中断。

void waitForConfirmsOrDie() throws IOException, InterruptedException;
void waitForConfirmsOrDie(long timeout) throws IOException, InterruptedException, TimeoutException;

参数说明:

  • timeout 表示超时时间,如果超时到期,则会引发 TimeoutException 异常。

两个 waitForConfirmsOrDie() 方法在接收到 RabbitMQ 服务器返回的 Basic.Nack 之后会抛出 java.io.IOException。

注意事项

上面介绍的事务机制和生产者确认机制两者是互斥的,不能共存。如果试图将已开启事务模式的信道(Channel)再设置为确认模式,RabbitMQ 会报错。如果试图将已开启发送者确认模式的信道(Channel)再次设直为事务模式,则 RabbitMQ 也会报错。

事务机制和发送方确认机制确保的是消息能够正确地发送至 RabbitMQ 服务器,这里的“发送至 RabbitMQ” 的含义是指消息被正确地发往至 RabbitMQ 的交换器,如果此交换器没有匹配的队列,那么消息也会丢失。所以在使用这两种机制的时候要确保所涉及的交换器能够有匹配的队列。

更进一步地讲,发送方要配合 mandatory 参数或者备份交换器一起使用来提高消息传输的可靠性。

生产者确认机制的优势在于并不一定需要同步确认。可以对使用方式进行改进,通常有如下两种:

批量确认方法

批量确认指客户端程序需要定期或者定量(达到多少条),或者两者结合起来调用 channel.waitForConfirms() 方法来等待 RabbitMQ 的确认返回。相比于前面示例中的普通确认方式,批量确认极大地提升了确认效率。但是,如果当 RabbitMQ 服务器返回 Basic.Nack 或者超时时,客户端需要将这一批次的消息全部重发,这明显会带来大量的重复消息,并且当消息经常丢失时,批量确认的性能反而会降低。

示例关键代码如下:

private void sendMessage(Channel channel, String message) throws Exception {
    channel.basicPublish(EXCHANGE_NAME, "www.hxstrive.com",
            false, null, message.getBytes());
    System.out.println("[Send] Send message ‘" + message + "’");
    // 将消息添加到缓存
    MESSAGE_LIST.add(message);

    // 每20条消息确认一次
    if(MESSAGE_LIST.size() >= 5) {
        try {
            // 等待消息被确认
            if (channel.waitForConfirms(2000)) {
                // 将缓存中的消息清空
                System.out.println("[Send] clear message");
                MESSAGE_LIST.clear();
            } else {
                // 重新发送缓存中的消息
                String[] msgs = MESSAGE_LIST.toArray(new String[]{});
                MESSAGE_LIST.clear();
                for(String msg : msgs) {
                    System.out.println("[Send] resend msg=" + msg);
                    sendMessage(channel, msg);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
            // 重新发送缓存中的消息
            String[] msgs = MESSAGE_LIST.toArray(new String[]{});
            MESSAGE_LIST.clear();
            for(String msg : msgs) {
                System.out.println("[Send] resend msg=" + msg + ", error=" + e.getMessage());
                sendMessage(channel, msg);
            }
        }
    }
}

上面代码中,将在每发送5条消息时进行一次消息确认。如果消息确认失败,或者抛出其他异常时,将会重新发送缓存中的消息。

点击查看完整示例(SenderAckDemo3.java)。

异步确认方法

异步确认方法的编程实现最为复杂。客户端 Channel 接口中提供的 addConfirmListener() 方法可以添加 ConfirmListener 回调接口。ConfirmListener 接口包含两个方法:

  • handleAck(long deliveryTag, boolean multiple):用来处理 RabbitMQ 回传的 Basic.ACK 命令

  • handleNack(long deliveryTag, boolean multiple):用来处理 RabbitMQ 回传的 Basic.Nack 命令

上面两个方法都含有一个参数 deliveryTag(用来标记消息的唯一有序序号)。我们需要为每一个信道维护一个“未确认”的消息序号集合,每发送一条消息,集合中的元素加1。每当调用 ConfirmListener 中的 handleAck 方法时,从“未确认”集合中删除相应的一条(即 multiple 设置为 false)或多条(即 multiple 设置为 true)记录。

示例关键代码如下:

// 添加监听器
channel.addConfirmListener(new ConfirmListener() {
    // 处理 Basic.Ack 命令
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("ConfirmListener.handleAck deliveryTag=" +
                deliveryTag + ", multiple=" + multiple);
        if (multiple) {
            // headSet() 方法将返回小于 deliveryTag - 1 的元素
            MESSAGE_LIST.headSet(deliveryTag - 1).clear();
        } else {
            // 移除等于 deliveryTag 的元素
            MESSAGE_LIST.remove(deliveryTag);
        }
    }

    // 处理 Basic.Nack 命令
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("ConfirmListener.handleNack deliveryTag=" +
                deliveryTag + ", multiple=" + multiple);
        if (multiple) {
            MESSAGE_LIST.headSet(deliveryTag - 1).clear();
        } else {
            MESSAGE_LIST.remove(deliveryTag);
        }
    }
});

// 发送消息
for(int i = 0; i < 20; i++) {
    long publishSeqNo = channel.getNextPublishSeqNo();
    MESSAGE_LIST.add(publishSeqNo);

    String message = "transaction message i=" + i;
    channel.basicPublish(EXCHANGE_NAME, "www.hxstrive.com",
            false, null, message.getBytes());
    System.out.println("[Send] Send message ‘" + message + "’");
}

点击查看完整示例(SenderAckDemo4.java)。

说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
公众号