import com.rabbitmq.client.*; import java.io.IOException; import java.util.Set; import java.util.TreeSet; /** * 验证 RabbitMQ 生产者确认,批量确认 * @author hxstrive.com 2022/3/2 */ public class SenderAckDemo3 { /** 交换器名称 */ private final String EXCHANGE_NAME = "exchange-" + SenderAckDemo3.class.getSimpleName(); /** 队列名称 */ private final String QUEUE_NAME = "queue-" + SenderAckDemo3.class.getSimpleName(); /** 缓存已发送的消息 */ private static final Set<String> MESSAGE_LIST = new TreeSet<String>(); public static void main(String[] args) throws Exception { SenderAckDemo3 demo = new SenderAckDemo3(); demo.consumer(); demo.sender(); } /** * 生产者 * @throws Exception */ private void sender() throws Exception { // 创建连接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); Connection connection = factory.newConnection(); // 创建信道 Channel channel = connection.createChannel(); channel.basicQos(1); // 声明交换器 channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 声明队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null) ; channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.hxstrive.com"); // 将信道置为 publisher confirm 模式 channel.confirmSelect(); // 发送消息 for(int i = 0; i < 20; i++) { String message = "transaction message i=" + i; sendMessage(channel, message); } // 关闭连接 channel.close(); connection.close(); } private void sendMessage(Channel channel, String message) throws Exception { channel.basicPublish(EXCHANGE_NAME, "www.hxstrive.com", false, null, message.getBytes()); System.out.println("[Send] Send message ‘" + message + "’"); // 将消息添加到缓存 MESSAGE_LIST.add(message); // 每20条消息确认一次 if(MESSAGE_LIST.size() >= 5) { try { // 故意抛出异常 if(Math.random() < 0.5f) { System.out.println(1 / 0); } // 等待消息被确认 if (channel.waitForConfirms(2000)) { // 将缓存中的消息清空 System.out.println("[Send] clear message"); MESSAGE_LIST.clear(); } else { // 重新发送缓存中的消息 String[] msgs = MESSAGE_LIST.toArray(new String[]{}); MESSAGE_LIST.clear(); for(String msg : msgs) { System.out.println("[Send] resend msg=" + msg); sendMessage(channel, msg); } } } catch (Exception e) { e.printStackTrace(); // 重新发送缓存中的消息 String[] msgs = MESSAGE_LIST.toArray(new String[]{}); MESSAGE_LIST.clear(); for(String msg : msgs) { System.out.println("[Send] resend msg=" + msg + ", error=" + e.getMessage()); sendMessage(channel, msg); } } } } /** * 消费者 * @throws Exception */ private void consumer() throws Exception { // 创建连接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); Connection connection = factory.newConnection(); // 创建信道 final Channel channel = connection.createChannel(); channel.basicQos(1); // 声明交换器 channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 声明队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null) ; channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.hxstrive.com"); // 消费消息 System.out.println("[Consumer] Waiting for a message...."); channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { System.out.println("[Consumer] body = " + new String(body)); Thread.sleep(100); channel.basicAck(envelope.getDeliveryTag(), false); } catch (Exception e) { e.printStackTrace(); } } }); } }