跳到主要内容

Reactive Streams 支持

QWen Plus 中英对照 Reactive Streams Support

Spring Integration 在框架的某些地方和从不同方面提供了对 Reactive Streams 交互的支持。我们将在适当的地方讨论大部分内容,并在必要时提供指向目标章节的链接以供详细了解。

前言

为了回顾,Spring Integration 扩展了 Spring 编程模型以支持知名的 企业集成模式 。Spring Integration 在基于 Spring 的应用程序中实现了轻量级消息传递,并通过声明性适配器支持与外部系统的集成。Spring Integration 的主要目标是提供一个简单的模型来构建企业集成解决方案,同时保持关注点分离,这对于生成可维护、可测试的代码至关重要。这一目标是通过像 messagechannelendpoint 这样的第一类公民在目标应用程序中实现的,它们使我们能够构建集成流(管道),其中(在大多数情况下)一个端点将消息发送到通道,由另一个端点消费。这样我们可以区分集成交互模型和目标业务逻辑。这里的关键部分是在中间的通道:流的行为取决于其实现,而不会影响端点。

另一方面,Reactive Streams 是一个用于异步流处理的非阻塞背压的标准。Reactive Streams 的主要目标是管理跨异步边界(例如将元素传递给另一个线程或线程池)的流数据交换,同时确保接收方不会被迫缓冲任意数量的数据。换句话说,背压是这个模型中不可或缺的一部分,以便允许在各线程之间进行调解的队列是有界的。像 Project Reactor 这样的 Reactive Streams 实现的目标是在整个流应用程序的处理图中保留这些优点和特性。Reactive Streams 库的最终目标是以尽可能透明和平滑的方式为目标应用程序提供类型、一组操作符和支持 API,尽管可用的编程语言结构,但最终解决方案不像普通的函数链调用那样具有命令式。它分为两个阶段:定义和执行,这会在稍后订阅最终的响应式发布者时发生,并且根据需要应用背压从定义的底部向顶部推送数据需求 - 我们请求我们当前可以处理的事件数量。响应式应用程序看起来像一个 "流" 或者按照我们在 Spring 集成术语中的习惯 - "流程". 事实上,自 Java 9 以来,Reactive Streams SPI 在 java.util.concurrent.Flow 类中呈现。

从这里看,当我们对端点应用一些反应式框架操作符时,Spring Integration 流似乎非常适合编写 Reactive Streams 应用程序,但实际上问题要广泛得多,我们需要记住并不是所有端点(例如 JdbcMessageHandler)都可以在反应式流中透明地处理。当然,Spring Integration 中支持 Reactive Streams 的主要目标是允许整个过程完全反应式、按需启动并准备好背压。在目标协议和通道适配器的系统提供 Reactive Streams 交互模型之前,这是不可能实现的。在下面的部分中,我们将描述 Spring Integration 提供了哪些组件和方法来开发保留集成流结构的反应式应用程序。

备注

Spring Integration 中所有的 Reactive Streams 交互都是使用 Project Reactor 类型实现的,例如 MonoFlux

消息网关

与 Reactive Streams 最简单的交互点是一个 @MessagingGateway,我们只需将网关方法的返回类型设置为 Mono<?> —— 网关方法调用背后的整个集成流程将在返回的 Mono 实例上发生订阅时执行。更多信息请参见 Reactor Mono。类似的 Mono 回复方法在框架内部用于完全基于与 Reactive Streams 兼容协议的入站网关(更多详情请参阅下面的 Reactive Channel Adapters)。发送和接收操作被包装进一个 Mono.defer(),并在 replyChannel 头可用时链接回复评估。这样,针对特定反应式协议(例如 Netty)的入站组件将成为订阅者和在 Spring Integration 上执行的反应式流的发起者。如果请求有效负载是反应式类型,则最好在反应式流定义中处理它,将处理过程推迟到发起者订阅。为此,处理程序方法也必须返回一个反应式类型。更多详情请参阅下一节。

反应式回复有效负载

