跳到主要内容

Apache Kafka 支持

QWen Plus 中英对照 Apache Kafka Support

概述

Spring Integration for Apache Kafka 是基于 Spring for Apache Kafka 项目

你需要将这个依赖添加到你的项目中:

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>6.4.2</version>
</dependency>
xml

它提供了以下组件:

输出通道适配器

outbound 通道适配器用于将消息从 Spring Integration 通道发布到 Apache Kafka 主题。该通道在应用程序上下文中定义,然后连接到发送消息到 Apache Kafka 的应用程序。发送方应用程序可以通过使用 Spring Integration 消息发布到 Apache Kafka,这些消息由 outbound 通道适配器内部转换为 Kafka 记录,具体如下:

  • Spring Integration 消息的有效载荷用于填充 Kafka 记录的有效载荷。

  • 默认情况下,Spring Integration 消息的 kafka_messageKey 头用于填充 Kafka 记录的键。

你可以分别通过 kafka_topickafka_partitionId 头自定义发布消息的目标主题和分区。

此外,<int-kafka:outbound-channel-adapter> 通过在 outbound 消息上应用 SpEL 表达式,提供了提取键、目标主题和目标分区的功能。为此,它支持三对互斥属性:

  • topictopic-expression

  • message-keymessage-key-expression

  • partition-idpartition-id-expression

这些让你分别将 topicmessage-keypartition-id 作为静态值在适配器上指定,或者在运行时根据请求消息动态评估它们的值。

important

KafkaHeaders 接口(由 spring-kafka 提供)包含用于与消息头交互的常量。messageKeytopic 默认头现在需要一个 kafka_ 前缀。当从使用旧头的早期版本迁移时,您需要在 <int-kafka:outbound-channel-adapter> 上指定 message-key-expression="headers['messageKey']"topic-expression="headers['topic']"。或者,您可以通过使用 <header-enricher>MessageBuilder 将上游头更改为来自 KafkaHeaders 的新头。如果您使用常量值,也可以通过使用 topicmessage-key 在适配器上配置它们。

注意:如果适配器配置了主题或消息键(无论是使用常量还是表达式),则会使用这些配置,相应的标头将被忽略。如果您希望标头覆盖配置,您需要在表达式中进行配置,例如以下内容:

topic-expression="headers['topic'] != null ? headers['topic'] : 'myTopic'"
java

适配器需要一个 KafkaTemplate,而 KafkaTemplate 又需要一个配置合适的 KafkaProducerFactory

如果提供了 send-failure-channel (sendFailureChannel) 并且接收到 send() 失败(同步或异步),则会向该通道发送一个 ErrorMessage。有效负载是一个带有 failedMessagerecord(即 ProducerRecord)和 cause 属性的 KafkaSendFailureException。您可以通过设置 error-message-strategy 属性来覆盖 DefaultErrorMessageStrategy

如果提供了 send-success-channel (sendSuccessChannel),在成功发送后会发送一条有效载荷类型为 org.apache.kafka.clients.producer.RecordMetadata 的消息。

important

如果你的应用程序使用事务,并且同一个通道适配器用于发布消息,其中事务由监听器容器启动,同时也用于在没有现有事务的情况下发布消息,你必须在 KafkaTemplate 上配置一个 transactionIdPrefix 以覆盖容器或事务管理器使用的前缀。由容器启动的事务(生产者工厂或事务管理器属性)所使用的前缀在所有应用程序实例上必须相同。仅用于生产者的事务所使用的前缀在所有应用程序实例上必须唯一。

你可以配置一个 flushExpression,它必须解析为布尔值。在发送几条消息后刷新可能很有用,如果你正在使用 linger.msbatch.size 这些 Kafka 生产者属性;该表达式应在最后一条消息上评估为 Boolean.TRUE,并且不完整的批次将立即发送。默认情况下,表达式会在 KafkaIntegrationHeaders.FLUSH 头 (kafka_flush) 中查找一个 Boolean 值。如果值为 true 则会刷新,而如果值为 false 或头不存在则不会刷新。

