import com.rabbitmq.client.*; import java.io.IOException; import java.util.HashMap; import java.util.Map; /** * 验证通过信道 Channel 的 basicPublish() 方法发送消息,将队列指定为超时队列 * @author Administrator * @date 2022年2月17日13:59:29 */ public class PushMessage6 { private static final String EXCHANGE_NAME = "exchange_" + PushMessage6.class.getSimpleName(); /** * 发送消息 */ private void sender() { Connection connection = null; try { // 创建连接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); connection = factory.newConnection(); // 创建通道 Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 发送消息 System.out.println("[Send] Sending Message..."); for (int i = 0; i < 20; i++) { String msg = "hello wrold.........." + i; System.out.println("[Send] Message = " + msg); channel.basicPublish(EXCHANGE_NAME, "www.hxstrive.com", null, msg.getBytes()); Thread.sleep(1000); } } catch(Exception e) { e.printStackTrace(); } finally { if ( connection != null ) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } /** * 消费消息 */ private void consumer() { try { // 创建连接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); Connection connection = factory.newConnection(); // 创建通道 final Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 声明队列 // 设置队列上所有的消息的有效期,单位为毫秒 Map<String, Object> arguments = new HashMap<String , Object>(); // 5秒钟 arguments.put("x-message-ttl", 5000); String queueName = "queue-" + getClass().getSimpleName(); channel.queueDeclare(queueName, false, false, false, arguments); // 绑定exchange与queue channel.queueBind(queueName, EXCHANGE_NAME, "*.hxstrive.com"); System.out.println("[Receive] Waiting Message..."); // 不消费消息队列中的消息,消息将在到期后自动被删除 } catch(Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws Exception { PushMessage6 demo = new PushMessage6(); demo.consumer(); demo.sender(); } }