跳到主要内容

Apache Kafka Streams 支持

ChatGPT-4o-mini 中英对照 Apache Kafka Streams Support

从版本 1.1.4 开始,Spring for Apache Kafka 提供对 Kafka Streams 的一流支持。要在 Spring 应用程序中使用它,kafka-streams jar 必须在类路径上。它是 Spring for Apache Kafka 项目的可选依赖项,并不会被传递下载。

基础

参考 Apache Kafka Streams 文档建议以下方式使用 API:

// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.

StreamsBuilder builder = ...; // when using the Kafka Streams DSL

// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;

KafkaStreams streams = new KafkaStreams(builder, config);

// Start the Kafka Streams instance
streams.start();

// Stop the Kafka Streams instance
streams.close();
java

所以,我们有两个主要组件:

  • StreamsBuilder: 提供构建 KStream(或 KTable)实例的 API。

  • KafkaStreams: 用于管理这些实例的生命周期。

备注

所有通过单个 StreamsBuilder 暴露给 KafkaStreams 实例的 KStream 实例都是同时启动和停止的,即使它们的逻辑不同。换句话说,由 StreamsBuilder 定义的所有流都与单一的生命周期控制相关联。一旦通过 streams.close() 关闭了 KafkaStreams 实例,就无法重新启动。相反,必须创建一个新的 KafkaStreams 实例来重新启动流处理。

Spring Management

为了简化从 Spring 应用程序上下文的角度使用 Kafka Streams,并通过容器进行生命周期管理,Spring for Apache Kafka 引入了 StreamsBuilderFactoryBean。这是一个 AbstractFactoryBean 实现,用于将 StreamsBuilder 单例实例作为一个 bean 暴露。以下示例创建了这样的一个 bean:

@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
java
important

从版本 2.2 开始,流配置现在作为 KafkaStreamsConfiguration 对象提供,而不是 StreamsConfig

StreamsBuilderFactoryBean 还实现了 SmartLifecycle 以管理内部 KafkaStreams 实例的生命周期。与 Kafka Streams API 类似,您必须在启动 KafkaStreams 之前定义 KStream 实例。这同样适用于 Kafka Streams 的 Spring API。因此,当您在 StreamsBuilderFactoryBean 上使用默认的 autoStartup = true 时,必须在应用程序上下文刷新之前在 StreamsBuilder 上声明 KStream 实例。例如,KStream 可以是一个常规的 bean 定义,而 Kafka Streams API 的使用不会受到任何影响。以下示例展示了如何做到这一点:

@Bean
public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
KStream<Integer, String> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
// Fluent KStream API
return stream;
}
java

如果您想手动控制生命周期(例如,根据某些条件停止和启动),您可以通过使用工厂 bean (&) 前缀 直接引用 StreamsBuilderFactoryBean bean。由于 StreamsBuilderFactoryBean 使用其内部的 KafkaStreams 实例,因此安全地停止并再次启动它是可以的。在每次调用 start() 时都会创建一个新的 KafkaStreams。如果您希望分别控制 KStream 实例的生命周期,您还可以考虑使用不同的 StreamsBuilderFactoryBean 实例。

您还可以在 StreamsBuilderFactoryBean 上指定 KafkaStreams.StateListenerThread.UncaughtExceptionHandlerStateRestoreListener 选项,这些选项会委托给内部的 KafkaStreams 实例。

此外,除了间接地在 StreamsBuilderFactoryBean 上设置这些选项外,您还可以使用 KafkaStreamsCustomizer 回调接口来:

  1. (来自 版本 2.1.5) 使用 customize(KafkaStreams) 配置一个内部 KafkaStreams 实例

  2. (来自 版本 3.3.0) 使用 initKafkaStreams(Topology, Properties, KafkaClientSupplier) 实例化一个自定义的 KafkaStreams 实现

注意 KafkaStreamsCustomizer 会覆盖 StreamsBuilderFactoryBean 提供的选项。