KafkaProducerMessageHandler.sendTimeoutExpression 的默认值已从 10 秒更改为 Kafka 生产者属性 delivery.timeout.ms + 5000,以便在超时后将实际的 Kafka 错误传播给应用程序,而不是由本框架生成的超时。为了保持一致性而进行了此更改,因为您可能会遇到意外行为(Spring 可能在发送时超时,但实际上最终是成功的)。重要提示:该超时默认为 120 秒,因此您可能希望减少它以获得更及时的失败通知。

配置

以下示例展示了如何配置 Apache Kafka 的 outbound 通道适配器:

@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(embeddedKafka));
}

@Bean
public IntegrationFlow sendToKafkaFlow() {
return f -> f
.splitWith(s -> s.<String>function(p -> Stream.generate(() -> p).limit(101).iterator()))
.publishSubscribeChannel(c -> c
.subscribe(sf -> sf.handle(
kafkaMessageHandler(producerFactory(), TEST_TOPIC1)
.timestampExpression("T(Long).valueOf('1487694048633')"),
e -> e.id("kafkaProducer1")))
.subscribe(sf -> sf.handle(
kafkaMessageHandler(producerFactory(), TEST_TOPIC2)
.timestamp(m -> 1487694048644L),
e -> e.id("kafkaProducer2")))
);
}

@Bean
public DefaultKafkaHeaderMapper mapper() {
return new DefaultKafkaHeaderMapper();
}

private KafkaProducerMessageHandlerSpec<Integer, String, ?> kafkaMessageHandler(
ProducerFactory<Integer, String> producerFactory, String topic) {
return Kafka
.outboundChannelAdapter(producerFactory)
.messageKey(m -> m
.getHeaders()
.get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
.headerMapper(mapper())
.partitionId(m -> 10)
.topicExpression("headers[kafka_topic] ?: '" + topic + "'")
.configureKafkaTemplate(t -> t.id("kafkaTemplate:" + topic));
}
java

消息驱动的通道适配器

KafkaMessageDrivenChannelAdapter (<int-kafka:message-driven-channel-adapter>) 使用 spring-kafkaKafkaMessageListenerContainerConcurrentListenerContainer

此外,mode 属性也是可用的。它可以接受 recordbatch 的值(默认:record)。对于 record 模式,每个消息有效负载是从单个 ConsumerRecord 转换而来的。对于 batch 模式,有效负载是从消费者轮询返回的所有 ConsumerRecord 实例转换而来的一组对象。与批处理的 @KafkaListener 一样,KafkaHeaders.RECEIVED_KEYKafkaHeaders.RECEIVED_PARTITIONKafkaHeaders.RECEIVED_TOPICKafkaHeaders.OFFSET 标头也是列表,其位置对应于有效负载中的位置。

接收到的消息具有某些已填充的标题。有关更多信息,请参阅 KafkaHeaders 类

important

Consumer 对象(在 kafka_consumer 标头中)不是线程安全的。你只能在适配器内调用监听器的线程上调用其方法。如果你将消息传递给另一个线程,则不得调用其方法。

当提供了 retry-template 时,会根据其重试策略对投递失败进行重试。如果也提供了 error-channel,在重试耗尽后将使用默认的 ErrorMessageSendingRecoverer 作为恢复回调。您也可以使用 recovery-callback 来指定在这种情况下要采取的其他操作,或者将其设置为 null 以将最终异常抛给监听器容器,以便在那里处理。

当构建一个 ErrorMessage(用于 error-channelrecovery-callback)时,你可以通过设置 error-message-strategy 属性来自定义错误消息。默认情况下,使用 RawRecordHeaderErrorMessageStrategy,以提供对转换后的消息以及原始 ConsumerRecord 的访问。

important

这种重试形式是阻塞的,并且如果所有轮询记录的累积重试延迟可能超过 max.poll.interval.ms 消费者属性,这可能会导致重新平衡。相反,请考虑向监听器容器添加一个 DefaultErrorHandler,并使用 KafkaErrorSendingMessageRecoverer 进行配置。

配置

以下示例展示了如何配置一个消息驱动的通道适配器:

@Bean
public IntegrationFlow topic1ListenerFromKafkaFlow() {
return IntegrationFlow
.from(Kafka.messageDrivenChannelAdapter(consumerFactory(),
KafkaMessageDrivenChannelAdapter.ListenerMode.record, TEST_TOPIC1)
.configureListenerContainer(c ->
c.ackMode(AbstractMessageListenerContainer.AckMode.MANUAL)
.id("topic1ListenerContainer"))
.recoveryCallback(new ErrorMessageSendingRecoverer(errorChannel(),
new RawRecordHeaderErrorMessageStrategy()))
.retryTemplate(new RetryTemplate())
.filterInRetry(true))
.filter(Message.class, m ->
m.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY, Integer.class) < 101,
f -> f.throwExceptionOnRejection(true))
.<String, String>transform(String::toUpperCase)
.channel(c -> c.queue("listeningFromKafkaResults1"))
.get();
}
java

