消息监听器容器
提供了两个 MessageListenerContainer
实现:
-
KafkaMessageListenerContainer
-
ConcurrentMessageListenerContainer
KafkaMessageListenerContainer
在单个线程上接收来自所有主题或分区的所有消息。ConcurrentMessageListenerContainer
委托给一个或多个 KafkaMessageListenerContainer
实例,以提供多线程消费。
从版本 2.2.7 开始,您可以将 RecordInterceptor
添加到监听器容器中;它将在调用监听器之前被调用,允许检查或修改记录。如果拦截器返回 null,则不会调用监听器。从版本 2.7 开始,它有额外的方法,这些方法在监听器退出后被调用(正常退出或抛出异常)。此外,从版本 2.7 开始,现在有一个 BatchInterceptor
,为 Batch Listeners 提供类似的功能。此外,ConsumerAwareRecordInterceptor
(和 BatchInterceptor
)提供对 Consumer<?, ?>
的访问。这可以用于,例如,在拦截器中访问消费者指标。
您不应在这些拦截器中执行任何影响消费者的位置信息和已提交偏移量的方法;容器需要管理这些信息。
如果拦截器修改了记录(通过创建一个新记录),则 topic
、partition
和 offset
必须保持不变,以避免意外的副作用,例如记录丢失。
CompositeRecordInterceptor
和 CompositeBatchInterceptor
可用于调用多个拦截器。
默认情况下,从版本 2.8 开始,在使用事务时,拦截器在事务开始之前被调用。您可以将监听器容器的 interceptBeforeTx
属性设置为 false
,以便在事务开始之后调用拦截器。从版本 2.9 开始,这将适用于任何事务管理器,而不仅仅是 KafkaAwareTransactionManager
。这允许拦截器参与由容器启动的 JDBC 事务。
从版本 2.3.8 和 2.4.6 开始,ConcurrentMessageListenerContainer
现在支持 Static Membership,当并发性大于 1 时。group.instance.id
以 -n
结尾,其中 n
从 1
开始。结合增加的 session.timeout.ms
,可以减少再平衡事件,例如,当应用程序实例重新启动时。
使用 KafkaMessageListenerContainer
以下构造函数可用:
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
Request error occurred:
public ContainerProperties(TopicPartitionOffset... topicPartitions)
public ContainerProperties(String... topics)
public ContainerProperties(Pattern topicPattern)
第一个构造函数接受一个 TopicPartitionOffset
数组作为参数,以明确指示容器使用哪些分区(使用消费者的 assign()
方法)以及可选的初始偏移量。默认情况下,正值是绝对偏移量。默认情况下,负值是相对于分区内当前最后偏移量的。提供了一个 TopicPartitionOffset
的构造函数,该构造函数接受一个额外的 boolean
参数。如果该参数为 true
,则初始偏移量(正值或负值)相对于该消费者的当前位置信息。偏移量在容器启动时应用。第二个构造函数接受一个主题数组,Kafka 根据 group.id
属性分配分区 — 在组内分配分区。第三个构造函数使用正则表达式 Pattern
来选择主题。
要将 MessageListener
分配给一个容器,可以在创建容器时使用 ContainerProps.setMessageListener
方法。以下示例演示了如何做到这一点:
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
注意,当创建 DefaultKafkaConsumerFactory
时,使用上述仅接受属性的构造函数意味着键和值的 Deserializer
类是从配置中获取的。或者,可以将 Deserializer
实例传递给 DefaultKafkaConsumerFactory
构造函数,以用于键和/或值,在这种情况下,所有消费者共享相同的实例。另一个选项是提供 Supplier<Deserializer>
(从版本 2.3 开始),将用于为每个 Consumer
获取单独的 Deserializer
实例:
DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
有关可以设置的各种属性的更多信息,请参阅 ContainerProperties
的 Javadoc。
从版本 2.1.1 开始,新增了一个名为 logContainerConfig
的属性。当该属性为 true
且启用了 INFO
日志时,每个监听器容器会写入一条日志消息,概述其配置属性。
默认情况下,主题偏移量提交的日志记录是在 DEBUG
日志级别下进行的。从版本 2.1.2 开始,ContainerProperties
中有一个名为 commitLogLevel
的属性,允许您指定这些消息的日志级别。例如,要将日志级别更改为 INFO
,您可以使用 containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);
。
从版本 2.2 开始,新增了一个名为 missingTopicsFatal
的容器属性(默认值:自 2.3.4 起为 false
)。如果配置的任何主题在代理上不存在,则此属性会阻止容器启动。如果容器配置为监听主题模式(正则表达式),则不适用。之前,容器线程在 consumer.poll()
方法中循环等待主题出现,同时记录许多消息。除了日志,没有其他迹象表明存在问题。
从版本 2.8 开始,引入了一个新的容器属性 authExceptionRetryInterval
。这使得容器在从 KafkaConsumer
获取到任何 AuthenticationException
或 AuthorizationException
后,会重新尝试获取消息。这种情况可能发生在,例如,配置的用户被拒绝访问某个主题或凭证不正确。定义 authExceptionRetryInterval
允许容器在授予适当权限时进行恢复。
默认情况下,没有配置间隔 - 认证和授权错误被视为致命错误,这会导致容器停止。
从版本 2.8 开始,当创建消费者工厂时,如果您提供反序列化器作为对象(在构造函数中或通过设置器),工厂将调用 configure()
方法来使用配置属性对它们进行配置。
使用 ConcurrentMessageListenerContainer
单个构造函数类似于 KafkaListenerContainer
构造函数。以下列表显示了构造函数的签名:
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
它还有一个 concurrency
属性。例如,container.setConcurrency(3)
创建三个 KafkaMessageListenerContainer
实例。
如果容器属性为主题(或主题模式)配置,Kafka 将通过其组管理能力在消费者之间分配分区。
在监听多个主题时,默认的分区分配可能并不是你所期望的。例如,如果你有三个主题,每个主题有五个分区,并且你想使用 concurrency=15
,你会看到只有五个活跃的消费者,每个消费者被分配了来自每个主题的一个分区,其余的 10 个消费者处于空闲状态。这是因为默认的 Kafka ConsumerPartitionAssignor
是 RangeAssignor
(请参见其 Javadoc)。对于这种情况,你可能想考虑使用 RoundRobinAssignor
,它会将分区分配到所有消费者上。这样,每个消费者被分配一个主题或分区。要更改 ConsumerPartitionAssignor
,你可以在提供给 DefaultKafkaConsumerFactory
的属性中设置 partition.assignment.strategy
消费者属性(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG
)。
在使用 Spring Boot 时,你可以按如下方式设置策略:
spring.kafka.consumer.properties.partition.assignment.strategy=\
org.apache.kafka.clients.consumer.RoundRobinAssignor
当容器属性配置为 TopicPartitionOffset
时,ConcurrentMessageListenerContainer
会将 TopicPartitionOffset
实例分配到委托的 KafkaMessageListenerContainer
实例中。
如果提供了六个 TopicPartitionOffset
实例,并且 concurrency
为 3
;每个容器将获得两个分区。对于五个 TopicPartitionOffset
实例,两个容器获得两个分区,第三个容器获得一个分区。如果 concurrency
大于 TopicPartitions
的数量,则 concurrency
会被调整为较低的值,以确保每个容器获得一个分区。
client.id
属性(如果设置)会附加 -n
,其中 n
是对应于并发的消费者实例。这是为了在启用 JMX 时提供 MBeans 的唯一名称所必需的。
从版本 1.3 开始,MessageListenerContainer
提供对底层 KafkaConsumer
的指标访问。在 ConcurrentMessageListenerContainer
的情况下,metrics()
方法返回所有目标 KafkaMessageListenerContainer
实例的指标。这些指标按提供给底层 KafkaConsumer
的 client-id
分组到 Map<MetricName, ? extends Metric>
中。
从版本 2.3 开始,ContainerProperties
提供了一个 idleBetweenPolls
选项,允许监听器容器中的主循环在 KafkaConsumer.poll()
调用之间休眠。实际的休眠间隔是从提供的选项和 max.poll.interval.ms
消费者配置与当前记录批处理时间之间的差值中选择的最小值。
提交偏移量
提供了几种提交偏移量的选项。如果 enable.auto.commit
消费者属性为 true
,Kafka 会根据其配置自动提交偏移量。如果为 false
,容器支持几种 AckMode
设置(在下一个列表中描述)。默认的 AckMode
是 BATCH
。从版本 2.3 开始,框架将 enable.auto.commit
设置为 false
,除非在配置中明确设置。之前,如果未设置该属性,则使用 Kafka 的默认值(true
)。
消费者 poll()
方法返回一个或多个 ConsumerRecords
。对于每条记录,都会调用 MessageListener
。以下列表描述了容器对于每个 AckMode
所采取的动作(当不使用事务时):
-
RECORD
: 当监听器在处理记录后返回时提交偏移量。 -
BATCH
: 当所有由poll()
返回的记录都已处理时提交偏移量。 -
TIME
: 当所有由poll()
返回的记录都已处理时提交偏移量,只要自上次提交以来的ackTime
已超过。 -
COUNT
: 当所有由poll()
返回的记录都已处理时提交偏移量,只要自上次提交以来已接收到ackCount
条记录。 -
COUNT_TIME
: 类似于TIME
和COUNT
,但如果任一条件为true
,则执行提交。 -
MANUAL
: 消息监听器负责acknowledge()
Acknowledgment
。之后,应用与BATCH
相同的语义。 -
MANUAL_IMMEDIATE
: 当监听器调用Acknowledgment.acknowledge()
方法时立即提交偏移量。
在使用 transactions 时,偏移量被发送到事务中,语义等同于 RECORD
或 BATCH
,具体取决于监听器类型(记录或批处理)。
MANUAL
和 MANUAL_IMMEDIATE
要求监听器必须是 AcknowledgingMessageListener
或 BatchAcknowledgingMessageListener
。请参见 Message Listeners。
根据 syncCommits
容器属性,消费者上使用 commitSync()
或 commitAsync()
方法。syncCommits
默认值为 true
;另请参见 setSyncCommitTimeout
。请参阅 setCommitCallback
以获取异步提交的结果;默认回调是 LoggingCommitCallback
,它记录错误(以及在调试级别的成功)。
因为监听器容器有自己的提交偏移量的机制,它更倾向于将 Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
设置为 false
。从版本 2.3 开始,除非在消费者工厂或容器的消费者属性覆盖中特别设置,否则它会无条件地将其设置为 false。
Acknowledgment
具有以下方法:
public interface Acknowledgment {
void acknowledge();
}
这种方法让监听者控制何时提交偏移量。
从版本 2.3 开始,Acknowledgment
接口增加了两个方法 nack(long sleep)
和 nack(int index, long sleep)
。第一个方法用于记录监听器,第二个方法用于批量监听器。对于你的监听器类型调用错误的方法将抛出 IllegalStateException
。
如果你想提交部分批次,使用 nack()
,在使用事务时,将 AckMode
设置为 MANUAL
;调用 nack()
将把成功处理的记录的偏移量发送到事务中。
nack()
只能在调用您的监听器的消费者线程上调用。
nack()
在使用 Out of Order Commits 时是不允许的。
当调用 nack()
时,具有记录监听器的消费者会提交任何待处理的偏移量,丢弃上次轮询的剩余记录,并在其分区上执行查找,以便在下一个 poll()
中重新交付失败的记录和未处理的记录。可以通过设置 sleep
参数在重新交付之前暂停消费者。这与在容器配置了 DefaultErrorHandler
时抛出异常的功能类似。
nack()
会在指定的睡眠时间内暂停整个监听器,包括所有分配的分区。
当使用批量监听器时,您可以指定发生故障的批次中的索引。当调用 nack()
时,将为索引之前的记录提交偏移量,并对失败和丢弃的记录所在的分区进行查找,以便在下一个 poll()
中重新交付它们。
请参见 Container Error Handlers 以获取更多信息。
消费者在休眠期间被暂停,以便我们继续轮询代理以保持消费者处于活动状态。实际的休眠时间及其分辨率取决于容器的 pollTimeout
,默认值为 5 秒。最小休眠时间等于 pollTimeout
,所有休眠时间将是其倍数。对于较短的休眠时间,或者为了提高其准确性,考虑减少容器的 pollTimeout
。
从版本 3.0.10 开始,批处理监听器可以使用 acknowledge(index)
在 Acknowledgment
参数上提交批次的一部分偏移量。当调用此方法时,索引处记录的偏移量(以及所有先前记录的偏移量)将被提交。在执行部分批次提交后调用 acknowledge()
将提交批次其余部分的偏移量。以下限制适用:
-
AckMode.MANUAL_IMMEDIATE
是必需的 -
该方法必须在监听线程上调用
-
监听器必须消费一个
List
而不是原始的ConsumerRecords
-
索引必须在列表元素的范围内
-
索引必须大于先前调用中使用的索引
这些限制是强制执行的,方法将抛出 IllegalArgumentException
或 IllegalStateException
,具体取决于违规情况。
监听器容器自动启动
监听器容器实现了 SmartLifecycle
,并且 autoStartup
默认值为 true
。容器在一个较晚的阶段启动(Integer.MAX-VALUE - 100
)。其他实现了 SmartLifecycle
的组件,用于处理来自监听器的数据,应在更早的阶段启动。- 100
为后续阶段留出了空间,以便在容器之后自动启动组件。