跳到主要内容

Apache Kafka 支持

DeepSeek V3 中英对照 Apache Kafka Support

Apache Kafka 通过提供 spring-kafka 项目的自动配置来支持。

Kafka 的配置由 spring.kafka.* 中的外部配置属性控制。例如,您可以在 application.properties 中声明以下部分:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
properties
提示

要在启动时创建一个主题,添加一个类型为 NewTopic 的 bean。如果主题已经存在,该 bean 将被忽略。

有关更多支持的选项,请参阅 KafkaProperties

发送消息

Spring 的 KafkaTemplate 是自动配置的,您可以直接在您自己的 Bean 中自动装配它,如下例所示:

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

private final KafkaTemplate<String, String> kafkaTemplate;

public MyBean(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}

// ...

public void someMethod() {
this.kafkaTemplate.send("someTopic", "Hello");
}

}
java
备注

如果定义了属性 spring.kafka.producer.transaction-id-prefix,则会自动配置一个 KafkaTransactionManager。此外,如果定义了一个 RecordMessageConverter bean,它将自动关联到自动配置的 KafkaTemplate

接收消息

当存在 Apache Kafka 基础设施时,任何 Bean 都可以通过 @KafkaListener 注解来创建一个监听器端点。如果没有定义 KafkaListenerContainerFactory,则会自动配置一个默认的工厂,并使用 spring.kafka.listener.* 中定义的键值。

以下组件在 someTopic 主题上创建一个监听器端点:

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

@KafkaListener(topics = "someTopic")
public void processMessage(String content) {
// ...
}

}
java

如果定义了 KafkaTransactionManager bean,它会被自动关联到容器工厂。同样地,如果定义了 RecordFilterStrategyCommonErrorHandlerAfterRollbackProcessorConsumerAwareRebalanceListener bean,它们也会被自动关联到默认工厂。

根据监听器类型的不同,默认工厂会关联一个 RecordMessageConverterBatchMessageConverter bean。如果对于批量监听器只存在一个 RecordMessageConverter bean,它将被包装在 BatchMessageConverter 中。

提示

自定义的 ChainedKafkaTransactionManager 必须标记为 @Primary,因为它通常引用自动配置的 KafkaTransactionManager Bean。

Kafka 流处理

Spring for Apache Kafka 提供了一个工厂 Bean 来创建 StreamsBuilder 对象并管理其流的生命周期。只要 kafka-streams 在类路径上,并且 Kafka Streams 通过 @EnableKafkaStreams 注解启用,Spring Boot 就会自动配置所需的 KafkaStreamsConfiguration Bean。

启用 Kafka Streams 意味着必须设置应用程序 ID 和引导服务器。前者可以通过 spring.kafka.streams.application-id 进行配置,如果未设置则默认为 spring.application.name。后者可以全局设置,或专门为 Streams 进行覆盖配置。

可以使用专用属性来设置其他几个属性;其他任意的 Kafka 属性可以使用 spring.kafka.streams.properties 命名空间来设置。有关更多信息,请参见 Additional Kafka Properties

要使用工厂 bean,请将 StreamsBuilder 注入到您的 @Bean 中,如下例所示:

import java.util.Locale;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.support.serializer.JsonSerde;

@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public class MyKafkaStreamsConfiguration {

@Bean
public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), new JsonSerde<>()));
return stream;
}

private KeyValue<Integer, String> uppercaseValue(Integer key, String value) {
return new KeyValue<>(key, value.toUpperCase(Locale.getDefault()));
}

}
java

默认情况下,由 StreamsBuilder 对象管理的流会自动启动。你可以使用 spring.kafka.streams.auto-startup 属性来定制这一行为。

额外的 Kafka 属性

自动配置支持的属性在附录的集成属性部分中展示。请注意,这些属性(无论是使用连字符还是驼峰命名法)大多数情况下直接映射到 Apache Kafka 的点状属性。有关详细信息,请参阅 Apache Kafka 文档。

名称中未包含客户端类型(producerconsumeradminstreams)的属性被视为通用属性,适用于所有客户端。大多数这些通用属性可以根据需要针对一个或多个客户端类型进行覆盖。

Apache Kafka 将属性标记为 HIGH(高)、MEDIUM(中)或 LOW(低)的重要性级别。Spring Boot 自动配置支持所有 HIGH 重要性级别的属性,部分选定的 MEDIUM 和 LOW 属性,以及任何没有默认值的属性。

Kafka 支持的属性中,只有一部分可以通过 KafkaProperties 类直接使用。如果您希望为各个客户端类型配置额外的、不受直接支持的属性,请使用以下属性:

spring.kafka.properties[prop.one]=first
spring.kafka.admin.properties[prop.two]=second
spring.kafka.consumer.properties[prop.three]=third
spring.kafka.producer.properties[prop.four]=fourth
spring.kafka.streams.properties[prop.five]=fifth
properties

这将 Kafka 的公共属性 prop.one 设置为 first(适用于生产者、消费者、管理员和流处理),将管理员属性 prop.two 设置为 second,将消费者属性 prop.three 设置为 third,将生产者属性 prop.four 设置为 fourth,并将流处理属性 prop.five 设置为 fifth

您还可以按如下方式配置 Spring Kafka 的 JsonDeserializer

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties[spring.json.value.default.type]=com.example.Invoice
spring.kafka.consumer.properties[spring.json.trusted.packages]=com.example.main,com.example.another
properties

同样地,你可以禁用 JsonSerializer 默认在消息头中发送类型信息的行为:

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties[spring.json.add.type.headers]=false
properties
important

以这种方式设置的属性会覆盖 Spring Boot 明确支持的任何配置项。

使用嵌入式 Kafka 进行测试

Spring for Apache Kafka 提供了一种便捷的方式来使用嵌入式 Apache Kafka 代理进行项目测试。要使用此功能,请使用 spring-kafka-test 模块中的 @EmbeddedKafka 注解测试类。更多信息,请参阅 Spring for Apache Kafka 的参考手册

为了使 Spring Boot 自动配置与上述嵌入式 Apache Kafka 代理一起工作,你需要将嵌入式代理地址的系统属性(由 EmbeddedKafkaBroker 填充)重新映射到 Apache Kafka 的 Spring Boot 配置属性中。有几种方法可以实现这一点:

  • 在测试类中提供一个系统属性,用于将嵌入式代理地址映射到 spring.kafka.bootstrap-servers
static {
System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
}
java
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.test.context.EmbeddedKafka;

@SpringBootTest
@EmbeddedKafka(topics = "someTopic", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {

// ...

}
java
  • 在配置属性中使用占位符:
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
properties