目录
本指南概述了 AMQP 0-9-1 协议,它是 RabbitMQ 支持的协议之一。
AMQP 0-9-1(高级消息队列协议)是一种消息协议,它使符合标准的客户端应用程序能够与符合标准的消息中间件代理进行通信。
消息代理从发布者(发布消息的应用程序,也称为生产者)接收消息,并将其路由到消费者(处理消息的应用程序)。
由于它是一种网络协议,发布者、消费者和代理都可以驻留在不同的机器上。
AMQP0-9-1模型的世界观如下:消息发布到交易所,通常被比作邮局或邮箱。然后,Exchanges 使用称为绑定的规则将消息副本分发到队列。然后代理要么将消息传递给订阅队列的消费者,要么消费者按需从队列中获取/拉取消息。
发布消息时,发布者可以指定各种消息属性(消息元数据)。这些元数据中的一些可能被代理使用,但是,其余的对代理完全不透明,并且仅由接收消息的应用程序使用。
网络不可靠,应用程序可能无法处理消息,因此 AMQP 0-9-1 模型具有消息确认的概念:当消息传递给消费者时,消费者会自动或在应用程序开发人员选择这样做时立即通知代理。当使用消息确认时,代理只有在收到该消息(或消息组)的通知时才会从队列中完全删除该消息。
在某些情况下,例如,当消息无法路由时,消息可能会返回给发布者、丢弃,或者,如果代理实现扩展,则将消息放入所谓的“死信队列”。发布者通过使用某些参数发布消息来选择如何处理此类情况。
Queue(队列)、Exchange(交换)和 Binding(绑定)统称为 AMQP 实体。
AMQP 0-9-1 是一种可编程协议,因为 AMQP 0-9-1 实体和路由方案主要由应用程序本身定义,而不是代理管理员。因此,为声明 Queue 和 Exchange、定义它们之间的 Binding、订阅队列等的协议操作做了规定。
这为应用程序开发人员提供了很大的自由,但也要求他们意识到潜在的定义冲突。在实践中,定义冲突很少见,通常表示配置错误。
应用程序声明他们需要的 AMQP 0-9-1 实体,定义必要的路由方案,并且可以选择在不再使用 AMQP 0-9-1 实体时删除它们。
Exchange 是消息发送到的 AMQP 0-9-1 实体。Exchange 接收消息并将其路由到零个或多个队列中。使用的路由算法取决于 Exchange 类型和称为绑定(Binding)的规则。AMQP 0-9-1 代理提供四种交换类型:
除了 Exchange 类型之外,Exchange 还声明了许多属性,其中最重要的是:
Name
Durability (exchanges 在 broker 重启后幸存下来)
Auto-delete (当最后一个队列与它解除绑定时,exchange 被删除)
Arguments (可选,由插件和特定于代理的功能使用)
Exchange 可以是持久的或短暂的。持久 exchange 在代理重启后仍然存在,而临时 exchange 则不会(当代理重新上线时必须重新声明它们)。并非所有场景和用例都要求 exchange 是持久的。
默认 exchange 是代理预先声明的没有名称(空字符串)的直接交换(direct exchange)。 它有一个特殊的属性,使它对简单的应用程序非常有用:创建的每个队列都会自动绑定到它,并使用与队列名称相同的路由键。
例如,当您声明一个名为 “search-indexing-online” 的队列时,AMQP 0-9-1 代理将使用“search-indexing-online” 作为路由键将其绑定到默认交换(在此 context 有时称为绑定键)。因此,使用路由键“search-indexing-online” 发布到默认交换的消息将被路由到队列 “search-indexing-online”。换句话说,默认的交换使得看起来可以将消息直接传递到队列,即使从技术上讲这不是正在发生的事情。
直接交换(Direct Exchange)根据消息路由键将消息传递到队列。直接交换是消息的单播路由的理想选择(尽管它们也可用于多播路由)。下面是它的工作原理:
队列使用路由键 K 绑定到交换器
当具有路由键 R 的新消息到达直接交换时,如果 K = R,则交换将其路由到队列
直接交换通常用于以循环方式在多个工作人员(同一应用程序的实例)之间分配任务。这样做时,重要的是要了解,在 AMQP 0-9-1 中,消息在消费者之间而不是队列之间进行负载均衡。
直接交换可以用图形表示如下:
扇出交换(Fanout Exchange)将消息路由到绑定到它的所有队列,并且忽略路由键。如果 N 个队列绑定到一个扇出交换器,则当一条新消息发布到该交换器时,该消息的副本将传递到所有 N 个队列。扇出交换是消息广播路由的理想选择。
因为扇出交换向绑定到它的每个队列传递消息的副本,所以它的用例非常相似:
大型多人在线 (MMO) 游戏可以将其用于排行榜更新或其他全球活动
体育新闻网站可以使用扇出交换近乎实时地向移动客户端分发分数更新
分布式系统可以广播各种状态和配置更新
群聊可以使用扇出交换在参与者之间分发消息(虽然 AMQP 没有内置的出席概念,所以 XMPP 可能是更好的选择)
扇出交换可以用图形表示如下:
主题交换(Topic Exchange)基于消息路由键与用于将队列绑定到交换的模式之间的匹配将消息路由到一个或多个队列。主题交换类型通常用于实现各种发布/订阅模式变体。主题交换通常用于消息的多播路由。
主题交换有非常广泛的用例。每当一个问题涉及多个消费者/应用程序选择性地选择他们想要接收的消息类型时,就应该考虑使用主题交换。
示例用途:
分发与特定地理位置相关的数据,例如销售点
由多个工作人员完成的后台任务处理,每个工作人员都能够处理特定的任务集
股票价格更新(以及其他类型财务数据的更新)
涉及分类或标记的新闻更新(例如,仅针对特定运动或团队)
云中各种服务的编排
分布式架构/特定于操作系统的软件构建或打包,其中每个构建器只能处理一个架构或操作系统
标头交换(Headers Exchange)设计用于在多个属性上进行路由,这些属性比路由键更容易表示为消息标头。标头交换忽略路由键属性。相反,用于路由的属性取自 headers 属性。如果标头的值等于绑定时指定的值,则认为消息匹配。
可以使用多个用于匹配的标头将队列绑定到标头交换。在这种情况下,代理需要应用程序开发人员提供的更多信息,即,它应该考虑与任何标头匹配的消息,还是所有这些消息? 这就是 “x-match” 绑定参数的用途。当 “x-match” 参数设置为 “any” 时,只需一个匹配的标头值就足够了。或者,将 “x-match” 设置为 “all” 要求所有值必须匹配。
标头交换可以被视为 “类固醇的直接交换”。因为它们基于标头值进行路由,所以它们可以用作路由键不必是字符串的直接交换;例如,它可以是整数或哈希(字典)。
请注意,以字符串 x- 开头的标头不会用于评估匹配项。
AMQP 0-9-1 模型中的队列与其他消息和任务队列系统中的队列非常相似:它们存储应用程序使用的消息。队列与 exchange 共享一些属性,但也有一些额外的属性:
Name
Durable(队列将在代理重新启动后继续存在)
Exclusive(仅由一个连接使用,当该连接关闭时队列将被删除)
Auto-delete(当最后一个消费者取消订阅时,至少有一个消费者的队列被删除)
Arguments(可选;由插件和特定于代理的功能使用,例如消息 TTL、队列长度限制等)
在使用队列之前,必须先声明它。如果队列不存在,则声明队列将导致它被创建。如果队列已经存在并且其属性与声明中的相同,则声明将无效。当现有队列属性与声明中的不同时,将引发代码为 406 (PRECONDITION_FAILED) 的通道级异常。
应用程序可以选择队列名称或要求代理为它们生成名称。队列名称最多可以是 255 个字节的 UTF-8 字符。AMQP 0-9-1 代理可以代表应用程序生成唯一的队列名称。要使用此功能,请传递一个空字符串作为队列名称参数。生成的名称将与队列声明响应一起返回给客户端。
以 “amq” 开头的队列名称。保留供 broker 内部使用。尝试使用违反此规则的名称声明队列将导致通道级异常,回复代码为 403 (ACCESS_REFUSED)。
在 AMQP 0-9-1 中,队列可以声明为持久的或瞬态的。持久队列的元数据存储在磁盘上,而临时队列的元数据尽可能存储在内存中。
发布时的消息也有同样的区别。
在持久性很重要的环境和用例中,应用程序必须使用持久队列并确保发布将已发布消息标记为持久。
绑定是交换(exchange)使用(除其他外)将消息路由到队列的规则。要指示交换 E 将消息路由到队列 Q,Q 必须绑定到 E。绑定可能具有某些交换类型使用的可选路由键属性。路由键的目的是选择发布到交换的某些消息被路由到绑定队列。换句话说,路由键就像一个过滤器。
打个比方:
队列(Queue)就像您在纽约市的目的地
Exchange 就像肯尼迪机场
绑定(Bindings)是从肯尼迪国际机场到您的目的地的路线。可以有零种或多种方式来达到它
有了这个间接层,可以使用直接发布到队列来实现不可能或很难实现的路由场景,并且还消除了应用程序开发人员必须做的一定数量的重复工作。
如果一条消息无法路由到任何队列(例如,因为它发布到的交换没有绑定),它要么被丢弃,要么返回给发布者,这取决于发布者设置的消息属性。
除非应用程序可以使用它们,否则将消息存储在队列中是没有用的。在 AMQP 0-9-1 模型中,应用程序有两种方法可以做到这一点:
订阅以将消息传递给他们(“push API”):这是推荐的选项
轮询(“pull API”):这种方式效率非常低,在大多数情况下应该避免
使用 “push API”,应用程序必须表明有兴趣使用来自特定队列的消息。当他们这样做时,我们说他们注册了一个消费者,或者简单地说,订阅了一个队列。每个队列可以有多个消费者或注册一个独占消费者(在消费时从队列中排除所有其他消费者)。
每个消费者(订阅)都有一个称为消费者标签的标识符。它可用于取消订阅消息。消费者标签只是字符串。
消费者应用程序(即接收和处理消息的应用程序)有时可能无法处理单个消息,或者有时会崩溃。网络问题也有可能导致问题。这就提出了一个问题:代理应该什么时候从队列中删除消息? AMQP 0-9-1 规范让消费者对此进行控制。有两种确认方式:
在代理向应用程序发送消息后(使用 basic.deliver 或 basic.get-ok 方法)。
在应用程序发回确认后(使用 basic.ack 方法)。
前一种选择称为自动确认模型,而后者称为显式确认模型。使用显式模型,应用程序选择何时发送确认。它可以是在收到消息之后,或者在处理之前将其持久化到数据存储之后,或者在完全处理消息之后(例如,成功获取网页,处理并将其存储到某个持久性数据存储中)。
如果一个消费者在没有发送确认的情况下死亡,代理将把它重新传递给另一个消费者,或者,如果当时没有可用的消费者,代理将等到至少一个消费者注册到同一个队列,然后再尝试重新传递。
当消费者应用程序接收到消息时,对该消息的处理可能会成功,也可能不会成功。应用程序可以通过拒绝消息向代理指示消息处理已失败(或当时无法完成)。拒绝消息时,应用程序可以要求代理丢弃或重新排队。当队列中只有一个消费者时,请确保不要通过一遍又一遍地拒绝和重新排队来自同一消费者的消息来创建无限的消息传递循环。
使用 basic.reject 方法拒绝消息。basic.reject 有一个限制:无法像使用确认一样拒绝多条消息。但是,如果您使用的是 RabbitMQ,那么有一个解决方案。RabbitMQ 提供了一个 AMQP 0-9-1 扩展,称为否定确认或 nacks。有关更多信息,请参阅确认和 basic.nack 扩展指南。
对于多个消费者共享一个队列的情况,能够指定每个消费者可以在发送下一个确认之前一次发送多少条消息是很有用的。如果消息倾向于批量发布,这可以用作简单的负载平衡技术或提高吞吐量。例如,如果生产应用程序由于其工作的性质而每分钟发送消息。
请注意,RabbitMQ 仅支持通道级预取计数,不支持基于连接或大小的预取。
AMQP 0-9-1 模型中的消息具有属性。某些属性非常常见,以至于 AMQP 0-9-1 规范定义了它们,应用程序开发人员不必考虑确切的属性名称。一些例子是:
Content type
Content encoding
Routing key
Delivery mode (persistent or not)
Message priority
Message publishing timestamp
Expiration period
Publisher application id
AMQP 代理使用了一些属性,但大多数属性都可以由接收它们的应用程序解释。 一些属性是可选的,称为标题。 它们类似于 HTTP 中的 X-Headers。 消息属性在消息发布时设置。
消息还有一个有效负载(它们携带的数据),AMQP 代理将其视为一个不透明的字节数组。 代理不会检查或修改有效负载。 消息可能只包含属性而不包含有效负载。 通常使用 JSON、Thrift、Protocol Buffers 和 MessagePack 等序列化格式来序列化结构化数据,以便将其作为消息有效负载发布。协议对等点通常使用 “content-type” 和 “content-encoding” 字段来传达此信息,但这只是约定俗成的。
消息可以作为持久性发布,这使得代理将它们持久化到磁盘。 如果服务器重新启动,系统会确保接收到的持久消息不会丢失。 简单地将消息发布到持久交换或它路由到的队列是持久的这一事实并不能使消息持久化:这完全取决于消息本身的持久性模式。 将消息作为持久性发布会影响性能(就像数据存储一样,持久性是以一定的性能成本为代价的)。
由于网络不可靠且应用程序失败,因此通常需要某种处理确认。 有时只需要确认已收到消息这一事实。 有时,确认意味着消息已由消费者验证和处理,例如,验证为具有强制性数据并保存到数据存储或索引。
这种情况很常见,因此 AMQP 0-9-1 有一个内置功能,称为消息确认(有时称为 acks),消费者使用它来确认消息传递和/或处理。 如果应用程序崩溃(连接关闭时 AMQP 代理会注意到这一点),如果预期消息的确认但 AMQP 代理未收到,则消息将重新排队(并可能立即传递给另一个消费者,如果有的话存在)。
在协议中内置确认有助于开发人员构建更强大的软件。
AMQP 0-9-1 由多种方法构成。方法是操作(如 HTTP 方法),与面向对象编程语言中的方法没有任何共同之处。 AMQP 0-9-1 中的协议方法被分组到类中。 类只是 AMQP 方法的逻辑分组。 AMQP 0-9-1 参考包含所有 AMQP 方法的完整细节。
让我们看一下 exchange 类,一组与 exchange 操作相关的方法。它包括以下操作:
exchange.declare
exchange.declare-ok
exchange.delete
exchange.delete-ok
(请注意,RabbitMQ 站点参考还包括特定于 RabbitMQ 的交换类扩展,我们不会在本指南中讨论)。
上述操作形成逻辑对:exchange.declare 和 exchange.declare-ok,exchange.delete 和 exchange.delete-ok。这些操作是 “请求”(由客户发送)和 “响应”(由 broker 发送以响应上述 “请求”)。
例如,客户端要求代理使用 exchange.declare 方法声明一个新的交换:
如上图所示,exchange.declare 带有几个参数。它们使客户端能够指定交换名称、类型、持久性标志等。
如果操作成功,代理会使用 exchange.declare-ok 方法进行响应:
exchange.declare-ok 不携带除频道号以外的任何参数(本指南稍后将介绍频道)。
AMQP 0-9-1 队列方法类上的另一个方法对的事件顺序非常相似:queue.declare 和 queue.declare-ok:
并非所有 AMQP 0-9-1 方法都有对应的方法。一些(basic.publish 是使用最广泛的一种)没有相应的“响应”方法,而另一些(例如 basic.get)有多个可能的“响应”。
AMQP 0-9-1 连接通常是长期存在的。 AMQP 0-9-1 是一个应用层协议,它使用 TCP 进行可靠传输。 连接使用身份验证,并且可以使用 TLS 进行保护。 当应用程序不再需要连接到服务器时,它应该优雅地关闭其 AMQP 0-9-1 连接,而不是突然关闭底层 TCP 连接。
一些应用程序需要到代理的多个连接。然而,同时保持许多 TCP 连接打开是不可取的,因为这样做会消耗系统资源并使得配置防火墙更加困难。 AMQP 0-9-1 连接与通道复用,可以被认为是 “共享单个 TCP 连接的轻量级连接”。
客户端执行的每个协议操作都发生在通道上。特定通道上的通信与另一个通道上的通信完全分开,因此每个协议方法还带有一个通道 ID(也称为通道号),代理和客户端都使用一个整数来确定该方法适用于哪个通道。
通道仅存在于连接的上下文中,从不单独存在。当连接关闭时,其上的所有通道也会关闭。
对于使用多个线程/进程进行处理的应用程序,很常见的是为每个线程/进程打开一个新通道,并且它们之间不共享通道。
为了使单个代理可以托管多个隔离的 “环境”(用户组、交换器、队列等),AMQP 0-9-1 包含了虚拟主机(vhost)的概念。 它们类似于许多流行的 Web 服务器使用的虚拟主机,并提供完全隔离的 AMQP 实体所在的环境。 协议客户端指定他们想要在连接协商期间使用的虚拟主机。
AMQP 0-9-1 有几个扩展点:
自定义交换类型让开发人员可以实现开箱即用的交换类型不能很好地涵盖的路由方案,例如基于地理数据的路由。
交换和队列的声明可以包括代理可以使用的附加属性。例如,RabbitMQ 中的 per-queue 消息 TTL 就是这样实现的。
协议的特定于代理的扩展。例如,请参阅 RabbitMQ 实现的扩展。
可以引入新的 AMQP 0-9-1 方法类。
代理可以通过额外的插件进行扩展,例如,RabbitMQ 管理前端和 HTTP API 作为插件实现。
这些特性使 AMQP 0-9-1 模型更加灵活,适用于非常广泛的问题。
许多流行的编程语言和平台都有许多 AMQP 0-9-1 客户端。 其中一些严格遵循 AMQP 术语,仅提供 AMQP 方法的实现。 其他一些具有附加功能、便利方法和抽象。 一些客户端是异步的(非阻塞的),一些是同步的(阻塞的),一些支持这两种模型。 一些客户端支持特定于供应商的扩展(例如,特定于 RabbitMQ 的扩展)。
因为 AMQP 的主要目标之一是互操作性,所以开发人员最好了解协议操作,而不是局限于特定客户端库的术语。 这种方式与使用不同库的开发人员进行交流将变得更加容易。
如果您对本指南的内容或与 RabbitMQ 相关的任何其他主题有任何疑问,请随时在 RabbitMQ 邮件列表中提问。
如果您想对该站点做出改进,可以在 GitHub 上找到其源代码。只需 fork 存储库并提交拉取请求。谢谢!