RabbitMQ 教程

Federation Web管理界面

前面章节简单介绍了 Feduration 插件,以及怎样安装 Feduration 插件。本章节将介绍怎样使用 Web 管理界面的方式来设置 Feduration 相关的运行参数和策略(Federation upstream)。

下面将详细讲解如何正确地使用 Federation 插件,以 broker1(IP地址:192.168.116.51)和 broker2(IP地址:192.168.116.52)为例,在两者之间建立 Feduration exchange。其中:

  • broker1(192.168.116.51)为下游Broker

  • broker2(192.168.116.52)为上游Broker

我们将在 broker1 中创建 Federation Upstreams,从 broker2 中同步消息。最终效果,我们在 broker2 中的队列中发送一个消息,Feduration 插件将自动把消息同步到 broker1 对应的队列,客户端从 broker1 的队列中消费该消息。

第一步:开启插件

在 broker1 和 broker2 中开启 rabbitmq federation 插件和 rabbitmq federation management 插件(如果不知道怎么开启,可以参考 Federation 插件安装)。

第二步:创建 Exchange 交换器

访问 http://192.168.116.51:15672/#/exchanges  地址,点击“Add a new exchange”按钮添加交换器,如下图:

其中,交换器名称为 federated_exchange,类型为 fanout。

第三步:创建 Queue 队列

访问 http://192.168.116.51:15672/#/queues  地址,点击“Add a new queue”按钮添加队列,如下图:

其中,队列名称为 federated_queue。

第四步:绑定 Exchange 和 Queue

继续访问 http://192.168.116.51:15672/#/exchanges 地址,然后选择上面创建的 federated_exchange 交换器,点击“Bindings” 按钮添加绑定关系,如下图:

上图中,将 federated_exchange 交换器和 federated_queue 队列进行绑定,不使用任何绑定 KEY,这是因为交换器类型为 fanout。

第五步:添加 Federation Upstream

访问 http://192.168.116.51:15672/#/federation-upstreams 地址,在 broker1 中定义一个 Federation upstream。打开 Web 管理界面 “Admin” → “Federation Upstream”,点击 “Add a new upstream” 按钮进行创建,如下图:

上图中,Upstream 名称为 upstream-test,URI 为 amqp://root:aaaaaa@192.168.116.52:5672(如果你没有 root 用户,则需要先在 broker2 上创建 root 用户),Ack Mode 为 on-confirm。

表单的各个参数的含义如下(注意:括号中对应的是采用 rabbitmqctl 工具或者调用 HTTP API 接口的方式所对应的相关参数名称):

通用参数

  • Name:必填项,定义这个 upstream 的名称。

  • URI (uri):必填项,定义 upstream AMQP 连接。例如:amqp://root:123456@192.168.116.52:5672

  • Prefetch count (prefetch count):定义 Federation 内部缓存的消息条数,即在收到上游消息之后且在发送到下游之前缓存的消息条数。

  • Reconnect delay (reconnect-delay):Federation link 由于某种原因断开之后,需要等待多少秒开始重新建立连接。

  • Acknowledgement Mode (ack-mode):定义 Federation link 的消息确认方式。共有 3 种:

    • on-confirm 默认方式,表示在接收到下游的确认消息(等待下游的 Basic.Ack)之后再向上游发送消息确认,这个选项可以确保网络失败或者 Broker 宕机时不会丢失消息,但也是处理速度最慢的选项。

    • on-publish 表示消息发送到下游后(并需要等待下游的 Basic.Ack)再向上游发送消息确认,这个选项可以确保在网络失败的情况下不会丢失消息,但不能确保 Broker 岩机时不会丢失消息。

    • no-ack 表示无须进行消息确认,这个选项处理速度最快,但也最容易丢失消息。

  • Trust User-ID (trust-user-id):设定 Federation 是否使用 “Validated User-ID” 这个功能。如果设置为 false 或者没有设置,那么 Federation 会忽略消息的 user_id 这个属性;如果设置为 true,则 Federation 只会转发 user_id 为上游任意有效的用户的消息。

        所谓的 “Validated User-ID” 功能是指发送消息时验证消息的 user_id 的属性,在 channel.basicPublish 方法中有个参数是 BasicProperties,这个 BasicProperties 类中有个属性为 userId。可以通过如下的方法设置消息的 user_id 属性为 “root”:

