消息通道
除了带有企业集成模式(EIP)方法的 IntegrationFlowBuilder 外,Java DSL 还提供了一个流畅的 API 来配置 MessageChannel 实例。为此,提供了 MessageChannels 构建器工厂。以下示例展示了如何使用它:
@Bean
public PriorityChannelSpec priorityChannel() {
return MessageChannels.priority(this.mongoDbChannelMessageStore, "priorityGroup")
.interceptor(wireTap());
}
同一个 MessageChannels 构建器工厂可用于 IntegrationFlowBuilder 中的 channel() EIP 方法,以连接端点,类似于在 XML 配置中连接 input-channel/output-channel 对。默认情况下,端点通过 DirectChannel 实例连接,其 bean 名称基于以下模式:[IntegrationFlow.beanName].channel#[channelNameIndex]。此规则同样适用于通过内联 MessageChannels 构建器工厂使用产生的未命名通道。然而,所有 MessageChannels 方法都提供了一种变体,该变体支持 channelId,可用于设置 MessageChannel 实例的 bean 名称。MessageChannel 引用和 beanName 可用作 bean 方法调用。以下示例展示了使用 channel() EIP 方法的可能方式:
@Bean
public QueueChannelSpec queueChannel() {
return MessageChannels.queue();
}
@Bean
public PublishSubscribeChannelSpec<?> publishSubscribe() {
return MessageChannels.publishSubscribe();
}
@Bean
public IntegrationFlow channelFlow() {
return IntegrationFlow.from("input")
.fixedSubscriberChannel()
.channel("queueChannel")
.channel(publishSubscribe())
.channel(MessageChannels.executor("executorChannel", this.taskExecutor))
.channel("output")
.get();
}
-
from("input")表示“查找并使用 id 为 'input' 的MessageChannel,若不存在则创建”。 -
fixedSubscriberChannel()创建一个FixedSubscriberChannel实例,并将其注册为channelFlow.channel#0。 -
channel("queueChannel")的工作方式相同,但使用现有的queueChannelbean。 -
channel(publishSubscribe())是 bean 方法引用。 -
channel(MessageChannels.executor("executorChannel", this.taskExecutor))是IntegrationFlowBuilder,它将IntegrationComponentSpec暴露给ExecutorChannel,并将其注册为executorChannel。 -
channel("output")将DirectChannelbean 注册为output,前提是当前不存在同名的 bean。
注意:前面的 IntegrationFlow 定义是有效的,其所有通道都应用于带有 BridgeHandler 实例的端点。
注意,通过来自不同 IntegrationFlow 实例的 MessageChannels 工厂使用相同的行内通道定义时需谨慎。即使 DSL 解析器会将不存在的对象注册为 bean,它也无法从不同的 IntegrationFlow 容器中识别出相同的对象(MessageChannel)。以下示例是错误的:
@Bean
public IntegrationFlow startFlow() {
return IntegrationFlow.from("input")
.transform(...)
.channel(MessageChannels.queue("queueChannel"))
.get();
}
@Bean
public IntegrationFlow endFlow() {
return IntegrationFlow.from(MessageChannels.queue("queueChannel"))
.handle(...)
.get();
}
那个错误示例的结果是以下异常:
Caused by: java.lang.IllegalStateException:
Could not register object [queueChannel] under bean name 'queueChannel':
there is already object [queueChannel] bound
at o.s.b.f.s.DefaultSingletonBeanRegistry.registerSingleton(DefaultSingletonBeanRegistry.java:129)
要使其正常工作,您需要为该通道声明 @Bean,并在不同的 IntegrationFlow 实例中使用其 bean 方法。