发布者确认是实现可靠发布的 RabbitMQ 扩展。 当通道上启用发布者确认时,客户端发布的消息由代理异步确认,这意味着它们已在服务器端得到处理。
在本教程中,我们将使用发布者确认来确保发布的消息已安全到达代理。我们将介绍几种使用发布者确认的策略并解释它们的优缺点。
发布者确认是 AMQP 0.9.1 协议的 RabbitMQ 扩展,因此默认情况下不启用它们。使用 confirmSelect 方法在通道级别启用发布者确认:
Channel channel = connection.createChannel(); channel.confirmSelect();
必须在您希望使用发布者确认的每个频道上调用此方法。确认应该只启用一次,而不是针对每条发布的消息。
让我们从最简单的使用确认发布的方法开始,即发布一条消息并同步等待它的确认:
while (thereAreMessagesToPublish()) { byte[] body = ...; BasicProperties properties = ...; channel.basicPublish(exchange, queue, properties, body); // uses a 5 second timeout channel.waitForConfirmsOrDie(5_000); }
在前面的示例中,我们像往常一样发布一条消息,并使用 Channel#waitForConfirmsOrDie(long) 方法等待其确认。该方法在消息被确认后立即返回。如果消息在超时时间内没有得到确认,或者它被 nack-ed(意味着代理由于某种原因无法处理它),该方法将抛出异常。异常的处理通常包括记录错误消息和/或重试发送消息。
不同的客户端库有不同的方式来同步处理发布者确认,因此请务必仔细阅读您正在使用的客户端的文档。
这种技术非常简单,但也有一个主要缺点:它显着减慢了发布速度,因为消息的确认会阻止所有后续消息的发布。这种方法不会提供超过每秒数百条已发布消息的吞吐量。尽管如此,这对于某些应用程序来说已经足够了。
发布者确认是异步的吗?
我们在开头提到代理异步确认发布的消息,但在第一个示例中,代码同步等待,直到消息被确认。客户端实际上异步接收确认并相应地解除对 waitForConfirmsOrDie 的调用。将 waitForConfirmsOrDie 视为一个同步助手,它在后台依赖异步通知。
为了改进我们之前的示例,我们可以发布一批消息并等待整个批次得到确认。以下示例使用 100 个批次:
int batchSize = 100; int outstandingMessageCount = 0; while (thereAreMessagesToPublish()) { byte[] body = ...; BasicProperties properties = ...; channel.basicPublish(exchange, queue, properties, body); outstandingMessageCount++; if (outstandingMessageCount == batchSize) { ch.waitForConfirmsOrDie(5_000); outstandingMessageCount = 0; } } if (outstandingMessageCount > 0) { ch.waitForConfirmsOrDie(5_000); }
与等待单个消息的确认相比,等待一批消息被确认大大提高了吞吐量(使用远程 RabbitMQ 节点最多 20-30 次)。 一个缺点是我们不知道在失败的情况下到底出了什么问题,所以我们可能必须在内存中保留一整批来记录有意义的东西或重新发布消息。 而且这个方案还是同步的,所以会阻塞消息的发布。
代理以异步方式确认发布的消息,只需在客户端注册一个回调即可收到这些确认的通知:
Channel channel = connection.createChannel(); channel.confirmSelect(); channel.addConfirmListener((sequenceNumber, multiple) -> { // code when message is confirmed }, (sequenceNumber, multiple) -> { // code when message is nack-ed });
有 2 种回调:一种用于确认消息,另一种用于 nack-ed 消息(可被代理视为丢失的消息)。 每个回调有 2 个参数:
sequenceNumber:标识已确认或未确认消息的编号。 我们将很快看到如何将它与发布的消息关联起来。
multiple:这是一个布尔值。 如果为假,则仅确认/确认一条消息,如果为真,则确认/确认所有具有较低或相等序列号的消息。
发布前可以通过 Channel#getNextPublishSeqNo() 获取序列号:
int sequenceNumber = channel.getNextPublishSeqNo()); ch.basicPublish(exchange, queue, properties, body);
将消息与序列号相关联的一种简单方法是使用映射。 假设我们想要发布字符串,因为它们很容易变成一个字节数组来发布。 下面是一个代码示例,它使用映射将发布序列号与消息的字符串正文相关联:
ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>(); // ... code for confirm callbacks will come later String body = "..."; outstandingConfirms.put(channel.getNextPublishSeqNo(), body); channel.basicPublish(exchange, queue, properties, body.getBytes());
发布代码现在使用地图跟踪出站消息。我们需要在确认到达时清理此映射,并在消息被 nack 时执行诸如记录警告之类的操作:
ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>(); ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> { if (multiple) { ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap( sequenceNumber, true ); confirmed.clear(); } else { outstandingConfirms.remove(sequenceNumber); } }; channel.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> { String body = outstandingConfirms.get(sequenceNumber); System.err.format( "Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n", body, sequenceNumber, multiple ); cleanOutstandingConfirms.handle(sequenceNumber, multiple); }); // ... publishing code
前面的示例包含一个回调,当确认到达时清理地图。 请注意,此回调处理单个和多个确认。 当确认到达时使用此回调(作为 Channel#addConfirmListener 的第一个参数)。 nack-ed 消息的回调检索消息正文并发出警告。 然后它重新使用之前的回调来清理未完成的确认映射(无论消息是确认的还是 nack-ed,都必须删除它们在映射中的相应条目。)
如何跟踪未完成的确认?
我们的示例使用 ConcurrentNavigableMap 来跟踪未完成的确认。 这种数据结构很方便有几个原因。 它允许轻松地将序列号与消息相关联(无论消息数据是什么),并轻松地将条目清理到给定的序列 id(以处理多个确认/nack)。 最后,它支持并发访问,因为确认回调是在客户端库拥有的线程中调用的,该线程应与发布线程保持不同。
除了复杂的映射实现之外,还有其他方法可以跟踪未完成的确认,例如使用简单的并发哈希映射和变量来跟踪发布序列的下限,但它们通常涉及更多并且不属于教程。
综上所述,异步处理发布者确认通常需要以下步骤:
提供一种将发布序列号与消息相关联的方法。
在通道上注册一个确认侦听器,以便在发布者确认 / 确认到达以执行适当的操作时收到通知,例如记录或重新发布已确认的消息。 在此步骤中,序列号与消息的关联机制也可能需要进行一些清理。
在发布消息之前跟踪发布序列号。
重新发布 nack-ed 消息?
从相应的回调中重新发布 nack-ed 消息可能很诱人,但应该避免这种情况,因为确认回调是在通道不应该执行操作的 I/O 线程中调度的。更好的解决方案是将消息排入内存队列,该队列由发布线程轮询。像 ConcurrentLinkedQueue 这样的类将非常适合在确认回调和发布线程之间传输消息。
在某些应用程序中,确保已发布的消息发送到代理可能是必不可少的。 发布者确认是有助于满足此要求的 RabbitMQ 功能。 发布者确认本质上是异步的,但也可以同步处理它们。 没有明确的方法来实现发布者确认,这通常归结为应用程序和整个系统中的约束。 典型的技术是:
单独发布消息,同步等待确认:简单,但吞吐量非常有限。
批量发布消息,批量同步等待确认:简单,合理的吞吐量,但当出现问题时很难推理。
异步处理:最好的性能和资源使用,在错误的情况下很好的控制,但可以参与正确实现。
PublisherConfirms.java 类包含我们介绍的技术的代码。我们可以编译它,按原样执行它,看看它们各自的表现如何:
javac -cp $CP PublisherConfirms.java java -cp $CP PublisherConfirms
输出将如下所示:
Published 50,000 messages individually in 5,549 ms Published 50,000 messages in batch in 2,331 ms Published 50,000 messages and handled confirms asynchronously in 4,054 ms
如果客户端和服务器位于同一台计算机上,您计算机上的输出应该看起来相似。 单独发布消息的性能不如预期,但与批量发布相比,异步处理的结果有点令人失望。
发布者确认非常依赖网络,所以我们最好尝试使用远程节点,因为客户端和服务器通常不在生产中的同一台机器上,这更现实。 PublisherConfirms.java 可以轻松更改为使用非本地节点:
static Connection createConnection() throws Exception { ConnectionFactory cf = new ConnectionFactory(); cf.setHost("remote-host"); cf.setUsername("remote-user"); cf.setPassword("remote-password"); return cf.newConnection(); }
重新编译类,再次执行,等待结果:
Published 50,000 messages individually in 231,541 ms Published 50,000 messages in batch in 7,232 ms Published 50,000 messages and handled confirms asynchronously in 6,332 ms
我们看到现在单独出版的表现非常糟糕。 但是对于客户端和服务器之间的网络,批量发布和异步处理现在执行类似,对于发布者确认的异步处理有一点优势。
请记住,批量发布实现起来很简单,但在发布者拒绝确认的情况下,很难知道哪些消息无法发送到代理。 异步处理发布者确认更涉及实现,但提供更好的粒度和更好地控制在发布消息被 nack 时执行的操作。