跳到主要内容

监听器容器属性

ChatGPT-4o-mini 中英对照 Listener Container Properties

表 1. ContainerProperties 属性

属性默认描述

ackCount
1ackModeCOUNTCOUNT_TIME 时,提交待处理偏移量之前的记录数量。

adviceChain
null一系列 Advice 对象(例如,围绕建议的 MethodInterceptor)包装消息监听器,按顺序调用。

ackMode
BATCH控制偏移量提交的频率 - 请参见 Committing Offsets

ackTime
5000ackModeTIMECOUNT_TIME 时,待处理的偏移量提交的时间(以毫秒为单位)。

assignmentCommitOption
LATEST_ONLY _NO_TX是否在分配时提交初始位置;默认情况下,只有当 ConsumerConfig.AUTO_OFFSET_RESET_CONFIGlatest 时,初始偏移量才会被提交,即使存在事务管理器,它也不会在事务中运行。有关可用选项的更多信息,请参见 ContainerProperties.AssignmentCommitOption 的 JavaDocs。

asyncAcks
false启用乱序提交(参见 手动提交偏移量);消费者被暂停,提交被推迟,直到填补空缺。

authExceptionRetryInterval
null当不为 null 时,Duration 表示在 Kafka 客户端抛出 AuthenticationExceptionAuthorizationException 时在轮询之间的睡眠时间。当为 null 时,这些异常被视为致命,容器将停止。

batchRecoverAfterRollback
false设置为 true 以启用批量恢复,参见 After Rollback Processor

clientId
(empty string)client.id 消费者属性的前缀。覆盖消费者工厂的 client.id 属性;在并发容器中,-n 被添加为每个消费者实例的后缀。

checkDeserExWhenKeyNull
false设置为 true 以便在接收到 null key 时始终检查 DeserializationException 头。对于消费者代码无法确定已配置 ErrorHandlingDeserializer 的情况非常有用,例如在使用委托反序列化器时。

checkDeserExWhenValueNull
false设置为 true 时,总是检查在接收到 null value 时是否存在 DeserializationException 头。对于消费者代码无法确定是否已配置 ErrorHandlingDeserializer 的情况(例如使用委托反序列化器时)非常有用。

commitCallback
null当存在且 syncCommitsfalse 时,在提交完成后会调用一个回调。

commitLogLevel
DEBUG与提交偏移量相关的日志的日志级别。

consumerRebalanceListener
null一个重新平衡监听器;请参见 Rebalancing Listeners

commitRetries
3设置在使用 syncCommits 设置为 true 时的重试次数 RetriableCommitFailedException。默认值为 3(总共 4 次尝试)。

consumerStartTimeout
30s等待消费者开始之前记录错误的时间;这可能发生在,例如,您使用的任务执行器线程不足的情况下。

deliveryAttemptHeader
false请参见 Delivery Attempts Header

eosMode
V2Exactly Once Semantics 模式;参见 Exactly Once Semantics

fixTxOffsets
false当消费由事务性生产者生成的记录时,如果消费者位于分区的末尾,滞后可能会错误地报告为大于零,这可能是由于用于指示事务提交/回滚的伪记录,以及可能存在的回滚记录。这在功能上不会影响消费者,但一些用户对“滞后”非零表示担忧。将此属性设置为 true,容器将纠正此类错误报告的偏移量。检查在下一个轮询之前进行,以避免在提交处理过程中增加显著复杂性。撰写时,只有当消费者配置为 isolation.level=read_committedmax.poll.records 大于 1 时,滞后才会被纠正。有关更多信息,请参见 KAFKA-10683

groupId
null覆盖消费者 group.id 属性;由 @KafkaListeneridgroupId 属性自动设置。

idleBeforeDataMultiplier
5.0idleEventInterval 的乘数,在接收任何记录之前应用。接收到记录后,不再应用该乘数。自版本 2.8 起可用。

idleBetweenPolls
0用于通过在轮询之间让线程休眠来减慢交付。处理一批记录的时间加上这个值必须小于 max.poll.interval.ms 消费者属性。

idleEventInterval
null当设置时,启用 ListenerContainerIdleEvent 的发布,参见 Application EventsDetecting Idle and Non-Responsive Consumers。另请参见 idleBeforeDataMultiplier

idlePartitionEventInterval
null当设置时,启用 ListenerContainerIdlePartitionEvent 的发布,参见 应用程序事件检测空闲和无响应的消费者

kafkaConsumerProperties
None用于覆盖在消费者工厂上配置的任何任意消费者属性。

kafkaAwareTransactionManager
null请参见 Transactions

listenerTaskExecutor
SimpleAsyncTaskExecutor一个任务执行器,用于运行消费者线程。默认的执行器创建名为 <name>-C-n 的线程;使用 KafkaMessageListenerContainer 时,名称为 bean 名称;使用 ConcurrentMessageListenerContainer 时,名称为 bean 名称后缀加 -m,其中 m 是每个子容器递增的数字。请参见 Container Thread Naming

logContainerConfig
false设置为 true 以在 INFO 级别记录所有容器属性。

messageListener
null消息监听器。

micrometerEnabled
true是否要为消费者线程维护 Micrometer 定时器。

micrometerTags
empty要添加到 micrometer 指标的静态标签映射。

micrometerTagsProvider
null一个根据消费者记录提供动态标签的函数。

