消息通道
除了带有 EIP 方法的 IntegrationFlowBuilder
,Java DSL 还提供了一个流畅的 API 来配置 MessageChannel
实例。为此,提供了 MessageChannels
构建工厂。以下示例展示了如何使用它:
@Bean
public PriorityChannelSpec priorityChannel() {
return MessageChannels.priority(this.mongoDbChannelMessageStore, "priorityGroup")
.interceptor(wireTap());
}
相同的 MessageChannels
构建工厂可以在 IntegrationFlowBuilder
的 channel()
EIP 方法中使用,以连接端点,类似于在 XML 配置中连接 input-channel
/output-channel
对。默认情况下,端点通过 DirectChannel
实例连接,其中 bean 名称基于以下模式:[IntegrationFlow.beanName].channel#[channelNameIndex]
。此规则也适用于由内联 MessageChannels
构建工厂生成的未命名通道。然而,所有 MessageChannels
方法都有一个变体,可以识别 channelId
,你可以用它来设置 MessageChannel
实例的 bean 名称。MessageChannel
引用和 beanName
可以用作 bean 方法调用。以下示例显示了使用 channel()
EIP 方法的可能方式:
@Bean
public QueueChannelSpec queueChannel() {
return MessageChannels.queue();
}
@Bean
public PublishSubscribeChannelSpec<?> publishSubscribe() {
return MessageChannels.publishSubscribe();
}
@Bean
public IntegrationFlow channelFlow() {
return IntegrationFlow.from("input")
.fixedSubscriberChannel()
.channel("queueChannel")
.channel(publishSubscribe())
.channel(MessageChannels.executor("executorChannel", this.taskExecutor))
.channel("output")
.get();
}
-
from("input")
表示 “查找并使用具有 "input" id 的MessageChannel
,或者创建一个”。 -
fixedSubscriberChannel()
生成一个FixedSubscriberChannel
实例,并以名称channelFlow.channel#0
注册它。 -
channel("queueChannel")
的工作方式相同,但使用现有的queueChannel
bean。 -
channel(publishSubscribe())
是 bean 方法引用。 -
channel(MessageChannels.executor("executorChannel", this.taskExecutor))
是将IntegrationComponentSpec
暴露给ExecutorChannel
的IntegrationFlowBuilder
,并将其注册为executorChannel
。 -
channel("output")
在没有同名的现有 bean 的情况下,将名为output
的DirectChannel
bean 注册。
注意:前面的 IntegrationFlow
定义是有效的,且其所有通道都应用于带有 BridgeHandler
实例的端点。
在通过不同的 IntegrationFlow
实例使用 MessageChannels
工厂创建相同的内联通道定义时要小心。即使 DSL 解析器会将不存在的对象注册为 bean,它也无法从不同的 IntegrationFlow
容器中确定相同的对象(MessageChannel
)。以下示例是错误的:
@Bean
public IntegrationFlow startFlow() {
return IntegrationFlow.from("input")
.transform(...)
.channel(MessageChannels.queue("queueChannel"))
.get();
}
@Bean
public IntegrationFlow endFlow() {
return IntegrationFlow.from(MessageChannels.queue("queueChannel"))
.handle(...)
.get();
}
那个坏例子的结果是以下异常:
Caused by: java.lang.IllegalStateException:
Could not register object [queueChannel] under bean name 'queueChannel':
there is already object [queueChannel] bound
at o.s.b.f.s.DefaultSingletonBeanRegistry.registerSingleton(DefaultSingletonBeanRegistry.java:129)
要使其工作,你需要为该通道声明 @Bean
,并从不同的 IntegrationFlow
实例中使用其 bean 方法。