跳到主要内容

处理消息流

QWen Plus 中英对照 Working With Message Flows

IntegrationFlowBuilder 提供了一个顶级 API,用于生成连接到消息流的集成组件。当您的集成可以通过单一流程完成(这通常是情况)时,这是很方便的。或者,IntegrationFlow 实例可以通过 MessageChannel 实例进行连接。

默认情况下,MessageFlow 表现为 Spring Integration 中的 “链”。也就是说,端点会通过 DirectChannel 实例自动且隐式地连接。消息流实际上并不是以链的方式构建的,这提供了更大的灵活性。例如,如果知道某个组件的 inputChannel 名称(即如果显式定义它),可以将消息发送到流中的任何组件。还可以在流中引用外部定义的通道,以允许使用通道适配器(启用远程传输协议、文件 I/O 等),而不是直接通道。因此,DSL 不支持 Spring Integration 的 chain 元素,因为在这种情况下它并没有增加太多价值。

由于 Spring Integration Java DSL 产生的 bean 定义模型与其他任何配置选项相同,并且是基于现有的 Spring Framework @Configuration 基础设施,因此它可以与 XML 定义一起使用,并与 Spring Integration 消息注解配置结合。

你也可以通过使用 lambda 定义直接的 IntegrationFlow 实例。以下示例展示了如何做到这一点:

@Bean
public IntegrationFlow lambdaFlow() {
return f -> f.filter("World"::equals)
.transform("Hello "::concat)
.handle(System.out::println);
}
java

此定义的结果是同一组通过隐式直接通道连接的集成组件。这里唯一的限制是,此流程是从命名的直接通道 lambdaFlow.input 开始的。此外,Lambda 流不能从 MessageSourceMessageProducer 开始。

从 5.1 版本开始,这种 IntegrationFlow 被包装到代理中以暴露生命周期控制并提供对内部关联的 StandardIntegrationFlowinputChannel 的访问。

从 5.0.6 版本开始,IntegrationFlow 中组件生成的 bean 名称包含以流 bean 加上点 (.) 作为前缀。例如,前面示例中 .transform("Hello "::concat)ConsumerEndpointFactoryBean 结果是一个名为 lambdaFlow.o.s.i.config.ConsumerEndpointFactoryBean#0 的 bean。(这里的 o.s.iorg.springframework.integration 的缩写,为了适应页面显示。)该端点的 Transformer 实现 bean 的名称为 lambdaFlow.transformer#0(从 5.1 版本开始),其中不是使用 MethodInvokingTransformer 类的全限定名,而是使用其组件类型。当需要在流内部生成 bean 名称时,所有 NamedComponent 都应用了相同的模式。这些生成的 bean 名称会加上流 ID 作为前缀,以便于解析日志或在某些分析工具中将组件分组,以及避免在我们并行注册集成流时出现竞争条件。有关更多信息,请参阅动态和运行时集成流