import com.rabbitmq.client.*; import java.io.IOException; /** * 消费消息,批量拒绝消息 * @author hxstrive.com * @date 2022年2月17日13:59:29 */ public class RejectMessage2 { private static final String EXCHANGE_NAME = "exchange_" + RejectMessage2.class.getSimpleName(); /** * 发送消息 */ private void sender() throws Exception { // 创建连接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); Connection connection = factory.newConnection(); // 创建通道 Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 发送消息 System.out.println("[Send] Sending Message..."); for(int i = 1; i <= 5; i++) { byte[] msg = ("hello wrold " + i).getBytes(); channel.basicPublish(EXCHANGE_NAME, "www.hxstrive.com", null, msg); System.out.println("[Send] msg = " + new String(msg)); } // 释放资源 channel.close(); connection.close(); } /** * 消费消息 */ private void consumer() throws Exception { // 创建连接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); Connection connection = factory.newConnection(); // 创建通道 final Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 绑定exchange与queue final String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "*.hxstrive.com"); System.out.println("[Receive] Waiting Message..."); // 消费消息 channel.basicConsume(queueName, false, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 打印消息 String msg = new String(body); System.out.println("[Receive] message = " + msg); // 拒绝消息,不重新放入队列 if(msg.contains("5")) { try { Thread.sleep(10000); // 批量拒绝前4个和当前消息 // 通过 RabbitMQ 管理界面观察 // Unacked 值为 5,等待 10 秒后 Unacked 为 0 System.out.println("[Receive] call channel.basicNack()"); channel.basicNack(envelope.getDeliveryTag(), true, false); } catch (Exception e) { e.printStackTrace(); } } } }); } public static void main(String[] args) throws Exception { RejectMessage2 demo = new RejectMessage2(); demo.consumer(); demo.sender(); } }