本章节将介绍怎样为 MinIO 中某个存储桶添加 AMQP 事件通知(这里将介绍使用 RabbitMQ 作为 AMQP 服务提供者)。
注意:由于 RabbitMQ 是采用 Erlang 语言编写的,需要先安装 Erlang 环境,下载地址 https://www.erlang.org/downloads
RabbitMQ 是实现了高级消息队列协议(AMQP)的开源消息代理软件。RabbitMQ 服务器是用 Erlang 语言编写的,而集群和故障转移是构建在开放电信平台框架上的。
访问 RabbitMQ 官网 https://www.rabbitmq.com/#getstarted 下载安装包,然后根据提示信息进行安装。
RabbitMQ 安装成功后接下来安装 RabbitMQ-Plugins。打开命令行 CMD,进入 %RABBITMQ_HOME%\rabbitmq_server-3.8.19\sbin 目录,执行如下脚本:
rabbitmq-plugins.bat enable rabbitmq_management
使用上面命令启动 rabbitmq_management 插件。如下图:
然后使用 rabbitmqctl status 命令查看 RabbitMQ 状态,如下图:
说明安装是成功的,且已经启动了,运行正常。
打开 MinIO 数据目录下面的 .minio.sys 目录,编辑 .minio.sys\config\config.json 文件。找到 “notify_amqp” 配置项,添加 RabbitMQ 配置信息。配置如下:
{ //... "notify_amqp": { "_": [{ "key": "enable", "value": "on" }, { "key": "url", "value": "amqp://localhost:5672" }, { "key": "exchange", "value": "bucketevents" }, { "key": "exchange_type", "value": "fanout" }, { "key": "routing_key", "value": "bucketlogs" }, { "key": "mandatory", "value": "off" }, { "key": "durable", "value": "off" }, { "key": "no_wait", "value": "off" }, { "key": "internal", "value": "off" }, { "key": "auto_deleted", "value": "off" }, { "key": "delivery_mode", "value": "0" }, { "key": "publisher_confirms", "value": "off" }, { "key": "queue_limit", "value": "0" }, { "key": "queue_dir", "value": "" } ] }, //... }
其中:
enable:是否启用该配置,on 表示启用;off 表示禁用
url:指定 AMQP 访问 URL,如:amqp://localhost:5672
exchange:指定 exchange 的名称
exchange_type:指定 exchange 的类型
routing_key:指定发布用的 Routing key
deliveryMode:指定发布方式。0或1 表示瞬态;2 表示持久太
完成上面配置后,启动 MinIO 服务;如果 MinIO 服务控制台输出“SQS ARNs: arn:minio:sqs::_:amqp”信息,则说明启动成功。
我们现在可以创建一个 images 存储桶,在该存储桶上面开启时间通知。一旦有文件被创建或者覆盖,就会项 AMQP 发送一个消息。
要配置这种存储桶通知,我们需要用到前面步骤 MinIO 输出的 ARN 信息“arn:minio:sqs::_:amqp”。
使用 mc 客户端执行如下脚本去开启通知:
# 使用客户端命令,在本地 MinIO 服务器中创建 images 存储桶 D:\server\minio>mc mb local/images Bucket created successfully `local/images`. # 为刚刚创建的 images 存储桶添加事件通知 # 使用 --suffix 选项指定仅仅上传/删除 jpg 格式图片才触发事件 D:\server\minio>mc event add local/images arn:minio:sqs::_:amqp --suffix=".jpg" Successfully added arn:minio:sqs::_:amqp # 查看 images 存储桶上面的事件 D:\server\minio>mc event list local/images arn:minio:sqs::_:amqp s3:ObjectCreated:*,s3:ObjectRemoved:*,s3:ObjectAccessed:* Filter: suffix=".jpg"
(1)使用 java 代码去订阅 bucketevents exchange。
a、pom.xml 依赖
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.5</version> </dependency>
b、Java 代码如下:
package com.hxstrive.rabbitmq.demo; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; /** * 消费者 —— 消费 MinIO 的通知消息 * * @author Administrator * @date 2021年7月27日 12:59:57 */ public class Demo1 { // 定义exchange名称 private static final String EXCHANGE_NAME = "bucketevents"; // 主机地址 private static final String HOST_NAME = "localhost"; // 模式 private static final String BIND_KEY = "*.bucketlogs"; public static void main(String[] args) throws Exception { // 创建连接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST_NAME); Connection connection = factory.newConnection(); // 获取一个通道 Channel channel = connection.createChannel(); // 声明一个类型为fanout的exchange channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 绑定Queue和Exchange String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, BIND_KEY); System.out.println("Waiting Message..."); // 消费消息 QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println("routingKey :: " + routingKey); System.out.println(" body :: " + new String(delivery.getBody()) ); } } }
运行上面 Java 代码,等待事件。
(2)使用浏览器访问 http://localhost:9000 ,使用 Access Key 和 Securet Key 登陆到后台。手动添加两张图片到 images 存储桶,如下图:
(3)查看 Java 代码收到的事件信息,如下:
routingKey :: bucketlogs body :: {"EventName":"s3:ObjectRemoved:Delete","Key":"images/309667.jpg","Records":[{"eventVersion":"2.0","eventSource":"minio:s3","awsRegion":"","eventTime":"2021-07-28T05:04:08.071Z","eventName":"s3:ObjectRemoved:Delete","userIdentity":{"principalId":"root"},"requestParameters":{"accessKey":"root","principalId":"root","region":"","sourceIPAddress":"::1"},"responseElements":{"x-amz-request-id":"","x-minio-deployment-id":"183f33e6-1876-40d9-9ccd-96c4404c6387","x-minio-origin-endpoint":"http://10.87.11.203:9000"},"s3":{"s3SchemaVersion":"1.0","configurationId":"Config","bucket":{"name":"images","ownerIdentity":{"principalId":"root"},"arn":"arn:aws:s3:::images"},"object":{"key":"309667.jpg","sequencer":"1695DBEC3E78B5D8"}},"source":{"host":"::1","port":"","userAgent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36"}}]} routingKey :: bucketlogs body :: {"EventName":"s3:ObjectRemoved:Delete","Key":"images/221406.jpg","Records":[{"eventVersion":"2.0","eventSource":"minio:s3","awsRegion":"","eventTime":"2021-07-28T05:04:12.681Z","eventName":"s3:ObjectRemoved:Delete","userIdentity":{"principalId":"root"},"requestParameters":{"accessKey":"root","principalId":"root","region":"","sourceIPAddress":"::1"},"responseElements":{"x-amz-request-id":"","x-minio-deployment-id":"183f33e6-1876-40d9-9ccd-96c4404c6387","x-minio-origin-endpoint":"http://10.87.11.203:9000"},"s3":{"s3SchemaVersion":"1.0","configurationId":"Config","bucket":{"name":"images","ownerIdentity":{"principalId":"root"},"arn":"arn:aws:s3:::images"},"object":{"key":"221406.jpg","sequencer":"1695DBED51492E44"}},"source":{"host":"::1","port":"","userAgent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36"}}]}