AMQP.BasicProperties properties= new AMQP.BasicProperties(); 
properties.setUserid("root"); 
channel.basicPublish("amq.fanout", "", properties, "test user id" .getBytes());

        如果在连接 Broker 时所用的用户名为 “root”,当发送 “test user id” 这条消息时设置的 user_id 的属性为 “guest”。那么这条消息会发送失败,具体报错为“406 PRECONDITION_FAILED - user_id property set to 'guest' but authenticated user was 'root'”,只有当 user_id 设置为 “root” 时这条消息才会发送成功。

Federated exchanges 相关参数

  • Exchange (exchange):指定 upstream exchange 的名称,默认情况下和 federated exchange 同名。

  • Max hops (max-hops):指定消息被丢弃前在 Federation link 中最大的跳转次数。默认为 1。注意即使设置 max-hops 参数为大于 1 的值,同一条消息也不会在同一个 Broker 中出现 2 次,但是有可能会在多个节点中被复制。

  • Expires (expires):指定 Federation link 断开之后,federated queue 所对应的 upstream queue 的超时时间,默认为 “none”。none 表示不删除,单位为 ms。这个参数相当于设置普通队列的 x-expires 参数。设置这个值可以避免 Federation link 断开之后,生产者一直在向 brokerl 中的 exchangeA 发送消息,这些消息又不能被转发到 broker2 中而被消费掉,进而造成 brokerl 中有大量的消息堆积。

  • Message TTL (message-ttl):为 federated queue 所对应的 upstream queue 设置 TTL,相当于普通队列的x-message-ttl 参数。默认为 “none”,表示消息没有超时时间。

  • HA policy (ha-policy):为 federated queue 所对应的 upstream queue 设置 HA,相当于普通队列的 x-ha-policy 参数,默认为 “none”,表示队列没有任何 HA。

Federated queues 相关参数

  • Queue (queue):执行 upstream queue 的名称,默认情况下和 federated queue 同名。

  • Consumer tag:可选地,从 upstream 消费消息时使用的消费者标签。

第六步:定义 Policy

访问 http://192.168.116.51:15672/#/policies  地址,点击“Add / update a policy”按钮创建一个新的策略 Policy。如下图:

上图中,设置策略名称为 federated_policy,模式为 ^federated.+ 等等。

第七步:查看 Federation Status 状态

如果你成功完成了上面所有步骤,那么直接访问 http://192.168.116.51:15672/#/federation  地址,点击“Running Links”按钮查看正在运行的连接。如下图:

上图中的连接就是我们刚刚创建好的 Federation 连接信息。如果能正常看见连接信息,则 Federation Upstream 创建成功了,不容易啊!!!接下来开始测试是否可用

第八步:发送消息

到这里,前面已经成功创建了 Federation Upstream,接下来移步到 broker2,访问 http://192.168.116.52:15672/#/exchanges 地址,查看我们创建的 federated_exchange 交换器(注意:该交换器不需要手动创建,当我们在 broker1 中成功创建 Federation Upstream 后,将会自动为我们创建一个同名的交换器和队列到 broker2)。如下图:

选中 federated_exchange 队列,点击“Publish message”按钮去发送消息,如下图:

上图中,只在 Payload 中填写了消息内容“hello message”,并发送。

第九步:消费消息

上一步骤成功发送了消息,我们返回到 broker1 上,访问 http://192.168.116.51:15672/#/queues  地址,如下图:

选中 federated_queue 队列,点击“Get messages”按钮去消费一个消息。如下图:

上图中,成功地消费了从 broker2 发送过来的消息。

说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
公众号