RabbitMQ 教程

队列(Queue)

什么是队列?

队列是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。进行插入操作的端称为队尾,进行删除操作的端称为队头。队列中没有元素时,称为空队列。

队列的数据元素又称为队列元素。在队列中插入一个队列元素称为入队,从队列中删除一个队列元素称为出队。因为队列只允许在一端插入,在另一端删除,所以只有最早进入队列的元素才能最先从队列中删除,故队列又称为先进先出(FIFO—first in first out)线性表。

RabbitMQ 中的队列

在 RabbitMQ 中,队列(Queue)是它的内部对象,用于存储消息。生产者将消息发送给交换器(Exchange)后,交换器根据路由规则将消息路由到合适的队列中。如下图:

上图中,如果生产者生产的消息指定了一个路由键,并且该路由键等于“绑定键1”,则交换器会将消息路由到队列1。

注意,RabbitMQ 中消息都只能存储在队列中。RabbitMQ 的生产者生产消息最终将投递到队列中,消费者可以从队列中获取消息并消费。

多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊给多个消费者进行消费,而不是每个消费者都收到所有的消息并消费。如下图:

上图中,两个消费者均订阅了队列1,所有发送给队列1的消息将平均发送给两个消费者。

声明队列

在 RabbitMQ Java 客户端中,可以通过信道 Channel 的 channel.queueDeclare() 方法声明一个队列。方法定义如下:

Queue.DeclareOk queueDeclare() throws IOException;
Queue.DeclareOk queueDeclare(String queue, boolean durable , boolean exclusive,  boolean autoDelete , Map<String, Object> arguments) throws IOException;

不带任何参数的 queueDeclare() 方法默认创建一个由 RabbitMQ 命名的(类似这种 amq.gen-***** 名称,这种队列称为匿名队列)、排他的、自动删除的、非持久化的队列。

方法参数说明如下:

  • queue:队列的名称

  • durable:设置队列是否持久化。如果设置为true,则队列为持久化队列。队列信息将持久化到磁盘,重启 RabbitMQ 服务可以保证不丢失队列信息。

  • exclusive:设置队列是否排他。如果设置为true,则队列为排他队列。如果一个队列被声明为排他队列,该队列仅对首次声明它的连接(Connection)可见,并在连接断开是自动删除。注意,排他队列是基于连接( Connection )可见的,同 一个连接的不同信道( Channel) 是可以同时访问同一连接创建的排他队列;“首次”是指如果一个连接己经声明了排他队列,其他连接是不允许建立同名的排他队列,这个与普通队列不同;即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除,这种队列适用于一个客户端同时发送和读取消息的应用场景。

  • autoDelete:设置队列是否自动删除。如果设置为 true,则设置队列为自动删除。自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除该队列。

  • arguments :设置队列的其他一些参数,如:x-message-ttl、x-expires、x-max-length 等。

注意:生产者和消费者都能够使用 queueDeclare() 来声明一个队列,但是如果消费者在同一个信道上订阅了另一个队列,就无法再声明队列了。必须先取消订阅,然后将信道直为“传输”模式,之后才能声明队列。

删除队列

删除队列使用信道的 queueDelete() 方法,定义如下:

Queue.DeleteOk queueDelete(String queue) throws IOException;
Queue.DeleteOk queueDelete(String queue , boolean ifUnused, boolean ifEmpty )  throws IOException;
void queueDeleteNoWait(String queue, boolean ifUnused, boolean ifEmpty)  throws IOException ;

参数说明:

  • queue:表示队列名称

  • ifUnused:如果 isUnused 设置为 true,则只有在此交换器没有被使用的情况下才会被删除:

  • ifEmpty:设置为true表示在队列为空的情况下才能删除

队列和交换器绑定

将队列和交换器绑定的方法如下:

Queue.BindOk queueBind(String queue, String exchange , String routingKey) throws IOException;
Queue.BindOk queueBind(String queue, String exchange , String routingKey,  Map<String , Object> arguments) throws IOException;
void queueBindNoWait(String queue, String exchange, String routingKey ,  Map<String, Object> arguments) throws IOException ;

参数说明:

  • queue:表示队列名称

  • exchange:表示交换器名称

  • routingKey:用来绑定队列和交换器的绑定键

  • argument:定义绑定的一些参数

队列和交换器解绑

不仅可以将队列和交换器绑定起来,也可以将已经被绑定的队列和交换器进行解绑。方法定义如下:

Queue.UnbindOk queueUnbind (String queue, String exchange , String routingKey)  throws IOException; 
Queue.UnbindOk queueUnbind (String queue, String exchange , String routingKey ,  Map<String , Object> arguments) throws IOException;

参数说明:

  • queue:表示队列名称

  • exchange:表示交换器名称

  • routingKey:用来绑定队列和交换器的绑定键

  • argument:定义绑定的一些参数

何时创建队列?

RabbitMQ 的消息被存储在队列中,交换器的使用并不消耗太多服务器资源,主要资源消耗在队列。如果要衡量 RabbitMQ 当前的 QPS(Queries-per-second,即每秒查询率,是对一个特定的服务器在规定时间内所处理流量多少的衡量标准)只需要看队列即可。在实际业务中,需要对创建的队列的流量、内存占用有一个清晰的认知,预估其平均值和峰值,以便在固定的服务器资源下合理分配资源。

按照 RabbitMQ 官方建,生产者和消费者都应该尝试创建队列。但不是所有情况都如此,如果业务本身架构设计之初已经预估了队列的使用情况,则完全可以在业务上线之前手动创建队列(通过 RabbitMQ 管理页面、RabbitMQ 命令行)。

提前手动创建队列有一个好处,可以确保交换器和队列之间正确的绑定。避免由于认为因素、代码缺陷等,导致交换器和队列没有绑定好,最终消息被丢失。与此同时,预估好队列的使用情况非常重要,如果在程序运行后期中,服务器资源不可用。可以根据实际情况对当前服务器资源进行扩展,将相应的队列迁移到其他服务器。

至于是使用预先分配队列还是动态创建队列,需要从业务逻辑本身考虑。

说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
公众号