import com.rabbitmq.client.*;
/**
* 消费消息,手动从消息队列拉取,手动确认
* @author hxstrive.com
* @date 2022年2月17日13:59:29
*/
public class AckMessage1 {
private static final String EXCHANGE_NAME = "exchange_" +
AckMessage1.class.getSimpleName();
/**
* 发送消息
*/
private void sender() throws Exception {
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
Connection connection = factory.newConnection();
// 创建通道
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 发送消息
System.out.println("[Send] Sending Message...");
while(true) {
byte[] msg = ("hello wrold " + System.currentTimeMillis()).getBytes();
channel.basicPublish(EXCHANGE_NAME,
"www.hxstrive.com", null, msg);
System.out.println("[Send] msg = " + new String(msg));
Thread.sleep(3000);
}
}
/**
* 消费消息
*/
private void consumer() throws Exception {
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
Connection connection = factory.newConnection();
// 创建通道
final Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 绑定exchange与queue
final String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "*.hxstrive.com");
System.out.println("[Receive] Waiting Message...");
// 消费消息
while (true) {
Thread.sleep(1000);
System.out.println("[Receive] get message...");
GetResponse response = channel.basicGet(queueName, false);
if (null != response) {
String body = new String(response.getBody());
System.out.print("[Receive] Receive Message :: " + new String(body));
Envelope envelope = response.getEnvelope();
System.out.print(" routingKey = " + envelope.getRoutingKey());
BasicProperties properties = response.getProps();
System.out.print(" contentType = " + properties.getContentType());
System.out.println();
// 手动确认
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
}
public static void main(String[] args) {
final AckMessage1 demo = new AckMessage1();
new Thread(new Runnable() {
public void run() {
try {
demo.consumer();
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
public void run() {
try {
demo.sender();
} catch (Exception e){
e.printStackTrace();
}
}
}).start();
}
}