如果您需要直接执行一些 KafkaStreams 操作,可以通过使用 StreamsBuilderFactoryBean.getKafkaStreams() 来访问内部的 KafkaStreams 实例。

您可以通过类型自动装配 StreamsBuilderFactoryBean bean,但您应该确保在 bean 定义中使用完整类型,如以下示例所示:

@Bean
public StreamsBuilderFactoryBean myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;
java

另外,如果您使用接口 bean 定义,可以通过名称注入添加 @Qualifier。以下示例演示了如何做到这一点:

@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
@Qualifier("&myKStreamBuilder")
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;
java

从版本 2.4.1 开始,工厂 bean 增加了一个新的属性 infrastructureCustomizer,类型为 KafkaStreamsInfrastructureCustomizer;这允许在创建流之前自定义 StreamsBuilder(例如,添加状态存储)和/或 Topology

public interface KafkaStreamsInfrastructureCustomizer {

void configureBuilder(StreamsBuilder builder);

void configureTopology(Topology topology);

}
java

提供了默认的无操作实现,以避免在不需要其中一个方法时还必须实现两个方法。

提供了一个 CompositeKafkaStreamsInfrastructureCustomizer,用于在需要应用多个自定义器时。

KafkaStreams Micrometer 支持

在版本 2.5.3 中,引入了 KafkaStreamsMicrometerListener,您可以配置它以自动为工厂 bean 管理的 KafkaStreams 对象注册 micrometer 计量器:

streamsBuilderFactoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry,
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
java

Streams JSON 序列化和反序列化

在以 JSON 格式读取或写入主题或状态存储时,Spring for Apache Kafka 提供了一个 JsonSerde 实现,该实现使用 JSON,并委托给 Serialization, Deserialization, and Message Conversion 中描述的 JsonSerializerJsonDeserializerJsonSerde 实现通过其构造函数提供相同的配置选项(目标类型或 ObjectMapper)。在以下示例中,我们使用 JsonSerde 来序列化和反序列化 Kafka 流的 Cat 负载(JsonSerde 可以在需要实例的类似方式中使用):

stream.through(Serdes.Integer(), new JsonSerde<>(Cat.class), "cats");
java

在为生产者/消费者工厂以编程方式构建序列化器/反序列化器时,从版本 2.3 开始,您可以使用流式 API,这简化了配置。

stream.through(
new JsonSerde<>(MyKeyType.class)
.forKeys()
.noTypeInfo(),
new JsonSerde<>(MyValueType.class)
.noTypeInfo(),
"myTypes");
java

使用 KafkaStreamBrancher

KafkaStreamBrancher 类引入了一种更便捷的方式来在 KStream 之上构建条件分支。

考虑以下不使用 KafkaStreamBrancher 的示例:

KStream<String, String>[] branches = builder.stream("source").branch(
(key, value) -> value.contains("A"),
(key, value) -> value.contains("B"),
(key, value) -> true
);
branches[0].to("A");
branches[1].to("B");
branches[2].to("C");
java

以下示例使用 KafkaStreamBrancher

new KafkaStreamBrancher<String, String>()
.branch((key, value) -> value.contains("A"), ks -> ks.to("A"))
.branch((key, value) -> value.contains("B"), ks -> ks.to("B"))
//default branch should not necessarily be defined in the end of the chain!
.defaultBranch(ks -> ks.to("C"))
.onTopOf(builder.stream("source"));
//onTopOf method returns the provided stream so we can continue with method chaining
java

配置

要配置 Kafka Streams 环境,StreamsBuilderFactoryBean 需要一个 KafkaStreamsConfiguration 实例。有关所有可能的选项,请参见 Apache Kafka 文档

important

从版本 2.2 开始,流配置现在作为 KafkaStreamsConfiguration 对象提供,而不是作为 StreamsConfig

