配置消息通道
要创建消息通道实例,您可以使用 <channel/> 元素(用于 XML 配置)或 DirectChannel 实例(用于 Java 配置),如下所示:
- Java
- XML
@Bean
public MessageChannel exampleChannel() {
return new DirectChannel();
}
<int:channel id="exampleChannel"/>
当你使用不带任何子元素的 <channel/> 元素时,它会创建一个 DirectChannel 实例(一个 SubscribableChannel)。
要创建发布-订阅通道,请使用 <publish-subscribe-channel/> 元素(Java 中的 PublishSubscribeChannel),如下所示:
- Java
- XML
@Bean
public MessageChannel exampleChannel() {
return new PublishSubscribeChannel();
}
<int:publish-subscribe-channel id="exampleChannel"/>
或者,你也可以提供多种 <queue/> 子元素来创建任意可轮询通道类型(如消息通道实现中所述)。以下各节展示了每种通道类型的示例。
DirectChannel 配置
如前所述,DirectChannel 是默认类型。以下清单展示了如何定义一个 DirectChannel:
- Java
- XML
@Bean
public MessageChannel directChannel() {
return new DirectChannel();
}
<int:channel id="directChannel"/>
默认通道具有轮询负载均衡器,并且启用了故障转移功能(更多详情请参见DirectChannel)。要禁用其中一个或两个功能,请添加 <dispatcher/> 子元素(即 DirectChannel 的 LoadBalancingStrategy 构造函数),并按如下方式配置属性:
- Java
- XML
@Bean
public MessageChannel failFastChannel() {
DirectChannel channel = new DirectChannel();
channel.setFailover(false);
return channel;
}
@Bean
public MessageChannel failFastChannel() {
return new DirectChannel(null);
}
<int:channel id="failFastChannel">
<int:dispatcher failover="false"/>
</channel>
<int:channel id="channelWithFixedOrderSequenceFailover">
<int:dispatcher load-balancer="none"/>
</int:channel>
从版本6.3开始,所有基于 UnicastingDispatcher 的 MessageChannel 实现都可以配置一个 Predicate<Exception> failoverStrategy,而不是简单的 failover 选项。该谓词根据当前 MessageHandler 抛出的异常来决定是否故障转移到下一个 MessageHandler。更复杂的错误分析应使用 ErrorMessageExceptionTypeRouter 来完成。
数据类型通道配置
有时,消费者只能处理特定类型的有效载荷,这迫使您必须确保输入消息的有效载荷类型。首先想到的可能是使用消息过滤器。然而,消息过滤器所能做的只是过滤掉不符合消费者要求的消息。另一种方法是使用基于内容的路由器,将数据类型不符合的消息路由到特定的转换器,以强制进行转换并转换为所需的数据类型。这虽然可行,但实现相同目标的更简单方法是应用数据类型通道模式。您可以为每种特定的有效载荷数据类型使用单独的数据类型通道。
要创建一个仅接受包含特定负载类型消息的数据类型通道,请在通道元素的 datatype 属性中提供数据类型的完全限定类名,如下例所示:
- Java
- XML
@Bean
public MessageChannel numberChannel() {
DirectChannel channel = new DirectChannel();
channel.setDatatypes(Number.class);
return channel;
}
<int:channel id="numberChannel" datatype="java.lang.Number"/>
请注意,类型检查适用于任何可分配给通道数据类型的类型。换句话说,前面示例中的 numberChannel 将接受有效负载为 java.lang.Integer 或 java.lang.Double 的消息。多个类型可以作为逗号分隔的列表提供,如下例所示:
- Java
- XML
@Bean
public MessageChannel numberChannel() {
DirectChannel channel = new DirectChannel();
channel.setDatatypes(String.class, Number.class);
return channel;
}
<int:channel id="stringOrNumberChannel" datatype="java.lang.String,java.lang.Number"/>
因此,在前面的示例中,'numberChannel' 仅接受数据类型为 java.lang.Number 的消息。但如果消息的有效负载不是所需类型,会发生什么情况呢?这取决于你是否定义了一个名为 integrationConversionService 的 bean,该 bean 是 Spring 的 Conversion Service 的实例。如果没有定义,则会立即抛出 Exception。然而,如果你定义了 integrationConversionService bean,系统将尝试使用它来将消息的有效负载转换为可接受的类型。
你甚至可以注册自定义转换器。例如,假设你向上面配置的 'numberChannel' 发送一个带有 String 负载的消息。你可以按以下方式处理该消息:
MessageChannel inChannel = context.getBean("numberChannel", MessageChannel.class);
inChannel.send(new GenericMessage<String>("5"));
通常,这是一个完全合法的操作。然而,由于我们使用了数据类型通道,此类操作的结果将产生类似于以下的异常:
Exception in thread "main" org.springframework.integration.MessageDeliveryException:
Channel 'numberChannel'
expected one of the following datataypes [class java.lang.Number],
but received [class java.lang.String]
…
异常发生是因为我们要求载荷类型为 Number,但我们发送了一个 String。因此我们需要某种方法将 String 转换为 Number。为此,我们可以实现一个类似于以下示例的转换器:
public static class StringToIntegerConverter implements Converter<String, Integer> {
public Integer convert(String source) {
return Integer.parseInt(source);
}
}
然后,我们可以将其注册为集成转换服务的转换器,如下例所示:
- Java
- XML
@Bean
@IntegrationConverter
public StringToIntegerConverter strToInt() {
return new StringToIntegerConverter();
}
<int:converter ref="strToInt"/>
<bean id="strToInt" class="org.springframework.integration.util.Demo.StringToIntegerConverter"/>
或者当 StringToIntegerConverter 类被标记为 @Component 注解时,用于自动扫描。
当 'converter' 元素被解析时,如果尚未定义 integrationConversionService bean,则会创建该 bean。配置该转换器后,send 操作现在将成功执行,因为数据类型通道会使用该转换器将 String 类型的负载转换为 Integer 类型。
有关负载类型转换的更多信息,请参阅负载类型转换。
从版本4.0开始,DefaultDatatypeChannelMessageConverter 会调用 integrationConversionService,该服务会在应用上下文中查找转换服务。要使用不同的转换技术,您可以在通道上指定 message-converter 属性。这必须是对 MessageConverter 实现的引用。仅使用 fromMessage 方法。该方法为转换器提供了对消息头部的访问权限(以防转换可能需要来自头部的信息,例如 content-type)。该方法只能返回转换后的有效负载或完整的 Message 对象。如果是后者,转换器必须小心复制入站消息中的所有头部。
或者,你可以声明一个类型为 MessageConverter 的 <bean/>,其 ID 为 datatypeChannelMessageConverter,该转换器将被所有具有 datatype 的通道使用。
QueueChannel 配置
要创建 QueueChannel,请使用 <queue/> 子元素。您可以按如下方式指定通道的容量:
- Java
- XML
@Bean
public PollableChannel queueChannel() {
return new QueueChannel(25);
}
<int:channel id="queueChannel">
<queue capacity="25"/>
</int:channel>
如果您没有为此 <queue/> 子元素的 'capacity' 属性提供值,那么生成的队列将是无界的。为了避免诸如内存耗尽等问题,我们强烈建议您为有界队列设置一个明确的值。
持久化 QueueChannel 配置
由于 QueueChannel 提供了缓冲消息的能力,但默认仅在内存中实现,这也带来了系统故障时消息可能丢失的风险。为了降低这种风险,QueueChannel 可以通过持久化实现 MessageGroupStore 策略接口来支持。有关 MessageGroupStore 和 MessageStore 的更多详细信息,请参阅消息存储。
当使用 message-store 属性时,不允许使用 capacity 属性。
当 QueueChannel 接收到一条 Message 时,它会将该消息添加到消息存储中。当从 QueueChannel 轮询到一条 Message 时,该消息会从消息存储中移除。
默认情况下,QueueChannel 将其消息存储在内存队列中,这可能导致之前提到的消息丢失场景。不过,Spring Integration 提供了持久化存储,例如 JdbcChannelMessageStore。
您可以通过添加 message-store 属性为任何 QueueChannel 配置消息存储,如下例所示:
<int:channel id="dbBackedChannel">
<int:queue message-store="channelStore"/>
</int:channel>
<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource"/>
<property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>
(Java/Kotlin 配置选项请参见以下示例。)
Spring Integration JDBC 模块还为多种流行数据库提供了模式数据定义语言(DDL)。这些模式位于该模块(spring-integration-jdbc)的 org.springframework.integration.jdbc.store.channel 包中。
一个重要的特性是,对于任何事务性持久化存储(例如 JdbcChannelMessageStore),只要轮询器配置了事务,从存储中移除的消息只有在事务成功完成时才会被永久删除。否则,事务会回滚,而 Message 不会丢失。
随着与“NoSQL”数据存储相关的Spring项目日益增多,这些项目为底层存储提供了支持,因此消息存储的许多其他实现也变得可用。如果找不到满足特定需求的实现,你也可以自行实现 MessageGroupStore 接口。
自 4.0 版本起,我们建议尽可能将 QueueChannel 实例配置为使用 ChannelMessageStore。与通用消息存储相比,这些存储通常针对此用途进行了优化。如果 ChannelMessageStore 是 ChannelPriorityMessageStore,则消息将按优先级顺序以 FIFO(先进先出)方式接收。优先级的概念由消息存储实现决定。例如,以下示例展示了 MongoDB 通道消息存储 的 Java 配置:
- Java
- Java DSL
- Kotlin DSL
@Bean
public BasicMessageGroupStore mongoDbChannelMessageStore(MongoDbFactory mongoDbFactory) {
MongoDbChannelMessageStore store = new MongoDbChannelMessageStore(mongoDbFactory);
store.setPriorityEnabled(true);
return store;
}
@Bean
public PollableChannel priorityQueue(BasicMessageGroupStore mongoDbChannelMessageStore) {
return new PriorityChannel(new MessageGroupQueue(mongoDbChannelMessageStore, "priorityQueue"));
}
@Bean
public IntegrationFlow priorityFlow(PriorityCapableChannelMessageStore mongoDbChannelMessageStore) {
return IntegrationFlow.from((Channels c) ->
c.priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup"))
....
.get();
}
@Bean
fun priorityFlow(mongoDbChannelMessageStore: PriorityCapableChannelMessageStore) =
integrationFlow {
channel { priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup") }
}
注意 MessageGroupQueue 类。这是一个 BlockingQueue 实现,用于执行 MessageGroupStore 操作。
自定义 QueueChannel 环境的另一种选择是通过 <int:queue> 子元素的 ref 属性或其特定构造函数提供。该属性提供对任何 java.util.Queue 实现的引用。例如,可以按如下方式配置 Hazelcast 分布式 IQueue:
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance(new Config()
.setProperty("hazelcast.logging.type", "log4j"));
}
@Bean
public PollableChannel distributedQueue() {
return new QueueChannel(hazelcastInstance()
.getQueue("springIntegrationQueue"));
}
PublishSubscribeChannel 配置
要创建 PublishSubscribeChannel,请使用 <publish-subscribe-channel/> 元素。使用此元素时,您还可以指定用于发布消息的 task-executor(如果未指定,则在发送者的线程中发布),如下所示:
- Java
- XML
@Bean
public MessageChannel pubsubChannel() {
return new PublishSubscribeChannel(someExecutor());
}
<int:publish-subscribe-channel id="pubsubChannel" task-executor="someExecutor"/>
如果在 PublishSubscribeChannel 的下游提供重排器或聚合器,您可以将通道的 apply-sequence 属性设置为 true。这样做表示通道在传递消息之前应设置 sequence-size 和 sequence-number 消息头以及关联 ID。例如,如果有五个订阅者,sequence-size 将被设置为 5,而消息的 sequence-number 头值范围将从 1 到 5。
除了 Executor 之外,您还可以配置一个 ErrorHandler。默认情况下,PublishSubscribeChannel 使用 MessagePublishingErrorHandler 实现将错误发送到 errorChannel 头信息中的 MessageChannel 或全局的 errorChannel 实例。如果未配置 Executor,ErrorHandler 将被忽略,异常将直接抛给调用者的线程。
如果在 PublishSubscribeChannel 的下游配置了 Resequencer 或 Aggregator,您可以将通道的 apply-sequence 属性设置为 true。这样做表示通道在传递消息之前应设置序列大小(sequence-size)和序列号(sequence-number)消息头以及关联 ID(correlation ID)。例如,如果有五个订阅者,序列大小将被设置为 5,而消息的序列号头值将范围从 1 到 5。
以下示例展示了如何将 apply-sequence 标头设置为 true:
- Java
- XML
@Bean
public MessageChannel pubsubChannel() {
PublishSubscribeChannel channel = new PublishSubscribeChannel();
channel.setApplySequence(true);
return channel;
}
<int:publish-subscribe-channel id="pubsubChannel" apply-sequence="true"/>
默认情况下,apply-sequence 的值为 false,这样发布-订阅通道可以将完全相同的消息实例发送到多个出站通道。由于 Spring Integration 强制要求负载和头部引用的不可变性,当该标志设置为 true 时,通道会创建具有相同负载引用但不同头部值的新 Message 实例。
从5.4.3版本开始,PublishSubscribeChannel 还可以通过其 BroadcastingDispatcher 的 requireSubscribers 选项进行配置,以指示该通道在没有订阅者时不会静默忽略消息。当没有订阅者且此选项设置为 true 时,将抛出带有 Dispatcher has no subscribers 消息的 MessageDispatchingException。
ExecutorChannel
要创建 ExecutorChannel,请添加带有 task-executor 属性的 <dispatcher> 子元素。该属性的值可以引用上下文中的任何 TaskExecutor。例如,这样做可以配置一个线程池来将消息分派给订阅的处理程序。如前所述,这样做会打破发送方和接收方之间的单线程执行上下文,因此任何活动的事务上下文都不会被处理程序的调用所共享(也就是说,处理程序可能会抛出 Exception,但 send 调用已经成功返回)。以下示例展示了如何使用 dispatcher 元素并在 task-executor 属性中指定一个执行器:
- Java
- XML
@Bean
public MessageChannel executorChannel() {
return new ExecutorChannel(someExecutor());
}
<int:channel id="executorChannel">
<int:dispatcher task-executor="someExecutor"/>
</int:channel>
load-balancer 和 failover 选项同样适用于 <dispatcher/> 子元素,如先前在 DirectChannel 配置 中所述。相同的默认值适用。因此,除非为这些属性中的一个或两个提供了显式配置,否则通道将启用故障转移并采用轮询负载均衡策略,如下例所示:
<int:channel id="executorChannelWithoutFailover">
<int:dispatcher task-executor="someExecutor" failover="false"/>
</int:channel>
PriorityChannel 配置
要创建 PriorityChannel,请使用 <priority-queue/> 子元素,如下例所示:
- Java
- XML
@Bean
public PollableChannel priorityChannel() {
return new PriorityChannel(20);
}
<int:channel id="priorityChannel">
<int:priority-queue capacity="20"/>
</int:channel>
默认情况下,通道会参考消息的 priority 头部。不过,你也可以提供一个自定义的 Comparator 引用。另外,请注意 PriorityChannel(与其他类型一样)确实支持 datatype 属性。与 QueueChannel 类似,它也支持 capacity 属性。以下示例展示了所有这些特性:
- Java
- XML
@Bean
public PollableChannel priorityChannel() {
PriorityChannel channel = new PriorityChannel(20, widgetComparator());
channel.setDatatypes(example.Widget.class);
return channel;
}
<int:channel id="priorityChannel" datatype="example.Widget">
<int:priority-queue comparator="widgetComparator"
capacity="10"/>
</int:channel>
自 4.0 版本起,priority-channel 子元素支持 message-store 选项(此时不允许使用 comparator 和 capacity)。消息存储必须是 PriorityCapableChannelMessageStore。目前为 Redis、JDBC 和 MongoDB 提供了 PriorityCapableChannelMessageStore 的实现。更多信息请参阅 QueueChannel 配置 和 消息存储。你可以在 支持消息通道 中找到配置示例。
RendezvousChannel 配置
当队列子元素为 <rendezvous-queue> 时,会创建一个 RendezvousChannel。它不提供超出前述配置的额外选项,且其队列不接受任何容量值,因为它是一个零容量直接交接队列。以下示例展示了如何声明 RendezvousChannel:
- Java
- XML
@Bean
public PollableChannel rendezvousChannel() {
return new RendezvousChannel();
}
<int:channel id="rendezvousChannel"/>
<int:rendezvous-queue/>
</int:channel>
作用域通道配置
任何频道都可以配置一个 scope 属性,如下例所示:
<int:channel id="threadLocalChannel" scope="thread"/>
通道拦截器配置
消息通道也可以拥有拦截器,如通道拦截器中所述。<interceptors/> 子元素可以添加到 <channel/>(或更具体的元素类型)中。您可以通过 ref 属性引用任何实现 ChannelInterceptor 接口的 Spring 托管对象,如下例所示:
<int:channel id="exampleChannel">
<int:interceptors>
<ref bean="trafficMonitoringInterceptor"/>
</int:interceptors>
</int:channel>
通常,我们建议将拦截器实现定义在单独的位置,因为它们通常提供可在多个通道中复用的通用行为。
全局通道拦截器配置
通道拦截器提供了一种清晰简洁的方式,用于为每个独立通道应用横切行为。如果需要在多个通道上应用相同的行为,为每个通道配置相同的拦截器集并不是最高效的方式。为了避免重复配置,同时使拦截器能够应用于多个通道,Spring Integration 提供了全局拦截器。请看以下两个示例:
<int:channel-interceptor pattern="input*, thing2*, thing1, !cat*" order="3">
<bean class="thing1.thing2SampleInterceptor"/>
</int:channel-interceptor>
<int:channel-interceptor ref="myInterceptor" pattern="input*, thing2*, thing1, !cat*" order="3"/>
<bean id="myInterceptor" class="thing1.thing2SampleInterceptor"/>
每个 <channel-interceptor/> 元素允许您定义一个全局拦截器,该拦截器将应用于所有与 pattern 属性定义的任何模式匹配的通道。在上述情况下,全局拦截器应用于 'thing1' 通道以及所有以 'thing2' 或 'input' 开头的其他通道,但不应用于以 'thing3' 开头的通道(自 5.0 版本起)。
向模式中添加此语法会导致一个可能(虽然或许不太可能)的问题。如果你有一个名为 !thing1 的 bean,并且在通道拦截器的 pattern 模式中包含了 !thing1 模式,它将不再匹配。该模式现在会匹配所有不名为 thing1 的 bean。在这种情况下,你可以使用 \ 来转义模式中的 !。模式 \!thing1 会匹配名为 !thing1 的 bean。
order 属性允许您在给定通道上存在多个拦截器时,管理此拦截器的注入位置。例如,通道 'inputChannel' 可以配置本地独立的拦截器,如下例所示:
<int:channel id="inputChannel">
<int:interceptors>
<int:wire-tap channel="logger"/>
</int:interceptors>
</int:channel>
一个合理的问题是:“全局拦截器如何相对于本地配置或通过其他全局拦截器定义配置的拦截器进行注入?”当前实现提供了一种简单的机制来定义拦截器执行顺序。order 属性中的正数确保拦截器在所有现有拦截器之后注入,而负数则确保拦截器在现有拦截器之前注入。这意味着,在前面的示例中,全局拦截器在本地配置的“wire-tap”拦截器之后注入(因为其 order 大于 0)。如果存在另一个具有匹配 pattern 的全局拦截器,其顺序将通过比较两个拦截器的 order 属性值来确定。若要在现有拦截器之前注入全局拦截器,请为 order 属性使用负值。
请注意,order 和 pattern 属性都是可选的。order 的默认值为 0,而 pattern 的默认值为 '*'(用于匹配所有通道)。
线路窃听
如前所述,Spring Integration 提供了一个简单的线缆分接拦截器。您可以在 <interceptors/> 元素内的任何通道上配置线缆分接。这在调试时特别有用,并且可以与 Spring Integration 的日志记录通道适配器结合使用,如下所示:
<int:channel id="in">
<int:interceptors>
<int:wire-tap channel="logger"/>
</int:interceptors>
</int:channel>
<int:logging-channel-adapter id="logger" level="DEBUG"/>
'logging-channel-adapter' 也接受一个 'expression' 属性,以便你可以针对 'payload' 和 'headers' 变量评估 SpEL 表达式。或者,要记录完整消息的 toString() 结果,可以为 'log-full-message' 属性提供 true 值。默认情况下,该值为 false,因此仅记录有效负载。将其设置为 true 可以记录除有效负载外的所有头部信息。'expression' 选项提供了最大的灵活性(例如,expression="payload.user.name")。
关于 wire tap 及其他类似组件(消息发布配置)的一个常见误解是它们本质上是自动异步的。实际上,wire tap 组件默认并非以异步方式调用。相反,Spring Integration 专注于采用统一的方法来配置异步行为:消息通道。决定消息流中某些部分是同步还是异步的关键在于该流程中配置的消息通道类型。这正是消息通道抽象的主要优势之一。自框架诞生之初,我们就始终强调消息通道作为框架一等公民的必要性和价值。它不仅仅是 EIP 模式的一种内部隐式实现,而是完全作为可配置组件暴露给最终用户。因此,wire tap 组件仅负责执行以下任务:
-
通过接入一个通道(例如
channelA)来拦截消息流 -
捕获每条消息
-
将消息发送到另一个通道(例如
channelB)
它本质上是桥接模式的一种变体,但被封装在通道定义中(因此更容易启用和禁用,而不会中断流程)。此外,与桥接不同,它基本上会分叉出另一个消息流。这个流是同步的还是异步的?答案取决于 channelB 的消息通道类型。我们有以下选项:直接通道、可轮询通道和执行器通道。后两种会打破线程边界,使得通过这些通道的通信变为异步,因为从该通道向其订阅的处理程序分发消息时使用的线程,与用于向该通道发送消息的线程不同。这正是决定你的线缆分流流是同步还是异步的关键。这与框架内的其他组件(如消息发布器)保持一致,并通过让你无需提前担心(除了编写线程安全代码)特定代码段应实现为同步还是异步,增加了一致性和简洁性。通过消息通道连接两段代码(例如组件 A 和组件 B)的实际方式,决定了它们的协作是同步还是异步。你甚至可能在未来希望从同步更改为异步,而消息通道让你能够快速实现这一点,无需触及代码。
关于窃听器(wire tap)的最后一点是,尽管上述理由解释了为何默认不采用异步模式,但您需要记住,通常希望尽快传递消息。因此,使用异步通道选项作为窃听器的出站通道是非常常见的做法。然而,默认情况下并不强制要求异步行为。如果我们强制要求异步,许多用例将会失效,包括您可能不希望破坏事务边界的情况。例如,您可能出于审计目的使用窃听器模式,并且确实希望审计消息在原始事务内发送。举例来说,您可以将窃听器连接到 JMS 出站通道适配器。这样,您就能两全其美:1) JMS 消息的发送可以在事务内进行,同时 2) 它仍然是“发送即忘”的操作,从而防止主消息流出现明显的延迟。
从 4.0 版本开始,当拦截器(例如 WireTap 类)引用通道时,避免循环引用非常重要。你需要通过适当的模式或编程方式,将这些通道从当前拦截器要拦截的通道中排除。如果你有一个自定义的 ChannelInterceptor 引用了某个 channel,可以考虑实现 VetoCapableInterceptor。这样,框架会根据提供的模式询问拦截器是否可以拦截每个候选通道。你也可以在拦截器方法中添加运行时保护,以确保该通道不是拦截器引用的通道。WireTap 同时使用了这两种技术。
从版本 4.3 开始,WireTap 增加了额外的构造函数,这些构造函数接受 channelName 参数而非 MessageChannel 实例。这在 Java 配置和使用通道自动创建逻辑时非常方便。目标 MessageChannel bean 会在首次与拦截器交互时,根据提供的 channelName 进行解析。
通道解析需要一个 BeanFactory,因此 wire tap 实例必须是一个由 Spring 管理的 bean。
这种后期绑定的方法还简化了使用 Java DSL 配置的典型监听模式,如下例所示:
@Bean
public PollableChannel myChannel() {
return MessageChannels.queue()
.wireTap("loggingFlow.input")
.get();
}
@Bean
public IntegrationFlow loggingFlow() {
return f -> f.log();
}
条件监听
通过使用 selector 或 selector-expression 属性,可以使 Wire Tap 变为条件触发。selector 引用一个 MessageSelector Bean,该 Bean 可以在运行时决定消息是否应发送到 Tap 通道。类似地,selector-expression 是一个布尔类型的 SpEL 表达式,用于实现相同的目的:如果表达式计算结果为 true,则消息将被发送到 Tap 通道。
全局线路窃听配置
可以配置全局的线缆监听作为全局通道拦截器配置的特殊情况。为此,需要配置一个顶层的 wire-tap 元素。现在,除了正常的 wire-tap 命名空间支持外,还支持 pattern 和 order 属性,其工作方式与 channel-interceptor 完全相同。以下示例展示了如何配置全局线缆监听:
- Java
- XML
@Bean
@GlobalChannelInterceptor(patterns = "input*,thing2*,thing1", order = 3)
public WireTap wireTap(MessageChannel wiretapChannel) {
return new WireTap(wiretapChannel);
}
<int:wire-tap pattern="input*, thing2*, thing1" order="3" channel="wiretapChannel"/>
全局线缆分接提供了一种便捷的方式,无需修改现有通道配置即可从外部配置单通道线缆分接。为此,将 pattern 属性设置为目标通道名称。例如,您可以使用此技术配置测试用例以验证通道上的消息。