子流程支持
一些 if…else 和 publish-subscribe 组件提供了通过使用子流来指定其逻辑或映射的能力。最简单的示例是 .publishSubscribeChannel(),如下所示:
@Bean
public IntegrationFlow subscribersFlow() {
return flow -> flow
.publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s
.subscribe(f -> f
.<Integer>handle((p, h) -> p / 2)
.channel(c -> c.queue("subscriber1Results")))
.subscribe(f -> f
.<Integer>handle((p, h) -> p * 2)
.channel(c -> c.queue("subscriber2Results"))))
.<Integer>handle((p, h) -> p * 3)
.channel(c -> c.queue("subscriber3Results"));
}
您也可以通过单独的 IntegrationFlow @Bean 定义来实现相同的结果,但我们希望您能发现子流风格的逻辑组合方式很有用。我们发现这种方式能使代码更简短(从而更具可读性)。
从 5.3 版本开始,提供了一个基于 BroadcastCapableChannel 的 publishSubscribeChannel() 实现,用于在代理支持的消息通道上配置子流订阅者。例如,我们现在可以在 Jms.publishSubscribeChannel() 上将多个订阅者配置为子流:
@Bean
public JmsPublishSubscribeMessageChannelSpec jmsPublishSubscribeChannel() {
return Jms.publishSubscribeChannel(jmsConnectionFactory())
.destination("pubsub");
}
@Bean
public IntegrationFlow pubSubFlow(BroadcastCapableChannel jmsPublishSubscribeChannel) {
return f -> f
.publishSubscribeChannel(jmsPublishSubscribeChannel,
pubsub -> pubsub
.subscribe(subFlow -> subFlow
.channel(c -> c.queue("jmsPubSubBridgeChannel1")))
.subscribe(subFlow -> subFlow
.channel(c -> c.queue("jmsPubSubBridgeChannel2"))));
}
类似的发布-订阅子流程组合提供了.routeToRecipients()方法。
另一个例子是在 .filter() 方法中使用 .discardFlow() 而非 .discardChannel()。
.route() 方法值得特别关注。请看以下示例:
@Bean
public IntegrationFlow routeFlow() {
return f -> f
.<Integer, Boolean>route(p -> p % 2 == 0,
m -> m.channelMapping("true", "evenChannel")
.subFlowMapping("false", sf ->
sf.<Integer>handle((p, h) -> p * 3)))
.transform(Object::toString)
.channel(c -> c.queue("oddChannel"));
}
.channelMapping() 在常规 Router 映射中的工作方式保持不变,而 .subFlowMapping() 则将子流程与主流程绑定。换句话说,任何路由器的子流程在 .route() 之后都会返回到主流程。
有时,你需要从 .subFlowMapping() 中引用一个已有的 IntegrationFlow @Bean。以下示例展示了如何实现:
@Bean
public IntegrationFlow splitRouteAggregate() {
return f -> f
.split()
.<Integer, Boolean>route(o -> o % 2 == 0,
m -> m
.subFlowMapping(true, oddFlow())
.subFlowMapping(false, sf -> sf.gateway(evenFlow())))
.aggregate();
}
@Bean
public IntegrationFlow oddFlow() {
return f -> f.handle(m -> System.out.println("odd"));
}
@Bean
public IntegrationFlow evenFlow() {
return f -> f.handle((p, h) -> "even");
}
在这种情况下,当你需要从此类子流接收回复并继续主流程时,必须将此 IntegrationFlow bean 引用(或其输入通道)用 .gateway() 包装,如上述示例所示。上述示例中的 oddFlow() 引用未被包装到 .gateway() 中。因此,我们不期望从该路由分支获得回复。否则,你最终会遇到类似以下的异常:
Caused by: org.springframework.beans.factory.BeanCreationException:
The 'currentComponent' (org.springframework.integration.router.MethodInvokingRouter@7965a51c)
is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'.
This is the end of the integration flow.
当你将子流配置为 lambda 表达式时,框架会处理与子流的请求-回复交互,此时不需要网关。
子流程可以嵌套到任意深度,但我们不建议这样做。实际上,即使在路由器场景中,在流程中添加复杂的子流程也会迅速变得像一盘意大利面一样混乱,让人难以理解。
在 DSL 支持子流配置的情况下,当被配置的组件通常需要一个通道,并且该子流以 channel() 元素开始时,框架会在组件输出通道和流的输入通道之间隐式放置一个 bridge()。例如,在这个 filter 定义中:
.filter(p -> p instanceof String, e -> e
.discardFlow(df -> df
.channel(MessageChannels.queue())
...)
框架内部会创建一个 DirectChannel bean 以注入到 MessageFilter.discardChannel 中。然后,它将子流包装成一个 IntegrationFlow,该流以这个用于订阅的隐式通道开始,并在流中指定的 channel() 之前放置一个 bridge。当使用一个现有的 IntegrationFlow bean 作为子流引用(而不是内联子流,例如 lambda)时,则不需要这样的桥接,因为框架可以从流 bean 中解析出第一个通道。而对于内联子流,输入通道尚不可用。