Apache Kafka 支持
Apache Kafka 通过提供 spring-kafka 项目的自动配置来支持。
Kafka 配置由 spring.kafka.* 中的外部配置属性控制。例如,你可以在 application.properties 中声明以下部分:
- Properties
- YAML
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
spring:
kafka:
bootstrap-servers: "localhost:9092"
consumer:
group-id: "myGroup"
要在启动时创建一个 topic,添加一个类型为 NewTopic 的 bean。如果该 topic 已存在,则该 bean 会被忽略。
更多支持的选项请参见 KafkaProperties。
发送消息
Spring 的 KafkaTemplate 会自动配置,你可以直接在自己的 bean 中通过自动装配(autowire)使用它,如下例所示:
- Java
- Kotlin
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final KafkaTemplate<String, String> kafkaTemplate;
public MyBean(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
// ...
public void someMethod() {
this.kafkaTemplate.send("someTopic", "Hello");
}
}
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Component
@Component
class MyBean(private val kafkaTemplate: KafkaTemplate<String, String>) {
// ...
fun someMethod() {
kafkaTemplate.send("someTopic", "Hello")
}
}
如果定义了属性 spring.kafka.producer.transaction-id-prefix,则会自动配置一个 KafkaTransactionManager。此外,如果定义了一个 RecordMessageConverter Bean,它会自动关联到自动配置的 KafkaTemplate。
接收消息
当 Apache Kafka 基础设施存在时,任何 Bean 都可以使用 @KafkaListener 注解来创建一个监听器端点。如果没有定义 KafkaListenerContainerFactory,则会自动配置一个默认的工厂,其配置项由 spring.kafka.listener.* 中定义的键值决定。
以下组件在 someTopic 主题上创建一个监听器端点:
- Java
- Kotlin
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@KafkaListener(topics = "someTopic")
public void processMessage(String content) {
// ...
}
}
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.stereotype.Component
@Component
class MyBean {
@KafkaListener(topics = ["someTopic"])
fun processMessage(content: String?) {
// ...
}
}
如果定义了一个 KafkaTransactionManager bean,它会自动关联到容器工厂。同样地,如果定义了 RecordFilterStrategy、CommonErrorHandler、AfterRollbackProcessor 或 ConsumerAwareRebalanceListener bean,它们也会自动关联到默认的工厂。
根据监听器类型,会将一个 RecordMessageConverter 或 BatchMessageConverter Bean 关联到默认的工厂。如果仅为批处理监听器提供了一个 RecordMessageConverter Bean,则会将其包装在一个 BatchMessageConverter 中。
自定义的 ChainedKafkaTransactionManager 必须标记为 @Primary,因为它通常会引用自动配置的 KafkaTransactionManager Bean。
Kafka Streams
Spring for Apache Kafka 提供了一个工厂 Bean,用于创建 StreamsBuilder 对象并管理其流的生命周期。只要 classpath 中包含 kafka-streams,并且通过 @EnableKafkaStreams 注解启用了 Kafka Streams,Spring Boot 就会自动配置所需的 KafkaStreamsConfiguration Bean。
启用 Kafka Streams 意味着必须设置应用 ID 和 bootstrap 服务器。前者可以通过 spring.kafka.streams.application-id 进行配置,如果未设置,则默认使用 spring.application.name。后者可以全局设置,也可以专门针对 streams 进行覆盖。
可以使用专用属性来设置若干额外的属性;其他任意的 Kafka 属性可以通过 spring.kafka.streams.properties 命名空间进行设置。更多信息请参见 Additional Kafka Properties。
要使用工厂 bean,请将 StreamsBuilder 注入到您的 @Bean 中,如下例所示:
- Java
- Kotlin
import java.util.Locale;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.support.serializer.JsonSerde;
@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public class MyKafkaStreamsConfiguration {
@Bean
public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), new JsonSerde<>()));
return stream;
}
private KeyValue<Integer, String> uppercaseValue(Integer key, String value) {
return new KeyValue<>(key, value.toUpperCase(Locale.getDefault()));
}
}
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Produced
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafkaStreams
import org.springframework.kafka.support.serializer.JsonSerde
@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
class MyKafkaStreamsConfiguration {
@Bean
fun kStream(streamsBuilder: StreamsBuilder): KStream<Int, String> {
val stream = streamsBuilder.stream<Int, String>("ks1In")
stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), JsonSerde()))
return stream
}
private fun uppercaseValue(key: Int, value: String): KeyValue<Int?, String?> {
return KeyValue(key, value.uppercase())
}
}
默认情况下,由 StreamsBuilder 对象管理的流会自动启动。你可以使用 spring.kafka.streams.auto-startup 属性来自定义此行为。
Additional Kafka Properties
自动配置所支持的属性在附录的 Integration Properties 一节中列出。请注意,这些属性(使用连字符或驼峰命名法)在大多数情况下直接映射到 Apache Kafka 的点号分隔属性。详情请参阅 Apache Kafka 文档。
名称中不包含客户端类型(producer、consumer、admin 或 streams)的属性被视为通用属性,适用于所有客户端。如果需要,这些通用属性中的大多数可以针对一个或多个客户端类型进行覆盖。
Apache Kafka 将属性的重要性标记为 HIGH、MEDIUM 或 LOW。Spring Boot 自动配置支持所有重要性为 HIGH 的属性、部分选定的 MEDIUM 和 LOW 属性,以及所有没有默认值的属性。
KafkaProperties 类仅直接支持 Kafka 所支持属性的一个子集。如果您希望为各个客户端类型配置未被直接支持的额外属性,请使用以下属性:
- Properties
- YAML
spring.kafka.properties[prop.one]=first
spring.kafka.admin.properties[prop.two]=second
spring.kafka.consumer.properties[prop.three]=third
spring.kafka.producer.properties[prop.four]=fourth
spring.kafka.streams.properties[prop.five]=fifth
spring:
kafka:
properties:
"[prop.one]": "first"
admin:
properties:
"[prop.two]": "second"
consumer:
properties:
"[prop.three]": "third"
producer:
properties:
"[prop.four]": "fourth"
streams:
properties:
"[prop.five]": "fifth"
这会将通用的 prop.one Kafka 属性设置为 first(适用于生产者、消费者、管理员和流处理),将 prop.two 管理员属性设置为 second,将 prop.three 消费者属性设置为 third,将 prop.four 生产者属性设置为 fourth,并将 prop.five 流处理属性设置为 fifth。
你也可以按如下方式配置 Spring Kafka 的 JsonDeserializer:
- Properties
- YAML
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties[spring.json.value.default.type]=com.example.Invoice
spring.kafka.consumer.properties[spring.json.trusted.packages]=com.example.main,com.example.another
spring:
kafka:
consumer:
value-deserializer: "org.springframework.kafka.support.serializer.JsonDeserializer"
properties:
"[spring.json.value.default.type]": "com.example.Invoice"
"[spring.json.trusted.packages]": "com.example.main,com.example.another"
同样,你可以禁用 JsonSerializer 在消息头中发送类型信息的默认行为:
- Properties
- YAML
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties[spring.json.add.type.headers]=false
spring:
kafka:
producer:
value-serializer: "org.springframework.kafka.support.serializer.JsonSerializer"
properties:
"[spring.json.add.type.headers]": false
以这种方式设置的属性会覆盖 Spring Boot 显式支持的任何配置项。
使用 Embedded Kafka 进行测试
Spring for Apache Kafka 提供了一种便捷的方式来使用嵌入式 Apache Kafka 代理测试项目。要使用此功能,请在测试类上添加来自 spring-kafka-test 模块的 @EmbeddedKafka 注解。更多信息请参阅 Spring for Apache Kafka 参考手册。
要使 Spring Boot 的自动配置与上述嵌入式 Apache Kafka 代理协同工作,你需要将嵌入式代理地址的系统属性(由 EmbeddedKafkaBroker 填充)重新映射为 Spring Boot 中用于 Apache Kafka 的配置属性。有几种方法可以实现这一点:
- 提供一个系统属性,将嵌入式代理地址映射到测试类中的
spring.kafka.bootstrap-servers:
- Java
- Kotlin
static {
System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
}
init {
System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers")
}
- 在 @EmbeddedKafka 注解上配置一个属性名称:
- Java
- Kotlin
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.test.context.EmbeddedKafka;
@SpringBootTest
@EmbeddedKafka(topics = "someTopic", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {
// ...
}
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.kafka.test.context.EmbeddedKafka
@SpringBootTest
@EmbeddedKafka(topics = ["someTopic"], bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {
// ...
}
- 在配置属性中使用占位符:
- Properties
- YAML
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
spring:
kafka:
bootstrap-servers: "${spring.embedded.kafka.brokers}"