RabbitMQ 教程

交换器(Exchange)

在 RabbitMQ 中,交换器(Exchange)是一个非常重要、也是非常常用的组件。RabbitMQ 中,生产者总是将消息发送给交换器,由交换器(Exchange)根据一定的规则将消息路由到指定的队列(Queue)。如下图:

上图中,交换器位于生产者和队列之间。生产者将消息发送给交换器(发送消息时需要指定一个路由键 RoutingKey),交换器根据路由键与绑定键进行匹配(当路由键和绑定键匹配时,消息将被路由到对应的队列中),将消息路由到匹配的队列。

交换器类型

交换器支持四种交换类型,不同的类型有着不同的路由策略。下面将分别介绍这四种类型:

Direct

direct 类型的交换器路由规则很简单,它会把消息路由到那些绑定键(BindingKey)和路由键(RoutingKey)完全匹配的队列中。如下图:

上图中,如果 routing_key 等于“images.archive”,则将消息路由到 archiver1 和 archiver2 队列。如果 routing_key 等于“images.crop”,则将消息路由到 cropper 队列。如果 routing_key 等于“images.resize”,则将消息路由到 resize 队列。

Fanout

它会把所有发送到该交换器的消息路由到所有绑定到该交换器上的队列中。如下图:

上图中,如果生产者向 Exchange 发送一个消息,则该消息将分别发送给绑定到 Exchange 的 Queue。

Topic

direct 类型的交换器路由规则是队列和消息的绑定键和路由键完全匹配,则将消息路由到该队列。但是这种严格的匹配方式在很多情况下并不能满足实际业务需要。

而 topic 类型的交换器对 direct 类型进行了扩展,路由规则为:判断队列和消息的绑定键和路由键是否匹配(这里做模糊匹配),这里为匹配定义了如下规则:

  • 路由键(RoutingKey)和绑定键(BindingKey)是一个由点号(.)分割的字符串。例如:hxstrive.com、www.hxstrive.com、demo.hxstrive.com 等

  • 绑定键(BindingKey)中可以存在两种特殊字符串,用于做模糊匹配:

    • *(星号):用于匹配一个单词,例如:www.hxstrive.com 将匹配 *.hxstrive.com;www.hxstrive.com 也将匹配 *.hxstrive.*

    • #(井号):用于匹配零个或多个单词,例如:#.hxstrive.com 可以匹配 www.hxstrive.com、h1.demo.hxstrive.com

Headers

headers 类型的交换器不依赖路由键的匹配规则来路有消息,而是根据发送消息内容中的消息头属性进行匹配。在绑定队列和交换器指定一组键值对。当发送消息到交换器时,RabbitMQ 会获取该消息的消息头,将消息头与绑定队列和交换器指定键值对进行匹配。如果匹配,则消息会路由到该队列。否则,不会路由到该队列。

在绑定队列和交换器时,可以在绑定键中指定一个特殊的键“x-match”来设置 RabbitMQ 的 headers 类型交换器匹配规则。x-match 可取值如下:

  • all:完全匹配,如果消息头与绑定队列和交换器指定键值对完全匹配,则将消息路由到该队列。否则,不路由到该队列。例如:绑定队列和交换器时,指定了 key1=val1,key2=val2,key3=val3 三个键值对,而发送的消息指定了 key1=val1,key2=val2 两个键值对,则不会将消息路由到该队列。如果消息指定了  key1=val1,key2=val2,key3=val3 三个键值对,则将消息路由到该队列。

  • any:部分匹配,如果消息头与绑定队列和交换器指定键值对任意一个匹配,则将消息路由到该队列。如果全部都不匹配,不路由到该队列。例如:绑定队列和交换器时,指定了 key1=val1,key2=val2,key3=val3 三个键值对,而发送的消息指定了 key2=val2 两个键值对,则将消息路由到该队列。

在绑定队列和交换器时,怎样通过 java 代码指定 x-match 呢?代码如下:

String queueName = channel.queueDeclare().getQueue();
// 绑定的header关键字
Map<String,Object> headers = new HashMap<String,Object>();
headers.put("x-match", "any"); // 设置匹配方式
headers.put("level", "error");
headers.put("package", "com.hxstrive");
channel.queueBind(queueName, EXCHANGE_NAME, "", headers);

声明交换器

不管是生产者还是消费者在发送/消费消息前,均需要声明一个交换器(如果已经存在,则不需要声明)。在 RabbitMQ 的 java 客户端中,可以通过 channel.exchangeDeclare() 方法去声明交换器。exchangeDeclare() 方法被重载:

Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException;
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,
                                   Map<String, Object> arguments) throws IOException;
Exchange.DeclareOk exchangeDeclare(String exchange,
                                          String type,
                                          boolean durable,
                                          boolean autoDelete,
                                          boolean internal,
                                          Map<String, Object> arguments) throws IOException;

参数说明:

  • exchange:交换器的名称

  • type:交换的类型,取值 fanout、direct、topic、heades

  • durable:设置是否持久化。如果设置为true,开启持久化功能。持久化可以将交换器信息存储到磁盘,重启 RabbitMQ 服务该交换器信息不会丢失。

  • autoDelete:设置是否自动删除。如果设置为 true,表示启用自动删除功能。自动删除的前提是至少有一个队列或者交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑,则触发自动删除。

  • internal:设置是否是内置的。如果设置为true,表示是内置的交换器,客户端程序无法直接发送消息到内置交换器中,只能通过交换器路由到内置交换器。

  • arguments:其他一些结构化参数,比如:alternate-exchange等

除了上面四个 exchangeDeclare() 方法外,RabbitMQ Java 客户端还定义了一个 exchangeDeclareNoWait() 方法。它的定义如下:

void exchangeDeclareNoWait(String exchange,
                           String type,
                           boolean durable,
                           boolean autoDelete,
                           boolean internal,
                           Map<String, Object> arguments) throws IOException;

这个 exchangeDeclareNoWait() 比 exchangeDeclare() 多设置了一个 nowait 参数(参数设置在 arguments 中),这个 nowait 参数指的是 AMQP 中 Exchange.Declare 命令的参数,意思是不需要服务器返回。注意这个方法的返回值是 void ,而普通的 exchangeDeclare() 方法的返回值是 Exchange.DeclareOk,意思是在客户端声明了一个交换器之后,需要等待服务器的返回(服务器会返回 Exchange.Declare-Ok 这个 AMQP 命令)。

删除交换器

删除交换器的方法定义如下:

Exchange.DeleteOk exchangeDelete(String exchange) throws IOException;
Exchange.DeleteOk exchangeDelete(String exchange , boolean ifUnused) throws  IOException; 
void exchangeDeleteNoWait(String exchange , boolean ifUnused) throws  IOException;

参数说明:

  • exchange:表示交换器的名称

  • ifUnused:用来设置是否在交换器没有被使用的情况下删除交换器。如果 isUnused 设置为 true,则只有在此交换器没有被使用的情况下才会被删除:如果设置 false ,则无论如何这个交换器都要被删除。

交换器与交换器绑定

交换器与交换器绑定的方法定义如下:

Exchange.BindOk exchangeBind(String destination , String source, String  routingKey) throws IOException;
Exchange.BindOk exchangeBind(String destination , String source, String  routingKey, Map<String , Object> arguments) throws IOException ; 
void exchangeBindNoWait(String destination, String source , String routingKey,  Map<String, Object> arguments) throws IOException;

参数说明:

  • destination:交换器的名称

  • source:交换器的名称

  • routingKey:路由键

  • arguments:其他一些结构化参数,比如:alternate-exchange等

说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
公众号