你也可以使用用于 @KafkaListener 注解的容器工厂来为其他目的创建 ConcurrentMessageListenerContainer 实例。有关示例,请参阅 Spring for Apache Kafka 文档

使用 Java DSL 时,容器不必配置为 @Bean,因为 DSL 会将容器注册为一个 bean。以下示例展示了如何做到这一点:

@Bean
public IntegrationFlow topic2ListenerFromKafkaFlow() {
return IntegrationFlow
.from(Kafka.messageDrivenChannelAdapter(kafkaListenerContainerFactory().createContainer(TEST_TOPIC2),
KafkaMessageDrivenChannelAdapter.ListenerMode.record)
.id("topic2Adapter"))
...
get();
}
java

请注意,在这种情况下,适配器被赋予了一个 id (topic2Adapter)。容器在应用程序上下文中以 topic2Adapter.container 为名称注册。如果适配器没有 id 属性,则容器的 bean 名称是容器的全限定类名加上 #n,其中 n 对每个容器递增。

入站通道适配器

KafkaMessageSource 提供了一个可轮询的通道适配器实现。

配置

@Bean
public IntegrationFlow flow(ConsumerFactory<String, String> cf) {
return IntegrationFlow.from(Kafka.inboundChannelAdapter(cf, new ConsumerProperties("myTopic")),
e -> e.poller(Pollers.fixedDelay(5000)))
.handle(System.out::println)
.get();
}
java

请参阅 javadocs 以获取可用属性。

默认情况下,max.poll.records 必须在消费者工厂中显式设置,或者如果消费者工厂是 DefaultKafkaConsumerFactory,它将被强制设置为 1。您可以将属性 allowMultiFetch 设置为 true 来覆盖此行为。

important

必须在 max.poll.interval.ms 内轮询消费者以避免重新平衡。如果你将 allowMultiFetch 设置为 true,则必须在 max.poll.interval.ms 内处理所有检索到的记录并再次轮询。

此适配器发出的消息包含一个标题 kafka_remainingRecords,其中包含来自上次轮询的剩余记录数。

从版本 6.2 开始,KafkaMessageSource 支持消费者属性中提供的 ErrorHandlingDeserializerDeserializationException 从记录头中提取并抛出给调用者。对于 SourcePollingChannelAdapter,此异常会被包装成一个 ErrorMessage 并发布到其 errorChannel。更多信息请参阅 ErrorHandlingDeserializer 文档。

外向网关

outbound 网关用于请求/回复操作。它与大多数 Spring Integration 网关不同,发送线程在网关中不会阻塞,并且回复是在回复监听器容器线程上处理的。如果您的代码在同步 Messaging Gateway 后调用网关,则用户线程会在那里阻塞,直到收到回复(或发生超时)。

