import com.rabbitmq.client.*; import java.io.IOException; /** * 通过信道 Channel 的 basicPublish() 方法发送消息, * 发送消息时指定 userId * @author Administrator * @date 2022年2月17日13:59:29 */ public class PushMessage3 { private static final String EXCHANGE_NAME = "exchange_" + PushMessage3.class.getSimpleName(); /** * 发送消息 */ private void sender() { Connection connection = null; try { // 创建连接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); connection = factory.newConnection(); // 创建通道 Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 发送消息 // 发送消息时指定 userId,只有当前连接的用户名等于 userId 才可以发送出去消息 System.out.println("[Send] Sending Message..."); byte[] msg = "hello wrold".getBytes(); channel.basicPublish(EXCHANGE_NAME, "www.hxstrive.com", new AMQP.BasicProperties.Builder() .contentType("text/plain") .deliveryMode(2) .priority(1) // 将 userId 设置为 hxstrive,则不能发送消息,因为与创建连接指定的用户 guest 不一致 // 将 userId 设置为 guest,能正常发送消息,因为与创建连接指定的用户一致 .userId("guest") .build(), msg); } catch(Exception e) { e.printStackTrace(); } finally { if ( connection != null ) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } /** * 消费消息 */ private void consumer() { try { // 创建连接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); // 创建通道 Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 绑定exchange与queue String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "*.hxstrive.com"); System.out.println("[Receive1] Waiting Message..."); // 消费消息 channel.basicConsume(queueName, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("[Receive1] Receive Message :: " + new String(body)); } }); } catch(Exception e) { e.printStackTrace(); } } /** * 消费消息 */ private void consumer2() { try { // 创建连接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("hxstrive"); factory.setPassword("hxstrive"); Connection connection = factory.newConnection(); // 创建通道 Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 绑定exchange与queue String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "*.hxstrive.com"); System.out.println("[Receive2] Waiting Message..."); // 消费消息 channel.basicConsume(queueName, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("[Receive2] Receive Message :: " + new String(body)); } }); } catch(Exception e) { e.printStackTrace(); } } public static void main(String[] args) { PushMessage3 demo = new PushMessage3(); demo.consumer(); demo.consumer2(); demo.sender(); } }