跳到主要内容

应用程序事件

ChatGPT-4o-mini 中英对照 Application Events

以下 Spring 应用程序事件由监听器容器及其消费者发布:

  • ConsumerStartingEvent: 当消费者线程首次启动时发布,在它开始轮询之前。

  • ConsumerStartedEvent: 当消费者即将开始轮询时发布。

  • ConsumerFailedToStartEvent: 如果在 consumerStartTimeout 容器属性内没有发布 ConsumerStartingEvent,则发布此事件。此事件可能表示配置的任务执行器的线程不足以支持其使用的容器及其并发性。当发生此情况时,还会记录错误消息。

  • ListenerContainerIdleEvent: 当在 idleEventInterval 内没有接收到消息时发布(如果已配置)。

  • ListenerContainerNoLongerIdleEvent: 在之前发布 ListenerContainerIdleEvent 后,当消费到一条记录时发布。

  • ListenerContainerPartitionIdleEvent: 当在 idlePartitionEventInterval 内没有从该分区接收到消息时发布(如果已配置)。

  • ListenerContainerPartitionNoLongerIdleEvent: 当从之前发布过 ListenerContainerPartitionIdleEvent 的分区消费到一条记录时发布。

  • NonResponsiveConsumerEvent: 当消费者似乎在 poll 方法中被阻塞时发布。

  • ConsumerPartitionPausedEvent: 每个消费者在分区被暂停时发布。

  • ConsumerPartitionResumedEvent: 每个消费者在分区恢复时发布。

  • ConsumerPausedEvent: 每个消费者在容器被暂停时发布。

  • ConsumerResumedEvent: 每个消费者在容器恢复时发布。

  • ConsumerStoppingEvent: 每个消费者在停止之前发布。

  • ConsumerStoppedEvent: 在消费者关闭后发布。请参见 Thread Safety

  • ConsumerRetryAuthEvent: 当消费者的身份验证或授权失败并正在重试时发布。

  • ConsumerRetryAuthSuccessfulEvent: 当身份验证或授权成功重试时发布。只能在之前有 ConsumerRetryAuthEvent 的情况下发生。

  • ContainerStoppedEvent: 当所有消费者都已停止时发布。

  • ConcurrentContainerStoppedEvent: 当 ConcurrentMessageListenerContainer 停止时发布。

important

默认情况下,应用程序上下文的事件多播器在调用线程上调用事件监听器。如果您将多播器更改为使用异步执行器,则在事件包含对消费者的引用时,您必须避免调用任何 Consumer 方法。

ListenerContainerIdleEvent 具有以下属性:

  • source: 发布事件的监听器容器实例。

  • container: 监听器容器或父监听器容器,如果源容器是子容器。

  • id: 监听器 ID(或容器 bean 名称)。

  • idleTime: 发布事件时容器处于空闲状态的时间。

  • topicPartitions: 事件生成时容器被分配的主题和分区。

  • consumer: 对 Kafka Consumer 对象的引用。例如,如果之前调用了消费者的 pause() 方法,则在接收到事件时可以 resume()

  • paused: 容器当前是否处于暂停状态。有关更多信息,请参见 Pausing and Resuming Listener Containers

ListenerContainerNoLongerIdleEvent 具有相同的属性,除了 idleTimepaused

ListenerContainerPartitionIdleEvent 具有以下属性:

  • source: 发布事件的监听器容器实例。

  • container: 监听器容器或父监听器容器,如果源容器是子容器。

  • id: 监听器 ID(或容器 bean 名称)。

  • idleTime: 在事件发布时,时间分区消费处于空闲状态的时间。

  • topicPartition: 触发事件的主题和分区。

  • consumer: 对 Kafka Consumer 对象的引用。例如,如果之前调用了消费者的 pause() 方法,则在接收到事件时可以 resume()

  • paused: 该分区消费是否当前已暂停。有关更多信息,请参见 Pausing and Resuming Listener Containers

ListenerContainerPartitionNoLongerIdleEvent 具有相同的属性,除了 idleTimepaused

NonResponsiveConsumerEvent 具有以下属性:

  • source: 发布事件的监听器容器实例。

  • container: 监听器容器或父监听器容器,如果源容器是子容器的话。

  • id: 监听器 ID(或容器 bean 名称)。

  • timeSinceLastPoll: 容器上一次调用 poll() 之前的时间。

  • topicPartitions: 事件生成时容器被分配的主题和分区。

  • consumer: 对 Kafka Consumer 对象的引用。例如,如果消费者的 pause() 方法之前被调用,则在接收到事件时可以 resume()

  • paused: 容器当前是否处于暂停状态。有关更多信息,请参见 Pausing and Resuming Listener Containers

ConsumerPausedEventConsumerResumedEventConsumerStopping 事件具有以下属性:

  • source: 发布事件的监听器容器实例。

  • container: 监听器容器或父监听器容器,如果源容器是子容器。

  • partitions: 相关的 TopicPartition 实例。

ConsumerPartitionPausedEventConsumerPartitionResumedEvent 事件具有以下属性:

  • source: 发布事件的监听器容器实例。

  • container: 监听器容器或父监听器容器,如果源容器是子容器。

  • partition: 涉及的 TopicPartition 实例。

