本章节将介绍怎样使用 Spring Boot 来快速集成 RabbitMQ,集成的详细步骤如下。
注意:在进行下面的步骤前,你可以通过 https://start.spring.io/ 网站,或者通过 IDEA 创建一个 Spring Boot 项目。
将下面 Maven 依赖添加到 pom.xml 文件的 <dependencies> 标签,如下:
<!-- MQ 依赖 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
在 application.yml 文件中指定 RabbitMQ 的主机IP地址、端口、用户名和密码信息,如下:
server: port: 8080 # rabiitmq config spring: rabbitmq: host: localhost port: 5672 username: guest password: guest
创建一个名为 RabbitMqConfig 的配置类,该类使用 @Configuration 注解进行修饰。代码如下:
import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * RabbitMQ 配置类 * @author hxstrive.com 2022/1/11 */ @Configuration public class RabbitMqConfig { public final static String EXCHANGE_NAME ="demo-exchange"; /** 队列名称 */ public final static String QUEUE_NAME = "demo-queue"; /** * 提供一个消息队列Queue * @return */ @Bean Queue queue(){ return new Queue(QUEUE_NAME); } /** * 创建TopicExchange对象,主题类型交换器 * @return */ @Bean TopicExchange topicExchange(){ // 参数1是名字,参数2是重启后是否以然有效,参数3是长期未使用时是否删除 return new TopicExchange(EXCHANGE_NAME, true, false); } /** * 创建一个Binding对象将Exchange和Queue绑定再一起 * @return */ @Bean Binding binding(){ // 使用 “#.hxstrive.com” 路由键绑定Exchange和Queue return BindingBuilder.bind(queue()) .to(topicExchange()) .with("#.hxstrive.com"); } }
通过 @RabbitListener 注解快速实现一个消费者(这里只是简单的将收到的消息打印出来),代码如下:
import com.hxstrive.rabbitmq.config.RabbitMqConfig; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 消费 HelloTest 发送的消息 * @author hxstrive.com 2022/1/11 */ @Component public class Hello { /** * 监听指定的队列,从该队列中消费消息 * @param message 消息正文 */ @RabbitListener(queues = RabbitMqConfig.QUEUE_NAME) public void handler(String message) { System.out.println("消费:" + message); } }
该类用来启动 Spring Boot 项目,代码如下:
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class RabbitMqSpringBootApplication { public static void main(String[] args) { SpringApplication.run(RabbitMqSpringBootApplication.class, args); } }
通过 @SpringBootTest 注解编写一个 Spring Boot 程序的单元测试,通过单元测试生产一个消息,然后由上面编写的消费者进行消费。代码如下:
import com.hxstrive.rabbitmq.config.RabbitMqConfig; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; /** * RabbitMQ 入门示例 * @author hxstrive.com 2022/1/11 */ @SpringBootTest class HelloTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void contextLoads() { // 向 RabbitMqConfig.EXCHANGE_NAME 队列发送10个消息 for(int i = 0; i < 10; i++) { rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME, "demo.hxstrive.com", "Hello RabbitMQ......." + i); } } }
运行上面测试用例,输出如下:
13:29:09.725 [main] DEBUG org.springframework.test.context.BootstrapUtils - Instantiating CacheAwareContextLoaderDelegate from class [org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate] ... . ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v2.6.3) 2022-02-07 13:29:14.873 INFO 11812 --- [ main] com.hxstrive.rabbitmq.demo.HelloTest : Starting HelloTest using Java 1.8.0_45 on hxstrive with PID 11812 (started by Administrator in D:\learn\消息队列\RabbitMQ\rabbitmq-workspaces\RabbitMQ-SpringBoot) 2022-02-07 13:29:14.875 INFO 11812 --- [ main] com.hxstrive.rabbitmq.demo.HelloTest : No active profile set, falling back to default profiles: default 2022-02-07 13:29:21.406 INFO 11812 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672] 2022-02-07 13:29:21.510 INFO 11812 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#62628e78:0/SimpleConnection@75fa1be3 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 4390] 2022-02-07 13:29:21.635 INFO 11812 --- [ main] com.hxstrive.rabbitmq.demo.HelloTest : Started HelloTest in 8.734 seconds (JVM running for 14.861) 消费:Hello RabbitMQ.......0 消费:Hello RabbitMQ.......1 消费:Hello RabbitMQ.......2 消费:Hello RabbitMQ.......3 消费:Hello RabbitMQ.......4 消费:Hello RabbitMQ.......5 消费:Hello RabbitMQ.......6 消费:Hello RabbitMQ.......7 消费:Hello RabbitMQ.......8 消费:Hello RabbitMQ.......9 2022-02-07 13:29:22.170 INFO 11812 --- [ionShutdownHook] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish. 2022-02-07 13:29:23.160 INFO 11812 --- [ionShutdownHook] o.s.a.r.l.SimpleMessageListenerContainer : Successfully waited for workers to finish.