RabbitMQ 教程

AMQP 协议介绍

从 RabbitMQ 官网可以知道,它遵循了 AMQP 协议。换句话说,RabbitMQ 就是 AMQP 协议的 Erlang 版本实现。注意,RabbitMQ 不仅仅支持 AMQP 协议,还支持 STOMP(Simple(or Streaming)Text Oriented Messaging Protocol,简单(流)文本消息协议)、MQTT(Message Queuing Telemetry Transport,消息队列遥测传输) 等协议。

AMQP 的模型架构和 RabbitMQ 的模型架构是一样的,生产者将消息发送给交换器,交换器和队列进行绑定。当生产者将消息发送到交换器时,交换器根据所携带的路由键去匹配绑定时的绑定键。如果匹配成功,则消息将被存储到绑定键绑定的队列中。消费者可以订阅相应的队列来获取并消费消息。

RabbitMQ中的交换器、交换器类型、队列、绑定、路由键、绑定键等都是遵循的AMQP协议中相应的概念。目前,RabbitMQ 最新版本默认支持 AMQP 0-9-1。

AMQP协议本身包括三层,如下图:

图1:AMQP 0-9-1 层次结构

其中:

  • Module Layer:协议最高层,主要定义了一些供客户端调用的命令,客户端可以利用这些命令实现自己的业务逻辑。例如,客户端可以使用 Queue.Declare 命令声明一个队列或者使用 Basic.Consume 订阅消费一个队列中的消息。

  • Session Layer:中间层,主要负责将客户端的命令发送给服务器,再将服务端的应答返回给客户端,主要为客户端和服务端之间的通信提供可靠的同步机制和错误处理。

  • Transport Layer:最底层,主要传输二进制流,提供帧的处理、信道复用、错误检测和数据表示等。

AMQP生产者流转过程

为了形象的说明AMQP协议命令的流转过程,下面是使用 RabbitMQ 生产消息的简单代码。代码如下:

// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
// 创建连接
Connection connection = factory.newConnection();
// 创建信道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 发送消息
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
// 关闭连接,释放资源
channel.close();
connection.close();

生产者流转过程如下:

  • 创建连接:当客户端与 Broker 建立连接的时候,会调用 factory.newConnection() 方法,这个方法会进一步封装成 AMQP 0-9-1 协议报文头发送给 Broker,告知 Broker 本次采用 AMQP 0-9-1 协议进行交互,紧接着返回 Connection.Start 来建立连接,在连接过程中涉及如下6个命令的交互:

    • Connection.Start/Connection.Start-OK

    • Connection.Tune/Connection.Tune-OK 

    • Connection.Open/Connection.Open-OK

  • 创建信道(Channel):成功创建连接后,将在 Connection 之上调用 connection.createChannel() 方法创建信道 Channel。该过程涉及 Channel.Open/Channel.Open-OK 命令。

  • 发送消息:通过调用信道的 channel.queueDeclare() 方法来发送消息,它对应的 AMQP 协议的命令为 Basic.Publish。注意,Basic.Publish 命令还包含了 Content Header 和 Content Body。Content Header 里面包含了消息属性,例如优先级等。Content Body 包含消息主体。

  • 关闭资源:当生产者发送完消息后,需要调用信道和连接的 channel.close()、connection.close() 方法。这里涉及了如下命令:

    • Channel.Close/Channel.Close-OK

    • Connection.Close/Connection.Close-OK

详细流转过程如下图:

AMQP消费者流转过程

为了形象的说明AMQP协议命令的流转过程,下面是使用 RabbitMQ 消费者的简单代码。代码如下:

// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
// 创建连接
Connection connection = factory.newConnection();
// 创建信道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 消费消息
channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
            AMQP.BasicProperties properties, byte[] body)
            throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println("[x] Received '" + message + "'");
    }
});

消费者流转过程如下:

  • 创建连接:当客户端与 Broker 建立连接的时候,会调用 factory.newConnection() 方法,这个方法会进一步封装成 AMQP 0-9-1 协议报文头发送给 Broker,告知 Broker 本次采用 AMQP 0-9-1 协议进行交互,紧接着返回 Connection.Start 来建立连接,在连接过程中涉及如下6个命令的交互:

    • Connection.Start/Connection.Start-OK

    • Connection.Tune/Connection.Tune-OK 

    • Connection.Open/Connection.Open-OK

  • 创建信道(Channel):成功创建连接后,将在 Connection 之上调用 connection.createChannel() 方法创建信道 Channel。该过程涉及 Channel.Open/Channel.Open-OK 命令。

  • 消费消息:通过调用信道的 channel.basicConsume() 方法订阅指定队列,它对于AMQP的 Basic.Consum、Basic.Consum-OK 命令。如果 Broker 向消费者推送消息,涉及到 AMQP 的 Basic.Deliver 命令。

详细流转过程如下图:

点击查看 AMQP 0-9-1 模型解释(查看更多AMQP命令)。

说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
公众号