ConsumerRetryAuthEvent 事件具有以下属性:

  • source: 发布事件的监听器容器实例。

  • container: 监听器容器或父监听器容器,如果源容器是子容器。

  • reason:

    • AUTHENTICATION - 事件因身份验证异常而发布。

    • AUTHORIZATION - 事件因授权异常而发布。

ConsumerStartingEventConsumerStartedEventConsumerFailedToStartEventConsumerStoppedEventConsumerRetryAuthSuccessfulEventContainerStoppedEvent 事件具有以下属性:

  • source: 发布事件的监听器容器实例。

  • container: 监听器容器或父监听器容器,如果源容器是子容器。

所有容器(无论是子容器还是父容器)都会发布 ContainerStoppedEvent。对于父容器,source 和 container 属性是相同的。

此外,ConsumerStoppedEvent 具有以下附加属性:

  • reason:

    • NORMAL - 消费者正常停止(容器已停止)。

    • ERROR - 抛出了一个 java.lang.Error

    • FENCED - 事务性生产者被围栏限制,且 stopContainerWhenFenced 容器属性为 true

    • AUTH - 抛出了一个 AuthenticationExceptionAuthorizationException,且 authExceptionRetryInterval 未配置。

    • NO_OFFSET - 分区没有偏移量,且 auto.offset.reset 策略为 none

您可以使用此事件在出现这种情况后重新启动容器:

if (event.getReason().equals(Reason.FENCED)) {
event.getSource(MessageListenerContainer.class).start();
}
java

检测空闲和无响应的消费者

虽然异步消费者效率高,但一个问题是检测它们何时处于空闲状态。如果在一段时间内没有消息到达,您可能希望采取一些措施。

您可以配置监听器容器,以便在一段时间内没有消息传递时发布 ListenerContainerIdleEvent。当容器处于空闲状态时,每隔 idleEventInterval 毫秒发布一次事件。

要配置此功能,请在容器上设置 idleEventInterval。以下示例演示了如何做到这一点:

@Bean
public KafkaMessageListenerContainer(ConsumerFactory<String, String> consumerFactory) {
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
...
containerProps.setIdleEventInterval(60000L);
...
KafkaMessageListenerContainer<String, String> container = new KafKaMessageListenerContainer<>(consumerFactory, containerProps);
return container;
}
java

以下示例演示如何为 @KafkaListener 设置 idleEventInterval

@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.getContainerProperties().setIdleEventInterval(60000L);
...
return factory;
}
java

在这些情况下,当容器处于空闲状态时,每分钟发布一次事件。

如果出于某种原因,消费者 poll() 方法没有退出,消息不会被接收,空闲事件也无法生成(这在早期版本的 kafka-clients 中是一个问题,当代理不可达时)。在这种情况下,如果 poll3xpollTimeout 属性内没有返回,容器将发布一个 NonResponsiveConsumerEvent。默认情况下,这个检查在每个容器中每 30 秒执行一次。您可以通过在配置监听器容器时设置 ContainerProperties 中的 monitorInterval(默认 30 秒)和 noPollThreshold(默认 3.0)属性来修改此行为。noPollThreshold 应大于 1.0 以避免由于竞争条件而产生虚假事件。接收到这样的事件可以让您停止容器,从而唤醒消费者以便它可以停止。

从版本 2.6.2 开始,如果一个容器发布了 ListenerContainerIdleEvent,当随后接收到记录时,它将发布 ListenerContainerNoLongerIdleEvent

事件消费

您可以通过实现 ApplicationListener 来捕获这些事件 — 可以是一个通用监听器,也可以是一个仅接收此特定事件的监听器。您还可以使用 @EventListener,该注解在 Spring Framework 4.2 中引入。

下面的示例将 @KafkaListener@EventListener 结合到一个类中。您应该理解,应用程序监听器会接收所有容器的事件,因此如果您想根据哪个容器空闲来采取特定操作,您可能需要检查监听器 ID。您还可以使用 @EventListenercondition 来实现这个目的。

请参见 Application Events 以获取有关事件属性的信息。

该事件通常在消费者线程上发布,因此与 Consumer 对象进行交互是安全的。

以下示例同时使用 @KafkaListener@EventListener

public class Listener {

@KafkaListener(id = "qux", topics = "annotated")
public void listen4(@Payload String foo, Acknowledgment ack) {
...
}

@EventListener(condition = "event.listenerId.startsWith('qux-')")
public void eventHandler(ListenerContainerIdleEvent event) {
...
}

}
java
important

事件监听器会接收所有容器的事件。因此,在前面的示例中,我们根据监听器 ID 来缩小接收到的事件范围。由于为 @KafkaListener 创建的容器支持并发,实际的容器命名为 id-n,其中 n 是每个实例的唯一值,以支持并发。这就是我们在条件中使用 startsWith 的原因。

警告

如果您希望使用空闲事件来停止监听器容器,则不应在调用监听器的线程上调用 container.stop()。这样会导致延迟和不必要的日志消息。相反,您应该将事件交给一个可以停止容器的不同线程。此外,如果容器实例是子容器,则不应调用 stop()。您应该停止并发容器。

空闲时的当前状态

请注意,您可以通过在监听器中实现 ConsumerSeekAware 来获取检测到空闲时的当前位置信息。请参阅 seek 中的 onIdleContainer()