在前面的 AMQP 协议介绍 中,生产者生产消息的代码如下:
// 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); // 创建连接 Connection connection = factory.newConnection(); // 创建信道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 发送消息 String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); // 关闭连接,释放资源 channel.close(); connection.close();
生产者发送消息的主要步骤如下:
创建连接
创建信道
声明队列
发送消息
关闭连接,释放资源
本章节将详细介绍怎样通过 RabbitMQ Java 客户端代码创建连接。创建连接最简单的方式就是直接 new 一个 ConnectionFactory 对象,然后设置 RabbitMQ Broker 的主机地址和端口,然后调用 ConnectionFactory 的 newConnection() 方法。代码如下:
// 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); // 创建连接 Connection connection = factory.newConnection();
创建连接时,除了指定主机地址和端口外,我们还可以指定用户名(Username)、密码(Password)、虚拟主机(Virtual Host)等。代码如下:
// 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("hxstrive.virtual"); // 创建连接 Connection connection = factory.newConnection();
注意:虚拟主机可以通过 RabbitMQ 的管理界面的“Admin”>“Virtual hosts”进行添加。如下图:
除了上面的创建方式,你还可以选择使用 URL 的方式来创建连接,代码如下:
// 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setUri("amqp://guest:guest@127.0.0.1:5672/hxstrive.virtual"); // 创建连接 Connection connection = factory.newConnection(); System.out.println(connection); // 关闭连接 connection.close();
在连接 Connection 创建成功后,就可以通过连接创建信道 Channel。AMQP URI格式如下:
amqp://username:password@hostIp:port/virtualHost
注意:
Connection 可以用来创建多个 Channel 实例,但是 Channel 实例不能在线程间共享,应用程序应该为每一个线程重新创建一个 Channel。在某些情况下 Channel 的操作可以并发运行,但是其它情况下会导致在网络上出现错误的通信帧交错,同时也会影响发送方确认机制的运行,所以多线程间共享 Channel 实例是非线程安全的。
Channel 或者 Connection 中都有一个 isOpen() 方法,该方法可以用来检测 Channel/Connection 是否已经处于开启状态。但是,并不推荐在生产环境下使用 isOpen() 方法,这个方法的返回值依赖于 shutdownCause 的存在,有可能会产生竞争。
isOpen() 源码如下:
// com.rabbitmq.client.impl.ShutdownNotifierComponent public boolean isOpen() { synchronized(this.monitor) { return this.shutdownCause == null; } }
通常情况下,在调用 createChannel() 或者 newConnection() 方法之后,可以简单认为 Connection 或者 Channel 已经成功处于开启状态,而并不需要我们在代码中使用 isOpen() 方法去检查它们。即使在使用 Channel 的时候它处于关闭状态,程序会抛出 com.rabbitmq.client.ShutdownSignalException,捕获且处理该异常即可。