跳到主要内容
版本:7.0.2

Java DSL

DeepSeek V3 中英对照 Java DSL

Spring Integration Java配置与DSL提供了一组便捷的构建器和流畅的API,使您能够通过Spring @Configuration类来配置Spring Integration消息流。

(另请参阅 Kotlin DSL。)

(另请参阅 Groovy DSL。)

Spring Integration 的 Java DSL 本质上是 Spring Integration 的一个门面。该 DSL 通过流畅的 Builder 模式,结合 Spring Framework 和 Spring Integration 中现有的 Java 配置,提供了一种将 Spring Integration 消息流嵌入到应用程序中的简单方法。我们还使用并支持 lambda 表达式(Java 8 及以上版本可用),以进一步简化 Java 配置。

café 示例很好地展示了 DSL 的使用方式。

DSL 通过 IntegrationFlow 流式 API(参见 IntegrationFlowBuilder)呈现。这会生成 IntegrationFlow 组件,该组件应注册为 Spring bean(通过使用 @Bean 注解)。构建器模式用于将任意复杂的结构表达为方法层次结构,这些方法可以接受 lambda 表达式作为参数。

IntegrationFlowBuilder 仅在 IntegrationFlow bean 中收集集成组件(MessageChannel 实例、AbstractEndpoint 实例等),以便由 IntegrationFlowBeanPostProcessor 进一步解析并在应用程序上下文中注册具体的 bean。

Java DSL 直接使用 Spring Integration 类,绕过了任何 XML 生成和解析过程。然而,DSL 提供的不仅仅是 XML 之上的语法糖。其最引人注目的特性之一是能够定义内联 lambda 表达式来实现端点逻辑,从而无需外部类来实现自定义逻辑。从某种意义上说,Spring Integration 对 Spring 表达式语言(SpEL)和内联脚本的支持解决了这个问题,但 lambda 表达式更简单且功能更强大。

以下示例展示了如何为 Spring Integration 使用 Java 配置:

@Configuration
@EnableIntegration
public class MyConfiguration {

@Bean
public AtomicInteger integerSource() {
return new AtomicInteger();
}

@Bean
public IntegrationFlow myFlow(AtomicInteger integerSource) {
return IntegrationFlow.fromSupplier(integerSource::getAndIncrement,
c -> c.poller(Pollers.fixedRate(100)))
.channel("inputChannel")
.filter((Integer p) -> p > 0)
.transform(Object::toString)
.channel(MessageChannels.queue())
.get();
}
}

上述配置示例的结果是,在 ApplicationContext 启动后,它会创建 Spring Integration 端点和消息通道。Java 配置既可用于替代也可用于增强 XML 配置。您无需替换所有现有的 XML 配置即可使用 Java 配置。

章节总结

📄️ reactive() 端点

从版本 5.5 开始,ConsumerEndpointSpec 提供了一个 reactive() 配置属性,它包含一个可选的定制器函数 Function>, ? extends Publisher\<Message\<?>>>。此选项将目标端点配置为 ReactiveStreamsConsumer 实例,与输入通道类型无关,输入通道会通过 IntegrationReactiveUtils.messageChannelToFlux() 转换为 Flux。提供的函数通过 Flux.transform() 操作符使用,用于定制来自输入通道的响应式流源(例如 publishOn()、log()、doOnNext() 等)。

📄️ 入站通道适配器

通常,消息流从入站通道适配器(例如 \<int-jdbc:inbound-channel-adapter>)开始。适配器配置了 \<poller>,它会定期请求 MessageSource\<?> 生成消息。Java DSL 也允许从 MessageSource\<?> 启动 IntegrationFlow。为此,IntegrationFlow 流式 API 提供了一个重载的 IntegrationFlow.from(MessageSource\<?> messageSource) 方法。您可以将 MessageSource\<?> 配置为一个 bean,并将其作为该方法的参数提供。IntegrationFlow.from() 的第二个参数是一个 Consumer\<SourcePollingChannelAdapterSpec> lambda 表达式,允许您为 SourcePollingChannelAdapter 提供选项(例如 PollerMetadata 或 SmartLifecycle)。以下示例展示了如何使用流式 API 和 lambda 表达式创建 IntegrationFlow:

📄️ Operator gateway()

在集成流定义中,gateway() 操作符是一种特殊的服务激活器实现,用于通过其输入通道调用其他端点或集成流并等待回复。从技术上讲,它与 \<chain> 定义中的嵌套 \<gateway> 组件(参见从链内调用链)扮演相同的角色,并能使流更简洁、更直接。从逻辑和业务角度来看,它是一个消息网关,允许在目标集成解决方案的不同部分之间分发和重用功能(参见消息网关)。该操作符针对不同目标有多个重载版本:

📄️ 使用协议适配器

到目前为止展示的所有示例都说明了DSL如何通过使用Spring Integration编程模型来支持消息传递架构。然而,我们尚未进行任何实际的集成操作。实现集成需要访问通过HTTP、JMS、AMQP、TCP、JDBC、FTP、SMTP等协议访问远程资源,或访问本地文件系统。Spring Integration支持所有这些协议及更多功能。理想情况下,DSL应该为所有这些功能提供一流的支持,但实现所有这些功能并跟上Spring Integration新增适配器的步伐是一项艰巨的任务。因此,期望DSL能够持续跟进Spring Integration的发展。

📄️ DSL 扩展

从版本 5.3 开始,引入了 IntegrationFlowExtension,允许通过自定义或组合的 EIP 操作符来扩展现有的 Java DSL。只需要扩展这个类,提供可在 IntegrationFlow bean 定义中使用的方法。该扩展类也可用于自定义 IntegrationComponentSpec 配置;例如,可以在现有的 IntegrationComponentSpec 扩展中实现缺失或默认的选项。以下示例演示了一个组合自定义操作符,以及用于默认自定义 outputProcessor 的 AggregatorSpec 扩展的用法:

📄️ 集成流组合

随着MessageChannel抽象在Spring Integration中成为一等公民,集成流的组合始终被假定。流中任何端点的输入通道都可以用于从任何其他端点发送消息,而不仅仅是从将此通道作为输出的端点发送。此外,通过@MessagingGateway契约、内容丰富器组件、复合端点(如)以及现在的IntegrationFlow bean(例如IntegrationFlowAdapter),将业务逻辑分布在更短、可重用的部分之间变得足够直接。最终组合所需的只是了解要发送到或接收消息的MessageChannel。