Reactive Streams 支持
Spring Integration 在框架的某些位置以及从不同方面提供了对 Reactive Streams 交互的支持。我们将在必要时通过指向目标章节的适当链接来详细讨论其中的大部分内容。
前言
回顾一下,Spring Integration 扩展了 Spring 编程模型以支持著名的企业集成模式。Spring Integration 支持在基于 Spring 的应用程序中进行轻量级消息传递,并通过声明式适配器支持与外部系统的集成。Spring Integration 的主要目标是为构建企业集成解决方案提供一个简单的模型,同时保持关注点分离,这对于生成可维护、可测试的代码至关重要。这一目标在目标应用程序中通过使用 message、channel 和 endpoint 等核心组件来实现,这些组件允许我们构建一个集成流(管道),其中(在大多数情况下)一个端点将消息生成到通道中,供另一个端点消费。通过这种方式,我们将集成交互模型与目标业务逻辑区分开来。这里的关键部分是中间的通道:流的行为取决于其实现,而端点保持不变。
另一方面,Reactive Streams 是一种支持非阻塞背压的异步流处理标准。其主要目标是在异步边界(例如将元素传递到另一个线程或线程池)上管理流数据的交换,同时确保接收方不会被强制缓冲任意数量的数据。换言之,背压是该模型的内在组成部分,以便允许在线程间协调的队列具有边界。Reactive Streams 的实现(例如 Project Reactor)旨在在整个流应用程序的处理图中保持这些优势和特性。Reactive Streams 库的最终目标是以透明且流畅的方式为目标应用程序提供类型、操作符集和支持 API,尽可能利用现有编程语言结构,但最终解决方案并不像普通函数链调用那样具有强制性。它分为两个阶段:定义和执行。执行发生在稍后订阅最终响应式发布者时,数据需求从定义的底部向上推送,根据需要应用背压——我们请求当前能够处理的事件数量。响应式应用程序看起来像一个 "流",或者用 Spring Integration 的术语来说——"流"。事实上,自 Java 9 起,Reactive Streams SPI 已通过 java.util.concurrent.Flow 类提供。
从这里看,当我们对端点应用一些响应式框架操作符时,Spring Integration 流似乎非常适合编写响应式流应用程序。但实际上,问题要广泛得多,我们需要记住,并非所有端点(例如 JdbcMessageHandler)都能在响应式流中透明地处理。当然,Spring Integration 中支持响应式流的主要目标是允许整个流程完全响应式、按需启动并具备背压能力。除非通道适配器的目标协议和系统提供响应式流交互模型,否则这是不可能实现的。在下面的章节中,我们将描述 Spring Integration 为开发保留集成流结构的响应式应用程序提供了哪些组件和方法。
Spring Integration 中的所有 Reactive Streams 交互均通过 Project Reactor 类型实现,例如 Mono 和 Flux。
消息网关
与 Reactive Streams 最简单的交互点是通过 @MessagingGateway,我们只需将网关方法的返回类型设为 Mono<?>——当对返回的 Mono 实例进行订阅时,网关方法调用背后的整个集成流程将被执行。更多信息请参阅 Reactor Mono。框架内部对完全基于 Reactive Streams 兼容协议的入站网关也采用了类似的 Mono 回复方法(更多信息请参阅下面的 Reactive Channel Adapters)。发送-接收操作被包装在 Mono.defer() 中,并在 replyChannel 头可用时链接回复评估。这样,特定响应式协议(例如 Netty)的入站组件将作为订阅者和 Spring Integration 上执行的响应式流的启动器。如果请求负载是响应式类型,最好在响应式流定义中处理它,将处理过程推迟到启动器订阅时。为此,处理方法也必须返回响应式类型。更多信息请参阅下一节。
响应式回复负载
当生成回复的 MessageHandler 返回一个响应式类型的回复消息负载时,它会以异步方式处理,并为 outputChannel 提供一个常规的 MessageChannel 实现(async 必须设置为 true),并在输出通道是 ReactiveStreamsSubscribableChannel 实现(例如 FluxMessageChannel)时按需订阅进行扁平化处理。在标准的命令式 MessageChannel 使用场景中,如果回复负载是一个多值发布者(更多信息请参见 ReactiveAdapter.isMultiValue()),它会被包装成 Mono.just()。这样做的结果是,Mono 必须在下游显式订阅或由下游的 FluxMessageChannel 进行扁平化处理。当 outputChannel 使用 ReactiveStreamsSubscribableChannel 时,无需关心返回类型和订阅问题;所有内容都会由框架内部顺利处理。
更多信息请参见异步服务激活器。
另请参阅 Kotlin 协程 以获取更多信息。
FluxMessageChannel 与 ReactiveStreamsConsumer
FluxMessageChannel 是 MessageChannel 和 Publisher<Message<?>> 的组合实现。作为一个热源,内部创建了一个 Flux 来接收来自 send() 实现的消息。Publisher.subscribe() 的实现被委托给该内部 Flux。此外,为了按需消费上游消息,FluxMessageChannel 提供了 ReactiveStreamsSubscribableChannel 契约的实现。当此通道准备好订阅时,为此通道提供的任何上游 Publisher(例如,参见下面的源轮询通道适配器和拆分器)都会自动订阅。来自此委托发布者的事件会被汇入上述内部 Flux 中。
FluxMessageChannel 的消费者必须是一个 org.reactivestreams.Subscriber 实例,以遵守 Reactive Streams 规范。幸运的是,Spring Integration 中的所有 MessageHandler 实现也都实现了来自 Reactor 项目的 CoreSubscriber。得益于中间层的 ReactiveStreamsConsumer 实现,整个集成流的配置对目标开发者保持透明。在这种情况下,流的行为从命令式的推送模型转变为响应式的拉取模型。通过使用 IntegrationReactiveUtils,ReactiveStreamsConsumer 也可以将任何 MessageChannel 转换为响应式源,从而使集成流部分实现响应式。
更多信息请参见 FluxMessageChannel。
从版本5.5开始,ConsumerEndpointSpec 引入了 reactive() 选项,用于将流中的端点作为 ReactiveStreamsConsumer,独立于输入通道。可以通过 Flux.transform() 操作提供一个可选的 Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>> 来定制来自输入通道的源 Flux,例如使用 publishOn()、doOnNext()、retry() 等。此功能通过所有消息注解(@ServiceActivator、@Splitter 等)的 reactive() 属性,以 @Reactive 子注解的形式表示。
源轮询通道适配器
通常,SourcePollingChannelAdapter 依赖于由 TaskScheduler 启动的任务。它会根据提供的选项构建一个轮询触发器,用于定期调度任务以轮询数据或事件的目标源。当 outputChannel 是 ReactiveStreamsSubscribableChannel 时,使用相同的 Trigger 来确定下一次执行时间,但 SourcePollingChannelAdapter 不会调度任务,而是基于 Flux.generate() 为 nextExecutionTime 值创建 Flux<Message<?>>,并使用 Mono.delay() 处理上一步的持续时间。然后使用 Flux.flatMapMany() 轮询 maxMessagesPerPoll 并将其汇入输出 Flux。这个生成器 Flux 由提供的 ReactiveStreamsSubscribableChannel 订阅,并遵循下游的反压机制。从版本 5.5 开始,当 maxMessagesPerPoll == 0 时,完全不会调用源,并且 flatMapMany() 会通过 Mono.empty() 结果立即完成,直到稍后(例如通过控制总线)将 maxMessagesPerPoll 更改为非零值。通过这种方式,任何 MessageSource 实现都可以转换为响应式热源。
更多信息请参见轮询消费者。
事件驱动型通道适配器
MessageProducerSupport 是事件驱动型通道适配器的基类,通常其 sendMessage(Message<?>) 方法被用作生产端驱动 API 中的监听器回调。当消息生产者实现构建的是消息的 Flux 流而非基于监听器的功能时,此回调也可轻松接入 doOnNext() Reactor 操作符。实际上,当消息生产者的 outputChannel 不是 ReactiveStreamsSubscribableChannel 时,框架内部便会执行此操作。然而,为提升终端用户体验并支持更多背压就绪功能,MessageProducerSupport 提供了 subscribeToPublisher(Publisher<? extends Message<?>>) API,当目标系统的数据源为 Publisher<Message<?>>> 时,可在目标实现中使用此 API。通常,在调用目标驱动 API 获取源数据的 Publisher 时,会从 doStart() 实现中调用此方法。建议将响应式 MessageProducerSupport 实现与 FluxMessageChannel 结合作为 outputChannel,以实现下游按需订阅和事件消费。当对 Publisher 的订阅被取消时,通道适配器将进入停止状态。对此类通道适配器调用 stop() 将完成对源 Publisher 的生产操作。通道适配器可通过自动订阅新创建的源 Publisher 重新启动。
消息源到响应式流
从版本 5.3 开始,提供了一个 ReactiveMessageSourceProducer。它是提供的 MessageSource 与事件驱动生产到配置的 outputChannel 的组合。在内部,它将 MessageSource 包装到一个重复重新订阅的 Mono 中,生成一个 Flux<Message<?>>,以便在上面提到的 subscribeToPublisher(Publisher<? extends Message<?>>) 中进行订阅。此 Mono 的订阅使用 Schedulers.boundedElastic() 完成,以避免在目标 MessageSource 中可能出现的阻塞。当消息源返回 null(没有数据可拉取)时,Mono 会转换为 repeatWhenEmpty() 状态,并基于订阅者上下文中的 IntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEY Duration 条目进行延迟,以便后续重新订阅。默认情况下,延迟为 1 秒。如果 MessageSource 生成的消息头部包含 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 信息,则会在原始 Mono 的 doOnSuccess() 中确认(如果需要),并在下游流抛出带有失败消息的 MessagingException 以拒绝时,在 doOnError() 中拒绝。当需要将轮询通道适配器的功能转换为针对任何现有 MessageSource<?> 实现的反式按需解决方案时,此 ReactiveMessageSourceProducer 可用于任何用例。
分割器与聚合器
当 AbstractMessageSplitter 获取到用于其逻辑的 Publisher 时,处理过程会自然地遍历 Publisher 中的项,将它们映射为消息并发送到 outputChannel。如果该通道是 ReactiveStreamsSubscribableChannel,则 Publisher 的 Flux 包装器会根据该通道的需求进行订阅,此时拆分器的行为更类似于 Reactor 操作符中的 flatMap,即将传入的事件映射为多值输出的 Publisher。当整个集成流在拆分器前后都使用 FluxMessageChannel 构建时,这种做法最为合理,这样可以将 Spring Integration 配置与 Reactive Streams 的要求及其事件处理操作符对齐。而对于常规通道,Publisher 会被转换为 Iterable,以便执行标准的迭代-生成拆分逻辑。
FluxAggregatorMessageHandler 是特定 Reactive Streams 逻辑实现的另一个示例,在 Project Reactor 的语境中可被视为一个"响应式操作符"。它基于 Flux.groupBy() 和 Flux.window()(或 buffer())操作符构建。当 FluxAggregatorMessageHandler 创建时,传入的消息会汇入由 Flux.create() 初始化的流中,使其成为热源。该 Flux 由 ReactiveStreamsSubscribableChannel 按需订阅,或在 outputChannel 非响应式时通过 FluxAggregatorMessageHandler.start() 直接订阅。当整个集成流在该组件前后均使用 FluxMessageChannel 构建时,此 MessageHandler 将充分发挥其能力,使整个逻辑具备背压就绪特性。
更多信息请参见流与 Flux 拆分和Flux 聚合器。
Java DSL
在Java DSL中,IntegrationFlow可以从任何Publisher实例开始(参见IntegrationFlow.from(Publisher<Message<T>>))。此外,通过IntegrationFlowBuilder.toReactivePublisher()操作符,IntegrationFlow可以转换为响应式热源。在这两种情况下,内部都使用了FluxMessageChannel;它可以根据其ReactiveStreamsSubscribableChannel契约订阅入站Publisher,并且它本身也是下游订阅者的Publisher<Message<?>>。通过动态注册IntegrationFlow,我们可以实现强大的逻辑,将Reactive Streams与集成流桥接至/从Publisher结合起来。
从版本5.5.6开始,新增了一个 toReactivePublisher(boolean autoStartOnSubscribe) 操作符变体,用于控制返回的 Publisher<Message<?>> 背后整个 IntegrationFlow 的生命周期。通常,对响应式发布者的订阅和消费发生在较晚的运行时阶段,而不是在响应式流组合期间,甚至不是在 ApplicationContext 启动期间。为了避免在 Publisher<Message<?>> 订阅点进行 IntegrationFlow 生命周期管理的样板代码,并提供更好的最终用户体验,引入了这个带有 autoStartOnSubscribe 标志的新操作符。如果该标志为 true,它会将 IntegrationFlow 及其组件标记为 autoStartup = false,这样 ApplicationContext 就不会自动启动流中的消息生产和消费。相反,IntegrationFlow 的 start() 会从内部的 Flux.doOnSubscribe() 中启动。无论 autoStartOnSubscribe 的值如何,流都会通过 Flux.doOnCancel() 和 Flux.doOnTerminate() 停止——如果没有内容来消费消息,那么生产消息就没有意义。
对于完全相反的使用场景,当 IntegrationFlow 需要调用响应式流并在完成后继续执行时,IntegrationFlowDefinition 中提供了 fluxTransform() 操作符。此时流程会转换为 FluxMessageChannel,并传播到提供的 fluxFunction 中,在 Flux.transform() 操作符中执行。函数的结果会被包装成 Mono<Message<?>>,通过 flat-mapping 操作进入输出 Flux,该输出 Flux 由另一个 FluxMessageChannel 订阅以继续下游流程。
更多信息请参见 Java DSL 章节。
ReactiveMessageHandler
从版本5.3开始,框架原生支持 ReactiveMessageHandler。这类消息处理器专为响应式客户端设计,这些客户端返回响应式类型以按需订阅底层操作执行,且不提供任何回复数据来继续响应式流组合。当在命令式集成流中使用 ReactiveMessageHandler 时,handleMessage() 的结果会在返回后立即被订阅,这仅仅是因为在此类流中没有响应式流组合来遵循背压。在这种情况下,框架会将此 ReactiveMessageHandler 包装进 ReactiveMessageHandlerAdapter —— 一个 MessageHandler 的普通实现。然而,当流中涉及 ReactiveStreamsConsumer 时(例如,当消费通道是 FluxMessageChannel),这样的 ReactiveMessageHandler 会通过 Reactor 操作符 flatMap() 组合到整个响应式流中,以在消费期间遵循背压。
其中一个开箱即用的 ReactiveMessageHandler 实现是用于出站通道适配器的 ReactiveMongoDbStoringMessageHandler。更多信息请参阅 MongoDB 响应式通道适配器。
从 6.1 版本开始,IntegrationFlowDefinition 提供了一个便捷的终端操作符 handleReactive(ReactiveMessageHandler)。任何 ReactiveMessageHandler 实现(甚至只是一个使用 Mono API 的简单 lambda 表达式)都可以用于此操作符。框架会自动订阅返回的 Mono<Void>。以下是此操作符的一个简单配置示例:
@Bean
public IntegrationFlow wireTapFlow1() {
return IntegrationFlow.from("tappedChannel1")
.wireTap("tapChannel", wt -> wt.selector(m -> m.getPayload().equals("foo")))
.handleReactive((message) -> Mono.just(message).log().then());
}
该操作符的一个重载版本接受一个 Consumer<GenericEndpointSpec<ReactiveMessageHandlerAdapter>> 参数,用于围绕提供的 ReactiveMessageHandler 自定义消费者端点。
此外,还提供了基于 ReactiveMessageHandlerSpec 的变体。在大多数情况下,它们用于特定协议的通道适配器实现。请参阅下一节,其中包含指向具有相应响应式通道适配器的目标技术的链接。
响应式通道适配器
当集成目标协议提供了Reactive Streams解决方案时,在Spring Integration中实现通道适配器就变得非常直接。
入站事件驱动通道适配器的实现,其核心在于将请求(如有必要)包装为延迟的 Mono 或 Flux,并仅在协议组件对监听器方法返回的 Mono 发起订阅时,才执行 send 操作(并在需要时生成回复)。通过这种方式,我们在此组件中封装了一个完全符合响应式流的解决方案。当然,下游集成流在订阅输出通道时,应遵循响应式流规范,并以按需、支持背压的方式执行。
由于集成流中使用的 MessageHandler 处理器的特性(或当前实现),这并非总是可用。当缺乏响应式实现时,可以通过在集成端点前后使用线程池和队列或 FluxMessageChannel(参见上文)来处理此限制。
一个反应式事件驱动入站通道适配器的示例:
public class CustomReactiveMessageProducer extends MessageProducerSupport {
private final CustomReactiveSource customReactiveSource;
public CustomReactiveMessageProducer(CustomReactiveSource customReactiveSource) {
Assert.notNull(customReactiveSource, "'customReactiveSource' must not be null");
this.customReactiveSource = customReactiveSource;
}
@Override
protected void doStart() {
Flux<Message<?>> messageFlux =
this.customReactiveSource
.map(event - >
MessageBuilder
.withPayload(event.getBody())
.setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
.build());
subscribeToPublisher(messageFlux);
}
}
用法如下:
public class MainFlow {
@Autowired
private CustomReactiveMessageProducer customReactiveMessageProducer;
@Bean
public IntegrationFlow buildFlow() {
return IntegrationFlow.from(customReactiveMessageProducer)
.channel(outputChannel)
.get();
}
}
或者以声明式的方式:
public class MainFlow {
@Bean
public IntegrationFlow buildFlow() {
return IntegrationFlow.from(new CustomReactiveMessageProducer(new CustomReactiveSource()))
.handle(outputChannel)
.get();
}
}
或者即使没有通道适配器,我们也可以始终通过以下方式使用 Java DSL:
public class MainFlow {
@Bean
public IntegrationFlow buildFlow() {
Flux<Message<?>> myFlux = this.customReactiveSource
.map(event ->
MessageBuilder
.withPayload(event.getBody())
.setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
.build());
return IntegrationFlow.from(myFlux)
.handle(outputChannel)
.get();
}
}
一个反应式出站通道适配器的实现,涉及根据目标协议提供的反应式API,启动(或延续)一个反应式流以与外部系统进行交互。入站负载本身可以是反应式类型,也可以作为整个集成流中的一个事件,而该集成流是上层反应式流的一部分。如果我们处于单向、发送即忘的场景中,返回的反应式类型可以立即被订阅;或者,在请求-回复场景中,它会被传播到下游,用于进一步的集成流或目标业务逻辑中的显式订阅,但下游仍会保持反应式流的语义。
一个响应式出站通道适配器的示例:
public class CustomReactiveMessageHandler extends AbstractReactiveMessageHandler {
private final CustomEntityOperations customEntityOperations;
public CustomReactiveMessageHandler(CustomEntityOperations customEntityOperations) {
Assert.notNull(customEntityOperations, "'customEntityOperations' must not be null");
this.customEntityOperations = customEntityOperations;
}
@Override
protected Mono<Void> handleMessageInternal(Message<?> message) {
return Mono.fromSupplier(() -> message.getHeaders().get("queryType", Type.class))
.flatMap(mode -> {
switch (mode) {
case INSERT:
return handleInsert(message);
case UPDATE:
return handleUpdate(message);
default:
return Mono.error(new IllegalArgumentException());
}
}).then();
}
private Mono<Void> handleInsert(Message<?> message) {
return this.customEntityOperations.insert(message.getPayload())
.then();
}
private Mono<Void> handleUpdate(Message<?> message) {
return this.r2dbcEntityOperations.update(message.getPayload())
.then();
}
public enum Type {
INSERT,
UPDATE,
}
}
我们将能够同时使用这两种通道适配器:
public class MainFlow {
@Autowired
private CustomReactiveMessageProducer customReactiveMessageProducer;
@Autowired
private CustomReactiveMessageHandler customReactiveMessageHandler;
@Bean
public IntegrationFlow buildFlow() {
return IntegrationFlow.from(customReactiveMessageProducer)
.transform(someOperation)
.handle(customReactiveMessageHandler)
.get();
}
}
目前,Spring Integration 为 WebFlux、RSocket、MongoDb、R2DBC、ZeroMQ、GraphQL、Apache Cassandra 提供了通道适配器(或网关)实现。Redis Stream 通道适配器 也是响应式的,并使用 Spring Data 中的 ReactiveStreamOperations。更多响应式通道适配器即将推出,例如基于 Spring for Apache Kafka 中的 ReactiveKafkaProducerTemplate 和 ReactiveKafkaConsumerTemplate 的 Kafka 适配器等。对于许多其他非响应式通道适配器,建议使用线程池以避免在响应式流处理期间发生阻塞。
从响应式到命令式的上下文传播
当 Context Propagation 库位于类路径上时,Project Reactor 可以获取 ThreadLocal 值(例如 Micrometer Observation 或 SecurityContextHolder)并将其存储到 Subscriber 上下文中。当我们需要为追踪填充日志 MDC,或者让从响应式流中调用的服务从作用域恢复观察时,反向操作也是可行的。有关其上下文传播的特殊操作符,请参阅 Project Reactor 文档 获取更多信息。如果我们的整个解决方案是单个响应式流组合,那么存储和恢复上下文的工作会很顺畅,因为 Subscriber 上下文从下游到组合(Flux 或 Mono)的起点都是可见的。但是,如果应用程序在不同的 Flux 实例之间切换,或者切换到命令式处理再切换回来,那么绑定到 Subscriber 的上下文可能就不可用了。对于这种用例,Spring Integration 提供了额外的能力(从版本 6.0.5 开始),将 Reactor ContextView 存储到从响应式流产生的 IntegrationMessageHeaderAccessor.REACTOR_CONTEXT 消息头中,例如当我们执行直接的 send() 操作时。然后,在 FluxMessageChannel.subscribeTo() 中使用此头来为该通道将要发出的 Message 恢复 Reactor 上下文。目前,此头由 WebFluxInboundEndpoint 和 RSocketInboundGateway 组件填充,但也可用于任何执行响应式到命令式集成的解决方案中。填充此头的逻辑如下:
return requestMono
.flatMap((message) ->
Mono.deferContextual((context) ->
Mono.just(message)
.handle((messageToSend, sink) ->
send(messageWithReactorContextIfAny(messageToSend, context)))));
...
private Message<?> messageWithReactorContextIfAny(Message<?> message, ContextView context) {
if (!context.isEmpty()) {
return getMessageBuilderFactory()
.fromMessage(message)
.setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, context)
.build();
}
return message;
}
请注意,我们仍然需要使用 handle() 操作符来让 Reactor 从上下文中恢复 ThreadLocal 值。即使它作为标头发送,框架也无法假设它是否会在下游恢复到 ThreadLocal 值。
为了在另一个 Flux 或 Mono 组合中从 Message 恢复上下文,可以执行以下逻辑:
Mono.just(message)
.handle((messageToHandle, sink) -> ...)
.contextWrite(StaticMessageHeaderAccessor.getReactorContext(message)));