Java DSL
Spring Integration Java配置与DSL提供了一组便捷的构建器和流畅的API,使您能够通过Spring @Configuration类来配置Spring Integration消息流。
(另请参阅 Kotlin DSL。)
(另请参阅 Groovy DSL。)
Spring Integration 的 Java DSL 本质上是 Spring Integration 的一个门面。该 DSL 通过流畅的 Builder 模式,结合 Spring Framework 和 Spring Integration 中现有的 Java 配置,提供了一种将 Spring Integration 消息流嵌入到应用程序中的简单方法。我们还使用并支持 lambda 表达式(Java 8 及以上版本可用),以进一步简化 Java 配置。
café 示例很好地展示了 DSL 的使用方式。
DSL 通过 IntegrationFlow 流式 API(参见 IntegrationFlowBuilder)呈现。这会生成 IntegrationFlow 组件,该组件应注册为 Spring bean(通过使用 @Bean 注解)。构建器模式用于将任意复杂的结构表达为方法层次结构,这些方法可以接受 lambda 表达式作为参数。
IntegrationFlowBuilder 仅在 IntegrationFlow bean 中收集集成组件(MessageChannel 实例、AbstractEndpoint 实例等),以便由 IntegrationFlowBeanPostProcessor 进一步解析并在应用程序上下文中注册具体的 bean。
Java DSL 直接使用 Spring Integration 类,绕过了任何 XML 生成和解析过程。然而,DSL 提供的不仅仅是 XML 之上的语法糖。其最引人注目的特性之一是能够定义内联 lambda 表达式来实现端点逻辑,从而无需外部类来实现自定义逻辑。从某种意义上说,Spring Integration 对 Spring 表达式语言(SpEL)和内联脚本的支持解决了这个问题,但 lambda 表达式更简单且功能更强大。
以下示例展示了如何为 Spring Integration 使用 Java 配置:
@Configuration
@EnableIntegration
public class MyConfiguration {
@Bean
public AtomicInteger integerSource() {
return new AtomicInteger();
}
@Bean
public IntegrationFlow myFlow(AtomicInteger integerSource) {
return IntegrationFlow.fromSupplier(integerSource::getAndIncrement,
c -> c.poller(Pollers.fixedRate(100)))
.channel("inputChannel")
.filter((Integer p) -> p > 0)
.transform(Object::toString)
.channel(MessageChannels.queue())
.get();
}
}
上述配置示例的结果是,在 ApplicationContext 启动后,它会创建 Spring Integration 端点和消息通道。Java 配置既可用于替代也可用于增强 XML 配置。您无需替换所有现有的 XML 配置即可使用 Java 配置。
章节总结
📄️ DSL 基础
org.springframework.integration.dsl 包包含之前提到的 IntegrationFlowBuilder API 以及许多 IntegrationComponentSpec 实现,这些实现同样是构建器,并提供用于配置具体端点的流畅 API。IntegrationFlowBuilder 基础设施为基于消息的应用程序提供了通用的企业集成模式(EIP),例如通道、端点、轮询器和通道拦截器。
📄️ 消息通道
除了带有企业集成模式(EIP)方法的IntegrationFlowBuilder,Java DSL还提供了一个流畅的API来配置MessageChannel实例。为此,提供了MessageChannels构建器工厂。以下示例展示了如何使用它:
📄️ 轮询器
Spring Integration 还提供了一个流畅的 API,允许您为 AbstractPollingEndpoint 实现配置 PollerMetadata。您可以使用 Pollers 构建器工厂来配置通用 bean 定义,或配置通过 IntegrationFlowBuilder EIP 方法创建的 bean 定义,如下例所示:
📄️ reactive() 端点
从版本 5.5 开始,ConsumerEndpointSpec 提供了一个 reactive() 配置属性,它包含一个可选的定制器函数 Function>, ? extends Publisher\<Message\<?>>>。此选项将目标端点配置为 ReactiveStreamsConsumer 实例,与输入通道类型无关,输入通道会通过 IntegrationReactiveUtils.messageChannelToFlux() 转换为 Flux。提供的函数通过 Flux.transform() 操作符使用,用于定制来自输入通道的响应式流源(例如 publishOn()、log()、doOnNext() 等)。
📄️ DSL 与端点配置
所有 IntegrationFlowBuilder EIP 方法都包含一个变体,该变体应用 lambda 参数来为 AbstractEndpoint 实例提供选项:SmartLifecycle、PollerMetadata、request-handler-advice-chain 等。每个方法都具有泛型参数,因此允许您在上下文中配置端点甚至其 MessageHandler,如下例所示:
📄️ Transformers
DSL API 提供了一个便捷的流式 Transformers 工厂,可在 .transform() EIP 方法中作为内联目标对象定义使用。以下示例展示了其使用方法:
📄️ 入站通道适配器
通常,消息流从入站通道适配器(例如 \<int-jdbc:inbound-channel-adapter>)开始。适配器配置了 \<poller>,它会定期请求 MessageSource\<?> 生成消息。Java DSL 也允许从 MessageSource\<?> 启动 IntegrationFlow。为此,IntegrationFlow 流式 API 提供了一个重载的 IntegrationFlow.from(MessageSource\<?> messageSource) 方法。您可以将 MessageSource\<?> 配置为一个 bean,并将其作为该方法的参数提供。IntegrationFlow.from() 的第二个参数是一个 Consumer\<SourcePollingChannelAdapterSpec> lambda 表达式,允许您为 SourcePollingChannelAdapter 提供选项(例如 PollerMetadata 或 SmartLifecycle)。以下示例展示了如何使用流式 API 和 lambda 表达式创建 IntegrationFlow:
📄️ 消息路由器
Spring Integration 原生提供了多种专用路由器类型,包括:
📄️ Splitters
要创建拆分器,请使用 split() EIP 方法。默认情况下,如果负载是 Iterable、Iterator、Array、Stream 或响应式 Publisher,split() 方法会将每个项目作为单独的消息输出。它接受 lambda 表达式、SpEL 表达式或任何 AbstractMessageSplitter 实现。或者,您也可以在不带参数的情况下使用它来提供 DefaultMessageSplitter。以下示例展示了如何通过提供 lambda 表达式来使用 splitWith() 方法:
📄️ 聚合器与重排序器
聚合器在概念上与拆分器相反。它将一系列独立消息聚合成单个消息,因此必然更为复杂。默认情况下,聚合器返回的消息包含来自传入消息的有效负载集合。同样的规则也适用于重排序器。以下示例展示了拆分器-聚合器模式的典型示例:
📄️ 服务激活器与 .handle() 方法
.handle() EIP 方法的目标是调用任何 MessageHandler 实现或某个 POJO 上的任何方法。另一种选择是使用 lambda 表达式定义“活动”。因此,我们引入了一个通用的 GenericHandler\<P> 函数式接口。它的 handle 方法需要两个参数:P 负载和 MessageHeaders 头信息(从 5.1 版本开始)。基于此,我们可以如下定义一个流:
📄️ Operator gateway()
在集成流定义中,gateway() 操作符是一种特殊的服务激活器实现,用于通过其输入通道调用其他端点或集成流并等待回复。从技术上讲,它与 \<chain> 定义中的嵌套 \<gateway> 组件(参见从链内调用链)扮演相同的角色,并能使流更简洁、更直接。从逻辑和业务角度来看,它是一个消息网关,允许在目标集成解决方案的不同部分之间分发和重用功能(参见消息网关)。该操作符针对不同目标有多个重载版本:
📄️ Operator log()
为了方便记录消息在 Spring Integration 流程中的流转过程(),我们提供了一个 log() 操作符。在内部,它由 WireTap ChannelInterceptor 实现,并以 LoggingHandler 作为其订阅者。它负责将传入的消息记录到下一个端点或当前通道中。以下示例展示了如何使用 LoggingHandler:
📄️ Operator intercept()
从 5.3 版本开始,intercept() 操作符允许在当前流程的 MessageChannel 中注册一个或多个 ChannelInterceptor 实例。这是通过 MessageChannels API 创建显式 MessageChannel 的替代方案。以下示例使用 MessageSelectingInterceptor 来拒绝某些消息并抛出异常:
📄️ MessageChannelSpec.wireTap()
Spring Integration 包含一个 .wireTap() 流式 API MessageChannelSpec 构建器。以下示例展示了如何使用 wireTap 方法记录输入:
📄️ 处理消息流
IntegrationFlowBuilder 提供了一个顶层 API,用于生成连接到消息流的集成组件。当您的集成可以通过单个流完成时(通常情况如此),这非常方便。或者,IntegrationFlow 实例可以通过 MessageChannel 实例进行连接。
📄️ FunctionExpression
我们引入了FunctionExpression类(SpEL的Expression接口的一个实现),以便让我们能够使用lambda表达式和泛型。当存在来自Spring Integration核心的隐式策略变体时,DSL组件会提供Function\<T, R>选项以及表达式选项。以下示例展示了如何使用函数表达式:
📄️ 子流程支持
部分 if…else 和发布-订阅组件支持通过使用子流来指定其逻辑或映射。最简单的示例是 .publishSubscribeChannel(),如下所示:
📄️ 使用协议适配器
到目前为止展示的所有示例都说明了DSL如何通过使用Spring Integration编程模型来支持消息传递架构。然而,我们尚未进行任何实际的集成操作。实现集成需要访问通过HTTP、JMS、AMQP、TCP、JDBC、FTP、SMTP等协议访问远程资源,或访问本地文件系统。Spring Integration支持所有这些协议及更多功能。理想情况下,DSL应该为所有这些功能提供一流的支持,但实现所有这些功能并跟上Spring Integration新增适配器的步伐是一项艰巨的任务。因此,期望DSL能够持续跟进Spring Integration的发展。
📄️ IntegrationFlowAdapter
IntegrationFlow 接口可以直接实现并指定为扫描组件,如下例所示:
📄️ 动态与运行时集成流
IntegrationFlow 及其所有依赖组件可以在运行时注册。在 5.0 版本之前,我们使用 BeanFactory.registerSingleton() 钩子。从 Spring Framework 5.0 开始,我们使用 instanceSupplier 钩子进行编程式的 BeanDefinition 注册。以下示例展示了如何以编程方式注册一个 bean:
📄️ IntegrationFlow 作为网关
IntegrationFlow 可以从提供 GatewayProxyFactoryBean 组件的服务接口开始,如下例所示:
📄️ DSL 扩展
从版本 5.3 开始,引入了 IntegrationFlowExtension,允许通过自定义或组合的 EIP 操作符来扩展现有的 Java DSL。只需要扩展这个类,提供可在 IntegrationFlow bean 定义中使用的方法。该扩展类也可用于自定义 IntegrationComponentSpec 配置;例如,可以在现有的 IntegrationComponentSpec 扩展中实现缺失或默认的选项。以下示例演示了一个组合自定义操作符,以及用于默认自定义 outputProcessor 的 AggregatorSpec 扩展的用法:
📄️ 集成流组合
随着MessageChannel抽象在Spring Integration中成为一等公民,集成流的组合始终被假定。流中任何端点的输入通道都可以用于从任何其他端点发送消息,而不仅仅是从将此通道作为输出的端点发送。此外,通过@MessagingGateway契约、内容丰富器组件、复合端点(如)以及现在的IntegrationFlow bean(例如IntegrationFlowAdapter),将业务逻辑分布在更短、可重用的部分之间变得足够直接。最终组合所需的只是了解要发送到或接收消息的MessageChannel。