RabbitMQ 教程

Spring Boot 集成 RabbitMQ

本章节将介绍怎样使用 Spring Boot 来快速集成 RabbitMQ,集成的详细步骤如下。

注意:在进行下面的步骤前,你可以通过 https://start.spring.io/ 网站,或者通过 IDEA 创建一个 Spring Boot 项目。

添加 Maven 依赖

将下面 Maven 依赖添加到 pom.xml 文件的 <dependencies> 标签,如下:

<!-- MQ 依赖 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

编辑 application.yml 文件

在 application.yml 文件中指定 RabbitMQ 的主机IP地址、端口、用户名和密码信息,如下:

server:
  port: 8080
# rabiitmq config
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

编写 RabbitMQ 配置类

创建一个名为 RabbitMqConfig 的配置类,该类使用 @Configuration 注解进行修饰。代码如下:

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMQ 配置类
 * @author hxstrive.com 2022/1/11
 */
@Configuration
public class RabbitMqConfig {
    public final static String EXCHANGE_NAME ="demo-exchange";
    /** 队列名称 */
    public final static String QUEUE_NAME = "demo-queue";

    /**
     * 提供一个消息队列Queue
     * @return
     */
    @Bean
    Queue queue(){
        return new Queue(QUEUE_NAME);
    }

    /**
     * 创建TopicExchange对象,主题类型交换器
     * @return
     */
    @Bean
    TopicExchange topicExchange(){
        // 参数1是名字,参数2是重启后是否以然有效,参数3是长期未使用时是否删除
        return new TopicExchange(EXCHANGE_NAME, true, false);
    }

    /**
     * 创建一个Binding对象将Exchange和Queue绑定再一起
     * @return
     */
    @Bean
    Binding binding(){
        // 使用 “#.hxstrive.com” 路由键绑定Exchange和Queue
        return BindingBuilder.bind(queue())
                .to(topicExchange())
                .with("#.hxstrive.com");
    }

}

编写消费类

通过 @RabbitListener 注解快速实现一个消费者(这里只是简单的将收到的消息打印出来),代码如下:

import com.hxstrive.rabbitmq.config.RabbitMqConfig;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消费 HelloTest 发送的消息
 * @author hxstrive.com 2022/1/11
 */
@Component
public class Hello {

    /**
     * 监听指定的队列,从该队列中消费消息
     * @param message 消息正文
     */
    @RabbitListener(queues = RabbitMqConfig.QUEUE_NAME)
    public void handler(String message) {
        System.out.println("消费:" + message);
    }

}

Spring Boot 启动类

该类用来启动 Spring Boot 项目,代码如下:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitMqSpringBootApplication {

    public static void main(String[] args) {
        SpringApplication.run(RabbitMqSpringBootApplication.class, args);
    }

}

编写消息生产者

通过 @SpringBootTest 注解编写一个 Spring Boot 程序的单元测试,通过单元测试生产一个消息,然后由上面编写的消费者进行消费。代码如下:

import com.hxstrive.rabbitmq.config.RabbitMqConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

/**
 * RabbitMQ 入门示例
 * @author hxstrive.com 2022/1/11
 */
@SpringBootTest
class HelloTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void contextLoads() {
        // 向 RabbitMqConfig.EXCHANGE_NAME 队列发送10个消息
        for(int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME,
                    "demo.hxstrive.com",
                    "Hello RabbitMQ......." + i);
        }
    }

}

运行上面测试用例,输出如下:

13:29:09.725 [main] DEBUG org.springframework.test.context.BootstrapUtils - Instantiating CacheAwareContextLoaderDelegate from class [org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate]
...
  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.6.3)
2022-02-07 13:29:14.873  INFO 11812 --- [           main] com.hxstrive.rabbitmq.demo.HelloTest     : Starting HelloTest using Java 1.8.0_45 on hxstrive with PID 11812 (started by Administrator in D:\learn\消息队列\RabbitMQ\rabbitmq-workspaces\RabbitMQ-SpringBoot)
2022-02-07 13:29:14.875  INFO 11812 --- [           main] com.hxstrive.rabbitmq.demo.HelloTest     : No active profile set, falling back to default profiles: default
2022-02-07 13:29:21.406  INFO 11812 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2022-02-07 13:29:21.510  INFO 11812 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#62628e78:0/SimpleConnection@75fa1be3 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 4390]
2022-02-07 13:29:21.635  INFO 11812 --- [           main] com.hxstrive.rabbitmq.demo.HelloTest     : Started HelloTest in 8.734 seconds (JVM running for 14.861)
消费:Hello RabbitMQ.......0
消费:Hello RabbitMQ.......1
消费:Hello RabbitMQ.......2
消费:Hello RabbitMQ.......3
消费:Hello RabbitMQ.......4
消费:Hello RabbitMQ.......5
消费:Hello RabbitMQ.......6
消费:Hello RabbitMQ.......7
消费:Hello RabbitMQ.......8
消费:Hello RabbitMQ.......9
2022-02-07 13:29:22.170  INFO 11812 --- [ionShutdownHook] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
2022-02-07 13:29:23.160  INFO 11812 --- [ionShutdownHook] o.s.a.r.l.SimpleMessageListenerContainer : Successfully waited for workers to finish.


说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
公众号