应用程序事件
以下 Spring 应用程序事件由监听器容器及其消费者发布:
-
ConsumerStartingEvent
: 当消费者线程首次启动时发布,在它开始轮询之前。 -
ConsumerStartedEvent
: 当消费者即将开始轮询时发布。 -
ConsumerFailedToStartEvent
: 如果在consumerStartTimeout
容器属性内没有发布ConsumerStartingEvent
,则发布此事件。此事件可能表示配置的任务执行器的线程不足以支持其使用的容器及其并发性。当发生此情况时,还会记录错误消息。 -
ListenerContainerIdleEvent
: 当在idleEventInterval
内没有接收到消息时发布(如果已配置)。 -
ListenerContainerNoLongerIdleEvent
: 在之前发布ListenerContainerIdleEvent
后,当消费到一条记录时发布。 -
ListenerContainerPartitionIdleEvent
: 当在idlePartitionEventInterval
内没有从该分区接收到消息时发布(如果已配置)。 -
ListenerContainerPartitionNoLongerIdleEvent
: 当从之前发布过ListenerContainerPartitionIdleEvent
的分区消费到一条记录时发布。 -
NonResponsiveConsumerEvent
: 当消费者似乎在poll
方法中被阻塞时发布。 -
ConsumerPartitionPausedEvent
: 每个消费者在分区被暂停时发布。 -
ConsumerPartitionResumedEvent
: 每个消费者在分区恢复时发布。 -
ConsumerPausedEvent
: 每个消费者在容器被暂停时发布。 -
ConsumerResumedEvent
: 每个消费者在容器恢复时发布。 -
ConsumerStoppingEvent
: 每个消费者在停止之前发布。 -
ConsumerStoppedEvent
: 在消费者关闭后发布。请参见 Thread Safety。 -
ConsumerRetryAuthEvent
: 当消费者的身份验证或授权失败并正在重试时发布。 -
ConsumerRetryAuthSuccessfulEvent
: 当身份验证或授权成功重试时发布。只能在之前有ConsumerRetryAuthEvent
的情况下发生。 -
ContainerStoppedEvent
: 当所有消费者都已停止时发布。 -
ConcurrentContainerStoppedEvent
: 当ConcurrentMessageListenerContainer
停止时发布。
默认情况下,应用程序上下文的事件多播器在调用线程上调用事件监听器。如果您将多播器更改为使用异步执行器,则在事件包含对消费者的引用时,您必须避免调用任何 Consumer
方法。
ListenerContainerIdleEvent
具有以下属性:
-
source
: 发布事件的监听器容器实例。 -
container
: 监听器容器或父监听器容器,如果源容器是子容器。 -
id
: 监听器 ID(或容器 bean 名称)。 -
idleTime
: 发布事件时容器处于空闲状态的时间。 -
topicPartitions
: 事件生成时容器被分配的主题和分区。 -
consumer
: 对 KafkaConsumer
对象的引用。例如,如果之前调用了消费者的pause()
方法,则在接收到事件时可以resume()
。 -
paused
: 容器当前是否处于暂停状态。有关更多信息,请参见 Pausing and Resuming Listener Containers。
ListenerContainerNoLongerIdleEvent
具有相同的属性,除了 idleTime
和 paused
。
ListenerContainerPartitionIdleEvent
具有以下属性:
-
source
: 发布事件的监听器容器实例。 -
container
: 监听器容器或父监听器容器,如果源容器是子容器。 -
id
: 监听器 ID(或容器 bean 名称)。 -
idleTime
: 在事件发布时,时间分区消费处于空闲状态的时间。 -
topicPartition
: 触发事件的主题和分区。 -
consumer
: 对 KafkaConsumer
对象的引用。例如,如果之前调用了消费者的pause()
方法,则在接收到事件时可以resume()
。 -
paused
: 该分区消费是否当前已暂停。有关更多信息,请参见 Pausing and Resuming Listener Containers。
ListenerContainerPartitionNoLongerIdleEvent
具有相同的属性,除了 idleTime
和 paused
。
NonResponsiveConsumerEvent
具有以下属性:
-
source
: 发布事件的监听器容器实例。 -
container
: 监听器容器或父监听器容器,如果源容器是子容器的话。 -
id
: 监听器 ID(或容器 bean 名称)。 -
timeSinceLastPoll
: 容器上一次调用poll()
之前的时间。 -
topicPartitions
: 事件生成时容器被分配的主题和分区。 -
consumer
: 对 KafkaConsumer
对象的引用。例如,如果消费者的pause()
方法之前被调用,则在接收到事件时可以resume()
。 -
paused
: 容器当前是否处于暂停状态。有关更多信息,请参见 Pausing and Resuming Listener Containers。
ConsumerPausedEvent
、ConsumerResumedEvent
和 ConsumerStopping
事件具有以下属性:
-
source
: 发布事件的监听器容器实例。 -
container
: 监听器容器或父监听器容器,如果源容器是子容器。 -
partitions
: 相关的TopicPartition
实例。
ConsumerPartitionPausedEvent
和 ConsumerPartitionResumedEvent
事件具有以下属性:
-
source
: 发布事件的监听器容器实例。 -
container
: 监听器容器或父监听器容器,如果源容器是子容器。 -
partition
: 涉及的TopicPartition
实例。
ConsumerRetryAuthEvent
事件具有以下属性:
-
source
: 发布事件的监听器容器实例。 -
container
: 监听器容器或父监听器容器,如果源容器是子容器。 -
reason
:-
AUTHENTICATION
- 事件因身份验证异常而发布。 -
AUTHORIZATION
- 事件因授权异常而发布。
-
ConsumerStartingEvent
、ConsumerStartedEvent
、ConsumerFailedToStartEvent
、ConsumerStoppedEvent
、ConsumerRetryAuthSuccessfulEvent
和 ContainerStoppedEvent
事件具有以下属性:
-
source
: 发布事件的监听器容器实例。 -
container
: 监听器容器或父监听器容器,如果源容器是子容器。
所有容器(无论是子容器还是父容器)都会发布 ContainerStoppedEvent
。对于父容器,source 和 container 属性是相同的。
此外,ConsumerStoppedEvent
具有以下附加属性:
-
reason
:-
NORMAL
- 消费者正常停止(容器已停止)。 -
ERROR
- 抛出了一个java.lang.Error
。 -
FENCED
- 事务性生产者被围栏限制,且stopContainerWhenFenced
容器属性为true
。 -
AUTH
- 抛出了一个AuthenticationException
或AuthorizationException
,且authExceptionRetryInterval
未配置。 -
NO_OFFSET
- 分区没有偏移量,且auto.offset.reset
策略为none
。
-
您可以使用此事件在出现这种情况后重新启动容器:
if (event.getReason().equals(Reason.FENCED)) {
event.getSource(MessageListenerContainer.class).start();
}
检测空闲和无响应的消费者
虽然异步消费者效率高,但一个问题是检测它们何时处于空闲状态。如果在一段时间内没有消息到达,您可能希望采取一些措施。
您可以配置监听器容器,以便在一段时间内没有消息传递时发布 ListenerContainerIdleEvent
。当容器处于空闲状态时,每隔 idleEventInterval
毫秒发布一次事件。
要配置此功能,请在容器上设置 idleEventInterval
。以下示例演示了如何做到这一点:
@Bean
public KafkaMessageListenerContainer(ConsumerFactory<String, String> consumerFactory) {
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
...
containerProps.setIdleEventInterval(60000L);
...
KafkaMessageListenerContainer<String, String> container = new KafKaMessageListenerContainer<>(consumerFactory, containerProps);
return container;
}
以下示例演示如何为 @KafkaListener
设置 idleEventInterval
:
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.getContainerProperties().setIdleEventInterval(60000L);
...
return factory;
}
在这些情况下,当容器处于空闲状态时,每分钟发布一次事件。
如果出于某种原因,消费者 poll()
方法没有退出,消息不会被接收,空闲事件也无法生成(这在早期版本的 kafka-clients
中是一个问题,当代理不可达时)。在这种情况下,如果 poll
在 3x
的 pollTimeout
属性内没有返回,容器将发布一个 NonResponsiveConsumerEvent
。默认情况下,这个检查在每个容器中每 30 秒执行一次。您可以通过在配置监听器容器时设置 ContainerProperties
中的 monitorInterval
(默认 30 秒)和 noPollThreshold
(默认 3.0)属性来修改此行为。noPollThreshold
应大于 1.0
以避免由于竞争条件而产生虚假事件。接收到这样的事件可以让您停止容器,从而唤醒消费者以便它可以停止。
从版本 2.6.2 开始,如果一个容器发布了 ListenerContainerIdleEvent
,当随后接收到记录时,它将发布 ListenerContainerNoLongerIdleEvent
。
事件消费
您可以通过实现 ApplicationListener
来捕获这些事件 — 可以是一个通用监听器,也可以是一个仅接收此特定事件的监听器。您还可以使用 @EventListener
,该注解在 Spring Framework 4.2 中引入。
下面的示例将 @KafkaListener
和 @EventListener
结合到一个类中。您应该理解,应用程序监听器会接收所有容器的事件,因此如果您想根据哪个容器空闲来采取特定操作,您可能需要检查监听器 ID。您还可以使用 @EventListener
的 condition
来实现这个目的。
请参见 Application Events 以获取有关事件属性的信息。
该事件通常在消费者线程上发布,因此与 Consumer
对象进行交互是安全的。
以下示例同时使用 @KafkaListener
和 @EventListener
:
public class Listener {
@KafkaListener(id = "qux", topics = "annotated")
public void listen4(@Payload String foo, Acknowledgment ack) {
...
}
@EventListener(condition = "event.listenerId.startsWith('qux-')")
public void eventHandler(ListenerContainerIdleEvent event) {
...
}
}
事件监听器会接收所有容器的事件。因此,在前面的示例中,我们根据监听器 ID 来缩小接收到的事件范围。由于为 @KafkaListener
创建的容器支持并发,实际的容器命名为 id-n
,其中 n
是每个实例的唯一值,以支持并发。这就是我们在条件中使用 startsWith
的原因。
如果您希望使用空闲事件来停止监听器容器,则不应在调用监听器的线程上调用 container.stop()
。这样会导致延迟和不必要的日志消息。相反,您应该将事件交给一个可以停止容器的不同线程。此外,如果容器实例是子容器,则不应调用 stop()
。您应该停止并发容器。
空闲时的当前状态
请注意,您可以通过在监听器中实现 ConsumerSeekAware
来获取检测到空闲时的当前位置信息。请参阅 seek 中的 onIdleContainer()
。