跳到主要内容

监听器并发

DeepSeek V3 中英对照 Listener Concurrency

SimpleMessageListenerContainer

默认情况下,监听器容器会启动一个消费者,该消费者从队列中接收消息。

在查看上一节的表格时,可以看到许多控制并发的属性和参数。最简单的是 concurrentConsumers,它创建了(固定的)消费者数量,这些消费者并发处理消息。

在 1.3.0 版本之前,这是唯一可用的设置,并且必须停止并重新启动容器才能更改此设置。

自 1.3.0 版本起,你现在可以动态调整 concurrentConsumers 属性。如果该属性在容器运行时被更改,消费者将根据需要添加或删除,以适应新的设置。

此外,新增了一个名为 maxConcurrentConsumers 的属性,容器会根据工作负载动态调整并发度。该属性与另外四个属性协同工作:consecutiveActiveTriggerstartConsumerMinIntervalconsecutiveIdleTriggerstopConsumerMinInterval。在默认设置下,增加消费者的算法如下:

如果 maxConcurrentConsumers 尚未达到,并且现有的消费者在连续十个周期内处于活动状态,且自上一个消费者启动以来至少已经过去了 10 秒,则会启动一个新的消费者。如果消费者在 batchSize * receiveTimeout 毫秒内至少接收到一条消息,则被视为处于活动状态。

在默认设置下,减少消费者的算法工作方式如下:

如果有超过 concurrentConsumers 个消费者在运行,并且某个消费者检测到连续十次超时(空闲) 最后一个消费者至少在 60 秒前被停止,那么一个消费者将被停止。超时时间取决于 receiveTimeoutbatchSize 属性。如果消费者在 batchSize * receiveTimeout 毫秒内没有接收到任何消息,则被视为空闲。因此,在默认超时时间(1 秒)和 batchSize 为 4 的情况下,消费者在空闲 40 秒后(四次超时对应一次空闲检测)会被考虑停止。

备注

实际上,只有在整个容器空闲一段时间后,消费者才能被停止。这是因为 broker 在所有活跃的消费者之间共享其工作。

每个消费者使用单个通道,无论配置了多少个队列。

从 2.0 版本开始,concurrentConsumersmaxConcurrentConsumers 属性可以通过 concurrency 属性来设置——例如,2-4

使用 DirectMessageListenerContainer

使用这个容器时,并发性基于配置的队列和 consumersPerQueue。每个队列的每个消费者使用一个独立的通道,并发性由 RabbitMQ 客户端库控制。默认情况下,在撰写本文时,它使用一个线程池,大小为 DEFAULT_NUM_THREADS = Runtime.getRuntime().availableProcessors() * 2

你可以配置一个 taskExecutor 来提供所需的最大并发数。