处理消息流
IntegrationFlowBuilder 提供了一个顶层 API,用于生成连接到消息流的集成组件。当您的集成可以通过单个流完成时(通常情况如此),这非常方便。或者,IntegrationFlow 实例可以通过 MessageChannel 实例进行连接。
默认情况下,MessageFlow 的行为类似于 Spring Integration 术语中的“链”。也就是说,端点通过 DirectChannel 实例自动且隐式地连接。消息流实际上并未构建为链,这提供了更大的灵活性。例如,如果您知道某个组件在流中的 inputChannel 名称(即,如果您显式定义了它),您可以将消息发送到流中的任何组件。您还可以在流中引用外部定义的通道,以允许使用通道适配器(以启用远程传输协议、文件 I/O 等),而不是直接通道。因此,DSL 不支持 Spring Integration 的 chain 元素,因为在这种情况下它并没有增加太多价值。
由于Spring Integration Java DSL生成的bean定义模型与其他配置选项相同,并且基于现有的Spring Framework @Configuration 基础设施,它可以与XML定义一起使用,并与Spring Integration消息注解配置进行连接。
你也可以通过使用 lambda 表达式来定义直接的 IntegrationFlow 实例。以下示例展示了如何实现:
@Bean
public IntegrationFlow lambdaFlow() {
return f -> f.filter("World"::equals)
.transform("Hello "::concat)
.handle(System.out::println);
}
此定义的结果与通过隐式直接通道连接的集成组件集合相同。这里的唯一限制是,此流程通过一个命名直接通道启动——lambdaFlow.input。此外,Lambda 流程不能从 MessageSource 或 MessageProducer 开始。
从版本5.1开始,这种 IntegrationFlow 被包装为代理,以暴露生命周期控制并提供对内部关联的 StandardIntegrationFlow 的 inputChannel 的访问。
从 5.0.6 版本开始,IntegrationFlow 中组件的生成 bean 名称会包含流程 bean 名称,后跟一个点号 (.) 作为前缀。例如,前面示例中的 .transform("Hello "::concat) 对应的 ConsumerEndpointFactoryBean 生成的 bean 名称为 lambdaFlow.o.s.i.config.ConsumerEndpointFactoryBean#0。(o.s.i 是 org.springframework.integration 的缩写,以适应页面显示。)该端点的 Transformer 实现 bean 的 bean 名称为 lambdaFlow.transformer#0(从 5.1 版本开始),这里使用了其组件类型,而不是 MethodInvokingTransformer 类的完全限定名。当必须在流程内生成 bean 名称时,所有 NamedComponent 都应用相同的模式。这些生成的 bean 名称会以流程 ID 作为前缀,目的是为了在解析日志或在某些分析工具中将组件分组在一起,以及避免在运行时同时注册集成流时出现竞争条件。更多信息,请参阅动态和运行时集成流。