本章节将演示怎样使用 Java 程序发送和接收消息。
在 pom.xml 中,添加如下依赖:
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.5</version> </dependency>
该示例是一个简单的 RabbitMQ 实例,接收端创建一个名为 “hello” 的队列,然后发送消息到该队列,消费者从该队列中接收消息。如下图:
上图中:
P:表示生成者
C:表示消费者
发送消息,即消息生产者。代码如下:
import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * 直接发送消息到Queue(消息生产者) * @author Administrator */ public class Send { /** * 队列名称 */ private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { new Send().run(); } public void run() throws Exception { /** * 建立与RabbitMQ消息服务器的连接 */ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); // 连接到本地服务器 factory.setPort(AMQP.PROTOCOL.PORT); // 5672 Connection connection = factory.newConnection(); /** * 创建一个通道是大多数API完成获取数据的所在地 */ Channel channel = connection.createChannel(); /** * 发送消息,我们必须声明我们发送到哪里的队列。然后,我们能发布消息到这个队列中; * * 声明一个队列 * channel.queueDeclare(queue, durable, exclusive, autoDelete, arguments); * queue - 队列名称 * durable - 如果为true则声明一个持久化队列 * exclusive - 如果为true则声明一个独占队列 * autoDelete - 如果为true则声明一个自动删除队列 * arguments - 队列其他属性 */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); /** * 发布一个消息 * basicPublish(exchange, routingKey, props, body) * exchange - * routingKey - 路由的关键字 * props - 其他消息属性,路由报头等等 * body - 主体消息 */ String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("[x] Sent '" + message + "'"); /** * 关闭连接,释放资源 */ channel.close(); connection.close(); } }
接收消息,即消息消费者。代码如下:
import java.io.IOException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; /** * 从Queue中接收消息(消费消息) * @author Administrator */ public class Receive { /** * 队列名称 */ private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { new Receive().run(); } public void run() throws Exception { /** * 建立与RabbitMQ消息服务器的连接 */ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); // 连接到本地服务器 factory.setPort(AMQP.PROTOCOL.PORT); // 5672 Connection connection = factory.newConnection(); /** * 创建一个通道是大多数API完成获取数据的所在地 */ Channel channel = connection.createChannel(); /** * 发送消息,我们必须声明我们发送到哪里的队列。然后,我们能发布消息到这个队列中; * * 声明一个队列 * channel.queueDeclare(queue, durable, exclusive, autoDelete, arguments); * queue - 队列名称 * durable - 如果为true则声明一个持久化队列 * exclusive - 如果为true则声明一个独占队列 * autoDelete - 如果为true则声明一个自动删除队列 * arguments - 队列其他属性 */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); /** * 接收消息 */ System.out.println("[*] Waiting for messages. To exit press CTRL+C"); /** * 消费一个消息 * basicConsume(queue, autoAck, callback) * queue - 队列名称 * autoAck - * callback - 消费对象的接口 */ channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("[x] Received '" + message + "'"); } }); } }