本章节将演示怎样使用 Java 程序发送和接收消息。
在 pom.xml 中,添加如下依赖:
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.5</version> </dependency>
该示例是一个简单的 RabbitMQ 实例,接收端创建一个名为 “hello” 的队列,然后发送消息到该队列,消费者从该队列中接收消息。如下图:

上图中:
P:表示生成者
C:表示消费者
发送消息,即消息生产者。代码如下:
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 直接发送消息到Queue(消息生产者)
* @author Administrator
*/
public class Send {
/**
* 队列名称
*/
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
new Send().run();
}
public void run() throws Exception {
/**
* 建立与RabbitMQ消息服务器的连接
*/
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1"); // 连接到本地服务器
factory.setPort(AMQP.PROTOCOL.PORT); // 5672
Connection connection = factory.newConnection();
/**
* 创建一个通道是大多数API完成获取数据的所在地
*/
Channel channel = connection.createChannel();
/**
* 发送消息,我们必须声明我们发送到哪里的队列。然后,我们能发布消息到这个队列中;
*
* 声明一个队列
* channel.queueDeclare(queue, durable, exclusive, autoDelete, arguments);
* queue - 队列名称
* durable - 如果为true则声明一个持久化队列
* exclusive - 如果为true则声明一个独占队列
* autoDelete - 如果为true则声明一个自动删除队列
* arguments - 队列其他属性
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/**
* 发布一个消息
* basicPublish(exchange, routingKey, props, body)
* exchange -
* routingKey - 路由的关键字
* props - 其他消息属性,路由报头等等
* body - 主体消息
*/
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("[x] Sent '" + message + "'");
/**
* 关闭连接,释放资源
*/
channel.close();
connection.close();
}
}接收消息,即消息消费者。代码如下:
import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
/**
* 从Queue中接收消息(消费消息)
* @author Administrator
*/
public class Receive {
/**
* 队列名称
*/
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
new Receive().run();
}
public void run() throws Exception {
/**
* 建立与RabbitMQ消息服务器的连接
*/
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1"); // 连接到本地服务器
factory.setPort(AMQP.PROTOCOL.PORT); // 5672
Connection connection = factory.newConnection();
/**
* 创建一个通道是大多数API完成获取数据的所在地
*/
Channel channel = connection.createChannel();
/**
* 发送消息,我们必须声明我们发送到哪里的队列。然后,我们能发布消息到这个队列中;
*
* 声明一个队列
* channel.queueDeclare(queue, durable, exclusive, autoDelete, arguments);
* queue - 队列名称
* durable - 如果为true则声明一个持久化队列
* exclusive - 如果为true则声明一个独占队列
* autoDelete - 如果为true则声明一个自动删除队列
* arguments - 队列其他属性
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/**
* 接收消息
*/
System.out.println("[*] Waiting for messages. To exit press CTRL+C");
/**
* 消费一个消息
* basicConsume(queue, autoAck, callback)
* queue - 队列名称
* autoAck -
* callback - 消费对象的接口
*/
channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("[x] Received '" + message + "'");
}
});
}
}