跳到主要内容

Java DSL

QWen Plus 中英对照 Java DSL

Spring Integration 的 Java 配置和 DSL 提供了一组便捷的构建器和流畅的 API,让你可以从 Spring @Configuration 类中配置 Spring Integration 消息流。

(另请参见 Kotlin DSL。)

(另见 Groovy DSL。)

Spring Integration 的 Java DSL 实质上是 Spring Integration 的一个外观。DSL 提供了一种简单的方法,通过使用流畅的 Builder 模式以及来自 Spring Framework 和 Spring Integration 的现有 Java 配置,将 Spring Integration 消息流嵌入到应用程序中。我们还使用并支持 lambda(Java 8 中可用)来进一步简化 Java 配置。

cafe 提供了使用 DSL 的一个很好的示例。

DSL 由 IntegrationFlow 流畅 API(参见 IntegrationFlowBuilder)提供。这会产生一个 IntegrationFlow 组件,该组件应作为 Spring bean 注册(通过使用 @Bean 注解)。构建器模式用于将任意复杂的结构表达为可以接受 lambda 作为参数的方法层次结构。

IntegrationFlowBuilder 仅收集集成组件(MessageChannel 实例、AbstractEndpoint 实例等)到 IntegrationFlow bean 中,以便由 IntegrationFlowBeanPostProcessor 进一步解析并在应用上下文中注册具体的 bean。

Java DSL 直接使用 Spring Integration 类,绕过了任何 XML 生成和解析。然而,DSL 提供的不仅仅是 XML 之上的语法糖。它最吸引人的特性之一是能够定义内联 lambda 来实现端点逻辑,消除了对外部类实现自定义逻辑的需求。在某种意义上,Spring Integration 对 Spring 表达式语言 (SpEL) 和内联脚本的支持也解决了这个问题,但 lambda 更简单且功能更强大。

以下示例展示了如何使用 Java 配置来配置 Spring Integration:

@Configuration
@EnableIntegration
public class MyConfiguration {

@Bean
public AtomicInteger integerSource() {
return new AtomicInteger();
}

@Bean
public IntegrationFlow myFlow(AtomicInteger integerSource) {
return IntegrationFlow.fromSupplier(integerSource::getAndIncrement,
c -> c.poller(Pollers.fixedRate(100)))
.channel("inputChannel")
.filter((Integer p) -> p > 0)
.transform(Object::toString)
.channel(MessageChannels.queue())
.get();
}
}
java

上述配置示例的结果是在 ApplicationContext 启动后,创建 Spring Integration 端点和消息通道。Java 配置既可以用于替代也可以用于补充 XML 配置。你无需替换所有的现有 XML 配置即可使用 Java 配置。

部分总结

📄️ reactive() 端点

从 5.5 版本开始,ConsumerEndpointSpec 提供了一个 reactive() 配置属性,带有一个可选的自定义函数 Function\<? super Flux\<Message\<?>>, ? extends Publisher\<Message\<?>>>。此选项将目标端点配置为 ReactiveStreamsConsumer 实例,与输入通道类型无关,该输入通道类型通过 IntegrationReactiveUtils.messageChannelToFlux() 转换为 Flux。提供的函数用于 Flux.transform() 操作符,以自定义(如 publishOn()、log()、doOnNext() 等)来自输入通道的反应式流源。

📄️ 入站通道适配器

通常,消息流从入站通道适配器开始(例如 \<int-jdbc:inbound-channel-adapter>)。适配器配置有 \<poller>,它会定期要求 MessageSource\<?> 生成消息。Java DSL 也允许从 MessageSource\<?> 启动 IntegrationFlow。为此,IntegrationFlow 流式 API 提供了一个重载的 IntegrationFlow.from(MessageSource\<?> messageSource) 方法。您可以将 MessageSource\<?> 配置为一个 bean,并将其作为该方法的参数提供。IntegrationFlow.from() 的第二个参数是一个 Consumer\<SourcePollingChannelAdapterSpec> lambda,它允许您为 SourcePollingChannelAdapter 提供选项(如 PollerMetadata 或 SmartLifecycle)。以下示例展示了如何使用流式 API 和 lambda 创建 IntegrationFlow:

📄️ 操作符 gateway()

在 IntegrationFlow 定义中的 gateway() 操作符是一种特殊的服务激活器实现,用于通过其输入通道调用其他端点或集成流并等待回复。从技术上讲,它在 \<chain> 定义中扮演着与嵌套的 \<gateway> 组件相同的角色(参见从链内部调用链),并且使流程更简洁、更直接。从业务逻辑和业务角度来看,它是一个消息网关,允许在目标集成解决方案的不同部分之间分发和重用功能(参见消息网关)。此操作符有多个重载版本以实现不同目的:

📄️ 使用协议适配器

到目前为止,所有示例都说明了 DSL 如何通过使用 Spring Integration 编程模型来支持消息传递架构。然而,我们尚未进行任何真正的集成。这样做需要通过 HTTP、JMS、AMQP、TCP、JDBC、FTP、SMTP 等访问远程资源,或访问本地文件系统。Spring Integration 支持所有这些以及更多。理想情况下,DSL 应该为所有这些提供一流的支持,但实现所有这些并跟上 Spring Integration 添加的新适配器是一项艰巨的任务。因此,期望 DSL 能够不断赶上 Spring Integration。

📄️ DSL 扩展

从 5.3 版本开始,引入了 IntegrationFlowExtension 以允许使用自定义或组合的 EIP 操作符扩展现有的 Java DSL。所需要做的就是扩展这个类,该类提供可以在 IntegrationFlow bean 定义中使用的方法。扩展类也可以用于自定义 IntegrationComponentSpec 配置;例如,可以实现现有 IntegrationComponentSpec 扩展中缺失或默认的选项。下面的示例演示了一个组合的自定义操作符以及用于默认自定义 outputProcessor 的 AggregatorSpec 扩展的用法:

📄️ 集成流组合

在 Spring Integration 中,随着 MessageChannel 抽象成为一级公民,集成流的组合一直被认为是一个前提。流中任何端点的输入通道都可以用于从任何其他端点发送消息,而不仅仅是从将此通道作为输出的那个端点发送。此外,通过 @MessagingGateway 合约、内容充实组件、复合端点(如 \<chain>),以及现在的 IntegrationFlow beans(例如 IntegrationFlowAdapter),在较短的、可重用的部分之间分配业务逻辑变得非常直接。最终组合所需要的只是关于要发送到或接收来自哪个 MessageChannel 的知识。