import com.rabbitmq.client.*; /** * 消费消息,手动从消息队列拉取,手动确认 * @author hxstrive.com * @date 2022年2月17日13:59:29 */ public class AckMessage1 { private static final String EXCHANGE_NAME = "exchange_" + AckMessage1.class.getSimpleName(); /** * 发送消息 */ private void sender() throws Exception { // 创建连接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); Connection connection = factory.newConnection(); // 创建通道 Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 发送消息 System.out.println("[Send] Sending Message..."); while(true) { byte[] msg = ("hello wrold " + System.currentTimeMillis()).getBytes(); channel.basicPublish(EXCHANGE_NAME, "www.hxstrive.com", null, msg); System.out.println("[Send] msg = " + new String(msg)); Thread.sleep(3000); } } /** * 消费消息 */ private void consumer() throws Exception { // 创建连接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); Connection connection = factory.newConnection(); // 创建通道 final Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 绑定exchange与queue final String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "*.hxstrive.com"); System.out.println("[Receive] Waiting Message..."); // 消费消息 while (true) { Thread.sleep(1000); System.out.println("[Receive] get message..."); GetResponse response = channel.basicGet(queueName, false); if (null != response) { String body = new String(response.getBody()); System.out.print("[Receive] Receive Message :: " + new String(body)); Envelope envelope = response.getEnvelope(); System.out.print(" routingKey = " + envelope.getRoutingKey()); BasicProperties properties = response.getProps(); System.out.print(" contentType = " + properties.getContentType()); System.out.println(); // 手动确认 channel.basicAck(envelope.getDeliveryTag(), false); } } } public static void main(String[] args) { final AckMessage1 demo = new AckMessage1(); new Thread(new Runnable() { public void run() { try { demo.consumer(); } catch (Exception e) { e.printStackTrace(); } } }).start(); new Thread(new Runnable() { public void run() { try { demo.sender(); } catch (Exception e){ e.printStackTrace(); } } }).start(); } }