跳到主要内容

消息监听器容器

ChatGPT-4o-mini 中英对照 Message Listener Containers

提供了两个 MessageListenerContainer 实现:

  • KafkaMessageListenerContainer

  • ConcurrentMessageListenerContainer

KafkaMessageListenerContainer 在单个线程上接收来自所有主题或分区的所有消息。ConcurrentMessageListenerContainer 委托给一个或多个 KafkaMessageListenerContainer 实例,以提供多线程消费。

从版本 2.2.7 开始,您可以将 RecordInterceptor 添加到监听器容器中;它将在调用监听器之前被调用,允许检查或修改记录。如果拦截器返回 null,则不会调用监听器。从版本 2.7 开始,它有额外的方法,这些方法在监听器退出后被调用(正常退出或抛出异常)。此外,从版本 2.7 开始,现在有一个 BatchInterceptor,为 Batch Listeners 提供类似的功能。此外,ConsumerAwareRecordInterceptor(和 BatchInterceptor)提供对 Consumer<?, ?> 的访问。这可以用于,例如,在拦截器中访问消费者指标。

important

您不应在这些拦截器中执行任何影响消费者的位置信息和已提交偏移量的方法;容器需要管理这些信息。

important

如果拦截器修改了记录(通过创建一个新记录),则 topicpartitionoffset 必须保持不变,以避免意外的副作用,例如记录丢失。

CompositeRecordInterceptorCompositeBatchInterceptor 可用于调用多个拦截器。

默认情况下,从版本 2.8 开始,在使用事务时,拦截器在事务开始之前被调用。您可以将监听器容器的 interceptBeforeTx 属性设置为 false,以便在事务开始之后调用拦截器。从版本 2.9 开始,这将适用于任何事务管理器,而不仅仅是 KafkaAwareTransactionManager。这允许拦截器参与由容器启动的 JDBC 事务。

从版本 2.3.8 和 2.4.6 开始,ConcurrentMessageListenerContainer 现在支持 Static Membership,当并发性大于 1 时。group.instance.id-n 结尾,其中 n1 开始。结合增加的 session.timeout.ms,可以减少再平衡事件,例如,当应用程序实例重新启动时。

使用 KafkaMessageListenerContainer

以下构造函数可用:

public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
java

Request error occurred:

public ContainerProperties(TopicPartitionOffset... topicPartitions)

public ContainerProperties(String... topics)

public ContainerProperties(Pattern topicPattern)
java

第一个构造函数接受一个 TopicPartitionOffset 数组作为参数,以明确指示容器使用哪些分区(使用消费者的 assign() 方法)以及可选的初始偏移量。默认情况下,正值是绝对偏移量。默认情况下,负值是相对于分区内当前最后偏移量的。提供了一个 TopicPartitionOffset 的构造函数,该构造函数接受一个额外的 boolean 参数。如果该参数为 true,则初始偏移量(正值或负值)相对于该消费者的当前位置信息。偏移量在容器启动时应用。第二个构造函数接受一个主题数组,Kafka 根据 group.id 属性分配分区 — 在组内分配分区。第三个构造函数使用正则表达式 Pattern 来选择主题。

要将 MessageListener 分配给一个容器,可以在创建容器时使用 ContainerProps.setMessageListener 方法。以下示例演示了如何做到这一点:

ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
java

注意,当创建 DefaultKafkaConsumerFactory 时,使用上述仅接受属性的构造函数意味着键和值的 Deserializer 类是从配置中获取的。或者,可以将 Deserializer 实例传递给 DefaultKafkaConsumerFactory 构造函数,以用于键和/或值,在这种情况下,所有消费者共享相同的实例。另一个选项是提供 Supplier<Deserializer>(从版本 2.3 开始),将用于为每个 Consumer 获取单独的 Deserializer 实例:

DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
java

有关可以设置的各种属性的更多信息,请参阅 ContainerPropertiesJavadoc

从版本 2.1.1 开始,新增了一个名为 logContainerConfig 的属性。当该属性为 true 且启用了 INFO 日志时,每个监听器容器会写入一条日志消息,概述其配置属性。

默认情况下,主题偏移量提交的日志记录是在 DEBUG 日志级别下进行的。从版本 2.1.2 开始,ContainerProperties 中有一个名为 commitLogLevel 的属性,允许您指定这些消息的日志级别。例如,要将日志级别更改为 INFO,您可以使用 containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);