当回复生成的 MessageHandler 返回一个用于回复消息的反应式类型有效负载时,它会以异步方式处理,前提是为 outputChannel 提供了一个常规的 MessageChannel 实现(必须将 async 设置为 true),并且当输出通道是 ReactiveStreamsSubscribableChannel 实现(例如 FluxMessageChannel)时,会根据需要订阅进行展平。在标准命令式的 MessageChannel 使用场景中,如果回复的有效负载是一个 多值 发布者(有关更多信息,请参阅 ReactiveAdapter.isMultiValue()),它会被包装到 Mono.just() 中。因此,Mono 必须显式地在下游订阅或由下游的 FluxMessageChannel 展平。对于 outputChannelReactiveStreamsSubscribableChannel 的情况,无需担心返回类型和订阅;所有内容都会由框架内部顺畅处理。

更多信息请参见异步服务激活器

也请参阅 Kotlin 协程 以获取更多信息。

FluxMessageChannelReactiveStreamsConsumer

FluxMessageChannelMessageChannelPublisher<Message<?>> 的组合实现。一个作为热源的 Flux 在内部创建,用于接收来自 send() 实现的传入消息。Publisher.subscribe() 实现委托给该内部 Flux。此外,为了按需上游消费,FluxMessageChannel 提供了对 ReactiveStreamsSubscribableChannel 合约的实现。为该通道提供的任何上游 Publisher(例如,参见下面的 Source Polling Channel Adapter 和拆分器)在订阅准备好时会自动订阅此通道。来自这些代理发布者的事件会被下沉到上述内部 Flux 中。

FluxMessageChannel 的消费者必须是 org.reactivestreams.Subscriber 实例,以遵守 Reactive Streams 协议。幸运的是,Spring Integration 中所有的 MessageHandler 实现也实现了来自 Reactor 项目的 CoreSubscriber。并且由于中间有一个 ReactiveStreamsConsumer 实现,整个集成流配置对目标开发人员来说仍然是透明的。在这种情况下,流行为从命令式推送模型变为反应式拉取模型。ReactiveStreamsConsumer 还可以使用 IntegrationReactiveUtils 将任何 MessageChannel 转换为反应式源,从而使集成流部分反应式。

更多信息请参见 FluxMessageChannel

从 5.5 版本开始,ConsumerEndpointSpec 引入了一个 reactive() 选项,使流程中的端点可以作为一个 ReactiveStreamsConsumer,而与输入通道无关。可选的 Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>> 可以提供给自定义来自输入通道的源 Flux 通过 Flux.transform() 操作,例如使用 publishOn()doOnNext()retry() 等。此功能表示为所有消息注解 (@ServiceActivator@Splitter 等) 的 @Reactive 子注解,通过它们的 reactive() 属性。

源轮询通道适配器

通常情况下,SourcePollingChannelAdapter 依赖于由 TaskScheduler 发起的任务。根据提供的选项构建一个轮询触发器,并用于定期调度任务以轮询数据或事件的目标源。当 outputChannelReactiveStreamsSubscribableChannel 时,相同的 Trigger 用于确定下一次执行的时间,但不是调度任务,SourcePollingChannelAdapter 创建了一个基于 Flux.generate()Flux<Message<?>>,用于 nextExecutionTime 值和 Mono.delay() 来延迟上一步的持续时间。然后使用 Flux.flatMapMany() 轮询 maxMessagesPerPoll 并将它们下沉到输出 Flux 中。这个生成器 Flux 由提供的 ReactiveStreamsSubscribableChannel 订阅,并尊重下游的背压。从 5.5 版本开始,当 maxMessagesPerPoll == 0 时,根本不会调用源,并且 flatMapMany() 会立即通过 Mono.empty() 结果完成,直到稍后 maxMessagesPerPoll 更改为非零值,例如通过控制总线。这样,任何 MessageSource 实现都可以变成一个反应式的热源。

更多信息请参见 轮询消费者

事件驱动型通道适配器