为了避免大多数情况下的样板代码,特别是在开发微服务时,Spring for Apache Kafka 提供了 @EnableKafkaStreams 注解,您应该将其放置在 @Configuration 类上。您只需声明一个名为 defaultKafkaStreamsConfigKafkaStreamsConfiguration bean。一个名为 defaultKafkaStreamsBuilderStreamsBuilderFactoryBean bean 会自动在应用程序上下文中声明。您也可以声明并使用任何额外的 StreamsBuilderFactoryBean beans。您可以通过提供一个实现 StreamsBuilderFactoryBeanConfigurer 的 bean 来对该 bean 进行额外的自定义。如果有多个这样的 beans,它们将根据各自的 Ordered.order 属性应用。

清理与停止配置

当工厂停止时,调用 KafkaStreams.close() 方法,带有 2 个参数:

  • closeTimeout : 等待线程关闭的时间(默认为 DEFAULT_CLOSE_TIMEOUT,设置为 10 秒)。可以使用 StreamsBuilderFactoryBean.setCloseTimeout() 进行配置。

  • leaveGroupOnClose : 在关闭时触发消费者从组中离开的调用(默认为 false)。可以使用 StreamsBuilderFactoryBean.setLeaveGroupOnClose() 进行配置。

默认情况下,当工厂 bean 被停止时,会调用 KafkaStreams.cleanUp() 方法。从版本 2.1.2 开始,工厂 bean 具有额外的构造函数,接受一个 CleanupConfig 对象,该对象具有属性,可以让您控制在 start()stop() 或两者都不调用 cleanUp() 方法。从版本 2.7 开始,默认情况下不会清理本地状态。

Header Enricher

版本 3.0 添加了 HeaderEnricherProcessor 扩展自 ContextualProcessor;提供与已弃用的 HeaderEnricher 相同的功能,后者实现了已弃用的 Transformer 接口。可以在流处理过程中使用此功能添加头信息;头信息的值是 SpEL 表达式;表达式评估的根对象具有 3 个属性:

  • record - org.apache.kafka.streams.processor.api.Recordkeyvaluetimestampheaders

  • key - 当前记录的键

  • value - 当前记录的值

  • context - ProcessorContext,允许访问当前记录的元数据

表达式必须返回一个 byte[] 或一个 String(将使用 UTF-8 转换为 byte[])。

在流中使用 enrichers:

.process(() -> new HeaderEnricherProcessor(expressions))
java

处理器不会更改 keyvalue;它只是添加头部。

important

您需要为每条记录创建一个新的实例。

.process(() -> new HeaderEnricherProcessor<..., ...>(expressionMap))
java

这是一个简单的例子,添加一个字面量头和一个变量:

Map<String, Expression> headers = new HashMap<>();
headers.put("header1", new LiteralExpression("value1"));
SpelExpressionParser parser = new SpelExpressionParser();
headers.put("header2", parser.parseExpression("record.timestamp() + ' @' + record.offset()"));
ProcessorSupplier supplier = () -> new HeaderEnricher<String, String>(headers);
KStream<String, String> stream = builder.stream(INPUT);
stream
.process(() -> supplier)
.to(OUTPUT);
java

MessagingProcessor

版本 3.0 添加了 ContextualProcessorMessagingProcessor 扩展,提供与已弃用的 MessagingTransformer 相同的功能,后者实现了已弃用的 Transformer 接口。这允许 Kafka Streams 拓扑与 Spring Messaging 组件(如 Spring Integration 流)进行交互。该转换器需要实现 MessagingFunction

@FunctionalInterface
public interface MessagingFunction {

Message<?> exchange(Message<?> message);

}
java

Spring Integration 自动提供了一个使用其 GatewayProxyFactoryBean 的实现。它还需要一个 MessagingMessageConverter 来将键、值和元数据(包括头部)转换为/从 Spring Messaging 的 Message<?>。有关更多信息,请参见 [Calling a Spring Integration Flow from a KStream]。

从反序列化异常中恢复

