工作队列(又名:任务队列)背后的主要思想是避免立即执行资源密集型任务。相反,我们将这些任务安排在以后执行。我们将任务封装为消息并将其发送到工作队列。在后台运行的工作进程将任务从队列弹出并最终执行任务。当您运行许多工作人员时,任务将在他们之间共享。
在本教程的前一部分中,我们发送了一条包含 “Hello World!” 的消息。现在我们将发送代表复杂任务的字符串。我们没有现实世界的任务,比如要调整图片的大小或要渲染 pdf 文件,所以我们模拟很忙任务 ———— 通过使用 Thread.sleep() 函数来模拟。我们将把字符串中的点数作为它的复杂度;每个点将占一秒钟的 “工作”。例如,一个由 Hello... 描述的虚假任务将需要三秒钟。
String message = String.join(" ", argv); channel.basicPublish("", "hello", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'");
我们旧的 Recv.java 程序还需要一些更改:它需要为消息正文中的每个点伪造一秒钟的工作。它将处理传递的消息并执行任务,所以我们称之为 Worker.java:
DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); } }; boolean autoAck = true; // acknowledgment is covered below channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
我们模拟执行时间的假任务:
private static void doWork(String task) throws InterruptedException { for (char ch: task.toCharArray()) { if (ch == '.') Thread.sleep(1000); } }
按照教程一中的方法编译它们(使用工作目录中的 jar 文件和环境变量 CP):
javac -cp $CP NewTask.java Worker.java
使用任务队列的优点之一是能够轻松并行工作。如果我们正在积压工作,我们可以添加更多的工作人员,这样就可以轻松扩展。
首先,让我们尝试同时运行两个工作实例。他们都会从队列中获取消息,但究竟如何呢?让我们来看看。
您需要打开三个控制台。两个将运行工人程序。这些控制台将是我们的两个消费者 ———— C1 和 C2。
# shell 1 java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C # shell 2 java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C
在第三个中,我们将发布新任务。启动消费者后,您可以发布一些消息:
# shell 3 java -cp $CP NewTask First message. # => [x] Sent 'First message.' java -cp $CP NewTask Second message.. # => [x] Sent 'Second message..' java -cp $CP NewTask Third message... # => [x] Sent 'Third message...' java -cp $CP NewTask Fourth message.... # => [x] Sent 'Fourth message....' java -cp $CP NewTask Fifth message..... # => [x] Sent 'Fifth message.....'
让我们看看交付给我们的工人的是什么:
java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'First message.' # => [x] Received 'Third message...' # => [x] Received 'Fifth message.....' java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'Second message..' # => [x] Received 'Fourth message....'
默认情况下,RabbitMQ 会按顺序将每条消息发送给下一个消费者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为循环,与三个或更多工人一起尝试一下。
完成一项任务可能需要几秒钟。您可能想知道如果其中一个消费者开始一项长期任务并且只完成了部分任务而死去会发生什么。使用我们当前的代码,一旦 RabbitMQ 将消息传递给消费者,它会立即将其标记为删除。在这种情况下,如果你杀死一个工人,我们将丢失它刚刚处理的消息。我们还将丢失所有发送给该特定工作人员但尚未处理的消息。
但是我们不想丢失任何任务。如果一个工人死亡,我们希望将任务交付给另一个工人。
为了确保消息永远不会丢失,RabbitMQ 支持消息确认。消费者发回一个确认,告诉 RabbitMQ 一个特定的消息已经被接收、处理并且 RabbitMQ 可以自由地删除它。
如果消费者在没有发送 ack 的情况下死亡(其通道关闭、连接关闭或 TCP 连接丢失),RabbitMQ 将理解消息未完全处理并将重新排队。如果同时有其他消费者在线,它会迅速将其重新发送给另一个消费者。这样,即使工人偶尔死亡,您也可以确保不会丢失任何消息。
对消费者交付确认强制执行超时(默认为 30 分钟)。这有助于检测从不确认交付的错误(卡住)消费者。您可以按照传递确认超时中的说明增加此超时。
默认情况下,手动消息确认是打开的。在前面的示例中,我们通过 autoAck=true 标志明确地关闭了它们。 一旦我们完成了一项任务,是时候将此标志设置为 false 并从工作人员那里发送适当的确认。
channel.basicQos(1); // accept only one unack-ed message at a time (see below) DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
使用此代码,我们可以确定即使您在处理消息时使用 CTRL+C 杀死了一个工人,也不会丢失任何内容。工人死亡后不久,所有未确认的消息将被重新传递。
确认必须在接收交付的同一通道上发送。尝试使用不同的通道进行确认将导致通道级协议异常。
错过 basicAck 是一个常见的错误。这是一个简单的错误,但后果很严重。当您的客户端退出时,消息将被重新传递(这可能看起来像随机重新传递),但 RabbitMQ 将消耗越来越多的内存,因为它无法释放任何未确认的消息。
为了调试这种错误,您可以使用 rabbitmqctl 打印 messages_unacknowledged 字段:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
在 Windows 上,删除 sudo:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
我们已经学会了如何确保即使消费者死亡,任务也不会丢失。但是如果 RabbitMQ 服务器停止,我们的任务仍然会丢失。
当 RabbitMQ 退出或崩溃时,它会忘记队列和消息,除非你告诉它不要这样做。确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久的。
首先,我们需要确保队列能够在 RabbitMQ 节点重启后继续存在。为此,我们需要将其声明为持久的:
boolean durable = true; channel.queueDeclare("hello", durable, false, false, null);
虽然这个命令本身是正确的,但它在我们目前的设置中不起作用。那是因为我们已经定义了一个名为 hello 的队列,它不是持久的。RabbitMQ 不允许您使用不同的参数重新定义现有队列,并且会向任何尝试这样做的程序返回错误。但是有一个快速的解决方法 ———— 让我们声明一个具有不同名称的队列,例如 task_queue:
boolean durable = true; channel.queueDeclare("task_queue", durable, false, false, null);
此 queueDeclare 更改需要同时应用于生产者和消费者代码。
至此,我们确定即使 RabbitMQ 重启,task_queue 队列也不会丢失。 现在我们需要将我们的消息标记为持久 ———— 通过将 MessageProperties(它实现 BasicProperties)设置为值 PERSISTENT_TEXT_PLAIN。
import com.rabbitmq.client.MessageProperties; channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
关于消息持久性的注意事项
将消息标记为持久性并不能完全保证消息不会丢失。虽然它告诉 RabbitMQ 将消息保存到磁盘,但是当 RabbitMQ 接受消息并且还没有保存它时,仍然有很短的时间窗口。此外,RabbitMQ 不会对每条消息都执行 fsync(2) ———— 它可能只是保存到缓存中而不是真正写入磁盘。持久性保证并不强,但对于我们简单的任务队列来说已经绰绰有余了。 如果您需要更强的保证,则可以使用发布者确认。
您可能已经注意到调度仍然不能完全按照我们的意愿工作。例如,在有两个工人的情况下,当所有奇数消息很重而偶数消息都很轻时,一个工人将一直很忙,另一个工人几乎不做任何工作。好吧,RabbitMQ 对此一无所知,仍然会均匀地发送消息。
发生这种情况是因为 RabbitMQ 只是在消息进入队列时分派消息。它不查看消费者未确认消息的数量。它只是盲目地将第 n 个消息发送给第 n 个消费者。
为了解决这个问题,我们可以使用带有 prefetchCount = 1 设置的 basicQos 方法。这告诉 RabbitMQ 一次不要给一个 worker 多条消息。或者,换句话说,在工作人员处理并确认之前的消息之前,不要向工作人员发送新消息。相反,它将把它分派给下一个不忙的工人。
int prefetchCount = 1; channel.basicQos(prefetchCount);
关于队列大小的注意事项
如果所有工作人员都很忙,您的队列可能会被填满。你会想要关注这一点,可能会增加更多的工人,或者有一些其他的策略。
我们的 NewTask.java 类的最终代码:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = String.join(" ", argv); channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); } } }
还有我们的 Worker.java:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } private static void doWork(String task) { for (char ch : task.toCharArray()) { if (ch == '.') { try { Thread.sleep(1000); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } } }
使用消息确认和 prefetchCount,您可以设置工作队列。即使 RabbitMQ 重新启动,持久性选项也能让任务继续存在。