监控
监控监听器性能
从版本 2.3 开始,如果在类路径中检测到 Micrometer,并且应用程序上下文中存在一个单一的 MeterRegistry,监听器容器将自动创建和更新 Micrometer Timer。可以通过将 ContainerProperty 的 micrometerEnabled 设置为 false 来禁用计时器。
维护两个计时器 - 一个用于成功调用监听器,另一个用于失败。
定时器被命名为 spring.kafka.listener,并具有以下标签:
- 
name: (容器 bean 名称) - 
result:成功或失败 - 
exception:无或ListenerExecutionFailedException 
您可以使用 ContainerProperties 的 micrometerTags 属性添加额外的标签。
从版本 2.9.8 和 3.0.6 开始,您可以在 ContainerProperties 的 micrometerTagsProvider 中提供一个函数;该函数接收 ConsumerRecord<?, ?> 并返回基于该记录的标签,并与 micrometerTags 中的任何静态标签合并。
使用并发容器时,为每个线程创建定时器,name 标签后缀为 -n,其中 n 为 0 到 concurrency-1。
监控 KafkaTemplate 性能
从版本 2.5 开始,如果在类路径中检测到 Micrometer,并且应用程序上下文中存在一个单一的 MeterRegistry,模板将自动为发送操作创建和更新 Micrometer Timer。可以通过将模板的 micrometerEnabled 属性设置为 false 来禁用计时器。
维护两个计时器 - 一个用于成功调用监听器,另一个用于失败。
定时器被命名为 spring.kafka.template,并具有以下标签:
- 
name: (模板 bean 名称) - 
result:success或failure - 
exception:none或失败的异常类名称 
您可以使用模板的 micrometerTags 属性添加额外的标签。
从版本 2.9.8 和 3.0.6 开始,您可以提供一个 KafkaTemplate.setMicrometerTagsProvider(Function<ProducerRecord<?, ?>, Map<String, String>>) 属性;该函数接收 ProducerRecord<?, ?> 并返回可以基于该记录的标签,并与 micrometerTags 中的任何静态标签合并。
Micrometer Native Metrics
从版本 2.5 开始,该框架提供了 Factory Listeners,以便在创建和关闭生产者和消费者时管理 Micrometer KafkaClientMetrics 实例。
要启用此功能,只需将监听器添加到您的生产者和消费者工厂:
@Bean
public ConsumerFactory<String, String> myConsumerFactory() {
    Map<String, Object> configs = consumerConfigs();
    ...
    DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(configs);
    ...
    cf.addListener(new MicrometerConsumerListener<String, String>(meterRegistry(),
            Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
    ...
    return cf;
}
@Bean
public ProducerFactory<String, String> myProducerFactory() {
    Map<String, Object> configs = producerConfigs();
    configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
    ...
    DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
    ...
    pf.addListener(new MicrometerProducerListener<String, String>(meterRegistry(),
            Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
    ...
    return pf;
}
传递给监听器的消费者/生产者 id 被添加到计量器的标签中,标签名称为 spring.id。
double count = this.meterRegistry.get("kafka.producer.node.incoming.byte.total")
                .tag("customTag", "customTagValue")
                .tag("spring.id", "myProducerFactory.myClientId-1")
                .functionCounter()
                .count();
为 StreamsBuilderFactoryBean 提供了一个类似的监听器 - 参见 KafkaStreams Micrometer Support。
从版本 3.3 开始,引入了一个 KafkaMetricsSupport 抽象类,用于管理 io.micrometer.core.instrument.binder.kafka.KafkaMetrics 绑定到提供的 Kafka 客户端的 MeterRegistry 中。这个类是上述提到的 MicrometerConsumerListener、MicrometerProducerListener 和 KafkaStreamsMicrometerListener 的超类。然而,它可以用于任何 Kafka 客户端的用例。该类需要被扩展,并且必须调用其 bindClient() 和 unbindClient() API,以将 Kafka 客户端指标与 Micrometer 收集器连接起来。
微米观测
自版本 3.0 起,KafkaTemplate 和监听器容器现在支持使用 Micrometer 进行观察。
在 KafkaTemplate 和 ContainerProperties 上将 observationEnabled 设置为 true 以启用观察;这将禁用 Micrometer Timers,因为定时器将由每个观察进行管理。
Micrometer 观察不支持批量监听器;这将启用 Micrometer 定时器
有关更多信息,请参阅 Micrometer Tracing。
要为定时器/跟踪添加标签,请分别为模板或监听器容器配置自定义 KafkaTemplateObservationConvention 或 KafkaListenerObservationConvention。
默认实现为模板观察添加了 bean.name 标签,为容器添加了 listener.id 标签。
您可以选择子类化 DefaultKafkaTemplateObservationConvention 或 DefaultKafkaListenerObservationConvention,或者提供完全新的实现。
请参见 Micrometer Observation Documentation 以获取记录的默认观察的详细信息。
从版本 3.0.6 开始,您可以根据消费者或生产者记录中的信息为计时器和跟踪添加动态标签。为此,请将自定义 KafkaListenerObservationConvention 和/或 KafkaTemplateObservationConvention 添加到监听器容器属性或 KafkaTemplate 中。两个观察上下文中的 record 属性分别包含 ConsumerRecord 或 ProducerRecord。
发送者和接收者上下文的 remoteServiceName 属性被设置为 Kafka 的 clusterId 属性;这个属性是通过 KafkaAdmin 获取的。如果由于某种原因——可能是缺乏管理员权限,您无法获取集群 ID,从版本 3.1 开始,您可以在 KafkaAdmin 上手动设置 clusterId 并将其注入到 KafkaTemplate 和监听器容器中。当它为 null(默认值)时,管理员将调用 describeCluster 管理操作从代理中检索它。