KafkaProducerMessageHandlersendTimeoutExpression 默认值是 Kafka 生产者属性 delivery.timeout.ms + 5000,以便在超时后将实际的 Kafka 错误传播给应用程序,而不是由本框架生成的超时。这已经为了保持一致性而更改,因为您可能会遇到意外行为(Spring 可能在消息实际上最终成功发送的情况下使 send() 超时)。重要提示:该超时默认为 120 秒,因此您可能希望减少它以获得更及时的失败通知。

配置

以下示例展示了如何配置网关:

@Bean
public IntegrationFlow outboundGateFlow(
ReplyingKafkaTemplate<String, String, String> kafkaTemplate) {

return IntegrationFlow.from("kafkaRequests")
.handle(Kafka.outboundGateway(kafkaTemplate))
.channel("kafkaReplies")
.get();
}
java

请参阅 javadocs 以获取可用属性。

请注意,使用了与outbound channel adapter相同的类,唯一的区别是传递给构造函数的 KafkaTemplate 是一个 ReplyingKafkaTemplate。更多信息请参阅 Spring for Apache Kafka 文档

outbound 主题、分区、键等的确定方式与 outbound 适配器相同。回复主题的确定方式如下:

  1. 消息头 KafkaHeaders.REPLY_TOPIC(如果存在,它必须具有 Stringbyte[] 类型的值)会与模板的回复容器订阅的主题进行验证。

  2. 如果模板的 replyContainer 只订阅了一个主题,则使用该主题。

你还可以指定一个 KafkaHeaders.REPLY_PARTITION 头来确定用于回复的特定分区。同样,这会根据模板的回复容器的订阅进行验证。

或者,你也可以使用类似于以下bean的配置:

@Bean
public IntegrationFlow outboundGateFlow() {
return IntegrationFlow.from("kafkaRequests")
.handle(Kafka.outboundGateway(producerFactory(), replyContainer())
.configureKafkaTemplate(t -> t.replyTimeout(30_000)))
.channel("kafkaReplies")
.get();
}
java

入站网关

入站网关用于请求/回复操作。

配置

以下示例展示了如何配置入站网关:

@Bean
public IntegrationFlow serverGateway(
ConcurrentMessageListenerContainer<Integer, String> container,
KafkaTemplate<Integer, String> replyTemplate) {
return IntegrationFlow
.from(Kafka.inboundGateway(container, replyTemplate)
.replyTimeout(30_000))
.<String, String>transform(String::toUpperCase)
.get();
}
java

请参阅 javadocs 以获取可用属性。

当提供了 RetryTemplate 时,根据其重试策略会重新尝试交付失败的情况。如果也提供了 error-channel,在重试耗尽后将使用默认的 ErrorMessageSendingRecoverer 作为恢复回调。您也可以使用 recovery-callback 来指定在这种情况下采取其他操作,或者将其设置为 null 以将最终异常抛给监听器容器,以便在那里处理。

在构建 ErrorMessage(用于 error-channelrecovery-callback)时,您可以通过设置 error-message-strategy 属性来自定义错误消息。默认情况下,使用 RawRecordHeaderErrorMessageStrategy,以提供对转换后的消息以及原始 ConsumerRecord 的访问。

important

这种重试方式是阻塞的,并且如果所有轮询记录的累计重试延迟可能超过 max.poll.interval.ms 消费者属性,可能会导致再平衡。相反,考虑为监听器容器添加一个 DefaultErrorHandler,并使用 KafkaErrorSendingMessageRecoverer 进行配置。

以下示例展示了如何使用 Java DSL 配置一个简单的大写转换器:

或者,你可以通过使用类似以下的代码来配置一个大写转换器:

@Bean
public IntegrationFlow serverGateway() {
return IntegrationFlow
.from(Kafka.inboundGateway(consumerFactory(), containerProperties(),
producerFactory())
.replyTimeout(30_000))
.<String, String>transform(String::toUpperCase)
.get();
}
java

