前面章节介绍了怎样连接到 RabbitMQ、怎样创建交换器和怎样创建队列。本章将介绍如何使用 RabbitMQ Java 客户端发送消息。
如果要发送一个消息,可以使用 Channel(信道)类的 basicPublish() 方法,例如:发送一条内容为“Hello World!”的消息,代码如下:
// 创建连接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); Connection connection = factory.newConnection(); // 创建通道 Channel channel = connection.createChannel(); channel.exchangeDeclare(Receive.EXCHANGE_NAME, "topic"); // 发送消息 byte[] msg = "hello wrold".getBytes(); channel.basicPublish(Receive.EXCHANGE_NAME, "www.hxstrive.com", null, msg);
为了更好地控制发送,可以使用 mandatory 这个参数,或者可以发送一些特定属性的消息:
// 发送消息 boolean mandatory = true; byte[] msg = "hello wrold".getBytes(); channel.basicPublish(Receive.EXCHANGE_NAME, "www.hxstrive.com", mandatory, MessageProperties.PERSISTENT_TEXT_PLAIN, msg);
上面代码发送了一条消息,这条消息的投递模式(delivery mode)设置为2,即消息会被持久化到服务器磁盘中。同时,这条消息的优先级(priority)设置为0,content-type 为“text/plain”。MessageProperties.PERSISTENT_TEXT_PLAIN 定义如下:
当然,你也可以自己设定消息属性,代码如下:
byte[] msg = "hello wrold".getBytes(); channel.basicPublish(Receive.EXCHANGE_NAME, "www.hxstrive.com", new AMQP.BasicProperties.Builder() .contentType("text/plain") .deliveryMode(2) .priority(1) // 仅当创建连接的用户名和这里指定的 userId 一致时,才能将消息发送出去 .userId("guest") .build(), msg);
也可以发送一条带有 headers 的消息,代码如下:
byte[] msg = "hello wrold".getBytes(); Map<String, Object> headers = new HashMap<String, Object>(); headers.put("location", "here"); headers.put("time", "today"); channel.basicPublish(EXCHANGE_NAME, "www.hxstrive.com", new AMQP.BasicProperties.Builder() .headers(headers) .build(), msg);
还可以发送一条带有过期时间(expiration)的消息:
channel.basicPublish(exchangeName, routingKey, new AMQP.BasicProperties.Builder() .expiration("6000") .build(), messageBodyBytes) ;
信道的 basicPublish() 方法用来发送一个消息到 RabbitMQ 服务。它有三个重载方法,定义如下:
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException; void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException; void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
参数说明:
exchange:交换器的名称,指明消息需要发送到哪个交换器中。如果设置为空字符串,则消息会被发送到 RabbitMQ 默认的交换器中。
routingKey:路由键,交换器根据路由键将消息存储到相应的队列之中
props:消息的基本属性集,其包含 14 个属性成员,分别有 contentType、contentEncoding、headers(Map<String,Object>)、deliveryMode、priority、correlationId、 replyTo、expiration、messageId、timestamp、type、userId、appId、clusterId。
byte[] body:消息体( payload ),真正需要发送的消息
mandatory:如果将 mandatory 参数设为 true 时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么 RabbitMQ 会调用 Basic.Return 命令将消息返回给生产者。当 mandatory 参数设置为 false 时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,则消息直接被丢弃。
immediate:当 immediate 参数设为 true 时,如果交换器在将消息路由到队列时发现队列上并不存在任何消费者,那么这条消息将不会存入队列中。当与路由键匹配的所有队列都没有消费者时,该消息会通过 Basic .Return 返回至生产者。
点击查看发送消息的一些完整示例程序:
PushMessage1.java 验证利用信道 Channel 的 basicPublish() 简单发送消
PushMessage2.java 验证通过信道 Channel 的 basicPublish() 方法发送消息,且指定 mandatory 为 true
PushMessage3.java 验证通过信道 Channel 的 basicPublish() 方法发送消息,发送消息时指定 userI
PushMessage4.java 验证通过信道 Channel 的 basicPublish() 方法发送消息,发送一条带有 headers 的消息
PushMessage5.java 验证通过信道 Channel 的 basicPublish() 方法发送消息,为消息指定过期时
PushMessage6.java 验证通过信道 Channel 的 basicPublish() 方法发送消息,将队列指定为超时队列