版本 2.3 引入了 RecoveringDeserializationExceptionHandler,它可以在发生反序列化异常时采取一些措施。请参考 Kafka 文档中的 DeserializationExceptionHandlerRecoveringDeserializationExceptionHandler 是其实现之一。RecoveringDeserializationExceptionHandler 配置了一个 ConsumerRecordRecoverer 实现。框架提供了 DeadLetterPublishingRecoverer,它将失败的记录发送到死信主题。有关此恢复器的更多信息,请参见 Publishing Dead-letter Records

要配置恢复器,请将以下属性添加到您的流配置中:

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
...
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
RecoveringDeserializationExceptionHandler.class);
props.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, recoverer());
...
return new KafkaStreamsConfiguration(props);
}

@Bean
public DeadLetterPublishingRecoverer recoverer() {
return new DeadLetterPublishingRecoverer(kafkaTemplate(),
(record, ex) -> new TopicPartition("recovererDLQ", -1));
}
java

当然,recoverer() bean 可以是你自己实现的 ConsumerRecordRecoverer

交互式查询支持

从版本 3.2 开始,Spring for Apache Kafka 提供了在 Kafka Streams 中进行交互式查询所需的基本功能。交互式查询在有状态的 Kafka Streams 应用程序中非常有用,因为它们提供了一种不断查询应用程序中有状态存储的方法。因此,如果一个应用程序想要实现对所考虑系统的当前视图,交互式查询提供了一种实现方式。要了解有关交互式查询的更多信息,请参见这篇 文章。Spring for Apache Kafka 的支持集中在一个名为 KafkaStreamsInteractiveQueryService 的 API 上,该 API 是 Kafka Streams 库中交互式查询 API 的外观。应用程序可以将此服务的实例创建为一个 bean,然后在后续使用中通过名称检索状态存储。

以下代码片段展示了一个示例。

@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
return kafkaStreamsInteractiveQueryService;
}
java

假设一个 Kafka Streams 应用程序有一个状态存储叫做 app-store,那么可以通过 KafkStreamsInteractiveQuery API 如下所示检索该存储。

@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;

ReadOnlyKeyValueStore<Object, Object> appStore = interactiveQueryService.retrieveQueryableStore("app-store", QueryableStoreTypes.keyValueStore());
java

一旦应用程序获得对状态存储的访问权限,它就可以从中查询键值信息。

在这种情况下,应用程序使用的状态存储是一个只读键值存储。Kafka Streams 应用程序可以使用其他类型的状态存储。例如,如果应用程序更喜欢查询窗口基础的存储,它可以在 Kafka Streams 应用程序的业务逻辑中构建该存储,然后在后续中检索它。正因为如此,KafkaStreamsInteractiveQueryService 中检索可查询存储的 API 具有通用的存储类型签名,以便最终用户可以分配适当的类型。

这里是来自 API 的类型签名。

public <T> T retrieveQueryableStore(String storeName, QueryableStoreType<T> storeType)
java

当调用此方法时,用户可以具体请求适当的状态存储类型,正如我们在上述示例中所做的那样。

重试状态存储检索

在尝试使用 KafkaStreamsInteractiveQueryService 检索状态存储时,可能由于各种原因找不到状态存储。如果这些原因是暂时性的,KafkaStreamsInteractiveQueryService 提供了一个选项,通过允许注入自定义的 RetryTemplate 来重试状态存储的检索。默认情况下,KafkaStreamsInteractiveQueryService 中使用的 RetryTemplate 最大尝试次数为三次,固定的重试间隔为一秒。

以下是如何将自定义的 RetryTemplate 注入到 KafkaStreamsInteractiveQueryService 中,最大尝试次数为十次。

@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setBackOffPolicy(new FixedBackOffPolicy());
RetryPolicy retryPolicy = new SimpleRetryPolicy(10);
retryTemplate.setRetryPolicy(retryPolicy);
kafkaStreamsInteractiveQueryService.setRetryTemplate(retryTemplate);
return kafkaStreamsInteractiveQueryService;
}
java

