线程与异步消费者
许多不同的线程与异步消费者相关联。
在 SimpleMessageListenerContainer
中配置的 TaskExecutor
线程用于在 RabbitMQ Client
传递新消息时调用 MessageListener
。如果没有配置,将使用 SimpleAsyncTaskExecutor
。如果使用池化的执行器,需要确保池的大小足以处理配置的并发量。对于 DirectMessageListenerContainer
,MessageListener
是直接在 RabbitMQ Client
线程上调用的。在这种情况下,taskExecutor
用于监控消费者的任务。
当使用默认的 SimpleAsyncTaskExecutor
时,监听器被调用的线程会在 threadNamePrefix
中使用监听器容器的 beanName
。这对于日志分析非常有用。我们通常建议始终在日志记录器配置中包含线程名称。当通过容器上的 taskExecutor
属性显式提供一个 TaskExecutor
时,它会按原样使用,不做任何修改。建议您使用类似的技术来命名由自定义 TaskExecutor
bean 定义创建的线程,以帮助在日志消息中识别线程。
在 CachingConnectionFactory
中配置的 Executor
会在创建连接时传递给 RabbitMQ Client
,其线程用于将新消息传递给监听器容器。如果未配置此选项,客户端将使用内部线程池执行器,其池大小(截至撰写时)为每个连接的 Runtime.getRuntime().availableProcessors() * 2
。
如果你有大量的工厂或正在使用 CacheMode.CONNECTION
,你可能希望考虑使用一个共享的 ThreadPoolTaskExecutor
,并确保它有足够的线程来满足你的工作负载。
使用 DirectMessageListenerContainer
时,您需要确保连接工厂配置了一个具有足够线程的任务执行器,以支持所有使用该工厂的监听器容器所需的并发性。默认的线程池大小(截至撰写时)是 Runtime.getRuntime().availableProcessors() * 2
。
RabbitMQ 客户端
使用 ThreadFactory
来为底层 I/O(套接字)操作创建线程。要修改这个工厂,你需要配置底层的 RabbitMQ ConnectionFactory
,如 配置底层客户端连接工厂 中所述。