Java DSL
Spring Integration 的 Java 配置和 DSL 提供了一组便捷的构建器和流畅的 API,让你可以从 Spring @Configuration
类中配置 Spring Integration 消息流。
(另请参见 Kotlin DSL。)
(另见 Groovy DSL。)
Spring Integration 的 Java DSL 实质上是 Spring Integration 的一个外观。DSL 提供了一种简单的方法,通过使用流畅的 Builder
模式以及来自 Spring Framework 和 Spring Integration 的现有 Java 配置,将 Spring Integration 消息流嵌入到应用程序中。我们还使用并支持 lambda(Java 8 中可用)来进一步简化 Java 配置。
cafe 提供了使用 DSL 的一个很好的示例。
DSL 由 IntegrationFlow
流畅 API(参见 IntegrationFlowBuilder
)提供。这会产生一个 IntegrationFlow
组件,该组件应作为 Spring bean 注册(通过使用 @Bean
注解)。构建器模式用于将任意复杂的结构表达为可以接受 lambda 作为参数的方法层次结构。
IntegrationFlowBuilder
仅收集集成组件(MessageChannel
实例、AbstractEndpoint
实例等)到 IntegrationFlow
bean 中,以便由 IntegrationFlowBeanPostProcessor
进一步解析并在应用上下文中注册具体的 bean。
Java DSL 直接使用 Spring Integration 类,绕过了任何 XML 生成和解析。然而,DSL 提供的不仅仅是 XML 之上的语法糖。它最吸引人的特性之一是能够定义内联 lambda 来实现端点逻辑,消除了对外部类实现自定义逻辑的需求。在某种意义上,Spring Integration 对 Spring 表达式语言 (SpEL) 和内联脚本的支持也解决了这个问题,但 lambda 更简单且功能更强大。
以下示例展示了如何使用 Java 配置来配置 Spring Integration:
@Configuration
@EnableIntegration
public class MyConfiguration {
@Bean
public AtomicInteger integerSource() {
return new AtomicInteger();
}
@Bean
public IntegrationFlow myFlow(AtomicInteger integerSource) {
return IntegrationFlow.fromSupplier(integerSource::getAndIncrement,
c -> c.poller(Pollers.fixedRate(100)))
.channel("inputChannel")
.filter((Integer p) -> p > 0)
.transform(Object::toString)
.channel(MessageChannels.queue())
.get();
}
}
上述配置示例的结果是在 ApplicationContext
启动后,创建 Spring Integration 端点和消息通道。Java 配置既可以用于替代也可以用于补充 XML 配置。你无需替换所有的现有 XML 配置即可使用 Java 配置。
部分总结
📄️ DSL 基础
org.springframework.integration.dsl 包包含前面提到的 IntegrationFlowBuilder API 和多个 IntegrationComponentSpec 实现,这些也是构建器,并提供流畅的 API 来配置具体的端点。IntegrationFlowBuilder 基础设施为基于消息的应用程序提供了常见的企业集成模式(EIP),例如通道、端点、轮询器和通道拦截器。
📄️ 消息通道
除了带有 EIP 方法的 IntegrationFlowBuilder,Java DSL 还提供了一个流畅的 API 来配置 MessageChannel 实例。为此,提供了 MessageChannels 构建工厂。以下示例展示了如何使用它:
📄️ 轮询器
Spring Integration 还提供了一个流利的 API,可以让您为 AbstractPollingEndpoint 实现配置 PollerMetadata。您可以使用 Pollers 构建工厂来配置常见的 bean 定义或那些由 IntegrationFlowBuilder EIP 方法创建的定义,如下例所示:
📄️ reactive() 端点
从 5.5 版本开始,ConsumerEndpointSpec 提供了一个 reactive() 配置属性,带有一个可选的自定义函数 Function\<? super Flux\<Message\<?>>, ? extends Publisher\<Message\<?>>>。此选项将目标端点配置为 ReactiveStreamsConsumer 实例,与输入通道类型无关,该输入通道类型通过 IntegrationReactiveUtils.messageChannelToFlux() 转换为 Flux。提供的函数用于 Flux.transform() 操作符,以自定义(如 publishOn()、log()、doOnNext() 等)来自输入通道的反应式流源。
📄️ DSL 和端点配置
所有 IntegrationFlowBuilder EIP 方法都有一个变体,它应用 lambda 参数为 AbstractEndpoint 实例提供选项:SmartLifecycle、PollerMetadata、request-handler-advice-chain 等。它们每一个都有泛型参数,因此可以在上下文中配置端点及其 MessageHandler,如下例所示:
📄️ Transformers
DSL API 提供了一个方便、流畅的 Transformers 工厂,可用于在 .transform() EIP 方法内作为内联目标对象定义。以下示例展示了如何使用它:
📄️ 入站通道适配器
通常,消息流从入站通道适配器开始(例如 \<int-jdbc:inbound-channel-adapter>)。适配器配置有 \<poller>,它会定期要求 MessageSource\<?> 生成消息。Java DSL 也允许从 MessageSource\<?> 启动 IntegrationFlow。为此,IntegrationFlow 流式 API 提供了一个重载的 IntegrationFlow.from(MessageSource\<?> messageSource) 方法。您可以将 MessageSource\<?> 配置为一个 bean,并将其作为该方法的参数提供。IntegrationFlow.from() 的第二个参数是一个 Consumer\<SourcePollingChannelAdapterSpec> lambda,它允许您为 SourcePollingChannelAdapter 提供选项(如 PollerMetadata 或 SmartLifecycle)。以下示例展示了如何使用流式 API 和 lambda 创建 IntegrationFlow:
📄️ 消息路由
Spring Integration 本地提供了专门的路由器类型,包括:
📄️ 拆分器
要创建一个拆分器,使用 split() EIP 方法。默认情况下,如果有效负载是 Iterable、Iterator、Array、Stream 或反应式 Publisher,split() 方法会将每个项输出为单个消息。它接受一个 lambda、一个 SpEL 表达式或任何 AbstractMessageSplitter 实现。或者,你可以不带参数使用它以提供 DefaultMessageSplitter。以下示例展示了如何通过提供 lambda 来使用 splitWith() 方法:
📄️ 聚合器和重排序器
聚合器概念上是拆分器的对立面。它将一系列单独的消息聚合为一条消息,因此必然更加复杂。默认情况下,聚合器返回一条包含来自传入消息的有效载荷集合的消息。相同的规则也适用于重排序器。以下示例展示了拆分器-聚合器模式的一个典型例子
📄️ 服务激活器和服务的 .handle() 方法
.handle() EIP 方法的目标是调用任何 MessageHandler 实现或某些 POJO 上的任何方法。另一种选择是使用 lambda 表达式定义一个“活动”。因此,我们引入了一个通用的 GenericHandler 函数接口。从 5.1 版开始,其 handle 方法需要两个参数:P payload 和 MessageHeaders headers。有了这些,我们可以如下定义一个流程:
📄️ 操作符 gateway()
在 IntegrationFlow 定义中的 gateway() 操作符是一种特殊的服务激活器实现,用于通过其输入通道调用其他端点或集成流并等待回复。从技术上讲,它在 \<chain> 定义中扮演着与嵌套的 \<gateway> 组件相同的角色(参见从链内部调用链),并且使流程更简洁、更直接。从业务逻辑和业务角度来看,它是一个消息网关,允许在目标集成解决方案的不同部分之间分发和重用功能(参见消息网关)。此操作符有多个重载版本以实现不同目的:
📄️ 操作符 log()
为了方便起见,为了记录消息在 Spring Integration 流中的旅程(),提供了一个 log() 操作符。内部实现上,它由带有 LoggingHandler 订阅者的 WireTap ChannelInterceptor 表示。它负责将传入的消息记录到下一个端点或当前通道。以下示例展示了如何使用 LoggingHandler:
📄️ 操作 intercept()
从 5.3 版本开始,intercept() 操作符允许在当前流中的 MessageChannel 注册一个或多个 ChannelInterceptor 实例。这是通过 MessageChannels API 显式创建 MessageChannel 的一种替代方法。以下示例使用 MessageSelectingInterceptor 通过抛出异常来拒绝某些消息:
📄️ MessageChannelSpec.wireTap()
Spring Integration 包含了 .wireTap() 流畅 API MessageChannelSpec 构建器。以下示例展示了如何使用 wireTap 方法记录输入:
📄️ Working With 消息流
IntegrationFlowBuilder 提供了一级 API 来生成与消息流连接的集成组件。当您的集成可以通过单一流程完成时(这通常是情况),这是很方便的。或者,IntegrationFlow 实例可以通过 MessageChannel 实例进行连接。
📄️ 函数表达式
我们引入了 FunctionExpression 类(SpEL 的 Expression 接口的一个实现)以让我们可以使用 lambda 和泛型。为 DSL 组件提供了 Function\<T, R> 选项,以及表达式选项,在有来自核心 Spring Integration 的隐式 Strategy 变体时。以下示例展示了如何使用函数表达式:
📄️ 子流支持
一些 if…else 和发布-订阅组件提供了使用子流来指定其逻辑或映射的功能。最简单的例子是 .publishSubscribeChannel(),如下例所示:
📄️ 使用协议适配器
到目前为止,所有示例都说明了 DSL 如何通过使用 Spring Integration 编程模型来支持消息传递架构。然而,我们尚未进行任何真正的集成。这样做需要通过 HTTP、JMS、AMQP、TCP、JDBC、FTP、SMTP 等访问远程资源,或访问本地文件系统。Spring Integration 支持所有这些以及更多。理想情况下,DSL 应该为所有这些提供一流的支持,但实现所有这些并跟上 Spring Integration 添加的新适配器是一项艰巨的任务。因此,期望 DSL 能够不断赶上 Spring Integration。
📄️ 集成流适配器
可以直接实现 IntegrationFlow 接口并指定为可扫描的组件,如下例所示:
📄️ 动态和运行时集成流
IntegrationFlow 及其所有依赖组件可以在运行时注册。在 5.0 版本之前,我们使用 BeanFactory.registerSingleton() 钩子。从 Spring Framework 5.0 开始,我们使用 instanceSupplier 钩子进行编程式的 BeanDefinition 注册。以下示例展示了如何编程式地注册一个 bean:
📄️ 集成流作为网关
集成流可以从提供 GatewayProxyFactoryBean 组件的服务接口开始,如下例所示:
📄️ DSL 扩展
从 5.3 版本开始,引入了 IntegrationFlowExtension 以允许使用自定义或组合的 EIP 操作符扩展现有的 Java DSL。所需要做的就是扩展这个类,该类提供可以在 IntegrationFlow bean 定义中使用的方法。扩展类也可以用于自定义 IntegrationComponentSpec 配置;例如,可以实现现有 IntegrationComponentSpec 扩展中缺失或默认的选项。下面的示例演示了一个组合的自定义操作符以及用于默认自定义 outputProcessor 的 AggregatorSpec 扩展的用法:
📄️ 集成流组合
在 Spring Integration 中,随着 MessageChannel 抽象成为一级公民,集成流的组合一直被认为是一个前提。流中任何端点的输入通道都可以用于从任何其他端点发送消息,而不仅仅是从将此通道作为输出的那个端点发送。此外,通过 @MessagingGateway 合约、内容充实组件、复合端点(如 \<chain>),以及现在的 IntegrationFlow beans(例如 IntegrationFlowAdapter),在较短的、可重用的部分之间分配业务逻辑变得非常直接。最终组合所需要的只是关于要发送到或接收来自哪个 MessageChannel 的知识。