监听器容器属性
表 1. ContainerProperties
属性
属性 | 默认 | 描述 |
---|---|---|
ackCount | 1 | 在 ackMode 为 COUNT 或 COUNT_TIME 时,提交待处理偏移量之前的记录数量。 |
adviceChain | null | 一系列 Advice 对象(例如,围绕建议的 MethodInterceptor )包装消息监听器,按顺序调用。 |
ackMode | BATCH | 控制偏移量提交的频率 - 请参见 Committing Offsets。 |
ackTime | 5000 | 当 ackMode 为 TIME 或 COUNT_TIME 时,待处理的偏移量提交的时间(以毫秒为单位)。 |
assignmentCommitOption | LATEST_ONLY _NO_TX | 是否在分配时提交初始位置;默认情况下,只有当 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG 为 latest 时,初始偏移量才会被提交,即使存在事务管理器,它也不会在事务中运行。有关可用选项的更多信息,请参见 ContainerProperties.AssignmentCommitOption 的 JavaDocs。 |
asyncAcks | false | 启用乱序提交(参见 手动提交偏移量);消费者被暂停,提交被推迟,直到填补空缺。 |
authExceptionRetryInterval | null | 当不为 null 时,Duration 表示在 Kafka 客户端抛出 AuthenticationException 或 AuthorizationException 时在轮询之间的睡眠时间。当为 null 时,这些异常被视为致命,容器将停止。 |
batchRecoverAfterRollback | false | 设置为 true 以启用批量恢复,参见 After Rollback Processor。 |
clientId | (empty string) | client.id 消费者属性的前缀。覆盖消费者工厂的 client.id 属性;在并发容器中,-n 被添加为每个消费者实例的后缀。 |
checkDeserExWhenKeyNull | false | 设置为 true 以便在接收到 null key 时始终检查 DeserializationException 头。对于消费者代码无法确定已配置 ErrorHandlingDeserializer 的情况非常有用,例如在使用委托反序列化器时。 |
checkDeserExWhenValueNull | false | 设置为 true 时,总是检查在接收到 null value 时是否存在 DeserializationException 头。对于消费者代码无法确定是否已配置 ErrorHandlingDeserializer 的情况(例如使用委托反序列化器时)非常有用。 |
commitCallback | null | 当存在且 syncCommits 为 false 时,在提交完成后会调用一个回调。 |
commitLogLevel | DEBUG | 与提交偏移量相关的日志的日志级别。 |
consumerRebalanceListener | null | 一个重新平衡监听器;请参见 Rebalancing Listeners。 |
commitRetries | 3 | 设置在使用 syncCommits 设置为 true 时的重试次数 RetriableCommitFailedException 。默认值为 3(总共 4 次尝试)。 |
consumerStartTimeout | 30s | 等待消费者开始之前记录错误的时间;这可能发生在,例如,您使用的任务执行器线程不足的情况下。 |
deliveryAttemptHeader | false | 请参见 Delivery Attempts Header。 |
eosMode | V2 | Exactly Once Semantics 模式;参见 Exactly Once Semantics。 |
fixTxOffsets | false | 当消费由事务性生产者生成的记录时,如果消费者位于分区的末尾,滞后可能会错误地报告为大于零,这可能是由于用于指示事务提交/回滚的伪记录,以及可能存在的回滚记录。这在功能上不会影响消费者,但一些用户对“滞后”非零表示担忧。将此属性设置为 true ,容器将纠正此类错误报告的偏移量。检查在下一个轮询之前进行,以避免在提交处理过程中增加显著复杂性。撰写时,只有当消费者配置为 isolation.level=read_committed 且 max.poll.records 大于 1 时,滞后才会被纠正。有关更多信息,请参见 KAFKA-10683。 |
groupId | null | 覆盖消费者 group.id 属性;由 @KafkaListener 的 id 或 groupId 属性自动设置。 |
idleBeforeDataMultiplier | 5.0 | idleEventInterval 的乘数,在接收任何记录之前应用。接收到记录后,不再应用该乘数。自版本 2.8 起可用。 |
idleBetweenPolls | 0 | 用于通过在轮询之间让线程休眠来减慢交付。处理一批记录的时间加上这个值必须小于 max.poll.interval.ms 消费者属性。 |
idleEventInterval | null | 当设置时,启用 ListenerContainerIdleEvent 的发布,参见 Application Events 和 Detecting Idle and Non-Responsive Consumers。另请参见 idleBeforeDataMultiplier 。 |
idlePartitionEventInterval | null | 当设置时,启用 ListenerContainerIdlePartitionEvent 的发布,参见 应用程序事件 和 检测空闲和无响应的消费者。 |
kafkaConsumerProperties | None | 用于覆盖在消费者工厂上配置的任何任意消费者属性。 |
kafkaAwareTransactionManager | null | 请参见 Transactions。 |
listenerTaskExecutor | SimpleAsyncTaskExecutor | 一个任务执行器,用于运行消费者线程。默认的执行器创建名为 <name>-C-n 的线程;使用 KafkaMessageListenerContainer 时,名称为 bean 名称;使用 ConcurrentMessageListenerContainer 时,名称为 bean 名称后缀加 -m ,其中 m 是每个子容器递增的数字。请参见 Container Thread Naming。 |
logContainerConfig | false | 设置为 true 以在 INFO 级别记录所有容器属性。 |
messageListener | null | 消息监听器。 |
micrometerEnabled | true | 是否要为消费者线程维护 Micrometer 定时器。 |
micrometerTags | empty | 要添加到 micrometer 指标的静态标签映射。 |
micrometerTagsProvider | null | 一个根据消费者记录提供动态标签的函数。 |
missingTopicsFatal | false | 当设置为 true 时,如果配置的主题在代理上不存在,则会阻止容器启动。 |
monitorInterval | 30s | 检查消费者线程状态以获取 NonResponsiveConsumerEvent 的频率。请参阅 noPollThreshold 和 pollTimeout 。 |
noPollThreshold | 3.0 | 乘以 pollTimeOut 以确定是否发布 NonResponsiveConsumerEvent 。参见 monitorInterval 。 |
observationConvention | null | 当设置时,根据消费者记录中的信息,为计时器和跟踪添加动态标签。 |
observationEnabled | false | 设置为 true 以通过 Micrometer 启用观察。 |
offsetAndMetadataProvider | null | 一个 OffsetAndMetadata 的提供者;默认情况下,提供者创建一个带有空元数据的偏移量和元数据。该提供者提供了一种自定义元数据的方法。 |
onlyLogRecordMetadata | false | 设置为 false 以记录完整的消费者记录(在错误、调试日志等中),而不仅仅是 topic-partition@offset 。 |
pauseImmediate | false | 当容器暂停时,在处理完当前记录后停止处理,而不是在处理完上一个轮询的所有记录后停止;剩余的记录将保留在内存中,并在容器恢复时传递给监听器。 |
pollTimeout | 5000 | 传递给 Consumer.poll() 的超时时间(以毫秒为单位)。 |
pollTimeoutWhilePaused | 100 | 在容器处于暂停状态时,传递给 Consumer.poll() 的超时时间(以毫秒为单位)。 |
restartAfterAuthExceptions | false | 如果由于授权/身份验证异常而停止容器,则将其设置为自动重启。 |
scheduler | ThreadPoolTaskScheduler | 用于运行消费者监控任务的调度器。 |
shutdownTimeout | 10000 | 在所有消费者停止并在发布容器停止事件之前,阻塞 stop() 方法的最大时间(以毫秒为单位)。 |
stopContainerWhenFenced | false | 如果抛出 ProducerFencedException ,则停止监听器容器。有关更多信息,请参见 After-rollback Processor。 |
stopImmediate | false | 当容器停止时,在处理完当前记录后停止处理,而不是在处理完上一次轮询的所有记录后停止。 |
subBatchPerPartition | 见描述。 | 当使用批处理监听器时,如果这个值为 true ,则监听器会被调用,传入的结果会根据每个分区拆分成子批次。默认值为 false 。 |
syncCommitTimeout | null | 当 syncCommits 为 true 时使用的超时。如果未设置,容器将尝试确定 default.api.timeout.ms 消费者属性并使用该值;否则将使用 60 秒。 |
syncCommits | true | 是否使用同步或异步提交偏移量;参见 commitCallback 。 |
topics topicPattern topicPartitions | n/a | 配置的主题、主题模式或显式分配的主题/分区。互斥;必须提供至少一个;由 ContainerProperties 构造函数强制执行。 |
transactionManager | null | 自 3.2 起已弃用,参见 [kafkaAwareTransactionManager],其他事务管理器。 |
表 2. AbstractMessageListenerContainer
属性
属性 | 默认值 | 描述 |
---|---|---|
afterRollbackProcessor | DefaultAfterRollbackProcessor | 在事务回滚后调用的 AfterRollbackProcessor 。 |
applicationEventPublisher | application context | 事件发布者。 |
batchErrorHandler | 见描述 | 已弃用 - 参见 commonErrorHandler 。 |
batchInterceptor | null | 设置一个在调用批处理监听器之前调用的 BatchInterceptor ;不适用于记录监听器。另请参见 interceptBeforeTx 。 |
beanName | bean name | 容器的 bean 名称;子容器后缀为 -n 。 |
commonErrorHandler | 见描述 | 当提供 transactionManager 并使用 DefaultAfterRollbackProcessor 时,DefaultErrorHandler 或 null 。参见 Container Error Handlers。 |
containerProperties | ContainerProperties | 容器属性实例。 |
groupId | 见描述 | containerProperties.groupId ,如果存在,否则为消费者工厂中的 group.id 属性。 |
interceptBeforeTx | true | 确定 recordInterceptor 是在事务开始之前还是之后调用。 |
listenerId | 见描述 | 用户配置的容器的 bean 名称或 @KafkaListener 的 id 属性。 |
listenerInfo | null | 用于填充 KafkaHeaders.LISTENER_INFO 头的值。使用 @KafkaListener 时,此值来自 info 属性。此头可以在多个地方使用,例如 RecordInterceptor 、RecordFilterStrategy 和监听器代码本身。 |
pauseRequested | (只读) | 如果请求了消费者暂停,则为真。 |
recordInterceptor | null | 设置一个在调用记录监听器之前调用的 RecordInterceptor ;不适用于批处理监听器。另请参见 interceptBeforeTx 。 |
topicCheckTimeout | 30s | 当 missingTopicsFatal 容器属性为 true 时,等待 describeTopics 操作完成的时间(以秒为单位)。 |
表 3. KafkaMessageListenerContainer
属性
属性 | 默认值 | 描述 |
---|---|---|
assignedPartitions | (只读) | 当前分配给此容器的分区(无论是显式还是隐式)。 |
clientIdSuffix | null | 由并发容器使用,以便为每个子容器的消费者提供唯一的 client.id 。 |
containerPaused | n/a | 如果请求暂停并且消费者实际上已经暂停,则为真。 |
表 4. ConcurrentMessageListenerContainer
属性
属性 | 默认值 | 描述 |
---|---|---|
alwaysClientIdSuffix | true | 设置为 false 以抑制在 concurrency 仅为 1 时向 client.id 消费者属性添加后缀。 |
assignedPartitions | (只读) | 当前分配给此容器的子 KafkaMessageListenerContainer 的分区总和(无论是显式还是隐式)。 |
concurrency | 1 | 要管理的子 KafkaMessageListenerContainer 的数量。 |
containerPaused | n/a | 如果请求暂停并且所有子容器的消费者实际上已暂停,则为 true。 |
containers | n/a | 对所有子 KafkaMessageListenerContainer 的引用。 |