Apache Kafka Streams 支持
从版本 1.1.4 开始,Spring for Apache Kafka 提供对 Kafka Streams 的一流支持。要在 Spring 应用程序中使用它,kafka-streams
jar 必须在类路径上。它是 Spring for Apache Kafka 项目的可选依赖项,并不会被传递下载。
基础
参考 Apache Kafka Streams 文档建议以下方式使用 API:
// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.
StreamsBuilder builder = ...; // when using the Kafka Streams DSL
// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;
KafkaStreams streams = new KafkaStreams(builder, config);
// Start the Kafka Streams instance
streams.start();
// Stop the Kafka Streams instance
streams.close();
所以,我们有两个主要组件:
-
StreamsBuilder
: 提供构建KStream
(或KTable
)实例的 API。 -
KafkaStreams
: 用于管理这些实例的生命周期。
所有通过单个 StreamsBuilder
暴露给 KafkaStreams
实例的 KStream
实例都是同时启动和停止的,即使它们的逻辑不同。换句话说,由 StreamsBuilder
定义的所有流都与单一的生命周期控制相关联。一旦通过 streams.close()
关闭了 KafkaStreams
实例,就无法重新启动。相反,必须创建一个新的 KafkaStreams
实例来重新启动流处理。
Spring Management
为了简化从 Spring 应用程序上下文的角度使用 Kafka Streams,并通过容器进行生命周期管理,Spring for Apache Kafka 引入了 StreamsBuilderFactoryBean
。这是一个 AbstractFactoryBean
实现,用于将 StreamsBuilder
单例实例作为一个 bean 暴露。以下示例创建了这样的一个 bean:
@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
从版本 2.2 开始,流配置现在作为 KafkaStreamsConfiguration
对象提供,而不是 StreamsConfig
。
StreamsBuilderFactoryBean
还实现了 SmartLifecycle
以管理内部 KafkaStreams
实例的生命周期。与 Kafka Streams API 类似,您必须在启动 KafkaStreams
之前定义 KStream
实例。这同样适用于 Kafka Streams 的 Spring API。因此,当您在 StreamsBuilderFactoryBean
上使用默认的 autoStartup = true
时,必须在应用程序上下文刷新之前在 StreamsBuilder
上声明 KStream
实例。例如,KStream
可以是一个常规的 bean 定义,而 Kafka Streams API 的使用不会受到任何影响。以下示例展示了如何做到这一点:
@Bean
public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
KStream<Integer, String> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
// Fluent KStream API
return stream;
}
如果您想手动控制生命周期(例如,根据某些条件停止和启动),您可以通过使用工厂 bean (&
) 前缀 直接引用 StreamsBuilderFactoryBean
bean。由于 StreamsBuilderFactoryBean
使用其内部的 KafkaStreams
实例,因此安全地停止并再次启动它是可以的。在每次调用 start()
时都会创建一个新的 KafkaStreams
。如果您希望分别控制 KStream
实例的生命周期,您还可以考虑使用不同的 StreamsBuilderFactoryBean
实例。
您还可以在 StreamsBuilderFactoryBean
上指定 KafkaStreams.StateListener
、Thread.UncaughtExceptionHandler
和 StateRestoreListener
选项,这些选项会委托给内部的 KafkaStreams
实例。
此外,除了间接地在 StreamsBuilderFactoryBean
上设置这些选项外,您还可以使用 KafkaStreamsCustomizer
回调接口来:
-
(来自 版本 2.1.5) 使用
customize(KafkaStreams)
配置一个内部KafkaStreams
实例 -
(来自 版本 3.3.0) 使用
initKafkaStreams(Topology, Properties, KafkaClientSupplier)
实例化一个自定义的KafkaStreams
实现
注意 KafkaStreamsCustomizer
会覆盖 StreamsBuilderFactoryBean
提供的选项。
如果您需要直接执行一些 KafkaStreams
操作,可以通过使用 StreamsBuilderFactoryBean.getKafkaStreams()
来访问内部的 KafkaStreams
实例。
您可以通过类型自动装配 StreamsBuilderFactoryBean
bean,但您应该确保在 bean 定义中使用完整类型,如以下示例所示:
@Bean
public StreamsBuilderFactoryBean myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;
另外,如果您使用接口 bean 定义,可以通过名称注入添加 @Qualifier
。以下示例演示了如何做到这一点:
@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
@Qualifier("&myKStreamBuilder")
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;
从版本 2.4.1 开始,工厂 bean 增加了一个新的属性 infrastructureCustomizer
,类型为 KafkaStreamsInfrastructureCustomizer
;这允许在创建流之前自定义 StreamsBuilder
(例如,添加状态存储)和/或 Topology
。
public interface KafkaStreamsInfrastructureCustomizer {
void configureBuilder(StreamsBuilder builder);
void configureTopology(Topology topology);
}
提供了默认的无操作实现,以避免在不需要其中一个方法时还必须实现两个方法。
提供了一个 CompositeKafkaStreamsInfrastructureCustomizer
,用于在需要应用多个自定义器时。
KafkaStreams Micrometer 支持
在版本 2.5.3 中,引入了 KafkaStreamsMicrometerListener
,您可以配置它以自动为工厂 bean 管理的 KafkaStreams
对象注册 micrometer 计量器:
streamsBuilderFactoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry,
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
Streams JSON 序列化和反序列化
在以 JSON 格式读取或写入主题或状态存储时,Spring for Apache Kafka 提供了一个 JsonSerde
实现,该实现使用 JSON,并委托给 Serialization, Deserialization, and Message Conversion 中描述的 JsonSerializer
和 JsonDeserializer
。JsonSerde
实现通过其构造函数提供相同的配置选项(目标类型或 ObjectMapper
)。在以下示例中,我们使用 JsonSerde
来序列化和反序列化 Kafka 流的 Cat
负载(JsonSerde
可以在需要实例的类似方式中使用):
stream.through(Serdes.Integer(), new JsonSerde<>(Cat.class), "cats");
在为生产者/消费者工厂以编程方式构建序列化器/反序列化器时,从版本 2.3 开始,您可以使用流式 API,这简化了配置。
stream.through(
new JsonSerde<>(MyKeyType.class)
.forKeys()
.noTypeInfo(),
new JsonSerde<>(MyValueType.class)
.noTypeInfo(),
"myTypes");
使用 KafkaStreamBrancher
KafkaStreamBrancher
类引入了一种更便捷的方式来在 KStream
之上构建条件分支。
考虑以下不使用 KafkaStreamBrancher
的示例:
KStream<String, String>[] branches = builder.stream("source").branch(
(key, value) -> value.contains("A"),
(key, value) -> value.contains("B"),
(key, value) -> true
);
branches[0].to("A");
branches[1].to("B");
branches[2].to("C");
以下示例使用 KafkaStreamBrancher
:
new KafkaStreamBrancher<String, String>()
.branch((key, value) -> value.contains("A"), ks -> ks.to("A"))
.branch((key, value) -> value.contains("B"), ks -> ks.to("B"))
//default branch should not necessarily be defined in the end of the chain!
.defaultBranch(ks -> ks.to("C"))
.onTopOf(builder.stream("source"));
//onTopOf method returns the provided stream so we can continue with method chaining
配置
要配置 Kafka Streams 环境,StreamsBuilderFactoryBean
需要一个 KafkaStreamsConfiguration
实例。有关所有可能的选项,请参见 Apache Kafka 文档。
从版本 2.2 开始,流配置现在作为 KafkaStreamsConfiguration
对象提供,而不是作为 StreamsConfig
。
为了避免大多数情况下的样板代码,特别是在开发微服务时,Spring for Apache Kafka 提供了 @EnableKafkaStreams
注解,您应该将其放置在 @Configuration
类上。您只需声明一个名为 defaultKafkaStreamsConfig
的 KafkaStreamsConfiguration
bean。一个名为 defaultKafkaStreamsBuilder
的 StreamsBuilderFactoryBean
bean 会自动在应用程序上下文中声明。您也可以声明并使用任何额外的 StreamsBuilderFactoryBean
beans。您可以通过提供一个实现 StreamsBuilderFactoryBeanConfigurer
的 bean 来对该 bean 进行额外的自定义。如果有多个这样的 beans,它们将根据各自的 Ordered.order
属性应用。
清理与停止配置
当工厂停止时,调用 KafkaStreams.close()
方法,带有 2 个参数:
-
closeTimeout : 等待线程关闭的时间(默认为
DEFAULT_CLOSE_TIMEOUT
,设置为 10 秒)。可以使用StreamsBuilderFactoryBean.setCloseTimeout()
进行配置。 -
leaveGroupOnClose : 在关闭时触发消费者从组中离开的调用(默认为
false
)。可以使用StreamsBuilderFactoryBean.setLeaveGroupOnClose()
进行配置。
默认情况下,当工厂 bean 被停止时,会调用 KafkaStreams.cleanUp()
方法。从版本 2.1.2 开始,工厂 bean 具有额外的构造函数,接受一个 CleanupConfig
对象,该对象具有属性,可以让您控制在 start()
或 stop()
或两者都不调用 cleanUp()
方法。从版本 2.7 开始,默认情况下不会清理本地状态。
Header Enricher
版本 3.0 添加了 HeaderEnricherProcessor
扩展自 ContextualProcessor
;提供与已弃用的 HeaderEnricher
相同的功能,后者实现了已弃用的 Transformer
接口。可以在流处理过程中使用此功能添加头信息;头信息的值是 SpEL 表达式;表达式评估的根对象具有 3 个属性:
-
record
-org.apache.kafka.streams.processor.api.Record
(key
,value
,timestamp
,headers
) -
key
- 当前记录的键 -
value
- 当前记录的值 -
context
-ProcessorContext
,允许访问当前记录的元数据
表达式必须返回一个 byte[]
或一个 String
(将使用 UTF-8
转换为 byte[]
)。
在流中使用 enrichers:
.process(() -> new HeaderEnricherProcessor(expressions))
处理器不会更改 key
或 value
;它只是添加头部。
您需要为每条记录创建一个新的实例。
.process(() -> new HeaderEnricherProcessor<..., ...>(expressionMap))
这是一个简单的例子,添加一个字面量头和一个变量:
Map<String, Expression> headers = new HashMap<>();
headers.put("header1", new LiteralExpression("value1"));
SpelExpressionParser parser = new SpelExpressionParser();
headers.put("header2", parser.parseExpression("record.timestamp() + ' @' + record.offset()"));
ProcessorSupplier supplier = () -> new HeaderEnricher<String, String>(headers);
KStream<String, String> stream = builder.stream(INPUT);
stream
.process(() -> supplier)
.to(OUTPUT);
MessagingProcessor
版本 3.0 添加了 ContextualProcessor
的 MessagingProcessor
扩展,提供与已弃用的 MessagingTransformer
相同的功能,后者实现了已弃用的 Transformer
接口。这允许 Kafka Streams 拓扑与 Spring Messaging 组件(如 Spring Integration 流)进行交互。该转换器需要实现 MessagingFunction
。
@FunctionalInterface
public interface MessagingFunction {
Message<?> exchange(Message<?> message);
}
Spring Integration 自动提供了一个使用其 GatewayProxyFactoryBean
的实现。它还需要一个 MessagingMessageConverter
来将键、值和元数据(包括头部)转换为/从 Spring Messaging 的 Message<?>
。有关更多信息,请参见 [Calling a Spring Integration Flow from a KStream]。
从反序列化异常中恢复
版本 2.3 引入了 RecoveringDeserializationExceptionHandler
,它可以在发生反序列化异常时采取一些措施。请参考 Kafka 文档中的 DeserializationExceptionHandler
,RecoveringDeserializationExceptionHandler
是其实现之一。RecoveringDeserializationExceptionHandler
配置了一个 ConsumerRecordRecoverer
实现。框架提供了 DeadLetterPublishingRecoverer
,它将失败的记录发送到死信主题。有关此恢复器的更多信息,请参见 Publishing Dead-letter Records。
要配置恢复器,请将以下属性添加到您的流配置中:
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
...
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
RecoveringDeserializationExceptionHandler.class);
props.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, recoverer());
...
return new KafkaStreamsConfiguration(props);
}
@Bean
public DeadLetterPublishingRecoverer recoverer() {
return new DeadLetterPublishingRecoverer(kafkaTemplate(),
(record, ex) -> new TopicPartition("recovererDLQ", -1));
}
当然,recoverer()
bean 可以是你自己实现的 ConsumerRecordRecoverer
。
交互式查询支持
从版本 3.2 开始,Spring for Apache Kafka 提供了在 Kafka Streams 中进行交互式查询所需的基本功能。交互式查询在有状态的 Kafka Streams 应用程序中非常有用,因为它们提供了一种不断查询应用程序中有状态存储的方法。因此,如果一个应用程序想要实现对所考虑系统的当前视图,交互式查询提供了一种实现方式。要了解有关交互式查询的更多信息,请参见这篇 文章。Spring for Apache Kafka 的支持集中在一个名为 KafkaStreamsInteractiveQueryService
的 API 上,该 API 是 Kafka Streams 库中交互式查询 API 的外观。应用程序可以将此服务的实例创建为一个 bean,然后在后续使用中通过名称检索状态存储。
以下代码片段展示了一个示例。
@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
return kafkaStreamsInteractiveQueryService;
}
假设一个 Kafka Streams 应用程序有一个状态存储叫做 app-store
,那么可以通过 KafkStreamsInteractiveQuery
API 如下所示检索该存储。
@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;
ReadOnlyKeyValueStore<Object, Object> appStore = interactiveQueryService.retrieveQueryableStore("app-store", QueryableStoreTypes.keyValueStore());
一旦应用程序获得对状态存储的访问权限,它就可以从中查询键值信息。
在这种情况下,应用程序使用的状态存储是一个只读键值存储。Kafka Streams 应用程序可以使用其他类型的状态存储。例如,如果应用程序更喜欢查询窗口基础的存储,它可以在 Kafka Streams 应用程序的业务逻辑中构建该存储,然后在后续中检索它。正因为如此,KafkaStreamsInteractiveQueryService
中检索可查询存储的 API 具有通用的存储类型签名,以便最终用户可以分配适当的类型。
这里是来自 API 的类型签名。
public <T> T retrieveQueryableStore(String storeName, QueryableStoreType<T> storeType)
当调用此方法时,用户可以具体请求适当的状态存储类型,正如我们在上述示例中所做的那样。
重试状态存储检索
在尝试使用 KafkaStreamsInteractiveQueryService
检索状态存储时,可能由于各种原因找不到状态存储。如果这些原因是暂时性的,KafkaStreamsInteractiveQueryService
提供了一个选项,通过允许注入自定义的 RetryTemplate
来重试状态存储的检索。默认情况下,KafkaStreamsInteractiveQueryService
中使用的 RetryTemplate
最大尝试次数为三次,固定的重试间隔为一秒。
以下是如何将自定义的 RetryTemplate
注入到 KafkaStreamsInteractiveQueryService
中,最大尝试次数为十次。
@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setBackOffPolicy(new FixedBackOffPolicy());
RetryPolicy retryPolicy = new SimpleRetryPolicy(10);
retryTemplate.setRetryPolicy(retryPolicy);
kafkaStreamsInteractiveQueryService.setRetryTemplate(retryTemplate);
return kafkaStreamsInteractiveQueryService;
}
查询远程状态存储
上面显示的用于检索状态存储的 API - retrieveQueryableStore
旨在用于本地可用的键值状态存储。在生产环境中,Kafka Streams 应用程序通常是基于分区数量进行分布式的。如果一个主题有四个分区,并且有四个相同的 Kafka Streams 处理器实例在运行,那么每个实例可能负责处理来自该主题的一个单独分区。在这种情况下,调用 retrieveQueryableStore
可能不会返回实例所期望的正确结果,尽管它可能返回一个有效的存储。假设这个有四个分区的主题包含关于各种键的数据,并且一个单独的分区始终负责一个特定的键。如果调用 retrieveQueryableStore
的实例正在寻找关于一个该实例不托管的键的信息,那么它将不会收到任何数据。这是因为当前的 Kafka Streams 实例对这个键一无所知。为了解决这个问题,调用实例首先需要确保它们拥有托管特定键的 Kafka Streams 处理器实例的主机信息。这可以从任何具有相同 application.id
的 Kafka Streams 实例中检索,如下所示。
@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;
HostInfo kafkaStreamsApplicationHostInfo = this.interactiveQueryService.getKafkaStreamsApplicationHostInfo("app-store", 12345, new IntegerSerializer());
在上面的示例代码中,调用实例正在从名为 app-store
的状态存储中查询特定的键 12345
。API 还需要一个相应的键序列化器,在这种情况下是 IntegerSerializer
。Kafka Streams 会在相同的 application.id
下查看所有实例,并尝试找到哪个实例托管了这个特定的键。一旦找到,它将返回该主机信息作为 HostInfo
对象。
这就是 API 的样子:
public <K> HostInfo getKafkaStreamsApplicationHostInfo(String store, K key, Serializer<K> serializer)
当以分布式方式使用多个具有相同 application.id
的 Kafka Streams 处理器时,应用程序应该提供一个 RPC 层,以便可以通过 RPC 端点(例如 REST 端点)查询状态存储。有关更多详细信息,请参见这篇 文章。使用 Spring for Apache Kafka 时,通过使用 spring-web 技术添加基于 Spring 的 REST 端点非常简单。一旦有了 REST 端点,就可以从任何 Kafka Streams 实例查询状态存储,前提是已知托管键的 HostInfo
。
如果托管实例的键是当前实例,则应用程序不需要调用 RPC 机制,而是可以进行 JVM 内部调用。然而,问题在于应用程序可能不知道发起调用的实例是键所在的地方,因为特定服务器可能由于消费者重新平衡而失去一个分区。为了解决这个问题,KafkaStreamsInteractiveQueryService
提供了一个方便的 API,通过 API 方法 getCurrentKafkaStreamsApplicationHostInfo()
查询当前主机信息,该方法返回当前的 HostInfo
。其思路是,应用程序可以首先获取键所在位置的信息,然后将 HostInfo
与当前实例的信息进行比较。如果 HostInfo
数据匹配,则可以通过 retrieveQueryableStore
进行简单的 JVM 调用,否则选择 RPC 选项。
Kafka Streams 示例
以下示例结合了我们在本章中涵盖的各种主题:
@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfig {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
return new KafkaStreamsConfiguration(props);
}
@Bean
public StreamsBuilderFactoryBeanConfigurer configurer() {
return fb -> fb.setStateListener((newState, oldState) -> {
System.out.println("State transition from " + oldState + " to " + newState);
});
}
@Bean
public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
KStream<Integer, String> stream = kStreamBuilder.stream("streamingTopic1");
stream
.mapValues((ValueMapper<String, String>) String::toUpperCase)
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(1_000)))
.reduce((String value1, String value2) -> value1 + value2,
Named.as("windowStore"))
.toStream()
.map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
.filter((i, s) -> s.length() > 40)
.to("streamingTopic2");
stream.print(Printed.toSysOut());
return stream;
}
}