跳到主要内容

@KafkaListener 生命周期管理

ChatGPT-4o-mini 中英对照 @KafkaListener Lifecycle Management @KafkaListener Lifecycle Management

@KafkaListener 注解创建的监听器容器不是应用程序上下文中的 bean。相反,它们是注册在类型为 KafkaListenerEndpointRegistry 的基础设施 bean 中。该 bean 是由框架自动声明的,并管理容器的生命周期;它会自动启动任何 autoStartup 设置为 true 的容器。所有由所有容器工厂创建的容器必须处于相同的 phase。有关更多信息,请参见 Listener Container Auto Startup。您可以通过使用注册表以编程方式管理生命周期。启动或停止注册表将启动或停止所有注册的容器。或者,您可以通过使用其 id 属性获取对单个容器的引用。您可以在注解上设置 autoStartup,这将覆盖配置到容器工厂中的默认设置。您可以从应用程序上下文中获取对 bean 的引用,例如自动装配,以管理其注册的容器。以下示例展示了如何做到这一点:

@KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false")
public void listen(...) { ... }
java
@Autowired
private KafkaListenerEndpointRegistry registry;

...

this.registry.getListenerContainer("myContainer").start();

...
java

注册表仅维护它管理的容器的生命周期;声明为 beans 的容器不受注册表管理,可以从应用程序上下文中获取。可以通过调用注册表的 getListenerContainers() 方法获取一组受管理的容器。版本 2.2.5 添加了一个便利方法 getAllListenerContainers(),该方法返回所有容器的集合,包括注册表管理的容器和声明为 beans 的容器。返回的集合将包括任何已初始化的原型 beans,但不会初始化任何延迟 bean 声明。

important

在应用程序上下文刷新后注册的端点将立即启动,无论它们的 autoStartup 属性如何,以遵守 SmartLifecycle 合同,其中 autoStartup 仅在应用程序上下文初始化期间被考虑。迟注册的一个例子是一个具有 @KafkaListener 的原型作用域的 bean,其中在上下文初始化后创建了一个实例。从版本 2.8.7 开始,您可以将注册表的 alwaysStartAfterRefresh 属性设置为 false,然后容器的 autoStartup 属性将定义容器是否启动。

从 KafkaListenerEndpointRegistry 检索 MessageListenerContainers

KafkaListenerEndpointRegistry 提供了用于检索 MessageListenerContainer 实例的方法,以适应各种管理场景:

所有容器:对于覆盖所有监听器容器的操作,使用 getListenerContainers() 来检索一个全面的集合。

Collection<MessageListenerContainer> allContainers = registry.getListenerContainers();
java

通过 ID 获取特定容器:要管理单个容器,getListenerContainer(String id) 允许通过其 id 进行检索。

MessageListenerContainer specificContainer = registry.getListenerContainer("myContainerId");
java

动态容器过滤:在版本 3.2 中引入的两个重载的 getListenerContainersMatching 方法使得容器的选择更加精细。其中一个方法接受一个 Predicate<String> 作为参数用于基于 ID 的过滤,而另一个方法则接受一个 BiPredicate<String, MessageListenerContainer> 作为参数,用于更高级的标准,这些标准可能包括容器属性或状态。

// Prefix matching (Predicate<String>)
Collection<MessageListenerContainer> filteredContainers =
registry.getListenerContainersMatching(id -> id.startsWith("productListener-retry-"));

// Regex matching (Predicate<String>)
Collection<MessageListenerContainer> regexFilteredContainers =
registry.getListenerContainersMatching(myPattern::matches);

// Pre-built Set of IDs (Predicate<String>)
Collection<MessageListenerContainer> setFilteredContainers =
registry.getListenerContainersMatching(myIdSet::contains);

// Advanced Filtering: ID prefix and running state (BiPredicate<String, MessageListenerContainer>)
Collection<MessageListenerContainer> advancedFilteredContainers =
registry.getListenerContainersMatching(
(id, container) -> id.startsWith("specificPrefix-") && container.isRunning()
);
java

利用这些方法在您的应用程序中高效管理和查询 MessageListenerContainer 实例。