跳到主要内容
版本:7.0.2

动态与运行时集成流

DeepSeek V3 中英对照 Dynamic and Runtime Integration Flows

IntegrationFlow 及其所有依赖组件可以在运行时注册。在 5.0 版本之前,我们使用 BeanFactory.registerSingleton() 钩子。从 Spring Framework 5.0 开始,我们使用 instanceSupplier 钩子进行编程式的 BeanDefinition 注册。以下示例展示了如何以编程方式注册一个 bean:

BeanDefinition beanDefinition =
BeanDefinitionBuilder.genericBeanDefinition((Class<Object>) bean.getClass(), () -> bean)
.getRawBeanDefinition();

((BeanDefinitionRegistry) this.beanFactory).registerBeanDefinition(beanName, beanDefinition);

请注意,在前面的示例中,instanceSupplier 钩子是 genericBeanDefinition 方法的最后一个参数,在本例中由一个 lambda 表达式提供。

所有必要的bean初始化和生命周期管理都会自动完成,这与标准上下文配置中的bean定义方式一致。

为了简化开发体验,Spring Integration 引入了 IntegrationFlowContext 来在运行时注册和管理 IntegrationFlow 实例,如下例所示:

@Autowired
private AbstractServerConnectionFactory server1;

@Autowired
private IntegrationFlowContext flowContext;

...

@Test
public void testTcpGateways() {
TestingUtilities.waitListening(this.server1, null);

IntegrationFlow flow = f -> f
.handle(Tcp.outboundGateway(Tcp.netClient("localhost", this.server1.getPort())
.serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.lengthHeader1())
.id("client1"))
.remoteTimeout(m -> 5000))
.transform(Transformers.objectToString());

IntegrationFlowRegistration theFlow = this.flowContext.registration(flow).register();
assertThat(theFlow.getMessagingTemplate().convertSendAndReceive("foo", String.class), equalTo("FOO"));
}

当我们拥有多个配置选项,并且需要创建多个类似流程的实例时,这种方法非常有用。为此,我们可以遍历选项,并在循环中创建并注册 IntegrationFlow 实例。另一种情况是当我们的数据源不是基于 Spring 的,因此必须动态创建它。例如,Reactive Streams 事件源就是一个这样的示例,如下所示:

Flux<Message<?>> messageFlux =
Flux.just("1,2,3,4")
.map(v -> v.split(","))
.flatMapIterable(Arrays::asList)
.map(Integer::parseInt)
.map(GenericMessage<Integer>::new);

QueueChannel resultChannel = new QueueChannel();

IntegrationFlow integrationFlow =
IntegrationFlow.from(messageFlux)
.<Integer, Integer>transform(p -> p * 2)
.channel(resultChannel)
.get();

this.integrationFlowContext.registration(integrationFlow)
.register();

IntegrationFlowRegistrationBuilder(作为 IntegrationFlowContext.registration() 的结果)可用于为要注册的 IntegrationFlow 指定 bean 名称、控制其 autoStartup,以及注册非 Spring Integration bean。通常,这些额外的 bean 是连接工厂(AMQP、JMS、(S)FTP、TCP/UDP 等)、序列化器和反序列化器,或任何其他必需的支持组件。

您可以使用 IntegrationFlowRegistration.destroy() 回调来移除动态注册的 IntegrationFlow 及其所有依赖的 bean,当您不再需要它们时。更多信息请参阅 IntegrationFlowContext Javadoc

备注

从 5.0.6 版本开始,IntegrationFlow 定义中所有生成的 bean 名称都会以流 ID 作为前缀。我们建议始终指定一个明确的流 ID。否则,IntegrationFlowContext 中会启动一个同步屏障,为 IntegrationFlow 生成 bean 名称并注册其 bean。我们对这两个操作进行同步,以避免当相同的生成 bean 名称可能用于不同的 IntegrationFlow 实例时出现竞态条件。

此外,从 5.0.6 版本开始,注册构建器 API 新增了一个方法:useFlowIdAsPrefix()。当您希望声明同一流程的多个实例,并且避免流程中具有相同 ID 的组件发生 bean 名称冲突时,此方法非常有用,如下例所示:

private void registerFlows() {
IntegrationFlowRegistration flow1 =
this.flowContext.registration(buildFlow(1234))
.id("tcp1")
.useFlowIdAsPrefix()
.register();

IntegrationFlowRegistration flow2 =
this.flowContext.registration(buildFlow(1235))
.id("tcp2")
.useFlowIdAsPrefix()
.register();
}

private IntegrationFlow buildFlow(int port) {
return f -> f
.handle(Tcp.outboundGateway(Tcp.netClient("localhost", port)
.serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.lengthHeader1())
.id("client"))
.remoteTimeout(m -> 5000))
.transform(Transformers.objectToString());
}

在这种情况下,第一个流程的消息处理器可以通过 bean 名称 tcp1.client.handler 来引用。

备注

当你使用 useFlowIdAsPrefix() 时,需要提供一个 id 属性。