MessageProducerSupport 是事件驱动通道适配器的基础类,通常,其 sendMessage(Message<?>) 用作生产驱动 API 中的监听器回调。当消息生产者实现构建一个 Flux 消息而不是基于监听器的功能时,此回调也可以轻松插入到 doOnNext() Reactor 操作符中。实际上,当消息生产者的 outputChannel 不是 ReactiveStreamsSubscribableChannel 时,框架中会这样做。然而,为了改善最终用户的体验,并允许更支持背压的功能,MessageProducerSupport 提供了一个 subscribeToPublisher(Publisher<? extends Message<?>>) API,在目标实现中使用,当 Publisher<Message<?>>> 是来自目标系统的数据源时。通常,它在 doStart() 实现中使用,当调用目标驱动 API 以获取源数据的 Publisher 时。建议将反应式的 MessageProducerSupport 实现与作为 outputChannelFluxMessageChannel 结合使用,以便按需订阅和下游事件消费。当对 Publisher 的订阅被取消时,通道适配器进入停止状态。对这种通道适配器调用 stop() 将完成从源 Publisher 的生产。可以通过自动订阅新创建的源 Publisher 来重新启动通道适配器。

消息源到反应式流

从 5.3 版本开始,提供了一个 ReactiveMessageSourceProducer。它是一个提供的 MessageSource 和事件驱动生产到配置的 outputChannel 的组合。内部它将一个 MessageSource 包装进反复重新订阅的 Mono 中,产生一个 Flux<Message<?>>,在上述提到的 subscribeToPublisher(Publisher<? extends Message<?>>) 中订阅。此 Mono 的订阅使用 Schedulers.boundedElastic() 完成,以避免目标 MessageSource 中可能出现的阻塞。当消息源返回 null(没有数据可拉取)时,Mono 被转换为带有 delayrepeatWhenEmpty() 状态,以便根据订阅者上下文中的 IntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEY Duration 条目进行后续重新订阅。默认情况下,它是 1 秒。如果 MessageSource 在头信息中生成带有 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 信息的消息,则会在原始 MonodoOnSuccess() 中确认(如果需要),并在下游流抛出带有失败消息的 MessagingException 时,在 doOnError() 中拒绝。这个 ReactiveMessageSourceProducer 可用于任何用例,即将轮询通道适配器的功能转变为任何现有 MessageSource<?> 实现的反应式、按需解决方案。

分割器和聚合器

AbstractMessageSplitter 为它的逻辑获取一个 Publisher 时,处理会自然地遍历 Publisher 中的项目,将它们映射成消息并发送到 outputChannel。如果此通道是一个 ReactiveStreamsSubscribableChannel,则根据该通道按需订阅 PublisherFlux 包装器,这种拆分器行为更像 Reactor 操作符中的 flatMap,当我们把一个传入事件映射为多值输出 Publisher 时。这在使用 FluxMessageChannel 在拆分器前后构建整个集成流时最有意义,使 Spring Integration 配置与响应式流要求及其用于事件处理的操作符保持一致。对于普通通道,Publisher 会被转换为 Iterable 以进行标准的迭代和生成拆分逻辑。

FluxAggregatorMessageHandler 是特定反应式流逻辑实现的另一个示例,在 Project Reactor 的术语中可以被视为一个 "反应式操作符"。它基于 Flux.groupBy()Flux.window()(或 buffer())操作符。传入的消息通过在创建 FluxAggregatorMessageHandler 时启动的 Flux.create() 汇入,使其成为一个热源。根据需求,这个 FluxReactiveStreamsSubscribableChannel 订阅,或者当 outputChannel 不是反应式时,在 FluxAggregatorMessageHandler.start() 中直接订阅。当整个集成流在该组件之前和之后都使用 FluxMessageChannel 构建时,这个 MessageHandler 就会发挥其强大的功能,使整个逻辑具备背压能力。

详见 流和通量拆分通量聚合器 以获取更多信息。

Java DSL

