线程安全
当使用并发消息监听器容器时,单个监听器实例会在所有消费者线程上被调用。因此,监听器需要是线程安全的,最好使用无状态监听器。如果无法使监听器线程安全,或者添加同步会显著降低并发带来的好处,可以使用以下几种技术之一:
-
使用
n
个容器,concurrency=1
,并使用原型作用域的MessageListener
bean,以便每个容器都有自己的实例(在使用@KafkaListener
时这是不可能的)。 -
将状态保存在
ThreadLocal<?>
实例中。 -
让单例监听器委托给在
SimpleThreadScope
(或类似作用域)中声明的 bean。
为了方便清理线程状态(针对前面列表中的第二和第三项),从版本 2.2 开始,监听器容器在每个线程退出时发布 ConsumerStoppedEvent
。您可以使用 ApplicationListener
或 @EventListener
方法来消费这些事件,以移除 ThreadLocal<?>
实例或从作用域中 remove()
线程作用域的 bean。请注意,SimpleThreadScope
不会销毁具有销毁接口的 bean(例如 DisposableBean
),因此您应该自己 destroy()
实例。
默认情况下,应用程序上下文的事件多路广播器在调用线程上调用事件监听器。如果您将多路广播器更改为使用异步执行器,则线程清理将无效。
关于虚拟线程和并发消息监听器容器的特别说明
由于底层库类在进行线程协调时仍然使用 synchronized
块的某些限制,应用程序在使用虚拟线程与并发消息监听容器时需要谨慎。当启用虚拟线程时,如果并发超过可用的平台线程数量,虚拟线程很可能会被固定在平台线程上,从而可能导致竞争条件。因此,建议在 Spring for Apache Kafka 使用的第三方库完全支持虚拟线程之前,将消息监听容器的并发性保持在等于或少于平台线程数量。这样,应用程序可以避免线程之间以及虚拟线程被固定在平台线程上的任何竞争条件。