从版本 2.2 开始,新增了一个名为 missingTopicsFatal 的容器属性(默认值:自 2.3.4 起为 false)。如果配置的任何主题在代理上不存在,则此属性会阻止容器启动。如果容器配置为监听主题模式(正则表达式),则不适用。之前,容器线程在 consumer.poll() 方法中循环等待主题出现,同时记录许多消息。除了日志,没有其他迹象表明存在问题。

从版本 2.8 开始,引入了一个新的容器属性 authExceptionRetryInterval。这使得容器在从 KafkaConsumer 获取到任何 AuthenticationExceptionAuthorizationException 后,会重新尝试获取消息。这种情况可能发生在,例如,配置的用户被拒绝访问某个主题或凭证不正确。定义 authExceptionRetryInterval 允许容器在授予适当权限时进行恢复。

备注

默认情况下,没有配置间隔 - 认证和授权错误被视为致命错误,这会导致容器停止。

从版本 2.8 开始,当创建消费者工厂时,如果您提供反序列化器作为对象(在构造函数中或通过设置器),工厂将调用 configure() 方法来使用配置属性对它们进行配置。

使用 ConcurrentMessageListenerContainer

单个构造函数类似于 KafkaListenerContainer 构造函数。以下列表显示了构造函数的签名:

public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
java

它还有一个 concurrency 属性。例如,container.setConcurrency(3) 创建三个 KafkaMessageListenerContainer 实例。

如果容器属性为主题(或主题模式)配置,Kafka 将通过其组管理能力在消费者之间分配分区。

important

在监听多个主题时,默认的分区分配可能并不是你所期望的。例如,如果你有三个主题,每个主题有五个分区,并且你想使用 concurrency=15,你会看到只有五个活跃的消费者,每个消费者被分配了来自每个主题的一个分区,其余的 10 个消费者处于空闲状态。这是因为默认的 Kafka ConsumerPartitionAssignorRangeAssignor(请参见其 Javadoc)。对于这种情况,你可能想考虑使用 RoundRobinAssignor,它会将分区分配到所有消费者上。这样,每个消费者被分配一个主题或分区。要更改 ConsumerPartitionAssignor,你可以在提供给 DefaultKafkaConsumerFactory 的属性中设置 partition.assignment.strategy 消费者属性(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG)。

在使用 Spring Boot 时,你可以按如下方式设置策略:

spring.kafka.consumer.properties.partition.assignment.strategy=\
org.apache.kafka.clients.consumer.RoundRobinAssignor
none

当容器属性配置为 TopicPartitionOffset 时,ConcurrentMessageListenerContainer 会将 TopicPartitionOffset 实例分配到委托的 KafkaMessageListenerContainer 实例中。

如果提供了六个 TopicPartitionOffset 实例,并且 concurrency3;每个容器将获得两个分区。对于五个 TopicPartitionOffset 实例,两个容器获得两个分区,第三个容器获得一个分区。如果 concurrency 大于 TopicPartitions 的数量,则 concurrency 会被调整为较低的值,以确保每个容器获得一个分区。

备注

client.id 属性(如果设置)会附加 -n,其中 n 是对应于并发的消费者实例。这是为了在启用 JMX 时提供 MBeans 的唯一名称所必需的。

从版本 1.3 开始,MessageListenerContainer 提供对底层 KafkaConsumer 的指标访问。在 ConcurrentMessageListenerContainer 的情况下,metrics() 方法返回所有目标 KafkaMessageListenerContainer 实例的指标。这些指标按提供给底层 KafkaConsumerclient-id 分组到 Map<MetricName, ? extends Metric> 中。

从版本 2.3 开始,ContainerProperties 提供了一个 idleBetweenPolls 选项,允许监听器容器中的主循环在 KafkaConsumer.poll() 调用之间休眠。实际的休眠间隔是从提供的选项和 max.poll.interval.ms 消费者配置与当前记录批处理时间之间的差值中选择的最小值。

提交偏移量

提供了几种提交偏移量的选项。如果 enable.auto.commit 消费者属性为 true,Kafka 会根据其配置自动提交偏移量。如果为 false,容器支持几种 AckMode 设置(在下一个列表中描述)。默认的 AckModeBATCH。从版本 2.3 开始,框架将 enable.auto.commit 设置为 false,除非在配置中明确设置。之前,如果未设置该属性,则使用 Kafka 的默认值(true)。

