跳到主要内容

按顺序启动 @KafkaListener

ChatGPT-4o-mini 中英对照 Starting @KafkaListeners in Sequence Starting @KafkaListeners in Sequence

一个常见的用例是在另一个监听器消费完主题中的所有记录后启动一个监听器。例如,您可能希望在处理来自其他主题的记录之前,将一个或多个压缩主题的内容加载到内存中。从版本 2.7.3 开始,引入了一个新组件 ContainerGroupSequencer。它使用 @KafkaListenercontainerGroup 属性将容器分组在一起,并在当前组中的所有容器都处于空闲状态时启动下一个组中的容器。

最好用一个例子来说明。

@KafkaListener(id = "listen1", topics = "topic1", containerGroup = "g1", concurrency = "2")
public void listen1(String in) {
}

@KafkaListener(id = "listen2", topics = "topic2", containerGroup = "g1", concurrency = "2")
public void listen2(String in) {
}

@KafkaListener(id = "listen3", topics = "topic3", containerGroup = "g2", concurrency = "2")
public void listen3(String in) {
}

@KafkaListener(id = "listen4", topics = "topic4", containerGroup = "g2", concurrency = "2")
public void listen4(String in) {
}

@Bean
ContainerGroupSequencer sequencer(KafkaListenerEndpointRegistry registry) {
return new ContainerGroupSequencer(registry, 5000, "g1", "g2");
}
java

这里,我们有 4 个监听器在两个组中,g1g2

在应用程序上下文初始化期间,序列器将提供的组中所有容器的 autoStartup 属性设置为 false。它还将任何未设置 idleEventInterval 的容器的 idleEventInterval 设置为提供的值(在本例中为 5000ms)。然后,当应用程序上下文启动序列器时,第一个组中的容器被启动。当接收到 ListenerContainerIdleEvent 时,每个容器中的每个子容器都会停止。当 ConcurrentMessageListenerContainer 中的所有子容器都停止时,父容器也会停止。当一个组中的所有容器都停止后,下一个组中的容器将被启动。组的数量或组中的容器数量没有限制。

默认情况下,最后一组(上面的 g2)中的容器在空闲时不会停止。要修改该行为,请在序列器上将 stopLastGroupWhenIdle 设置为 true

作为附注,以前每个组中的容器被添加到类型为 Collection<MessageListenerContainer> 的 bean 中,bean 名称为 containerGroup。这些集合现在已被弃用,取而代之的是类型为 ContainerGroup 的 beans,bean 名称为组名,后缀为 .group;在上面的例子中,将会有 2 个 beans g1.groupg2.groupCollection beans 将在未来的版本中被移除。