MinIO 存储桶通知 AMQP

本章节将介绍怎样为 MinIO 中某个存储桶添加 AMQP 事件通知(这里将介绍使用 RabbitMQ 作为 AMQP 服务提供者)。

安装/运行 RabbitMQ

注意:由于 RabbitMQ 是采用 Erlang 语言编写的,需要先安装 Erlang 环境,下载地址 https://www.erlang.org/downloads 

RabbitMQ 是实现了高级消息队列协议(AMQP)的开源消息代理软件。RabbitMQ 服务器是用 Erlang 语言编写的,而集群和故障转移是构建在开放电信平台框架上的。

访问 RabbitMQ 官网 https://www.rabbitmq.com/#getstarted 下载安装包,然后根据提示信息进行安装。

RabbitMQ 安装成功后接下来安装 RabbitMQ-Plugins。打开命令行 CMD,进入 %RABBITMQ_HOME%\rabbitmq_server-3.8.19\sbin 目录,执行如下脚本:

rabbitmq-plugins.bat enable rabbitmq_management

使用上面命令启动 rabbitmq_management 插件。如下图:

然后使用 rabbitmqctl status 命令查看 RabbitMQ 状态,如下图:

说明安装是成功的,且已经启动了,运行正常。

MinIO 配置 AMQP 监听

打开 MinIO 数据目录下面的 .minio.sys 目录,编辑 .minio.sys\config\config.json 文件。找到 “notify_amqp” 配置项,添加 RabbitMQ 配置信息。配置如下:

{
    //...
    "notify_amqp": {
        "_": [{
                "key": "enable",
                "value": "on"
            }, {
                "key": "url",
                "value": "amqp://localhost:5672"
            }, {
                "key": "exchange",
                "value": "bucketevents"
            }, {
                "key": "exchange_type",
                "value": "fanout"
            }, {
                "key": "routing_key",
                "value": "bucketlogs"
            }, {
                "key": "mandatory",
                "value": "off"
            }, {
                "key": "durable",
                "value": "off"
            }, {
                "key": "no_wait",
                "value": "off"
            }, {
                "key": "internal",
                "value": "off"
            }, {
                "key": "auto_deleted",
                "value": "off"
            }, {
                "key": "delivery_mode",
                "value": "0"
            }, {
                "key": "publisher_confirms",
                "value": "off"
            }, {
                "key": "queue_limit",
                "value": "0"
            }, {
                "key": "queue_dir",
                "value": ""
            }
        ]
    },
    //...
}

其中:

  • enable:是否启用该配置,on 表示启用;off 表示禁用

  • url:指定 AMQP 访问 URL,如:amqp://localhost:5672

  • exchange:指定 exchange 的名称

  • exchange_type:指定 exchange 的类型

  • routing_key:指定发布用的 Routing key

  • deliveryMode:指定发布方式。0或1 表示瞬态;2 表示持久太

完成上面配置后,启动 MinIO 服务;如果 MinIO 服务控制台输出“SQS ARNs: arn:minio:sqs::_:amqp”信息,则说明启动成功。

存储桶添加通知

我们现在可以创建一个 images 存储桶,在该存储桶上面开启时间通知。一旦有文件被创建或者覆盖,就会项 AMQP 发送一个消息。

要配置这种存储桶通知,我们需要用到前面步骤 MinIO 输出的 ARN 信息“arn:minio:sqs::_:amqp”。

使用 mc 客户端执行如下脚本去开启通知:

# 使用客户端命令,在本地 MinIO 服务器中创建 images 存储桶
D:\server\minio>mc mb local/images
Bucket created successfully `local/images`.

# 为刚刚创建的 images 存储桶添加事件通知
# 使用 --suffix 选项指定仅仅上传/删除 jpg 格式图片才触发事件
D:\server\minio>mc event add local/images arn:minio:sqs::_:amqp --suffix=".jpg"
Successfully added arn:minio:sqs::_:amqp

