监听器并发
SimpleMessageListenerContainer
默认情况下,监听器容器会启动一个消费者,该消费者从队列中接收消息。
在查看上一节的表格时,可以看到许多控制并发的属性和参数。最简单的是 concurrentConsumers
,它创建了(固定的)消费者数量,这些消费者并发处理消息。
在 1.3.0 版本之前,这是唯一可用的设置,并且必须停止并重新启动容器才能更改此设置。
自 1.3.0 版本起,你现在可以动态调整 concurrentConsumers
属性。如果该属性在容器运行时被更改,消费者将根据需要添加或删除,以适应新的设置。
此外,新增了一个名为 maxConcurrentConsumers
的属性,容器会根据工作负载动态调整并发度。该属性与另外四个属性协同工作:consecutiveActiveTrigger
、startConsumerMinInterval
、consecutiveIdleTrigger
和 stopConsumerMinInterval
。在默认设置下,增加消费者的算法如下:
如果 maxConcurrentConsumers
尚未达到,并且现有的消费者在连续十个周期内处于活动状态,且自上一个消费者启动以来至少已经过去了 10 秒,则会启动一个新的消费者。如果消费者在 batchSize
* receiveTimeout
毫秒内至少接收到一条消息,则被视为处于活动状态。
在默认设置下,减少消费者的算法工作方式如下:
如果有超过 concurrentConsumers
个消费者在运行,并且某个消费者检测到连续十次超时(空闲)且 最后一个消费者至少在 60 秒前被停止,那么一个消费者将被停止。超时时间取决于 receiveTimeout
和 batchSize
属性。如果消费者在 batchSize
* receiveTimeout
毫秒内没有接收到任何消息,则被视为空闲。因此,在默认超时时间(1 秒)和 batchSize
为 4 的情况下,消费者在空闲 40 秒后(四次超时对应一次空闲检测)会被考虑停止。
实际上,只有在整个容器空闲一段时间后,消费者才能被停止。这是因为 broker 在所有活跃的消费者之间共享其工作。
每个消费者使用单个通道,无论配置了多少个队列。
从 2.0 版本开始,concurrentConsumers
和 maxConcurrentConsumers
属性可以通过 concurrency
属性来设置——例如,2-4
。
使用 DirectMessageListenerContainer
使用这个容器时,并发性基于配置的队列和 consumersPerQueue
。每个队列的每个消费者使用一个独立的通道,并发性由 RabbitMQ 客户端库控制。默认情况下,在撰写本文时,它使用一个线程池,大小为 DEFAULT_NUM_THREADS = Runtime.getRuntime().availableProcessors() * 2
。
你可以配置一个 taskExecutor
来提供所需的最大并发数。