查询远程状态存储

上面显示的用于检索状态存储的 API - retrieveQueryableStore 旨在用于本地可用的键值状态存储。在生产环境中,Kafka Streams 应用程序通常是基于分区数量进行分布式的。如果一个主题有四个分区,并且有四个相同的 Kafka Streams 处理器实例在运行,那么每个实例可能负责处理来自该主题的一个单独分区。在这种情况下,调用 retrieveQueryableStore 可能不会返回实例所期望的正确结果,尽管它可能返回一个有效的存储。假设这个有四个分区的主题包含关于各种键的数据,并且一个单独的分区始终负责一个特定的键。如果调用 retrieveQueryableStore 的实例正在寻找关于一个该实例不托管的键的信息,那么它将不会收到任何数据。这是因为当前的 Kafka Streams 实例对这个键一无所知。为了解决这个问题,调用实例首先需要确保它们拥有托管特定键的 Kafka Streams 处理器实例的主机信息。这可以从任何具有相同 application.id 的 Kafka Streams 实例中检索,如下所示。

@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;

HostInfo kafkaStreamsApplicationHostInfo = this.interactiveQueryService.getKafkaStreamsApplicationHostInfo("app-store", 12345, new IntegerSerializer());
java

在上面的示例代码中,调用实例正在从名为 app-store 的状态存储中查询特定的键 12345。API 还需要一个相应的键序列化器,在这种情况下是 IntegerSerializer。Kafka Streams 会在相同的 application.id 下查看所有实例,并尝试找到哪个实例托管了这个特定的键。一旦找到,它将返回该主机信息作为 HostInfo 对象。

这就是 API 的样子:

public <K> HostInfo getKafkaStreamsApplicationHostInfo(String store, K key, Serializer<K> serializer)
java

当以分布式方式使用多个具有相同 application.id 的 Kafka Streams 处理器时,应用程序应该提供一个 RPC 层,以便可以通过 RPC 端点(例如 REST 端点)查询状态存储。有关更多详细信息,请参见这篇 文章。使用 Spring for Apache Kafka 时,通过使用 spring-web 技术添加基于 Spring 的 REST 端点非常简单。一旦有了 REST 端点,就可以从任何 Kafka Streams 实例查询状态存储,前提是已知托管键的 HostInfo

如果托管实例的键是当前实例,则应用程序不需要调用 RPC 机制,而是可以进行 JVM 内部调用。然而,问题在于应用程序可能不知道发起调用的实例是键所在的地方,因为特定服务器可能由于消费者重新平衡而失去一个分区。为了解决这个问题,KafkaStreamsInteractiveQueryService 提供了一个方便的 API,通过 API 方法 getCurrentKafkaStreamsApplicationHostInfo() 查询当前主机信息,该方法返回当前的 HostInfo。其思路是,应用程序可以首先获取键所在位置的信息,然后将 HostInfo 与当前实例的信息进行比较。如果 HostInfo 数据匹配,则可以通过 retrieveQueryableStore 进行简单的 JVM 调用,否则选择 RPC 选项。

Kafka Streams 示例

以下示例结合了我们在本章中涵盖的各种主题:

@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfig {

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
return new KafkaStreamsConfiguration(props);
}

@Bean
public StreamsBuilderFactoryBeanConfigurer configurer() {
return fb -> fb.setStateListener((newState, oldState) -> {
System.out.println("State transition from " + oldState + " to " + newState);
});
}

@Bean
public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
KStream<Integer, String> stream = kStreamBuilder.stream("streamingTopic1");
stream
.mapValues((ValueMapper<String, String>) String::toUpperCase)
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(1_000)))
.reduce((String value1, String value2) -> value1 + value2,
Named.as("windowStore"))
.toStream()
.map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
.filter((i, s) -> s.length() > 40)
.to("streamingTopic2");

stream.print(Printed.toSysOut());

return stream;
}

}
java