# 查看 images 存储桶上面的事件
D:\server\minio>mc event list local/images
arn:minio:sqs::_:amqp   s3:ObjectCreated:*,s3:ObjectRemoved:*,s3:ObjectAccessed:*   Filter: suffix=".jpg"

验证结果

(1)使用 java 代码去订阅 bucketevents exchange。

a、pom.xml 依赖

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>3.6.5</version>
</dependency>

b、Java 代码如下:

package com.hxstrive.rabbitmq.demo;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

/**
 * 消费者 —— 消费 MinIO 的通知消息
 * 
 * @author Administrator
 * @date 2021年7月27日 12:59:57
 */
public class Demo1 {
	// 定义exchange名称
	private static final String EXCHANGE_NAME = "bucketevents";
	// 主机地址
	private static final String HOST_NAME = "localhost";
	// 模式
	private static final String BIND_KEY = "*.bucketlogs";
	
	public static void main(String[] args) throws Exception {
		// 创建连接
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost(HOST_NAME);
		Connection connection = factory.newConnection();
		
		// 获取一个通道
		Channel channel = connection.createChannel();
		
		// 声明一个类型为fanout的exchange
		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
		
		// 绑定Queue和Exchange
		String queueName = channel.queueDeclare().getQueue();
		channel.queueBind(queueName, EXCHANGE_NAME, BIND_KEY);
		
		System.out.println("Waiting Message...");
		
		// 消费消息
		QueueingConsumer consumer = new QueueingConsumer(channel);
		channel.basicConsume(queueName, true, consumer);
		while (true) {
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			String routingKey = delivery.getEnvelope().getRoutingKey();
			System.out.println("routingKey :: " + routingKey);
			System.out.println("      body :: " + new String(delivery.getBody()) );
		}
	}
	
}

运行上面 Java 代码,等待事件。

(2)使用浏览器访问 http://localhost:9000  ,使用 Access Key 和 Securet Key 登陆到后台。手动添加两张图片到 images 存储桶,如下图:

(3)查看 Java 代码收到的事件信息,如下:

routingKey :: bucketlogs
      body :: {"EventName":"s3:ObjectRemoved:Delete","Key":"images/309667.jpg","Records":[{"eventVersion":"2.0","eventSource":"minio:s3","awsRegion":"","eventTime":"2021-07-28T05:04:08.071Z","eventName":"s3:ObjectRemoved:Delete","userIdentity":{"principalId":"root"},"requestParameters":{"accessKey":"root","principalId":"root","region":"","sourceIPAddress":"::1"},"responseElements":{"x-amz-request-id":"","x-minio-deployment-id":"183f33e6-1876-40d9-9ccd-96c4404c6387","x-minio-origin-endpoint":"http://10.87.11.203:9000"},"s3":{"s3SchemaVersion":"1.0","configurationId":"Config","bucket":{"name":"images","ownerIdentity":{"principalId":"root"},"arn":"arn:aws:s3:::images"},"object":{"key":"309667.jpg","sequencer":"1695DBEC3E78B5D8"}},"source":{"host":"::1","port":"","userAgent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36"}}]}
routingKey :: bucketlogs
      body :: {"EventName":"s3:ObjectRemoved:Delete","Key":"images/221406.jpg","Records":[{"eventVersion":"2.0","eventSource":"minio:s3","awsRegion":"","eventTime":"2021-07-28T05:04:12.681Z","eventName":"s3:ObjectRemoved:Delete","userIdentity":{"principalId":"root"},"requestParameters":{"accessKey":"root","principalId":"root","region":"","sourceIPAddress":"::1"},"responseElements":{"x-amz-request-id":"","x-minio-deployment-id":"183f33e6-1876-40d9-9ccd-96c4404c6387","x-minio-origin-endpoint":"http://10.87.11.203:9000"},"s3":{"s3SchemaVersion":"1.0","configurationId":"Config","bucket":{"name":"images","ownerIdentity":{"principalId":"root"},"arn":"arn:aws:s3:::images"},"object":{"key":"221406.jpg","sequencer":"1695DBED51492E44"}},"source":{"host":"::1","port":"","userAgent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36"}}]}
说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
公众号