missingTopicsFatal
false当设置为 true 时,如果配置的主题在代理上不存在,则会阻止容器启动。

monitorInterval
30s检查消费者线程状态以获取 NonResponsiveConsumerEvent 的频率。请参阅 noPollThresholdpollTimeout

noPollThreshold
3.0乘以 pollTimeOut 以确定是否发布 NonResponsiveConsumerEvent。参见 monitorInterval

observationConvention
null当设置时,根据消费者记录中的信息,为计时器和跟踪添加动态标签。

observationEnabled
false设置为 true 以通过 Micrometer 启用观察。

offsetAndMetadataProvider
null一个 OffsetAndMetadata 的提供者;默认情况下,提供者创建一个带有空元数据的偏移量和元数据。该提供者提供了一种自定义元数据的方法。

onlyLogRecordMetadata
false设置为 false 以记录完整的消费者记录(在错误、调试日志等中),而不仅仅是 topic-partition@offset

pauseImmediate
false当容器暂停时,在处理完当前记录后停止处理,而不是在处理完上一个轮询的所有记录后停止;剩余的记录将保留在内存中,并在容器恢复时传递给监听器。

pollTimeout
5000传递给 Consumer.poll() 的超时时间(以毫秒为单位)。

pollTimeoutWhilePaused
100在容器处于暂停状态时,传递给 Consumer.poll() 的超时时间(以毫秒为单位)。

restartAfterAuthExceptions
false如果由于授权/身份验证异常而停止容器,则将其设置为自动重启。

scheduler
ThreadPoolTaskScheduler用于运行消费者监控任务的调度器。

shutdownTimeout
10000在所有消费者停止并在发布容器停止事件之前,阻塞 stop() 方法的最大时间(以毫秒为单位)。

stopContainerWhenFenced
false如果抛出 ProducerFencedException,则停止监听器容器。有关更多信息,请参见 After-rollback Processor

stopImmediate
false当容器停止时,在处理完当前记录后停止处理,而不是在处理完上一次轮询的所有记录后停止。

subBatchPerPartition
见描述。当使用批处理监听器时,如果这个值为 true,则监听器会被调用,传入的结果会根据每个分区拆分成子批次。默认值为 false

syncCommitTimeout
nullsyncCommitstrue 时使用的超时。如果未设置,容器将尝试确定 default.api.timeout.ms 消费者属性并使用该值;否则将使用 60 秒。

syncCommits
true是否使用同步或异步提交偏移量;参见 commitCallback

topics topicPattern topicPartitions
n/a配置的主题、主题模式或显式分配的主题/分区。互斥;必须提供至少一个;由 ContainerProperties 构造函数强制执行。

transactionManager
null自 3.2 起已弃用,参见 [kafkaAwareTransactionManager]其他事务管理器

表 2. AbstractMessageListenerContainer 属性

属性默认值描述

afterRollbackProcessor
DefaultAfterRollbackProcessor在事务回滚后调用的 AfterRollbackProcessor

applicationEventPublisher
application context事件发布者。

batchErrorHandler
见描述已弃用 - 参见 commonErrorHandler

batchInterceptor
null设置一个在调用批处理监听器之前调用的 BatchInterceptor;不适用于记录监听器。另请参见 interceptBeforeTx

beanName
bean name容器的 bean 名称;子容器后缀为 -n

commonErrorHandler
见描述当提供 transactionManager 并使用 DefaultAfterRollbackProcessor 时,DefaultErrorHandlernull。参见 Container Error Handlers

containerProperties
ContainerProperties容器属性实例。

groupId
见描述containerProperties.groupId,如果存在,否则为消费者工厂中的 group.id 属性。

interceptBeforeTx
true确定 recordInterceptor 是在事务开始之前还是之后调用。

listenerId
见描述用户配置的容器的 bean 名称或 @KafkaListenerid 属性。

listenerInfo
null用于填充 KafkaHeaders.LISTENER_INFO 头的值。使用 @KafkaListener 时,此值来自 info 属性。此头可以在多个地方使用,例如 RecordInterceptorRecordFilterStrategy 和监听器代码本身。

pauseRequested
(只读)如果请求了消费者暂停,则为真。

recordInterceptor
null设置一个在调用记录监听器之前调用的 RecordInterceptor;不适用于批处理监听器。另请参见 interceptBeforeTx

topicCheckTimeout
30smissingTopicsFatal 容器属性为 true 时,等待 describeTopics 操作完成的时间(以秒为单位)。

表 3. KafkaMessageListenerContainer 属性

属性默认值描述

assignedPartitions
(只读)当前分配给此容器的分区(无论是显式还是隐式)。

clientIdSuffix
null由并发容器使用,以便为每个子容器的消费者提供唯一的 client.id

containerPaused
n/a如果请求暂停并且消费者实际上已经暂停,则为真。

表 4. ConcurrentMessageListenerContainer 属性

属性默认值描述

alwaysClientIdSuffix
true设置为 false 以抑制在 concurrency 仅为 1 时向 client.id 消费者属性添加后缀。

assignedPartitions
(只读)当前分配给此容器的子 KafkaMessageListenerContainer 的分区总和(无论是显式还是隐式)。

concurrency
1要管理的子 KafkaMessageListenerContainer 的数量。

containerPaused
n/a如果请求暂停并且所有子容器的消费者实际上已暂停,则为 true。

containers
n/a对所有子 KafkaMessageListenerContainer 的引用。