在 RabbitMQ 之路由(Routing)教程中,我们改进了我们的日志系统。我们没有使用只能进行虚拟广播的扇出交换(Fanout exchange),而是使用了直接交换(Direct exchange),并获得了选择性接收日志的可能性。
尽管使用直接交换(Direct exchange)改进了我们的系统,但它仍然有局限性 ———— 它不能基于多个标准进行路由。
在我们的日志系统中,我们可能不仅要根据严重性订阅日志,还要根据发出日志的源来订阅。您可能从 syslog unix 工具中知道这个概念,该工具根据严重性(info/warn/crit...)和设施(auth/cron/kern...)路由日志。
这会给我们很大的灵活性 ———— 我们可能只想听来自 “cron” 的严重错误,也想听来自 “kern” 的所有日志。
为了在我们的日志系统中实现这一点,我们需要了解更复杂的主题交换器。
发送到主题交换的消息不能有任意的 routing_key ———— 它必须是单词列表,由点分隔。 这些词可以是任何东西,但通常它们指定与消息相关的一些特征。一些有效的路由键示例:“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”。路由键中可以有任意多的单词,最多为 255 个字节。
绑定键也必须采用相同的格式。主题交换器背后的逻辑类似于直接交换器 ———— 使用特定路由键发送的消息将被传递到与匹配绑定键绑定的所有队列。但是,绑定键有两个重要的特殊情况:
*(星号)可以只替换一个单词。
# (哈希) 可以代替零个或多个单词。
在一个例子中最容易解释这一点:
在这个例子中,我们将发送所有描述动物的消息。消息将使用由三个单词(两个点)组成的路由键发送。路由键中的第一个词将描述速度,第二个是颜色,第三个是物种:“<speed>.<colour>.<species>”。
我们创建了三个绑定:Q1 与绑定键 “*.orange.*” 绑定,Q2 与 “*.*.rabbit” 和 “lazy.#” 绑定。
这些绑定可以概括为:
Q1 对所有橙色动物都感兴趣。
Q2 想听听关于兔子的一切,以及关于懒惰动物的一切。
路由键设置为 “quick.orange.rabbit” 的消息将被传递到两个队列。消息 “lazy.orange.elephant” 也将发送给他们两个。另一方面,“quick.orange.fox” 只会进入第一个队列,而 “lazy.brown.fox” 只会进入第二个队列。 “lazy.pink.rabbit” 只会被传递到第二个队列一次,即使它匹配两个绑定。“quick.brown.fox” 与任何绑定都不匹配,因此将被丢弃。
如果我们违反合约并发送带有一四个词的消息,例如 “orange” 或 “quick.orange.male.rabbit”,会发生什么? 好吧,这些消息不会匹配任何绑定并且会丢失。
另一方面,“lazy.orange.male.rabbit”,即使它有四个单词,也会匹配最后一个绑定并被传递到第二个队列。
主题交换器功能强大,可以像其他交换器一样运行。
当队列与 “#”(散列)绑定键绑定时 ———— 无论路由键如何,它都会接收所有消息 ———— 就像在扇出交换器(Fanout exchange)中一样。
当绑定中不使用特殊字符 “*”(星号)和 “#”(哈希)时,主题交换器的行为就像直接交换器一样。
我们将在日志系统中使用主题交换器。我们将从一个可行的假设开始,即日志的路由键将有两个词:“<facility>.<severity>”。
代码与上一个教程中的代码几乎相同。
EmitLogTopic.java 的代码:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String routingKey = getRouting(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'"); } } //.. }
ReceiveLogsTopic.java 的代码:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class ReceiveLogsTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1) { System.err.println("Usage: ReceiveLogsTopic [binding_key]..."); System.exit(1); } for (String bindingKey : argv) { channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
编译并运行示例,包括教程1中的类路径 ———— 在 Windows 上,使用 %CP%。编译:
javac -cp $CP ReceiveLogsTopic.java EmitLogTopic.java
接收所有日志:
java -cp $CP ReceiveLogsTopic "#"
要从设施 “kern” 接收所有日志:
java -cp $CP ReceiveLogsTopic "kern.*"
或者,如果您只想了解 “critical” 日志:
java -cp $CP ReceiveLogsTopic "*.critical"
您可以创建多个绑定
java -cp $CP ReceiveLogsTopic "kern.*" "*.critical"
并发出带有路由键 “kern.critical” 类型的日志:
java -cp $CP EmitLogTopic "kern.critical" "A critical kernel error"