在 RabbitMQ 中,回复队列主要用于接收 RPC 调用的响应消息。
RPC 是 Remote Procedure Call 的简称(即远程过程调用)。RPC 是一种通过网络请求远程计算机上的服务,而不需要了解网络的底层技术。
RPC 的主要作用是让构建分布式计算更容易,在提供强大的远程调用能力时不损失本地调用的语义简洁性(即调用远程方法或函数像调用本地方法或函数一样方便)。通俗点来说,假设有两台服务器 A 和 B,有一个应用部署在 A 服务器上,A 服务器上的应用想要调用 B 服务器上应用提供的服务(函数或方法),由于两台服务器上的应用在不同的服务器上,因此不能直接调用,需要通过网络来表达调用的语义和传输调用参数数据。
目前支持 RPC 的协议有很多,例如最早的 CORBA、Java RMI、Web Service RPC 风格、Hessian、Thrift 甚至还有 Restful API。
RabbitMQ 不仅可以当做消息(MQ)服务器,还能进行 RPC 调用,而且使用方法还很简单。客户端发送请求消息,服务端回复响应的消息。为了接收服务器响应的消息,我们需要在请求消息中指定一个回复队列。回复队列通过发送消息时的 replyTo 属性进行指定,关键代码如下:
Channel channel = connection.createChannel(); // 由 RabbitMQ 自动创建队列名 String replyQueueName = channel.queueDeclare().getQueue(); // 消息唯一标识,消费的时候需要进行对别 String corrId = UUID.randomUUID().toString(); // 通过属性设置回复队列 AMQP.BasicProperties props = new AMQP.BasicProperties.Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); // 发送一个消息 channel.basicPublish("", RPC_QUEUE_NAME, props, message.getBytes());
该示例分别创建了一个生产者(RPC客户端)和一个消费者(RPC服务端),生产者向一个普通的队列发送一条消息(该消息需要设置一个回复队列,通过 replyTo() 进行设置,以及一个消息唯一标识符 correlationId),消费者订阅该队列。当消费者收到消息时,进行业务处理,业务处理完成后,将处理结果和收到消息的唯一标识符一并打包发送到回复队列(回复队列从收到消息的 replyTo 中获取),这条消息可以称为响应消息。然后,生产者从回复队列中接收响应消息,并根据消息唯一标识符进行处理。整个过程如下图:
注意:不要为每个 RPC 请求创建一个回复队列,因为这样非常低效。只需要为每一个客户端创建一个回复队列即可,同时,这也到这了一个新问题?如果你发送了4次RPC调用,那么收到的RPC响应消息分别对应那一次RPC调用呢!要解决这个问题就需要用到上面 correlationId(消息唯一标识符),通过这个就可以区分开每一个响应属于哪一个请求。
(1)服务端关键代码,如下:
// 创建连接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); connection = factory.newConnection(); // 创建信道 channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null) ; // 指该消费者在接收到队列里的消息但没有返回确认结果之前,队列不会将新的消息分发给该消费者。 // 队列中没有被消费的消息不会被删除,还是存在于队列中 channel.basicQos(1) ; System.out.println("[RpcServer] Awaiting RPC requests"); channel.basicConsume(RPC_QUEUE_NAME, false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String response = ""; try { String message = new String(body, "UTF-8"); int n = Integer.parseInt(message); System.out.println("[RpcServer] fib(" + message + ")"); // 计算斐波那契数列 // 调用业务方法 response += fib(n); } catch (RuntimeException e) { System.out.println("[RpcServer] " + e.toString()); } finally { // 回复消息属性 AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder() // 消息的唯一标识回传 .correlationId(properties.getCorrelationId()) .build(); // 将响应消息写入到回复队列,回复队列有客户端通过 replyTo 指定 channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8")); // 手动确认消息 channel.basicAck(envelope.getDeliveryTag(), false); } } });
(2)客户端关键代码,如下:
// 发送消息给RPC服务端 String corrId = UUID.randomUUID().toString(); AMQP.BasicProperties props = new AMQP.BasicProperties.Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); channel.basicPublish("", RPC_QUEUE_NAME, props, message.getBytes()); // 等待接收服务端响应 String response; while (true) { QueueingConsumer.Delivery delivery= consumer.nextDelivery(); if(delivery.getProperties().getCorrelationId().equals(corrId)) { response = new String(delivery.getBody()); break; } }
点击查看完整示例代码(ReplyToQueueDemo.java)。