在 Java DSL 中,IntegrationFlow 可以从任何 Publisher 实例开始(参见 IntegrationFlow.from(Publisher<Message<T>>))。此外,通过 IntegrationFlowBuilder.toReactivePublisher() 操作符,IntegrationFlow 可以被转换为一个反应式的热源。在这两种情况下,内部都使用了 FluxMessageChannel;它可以按照其 ReactiveStreamsSubscribableChannel 合约订阅入站的 Publisher,并且它本身也是一个 Publisher<Message<?>> 以便为下游订阅者提供服务。通过动态的 IntegrationFlow 注册,我们可以实现强大的逻辑,将 Reactive Streams 与这个集成流桥接至/自 Publisher 相结合。

从 5.5.6 版本开始,提供了一个 toReactivePublisher(boolean autoStartOnSubscribe) 操作符变体,用于控制返回的 Publisher<Message<?>> 后面整个 IntegrationFlow 的生命周期。通常情况下,对响应式发布者的订阅和消费发生在稍后的运行时阶段,而不是在响应式流组合期间,甚至不是在 ApplicationContext 启动期间。为了避免在 Publisher<Message<?>> 订阅点管理 IntegrationFlow 生命周期的样板代码,并提供更好的用户体验,引入了带有 autoStartOnSubscribe 标志的新操作符。如果为 true,它会将 IntegrationFlow 及其组件标记为 autoStartup = false,因此 ApplicationContext 不会自动启动流中的消息生产和消费。相反,IntegrationFlowstart() 是由内部的 Flux.doOnSubscribe() 触发的。无论 autoStartOnSubscribe 的值如何,流都会在 Flux.doOnCancel()Flux.doOnTerminate() 中停止——如果没有任何消费者,生成消息是没有意义的。

对于完全相反的用例,当 IntegrationFlow 应该调用一个反应式流并在完成后继续时,在 IntegrationFlowDefinition 中提供了一个 fluxTransform() 操作符。此时,流程会变成一个 FluxMessageChannel,它会被传播到提供的 fluxFunction 中,并在 Flux.transform() 操作符中执行。函数的结果被包装成一个 Mono<Message<?>> 以扁平映射到输出 Flux,该 Flux 由另一个 FluxMessageChannel 订阅,用于下游流程。

更多信息请参见 Java DSL 章节

ReactiveMessageHandler

从 5.3 版本开始,ReactiveMessageHandler 在框架中得到原生支持。这种类型的消息处理器是为反应式客户端设计的,这些客户端返回一个反应式类型,用于按需订阅以执行低级操作,并且不提供任何回复数据来继续反应式流组合。当 ReactiveMessageHandler 在命令式集成流中使用时,handleMessage() 的结果在返回后立即被订阅,只是因为在这种流中没有反应式流组合来尊重背压。在这种情况下,框架将此 ReactiveMessageHandler 包装到 ReactiveMessageHandlerAdapter 中——这是 MessageHandler 的一个简单实现。然而,当 ReactiveStreamsConsumer 参与到流中(例如,当消费的通道是一个 FluxMessageChannel 时),这样的 ReactiveMessageHandler 会通过 flatMap() Reactor 操作符组成整个反应式流,以在消费过程中尊重背压。

ReactiveMessageHandler 的一个开箱即用的实现是用于 outbound channel adapter 的 ReactiveMongoDbStoringMessageHandler。更多信息,请参见 MongoDB Reactive Channel Adapters

从 6.1 版本开始,IntegrationFlowDefinition 暴露了一个便捷的 handleReactive(ReactiveMessageHandler) 终端操作符。任何 ReactiveMessageHandler 实现(甚至是仅使用 Mono API 的简单 lambda)都可以用于此操作符。框架会自动订阅返回的 Mono<Void>。以下是此操作符可能配置的一个简单示例:

@Bean
public IntegrationFlow wireTapFlow1() {
return IntegrationFlow.from("tappedChannel1")
.wireTap("tapChannel", wt -> wt.selector(m -> m.getPayload().equals("foo")))
.handleReactive((message) -> Mono.just(message).log().then());
}
java