你也可以使用为 @KafkaListener 注解所用的容器工厂来创建 ConcurrentMessageListenerContainer 实例以用于其他目的。请参阅 Spring for Apache Kafka 文档消息驱动的通道适配器 以获取示例。

由 Apache Kafka 主题支持的通道

Spring Integration 有由 Apache Kafka 主题支持的 MessageChannel 实现,用于持久化。

每个通道都需要一个 KafkaTemplate 用于发送端,以及一个监听器容器工厂(对于可订阅的通道)或一个 KafkaMessageSource 用于可轮询的通道。

Java DSL 配置

@Bean
public IntegrationFlow flowWithSubscribable(KafkaTemplate<Integer, String> template,
ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {

return IntegrationFlow.from(...)
...
.channel(Kafka.channel(template, containerFactory, "someTopic1").groupId("group1"))
...
.get();
}

@Bean
public IntegrationFlow flowWithPubSub(KafkaTemplate<Integer, String> template,
ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {

return IntegrationFlow.from(...)
...
.publishSubscribeChannel(pubSub(template, containerFactory),
pubsub -> pubsub
.subscribe(subflow -> ...)
.subscribe(subflow -> ...))
.get();
}

@Bean
public BroadcastCapableChannel pubSub(KafkaTemplate<Integer, String> template,
ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {

return Kafka.publishSubscribeChannel(template, containerFactory, "someTopic2")
.groupId("group2")
.get();
}

@Bean
public IntegrationFlow flowWithPollable(KafkaTemplate<Integer, String> template,
KafkaMessageSource<Integer, String> source) {

return IntegrationFlow.from(...)
...
.channel(Kafka.pollableChannel(template, source, "someTopic3").groupId("group3"))
.handle(..., e -> e.poller(...))
...
.get();
}
java

消息转换

提供了 StringJsonMessageConverter。有关更多信息,请参阅 Spring for Apache Kafka 文档

当使用此转换器与消息驱动的通道适配器时,可以指定您希望传入的有效负载转换为的类型。这是通过在适配器上设置 payload-type 属性(payloadType 属性)来实现的。以下示例显示了如何在 XML 配置中进行设置:

<int:channel id="inputChannel">
<int:queue capacity="10"/>
</int:channel>

<int:jms-message-driven-channel-adapter
id="jmsIn"
channel="inputChannel"
connection-factory="connectionFactory"
destination="queueName"
payload-type="java.lang.String"/>
xml

以上配置将确保接收到的消息有效负载被转换为 java.lang.String 类型。

<int-kafka:message-driven-channel-adapter
id="kafkaListener"
listener-container="container1"
auto-startup="false"
phase="100"
send-timeout="5000"
channel="nullChannel"
message-converter="messageConverter"
payload-type="com.example.Thing"
error-channel="errorChannel" />

<bean id="messageConverter"
class="org.springframework.kafka.support.converter.MessagingMessageConverter"/>
xml

以下示例展示了如何在 Java 配置中为适配器设置 payload-type 属性(payloadType 属性):

@Bean
public KafkaMessageDrivenChannelAdapter<String, String>
adapter(KafkaMessageListenerContainer<String, String> container) {
KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
new KafkaMessageDrivenChannelAdapter<>(container, ListenerMode.record);
kafkaMessageDrivenChannelAdapter.setOutputChannel(received());
kafkaMessageDrivenChannelAdapter.setMessageConverter(converter());
kafkaMessageDrivenChannelAdapter.setPayloadType(Thing.class);
return kafkaMessageDrivenChannelAdapter;
}
java

空负载和日志压缩 '墓碑' 记录

Spring Messaging Message<?> 对象不能有 null 负载。当您使用 Apache Kafka 的端点时,null 负载(也称为墓碑记录)由类型为 KafkaNull 的负载表示。更多信息请参阅 Spring for Apache Kafka 文档

Spring Integration 端点的 POJO 方法可以使用真正的 null 值而不是 KafkaNull。要这样做,可以用 @Payload(required = false) 标记参数。以下示例展示了如何做到这一点:

@ServiceActivator(inputChannel = "fromSomeKafkaInboundEndpoint")
public void in(@Header(KafkaHeaders.RECEIVED_KEY) String key,
@Payload(required = false) Customer customer) {
// customer is null if a tombstone record
...
}
java

KStream 调用 Spring Integration 流

你可以使用 MessagingTransformerKStream 调用集成流:

@Bean
public KStream<byte[], byte[]> kStream(StreamsBuilder kStreamBuilder,
MessagingTransformer<byte[], byte[], byte[]> transformer) transformer) {
KStream<byte[], byte[]> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
stream.mapValues((ValueMapper<byte[], byte[]>) String::toUpperCase)
...
.transform(() -> transformer)
.to(streamingTopic2);

stream.print(Printed.toSysOut());

return stream;
}

@Bean
@DependsOn("flow")
public MessagingTransformer<byte[], byte[], String> transformer(
MessagingFunction function) {

MessagingMessageConverter converter = new MessagingMessageConverter();
converter.setHeaderMapper(new SimpleKafkaHeaderMapper("*"));
return new MessagingTransformer<>(function, converter);
}

@Bean
public IntegrationFlow flow() {
return IntegrationFlow.from(MessagingFunction.class)
...
.get();
}
java

当集成流以接口开始时,创建的代理具有流 bean 的名称,并附加了 ".gateway",因此如果需要,此 bean 名称可以用作 @Qualifier

读/处理/写 入场景的性能考虑

许多应用程序从一个主题读取消息,进行一些处理,然后写入另一个主题。在大多数情况下,如果 write 操作失败,应用程序希望抛出一个异常,以便可以重试传入的请求和/或将请求发送到死信主题。此功能由底层的消息监听器容器支持,并结合适当配置的错误处理器来实现。然而,为了支持这一点,我们需要阻塞监听器线程,直到写操作成功(或失败),以便任何异常都可以抛出到容器。当消费单条记录时,这是通过在传出适配器上设置 sync 属性来实现的。但是,在批量消费时,使用 sync 会导致性能显著下降,因为应用程序会在发送下一条消息之前等待每个发送的结果。你也可以执行多次发送,然后在此之后等待这些发送的结果。这是通过向消息处理器添加 futuresChannel 来实现的。要启用此功能,请将 KafkaIntegrationHeaders.FUTURE_TOKEN 添加到传出的消息中;这可以用来将 Future 与特定发送的消息相关联。以下是一个示例,展示了你如何使用此功能:

@SpringBootApplication
public class FuturesChannelApplication {

public static void main(String[] args) {
SpringApplication.run(FuturesChannelApplication.class, args);
}

@Bean
IntegrationFlow inbound(ConsumerFactory<String, String> consumerFactory, Handler handler) {
return IntegrationFlow.from(Kafka.messageDrivenChannelAdapter(consumerFactory,
ListenerMode.batch, "inTopic"))
.handle(handler)
.get();
}

@Bean
IntegrationFlow outbound(KafkaTemplate<String, String> kafkaTemplate) {
return IntegrationFlow.from(Gate.class)
.enrichHeaders(h -> h
.header(KafkaHeaders.TOPIC, "outTopic")
.headerExpression(KafkaIntegrationHeaders.FUTURE_TOKEN, "headers[id]"))
.handle(Kafka.outboundChannelAdapter(kafkaTemplate)
.futuresChannel("futures"))
.get();
}

@Bean
PollableChannel futures() {
return new QueueChannel();
}

}

@Component
@DependsOn("outbound")
class Handler {

@Autowired
Gate gate;

@Autowired
PollableChannel futures;

public void handle(List<String> input) throws Exception {
System.out.println(input);
input.forEach(str -> this.gate.send(str.toUpperCase()));
for (int i = 0; i < input.size(); i++) {
Message<?> future = this.futures.receive(10000);
((Future<?>) future.getPayload()).get(10, TimeUnit.SECONDS);
}
}

}

interface Gate {

void send(String out);

}
java