处理消息流
IntegrationFlowBuilder
提供了一个顶级 API,用于生成连接到消息流的集成组件。当您的集成可以通过单一流程完成(这通常是情况)时,这是很方便的。或者,IntegrationFlow
实例可以通过 MessageChannel
实例进行连接。
默认情况下,MessageFlow
表现为 Spring Integration 中的 “链”。也就是说,端点会通过 DirectChannel
实例自动且隐式地连接。消息流实际上并不是以链的方式构建的,这提供了更大的灵活性。例如,如果知道某个组件的 inputChannel
名称(即如果显式定义它),可以将消息发送到流中的任何组件。还可以在流中引用外部定义的通道,以允许使用通道适配器(启用远程传输协议、文件 I/O 等),而不是直接通道。因此,DSL 不支持 Spring Integration 的 chain
元素,因为在这种情况下它并没有增加太多价值。
由于 Spring Integration Java DSL 产生的 bean 定义模型与其他任何配置选项相同,并且是基于现有的 Spring Framework @Configuration
基础设施,因此它可以与 XML 定义一起使用,并与 Spring Integration 消息注解配置结合。
你也可以通过使用 lambda 定义直接的 IntegrationFlow
实例。以下示例展示了如何做到这一点:
@Bean
public IntegrationFlow lambdaFlow() {
return f -> f.filter("World"::equals)
.transform("Hello "::concat)
.handle(System.out::println);
}
此定义的结果是同一组通过隐式直接通道连接的集成组件。这里唯一的限制是,此流程是从命名的直接通道 lambdaFlow.input
开始的。此外,Lambda 流不能从 MessageSource
或 MessageProducer
开始。
从 5.1 版本开始,这种 IntegrationFlow
被包装到代理中以暴露生命周期控制并提供对内部关联的 StandardIntegrationFlow
的 inputChannel
的访问。
从 5.0.6 版本开始,IntegrationFlow
中组件生成的 bean 名称包含以流 bean 加上点 (.
) 作为前缀。例如,前面示例中 .transform("Hello "::concat)
的 ConsumerEndpointFactoryBean
结果是一个名为 lambdaFlow.o.s.i.config.ConsumerEndpointFactoryBean#0
的 bean。(这里的 o.s.i
是 org.springframework.integration
的缩写,为了适应页面显示。)该端点的 Transformer
实现 bean 的名称为 lambdaFlow.transformer#0
(从 5.1 版本开始),其中不是使用 MethodInvokingTransformer
类的全限定名,而是使用其组件类型。当需要在流内部生成 bean 名称时,所有 NamedComponent
都应用了相同的模式。这些生成的 bean 名称会加上流 ID 作为前缀,以便于解析日志或在某些分析工具中将组件分组,以及避免在我们并行注册集成流时出现竞争条件。有关更多信息,请参阅动态和运行时集成流。