此操作符的重载版本接受一个 Consumer<GenericEndpointSpec<ReactiveMessageHandlerAdapter>>,用于围绕提供的 ReactiveMessageHandler 定制一个消费者端点。

此外,还提供了基于 ReactiveMessageHandlerSpec 的变体。在大多数情况下,它们用于特定协议的通道适配器实现。请参阅下一节,其中包含指向目标技术及其相应反应式通道适配器的链接。

反应式通道适配器

当集成的目标协议提供了一个 Reactive Streams 解决方案时,在 Spring Integration 中实现通道适配器就变得简单直接了。

一个入站的、事件驱动的通道适配器实现是关于将请求(如果必要的话)包装成延迟的 MonoFlux,并在协议组件启动对监听器方法返回的 Mono 的订阅时执行发送(如果有,则生成回复)。这样,我们就将反应式流解决方案封装在这个组件中。当然,订阅输出通道的下游集成流应该遵循 Reactive Streams 规范,并以按需、支持背压的方式执行。

这并不是总是可用的,这是由集成流中使用的 MessageHandler 处理器的性质(或当前实现)决定的。可以通过在集成端点之前和之后使用线程池和队列或 FluxMessageChannel (见上文)来处理此限制,当没有反应式实现时。

一个反应式 事件驱动 的传入通道适配器示例:

public class CustomReactiveMessageProducer extends MessageProducerSupport {

private final CustomReactiveSource customReactiveSource;

public CustomReactiveMessageProducer(CustomReactiveSource customReactiveSource) {
Assert.notNull(customReactiveSource, "'customReactiveSource' must not be null");
this.customReactiveSource = customReactiveSource;
}

@Override
protected void doStart() {
Flux<Message<?>> messageFlux =
this.customReactiveSource
.map(event - >
MessageBuilder
.withPayload(event.getBody())
.setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
.build());

subscribeToPublisher(messageFlux);
}
}
java

用法如下:

public class MainFlow {
@Autowired
private CustomReactiveMessageProducer customReactiveMessageProducer;

@Bean
public IntegrationFlow buildFlow() {
return IntegrationFlow.from(customReactiveMessageProducer)
.channel(outputChannel)
.get();
}
}
java

或者以声明式的方式:

public class MainFlow {
@Bean
public IntegrationFlow buildFlow() {
return IntegrationFlow.from(new CustomReactiveMessageProducer(new CustomReactiveSource()))
.handle(outputChannel)
.get();
}
}
java

或者即使没有通道适配器,我们也可以始终以下面的方式使用 Java DSL:

public class MainFlow {
@Bean
public IntegrationFlow buildFlow() {
Flux<Message<?>> myFlux = this.customReactiveSource
.map(event ->
MessageBuilder
.withPayload(event.getBody())
.setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
.build());
return IntegrationFlow.from(myFlux)
.handle(outputChannel)
.get();
}
}
java

一个反应式 outbound 通道适配器的实现是关于根据目标协议提供的反应式 API 来启动(或继续)与外部系统的交互的反应式流。 inbound payload 可能本身就是一种反应式类型,或者作为整个集成流中的一个事件,而这个集成流是建立在反应式流之上的。如果我们在单向、发完即忘的场景中,返回的反应式类型可以立即订阅,或者它会在下游传播(请求 - 回复场景),以便进一步的集成流或在目标业务逻辑中进行显式订阅,但仍然保持下游的反应式流语义。

一个反应式输出通道适配器的例子:

public class CustomReactiveMessageHandler extends AbstractReactiveMessageHandler {

private final CustomEntityOperations customEntityOperations;

public CustomReactiveMessageHandler(CustomEntityOperations customEntityOperations) {
Assert.notNull(customEntityOperations, "'customEntityOperations' must not be null");
this.customEntityOperations = customEntityOperations;
}

@Override
protected Mono<Void> handleMessageInternal(Message<?> message) {
return Mono.fromSupplier(() -> message.getHeaders().get("queryType", Type.class))
.flatMap(mode -> {
switch (mode) {
case INSERT:
return handleInsert(message);
case UPDATE:
return handleUpdate(message);
default:
return Mono.error(new IllegalArgumentException());
}
}).then();
}

private Mono<Void> handleInsert(Message<?> message) {
return this.customEntityOperations.insert(message.getPayload())
.then();
}

private Mono<Void> handleUpdate(Message<?> message) {
return this.r2dbcEntityOperations.update(message.getPayload())
.then();
}

public enum Type {
INSERT,
UPDATE,
}
}
java

我们将能够使用这两个通道适配器:

public class MainFlow {

@Autowired
private CustomReactiveMessageProducer customReactiveMessageProducer;

@Autowired
private CustomReactiveMessageHandler customReactiveMessageHandler;

@Bean
public IntegrationFlow buildFlow() {
return IntegrationFlow.from(customReactiveMessageProducer)
.transform(someOperation)
.handle(customReactiveMessageHandler)
.get();
}
}
java

目前,Spring Integration 为 WebFluxRSocketMongoDbR2DBCZeroMQGraphQLApache Cassandra 提供了通道适配器(或网关)实现。Redis Stream 通道适配器 也是响应式的,并使用来自 Spring Data 的 ReactiveStreamOperations。更多的响应式通道适配器正在开发中,例如基于 Kafka 的 Apache Kafka,它将使用来自 Spring for Apache KafkaReactiveKafkaProducerTemplateReactiveKafkaConsumerTemplate 等等。对于许多其他非响应式通道适配器,建议使用线程池以避免在响应式流处理过程中发生阻塞。

从反应式到命令式的上下文传播

Context Propagation 库在类路径上时,Project Reactor 可以获取 ThreadLocal 值(例如 Micrometer ObservationSecurityContextHolder),并将它们存储到 Subscriber 上下文中。相反的操作也是可能的,当我们需要填充用于跟踪的日志 MDC,或者让我们从反应式流中调用的服务从作用域中恢复一个观察值。有关其上下文传播的特殊操作符的更多信息,请参阅 Project Reactor 文档。如果我们的整个解决方案是一个单一的反应式流组合,那么存储和恢复上下文可以顺利进行,因为 Subscriber 上下文可以从下游一直可见到组合的开始(FluxMono)。但是,如果应用程序在不同的 Flux 实例之间切换或进入命令式处理再返回,则绑定到 Subscriber 的上下文可能不可用。对于这种用例,Spring Integration 提供了一种额外的能力(从版本 6.0.5 开始),将 Reactor ContextView 存储到从反应式流生成的消息头 IntegrationMessageHeaderAccessor.REACTOR_CONTEXT 中,例如,当我们执行直接 send() 操作时。然后此消息头会在 FluxMessageChannel.subscribeTo() 中使用,以恢复该通道即将发出的 Message 的 Reactor 上下文。目前,此消息头由 WebFluxInboundEndpointRSocketInboundGateway 组件填充,但可以在任何执行反应式到命令式集成的解决方案中使用。填充此消息头的逻辑如下:

return requestMono
.flatMap((message) ->
Mono.deferContextual((context) ->
Mono.just(message)
.handle((messageToSend, sink) ->
send(messageWithReactorContextIfAny(messageToSend, context)))));
...

private Message<?> messageWithReactorContextIfAny(Message<?> message, ContextView context) {
if (!context.isEmpty()) {
return getMessageBuilderFactory()
.fromMessage(message)
.setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, context)
.build();
}
return message;
}
java

注意,我们仍然需要使用 handle() 操作符来使 Reactor 从上下文中恢复 ThreadLocal 值。即使它作为头部发送,框架也无法假设它是否会在下游恢复到 ThreadLocal 值。

要从另一个 FluxMono 组合中的 Message 恢复上下文,可以执行以下逻辑:

Mono.just(message)
.handle((messageToHandle, sink) -> ...)
.contextWrite(StaticMessageHeaderAccessor.getReactorContext(message)));
java