消费者 poll() 方法返回一个或多个 ConsumerRecords。对于每条记录,都会调用 MessageListener。以下列表描述了容器对于每个 AckMode 所采取的动作(当不使用事务时):

  • RECORD: 当监听器在处理记录后返回时提交偏移量。

  • BATCH: 当所有由 poll() 返回的记录都已处理时提交偏移量。

  • TIME: 当所有由 poll() 返回的记录都已处理时提交偏移量,只要自上次提交以来的 ackTime 已超过。

  • COUNT: 当所有由 poll() 返回的记录都已处理时提交偏移量,只要自上次提交以来已接收到 ackCount 条记录。

  • COUNT_TIME: 类似于 TIMECOUNT,但如果任一条件为 true,则执行提交。

  • MANUAL: 消息监听器负责 acknowledge() Acknowledgment。之后,应用与 BATCH 相同的语义。

  • MANUAL_IMMEDIATE: 当监听器调用 Acknowledgment.acknowledge() 方法时立即提交偏移量。

在使用 transactions 时,偏移量被发送到事务中,语义等同于 RECORDBATCH,具体取决于监听器类型(记录或批处理)。

备注

MANUALMANUAL_IMMEDIATE 要求监听器必须是 AcknowledgingMessageListenerBatchAcknowledgingMessageListener。请参见 Message Listeners

根据 syncCommits 容器属性,消费者上使用 commitSync()commitAsync() 方法。syncCommits 默认值为 true;另请参见 setSyncCommitTimeout。请参阅 setCommitCallback 以获取异步提交的结果;默认回调是 LoggingCommitCallback,它记录错误(以及在调试级别的成功)。

因为监听器容器有自己的提交偏移量的机制,它更倾向于将 Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG 设置为 false。从版本 2.3 开始,除非在消费者工厂或容器的消费者属性覆盖中特别设置,否则它会无条件地将其设置为 false。

Acknowledgment 具有以下方法:

public interface Acknowledgment {

void acknowledge();

}
java

这种方法让监听者控制何时提交偏移量。

从版本 2.3 开始,Acknowledgment 接口增加了两个方法 nack(long sleep)nack(int index, long sleep)。第一个方法用于记录监听器,第二个方法用于批量监听器。对于你的监听器类型调用错误的方法将抛出 IllegalStateException

备注

如果你想提交部分批次,使用 nack(),在使用事务时,将 AckMode 设置为 MANUAL;调用 nack() 将把成功处理的记录的偏移量发送到事务中。

important

nack() 只能在调用您的监听器的消费者线程上调用。

important

nack() 在使用 Out of Order Commits 时是不允许的。

当调用 nack() 时,具有记录监听器的消费者会提交任何待处理的偏移量,丢弃上次轮询的剩余记录,并在其分区上执行查找,以便在下一个 poll() 中重新交付失败的记录和未处理的记录。可以通过设置 sleep 参数在重新交付之前暂停消费者。这与在容器配置了 DefaultErrorHandler 时抛出异常的功能类似。

important

nack() 会在指定的睡眠时间内暂停整个监听器,包括所有分配的分区。

当使用批量监听器时,您可以指定发生故障的批次中的索引。当调用 nack() 时,将为索引之前的记录提交偏移量,并对失败和丢弃的记录所在的分区进行查找,以便在下一个 poll() 中重新交付它们。

请参见 Container Error Handlers 以获取更多信息。

important

消费者在休眠期间被暂停,以便我们继续轮询代理以保持消费者处于活动状态。实际的休眠时间及其分辨率取决于容器的 pollTimeout,默认值为 5 秒。最小休眠时间等于 pollTimeout,所有休眠时间将是其倍数。对于较短的休眠时间,或者为了提高其准确性,考虑减少容器的 pollTimeout

从版本 3.0.10 开始,批处理监听器可以使用 acknowledge(index)Acknowledgment 参数上提交批次的一部分偏移量。当调用此方法时,索引处记录的偏移量(以及所有先前记录的偏移量)将被提交。在执行部分批次提交后调用 acknowledge() 将提交批次其余部分的偏移量。以下限制适用:

  • AckMode.MANUAL_IMMEDIATE 是必需的

  • 该方法必须在监听线程上调用

  • 监听器必须消费一个 List 而不是原始的 ConsumerRecords

  • 索引必须在列表元素的范围内

  • 索引必须大于先前调用中使用的索引

这些限制是强制执行的,方法将抛出 IllegalArgumentExceptionIllegalStateException,具体取决于违规情况。

监听器容器自动启动

监听器容器实现了 SmartLifecycle,并且 autoStartup 默认值为 true。容器在一个较晚的阶段启动(Integer.MAX-VALUE - 100)。其他实现了 SmartLifecycle 的组件,用于处理来自监听器的数据,应在更早的阶段启动。- 100 为后续阶段留出了空间,以便在容器之后自动启动组件。