从 RabbitMQ 官网可以知道,它遵循了 AMQP 协议。换句话说,RabbitMQ 就是 AMQP 协议的 Erlang 版本实现。注意,RabbitMQ 不仅仅支持 AMQP 协议,还支持 STOMP(Simple(or Streaming)Text Oriented Messaging Protocol,简单(流)文本消息协议)、MQTT(Message Queuing Telemetry Transport,消息队列遥测传输) 等协议。
AMQP 的模型架构和 RabbitMQ 的模型架构是一样的,生产者将消息发送给交换器,交换器和队列进行绑定。当生产者将消息发送到交换器时,交换器根据所携带的路由键去匹配绑定时的绑定键。如果匹配成功,则消息将被存储到绑定键绑定的队列中。消费者可以订阅相应的队列来获取并消费消息。
RabbitMQ中的交换器、交换器类型、队列、绑定、路由键、绑定键等都是遵循的AMQP协议中相应的概念。目前,RabbitMQ 最新版本默认支持 AMQP 0-9-1。
AMQP协议本身包括三层,如下图:
图1:AMQP 0-9-1 层次结构
其中:
Module Layer:协议最高层,主要定义了一些供客户端调用的命令,客户端可以利用这些命令实现自己的业务逻辑。例如,客户端可以使用 Queue.Declare 命令声明一个队列或者使用 Basic.Consume 订阅消费一个队列中的消息。
Session Layer:中间层,主要负责将客户端的命令发送给服务器,再将服务端的应答返回给客户端,主要为客户端和服务端之间的通信提供可靠的同步机制和错误处理。
Transport Layer:最底层,主要传输二进制流,提供帧的处理、信道复用、错误检测和数据表示等。
为了形象的说明AMQP协议命令的流转过程,下面是使用 RabbitMQ 生产消息的简单代码。代码如下:
// 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); // 创建连接 Connection connection = factory.newConnection(); // 创建信道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 发送消息 String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); // 关闭连接,释放资源 channel.close(); connection.close();
生产者流转过程如下:
创建连接:当客户端与 Broker 建立连接的时候,会调用 factory.newConnection() 方法,这个方法会进一步封装成 AMQP 0-9-1 协议报文头发送给 Broker,告知 Broker 本次采用 AMQP 0-9-1 协议进行交互,紧接着返回 Connection.Start 来建立连接,在连接过程中涉及如下6个命令的交互:
Connection.Start/Connection.Start-OK
Connection.Tune/Connection.Tune-OK
Connection.Open/Connection.Open-OK
创建信道(Channel):成功创建连接后,将在 Connection 之上调用 connection.createChannel() 方法创建信道 Channel。该过程涉及 Channel.Open/Channel.Open-OK 命令。
发送消息:通过调用信道的 channel.queueDeclare() 方法来发送消息,它对应的 AMQP 协议的命令为 Basic.Publish。注意,Basic.Publish 命令还包含了 Content Header 和 Content Body。Content Header 里面包含了消息属性,例如优先级等。Content Body 包含消息主体。
关闭资源:当生产者发送完消息后,需要调用信道和连接的 channel.close()、connection.close() 方法。这里涉及了如下命令:
Channel.Close/Channel.Close-OK
Connection.Close/Connection.Close-OK
详细流转过程如下图:
为了形象的说明AMQP协议命令的流转过程,下面是使用 RabbitMQ 消费者的简单代码。代码如下:
// 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); // 创建连接 Connection connection = factory.newConnection(); // 创建信道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 消费消息 channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("[x] Received '" + message + "'"); } });
消费者流转过程如下:
创建连接:当客户端与 Broker 建立连接的时候,会调用 factory.newConnection() 方法,这个方法会进一步封装成 AMQP 0-9-1 协议报文头发送给 Broker,告知 Broker 本次采用 AMQP 0-9-1 协议进行交互,紧接着返回 Connection.Start 来建立连接,在连接过程中涉及如下6个命令的交互:
Connection.Start/Connection.Start-OK
Connection.Tune/Connection.Tune-OK
Connection.Open/Connection.Open-OK
创建信道(Channel):成功创建连接后,将在 Connection 之上调用 connection.createChannel() 方法创建信道 Channel。该过程涉及 Channel.Open/Channel.Open-OK 命令。
消费消息:通过调用信道的 channel.basicConsume() 方法订阅指定队列,它对于AMQP的 Basic.Consum、Basic.Consum-OK 命令。如果 Broker 向消费者推送消息,涉及到 AMQP 的 Basic.Deliver 命令。
详细流转过程如下图:
点击查看 AMQP 0-9-1 模型解释(查看更多AMQP命令)。