跳到主要内容

配置消息通道

QWen Plus 中英对照 Configuring Message Channels

要创建一个消息通道实例,您可以使用 <channel/> 元素进行 xml 配置或使用 DirectChannel 实例进行 Java 配置,如下所示:

@Bean
public MessageChannel exampleChannel() {
return new DirectChannel();
}
java

当你使用 <channel/> 元素而不带任何子元素时,它会创建一个 DirectChannel 实例(一个 SubscribableChannel)。

要创建一个发布-订阅通道,请使用 <publish-subscribe-channel/> 元素(Java 中的 PublishSubscribeChannel),如下所示:

@Bean
public MessageChannel exampleChannel() {
return new PublishSubscribeChannel();
}
java

你也可以提供各种 <queue/> 子元素来创建任何可轮询的通道类型(如 消息通道实现 中所述)。以下各节展示了每种通道类型的示例。

DirectChannel 配置

如前所述,DirectChannel 是默认类型。以下列表显示了如何定义一个:

@Bean
public MessageChannel directChannel() {
return new DirectChannel();
}
java

默认通道具有轮询负载均衡器,并且启用了故障转移(详情参见 DirectChannel)。要禁用其中一个或两个功能,添加一个 <dispatcher/> 子元素(DirectChannelLoadBalancingStrategy 构造函数),并按如下方式配置属性:

@Bean
public MessageChannel failFastChannel() {
DirectChannel channel = new DirectChannel();
channel.setFailover(false);
return channel;
}

@Bean
public MessageChannel failFastChannel() {
return new DirectChannel(null);
}
java

从 6.3 版本开始,所有基于 UnicastingDispatcherMessageChannel 实现都可以配置为使用 Predicate<Exception> failoverStrategy,而不是简单的 failover 选项。这个谓词根据当前 MessageHandler 抛出的异常来决定是否故障转移到下一个 MessageHandler。更复杂的错误分析应该使用 ErrorMessageExceptionTypeRouter 进行。

数据类型通道配置

有时,消费者只能处理特定类型的负载,这迫使你确保输入消息的负载类型。首先想到的可能是使用消息过滤器。但是,消息过滤器所能做的只是过滤掉不符合消费者要求的消息。另一种方法是使用基于内容的路由器,将具有不符合数据类型的消 息路由到特定的转换器,以强制执行转换为所需的数据类型。这虽然可行,但有一种更简单的方法可以实现相同的效果,那就是应用 Datatype Channel 模式。你可以为每种特定的负载数据类型使用单独的数据类型通道。

要创建一个只接受包含特定有效负载类型消息的数据类型通道,在通道元素的 datatype 属性中提供数据类型的全限定类名,如下例所示:

@Bean
public MessageChannel numberChannel() {
DirectChannel channel = new DirectChannel();
channel.setDatatypes(Number.class);
return channel;
}
java

请注意,类型检查会通过任何可以分配给通道数据类型的类型。换句话说,前面示例中的 numberChannel 会接受有效负载为 java.lang.Integerjava.lang.Double 的消息。可以提供多个类型,以逗号分隔的列表形式,如下例所示:

@Bean
public MessageChannel numberChannel() {
DirectChannel channel = new DirectChannel();
channel.setDatatypes(String.class, Number.class);
return channel;
}
java

所以前面例子中的 'numberChannel' 只接受数据类型为 java.lang.Number 的消息。但如果消息的有效载荷不是所需类型会发生什么?这取决于你是否定义了一个名为 integrationConversionService 的 bean,它是 Spring 的 转换服务 的实例。如果没有,则会立即抛出一个 Exception。但是,如果你定义了一个 integrationConversionService bean,它将尝试将消息的有效载荷转换为可接受的类型。

你甚至可以注册自定义转换器。例如,假设你向我们上面配置的 'numberChannel' 发送了一个带有 String 负载的消息。你可能会如下处理该消息:

MessageChannel inChannel = context.getBean("numberChannel", MessageChannel.class);
inChannel.send(new GenericMessage<String>("5"));
java

通常情况下,这是一个完全合法的操作。但是,由于我们使用了 Datatype Channel,此类操作的结果会生成一个类似的异常:

Exception in thread "main" org.springframework.integration.MessageDeliveryException:
Channel 'numberChannel'
expected one of the following datataypes [class java.lang.Number],
but received [class java.lang.String]

none

异常发生是因为我们要求有效载荷类型为 Number,但我们发送了一个 String。因此,我们需要某种方法将 String 转换为 Number。为此,我们可以实现一个类似于以下示例的转换器:

public static class StringToIntegerConverter implements Converter<String, Integer> {
public Integer convert(String source) {
return Integer.parseInt(source);
}
}
java

然后我们可以将其注册为集成转换服务的转换器,如下例所示:

@Bean
@IntegrationConverter
public StringToIntegerConverter strToInt {
return new StringToIntegerConverter();
}
java

或者在 StringToIntegerConverter 类上,当它被标记为带有 @Component 注解以进行自动扫描时。

当解析 'converter' 元素时,如果尚未定义,则会创建 integrationConversionService bean。有了这个转换器,send 操作现在将成功,因为数据类型通道使用该转换器将 String 负载转换为 Integer

有关有效载荷类型转换的更多信息,请参阅有效载荷类型转换

从 4.0 版本开始,integrationConversionServiceDefaultDatatypeChannelMessageConverter 调用,它会在应用程序上下文中查找转换服务。要使用不同的转换技术,可以在通道上指定 message-converter 属性。这必须引用一个 MessageConverter 实现。仅使用 fromMessage 方法。它为转换器提供对消息头的访问(以防转换可能需要来自头的信息,例如 content-type)。该方法只能返回转换后的有效负载或完整的 Message 对象。如果是后者,则转换器必须小心将所有头信息从入站消息复制过来。

或者,你可以声明一个 IDdatatypeChannelMessageConverter<bean/>,其类型为 MessageConverter,并且该转换器会被所有具有 datatype 的通道使用。

QueueChannel 配置

要创建一个 QueueChannel,请使用 <queue/> 子元素。您可以如下指定通道的容量:

@Bean
public PollableChannel queueChannel() {
return new QueueChannel(25);
}
java
备注

如果您不为这个 <queue/> 子元素的 'capacity' 属性提供值,则生成的队列将是无界的。为了避免诸如内存不足等问题,我们强烈建议您为有界队列设置一个明确的值。

持久化 QueueChannel 配置

由于 QueueChannel 提供了缓冲消息的能力,但默认情况下仅在内存中进行,因此在系统故障时也存在消息丢失的可能性。为了降低这种风险,QueueChannel 可以由 MessageGroupStore 策略接口的持久化实现来支持。有关 MessageGroupStoreMessageStore 的更多详细信息,请参阅 消息存储

important

当使用 message-store 属性时,不允许使用 capacity 属性。

QueueChannel 接收到一个 Message 时,它会将消息添加到消息存储中。当从 QueueChannel 轮询一个 Message 时,它将从消息存储中移除。

默认情况下,QueueChannel 将其消息存储在内存队列中,这可能导致前面提到的消息丢失情况。但是,Spring Integration 提供了持久化存储,例如 JdbcChannelMessageStore

你可以通过添加 message-store 属性为任何 QueueChannel 配置一个消息存储,如下例所示:

<int:channel id="dbBackedChannel">
<int:queue message-store="channelStore"/>
</int:channel>

<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource"/>
<property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>
xml

(请参阅下面的示例,了解 Java/Kotlin 配置选项。)

Spring Integration JDBC 模块还为多个流行数据库提供了一个模式数据定义语言 (DDL)。这些模式位于该模块 (spring-integration-jdbc) 的 org.springframework.integration.jdbc.store.channel 包中。

important

一个重要特性是,对于任何事务性持久化存储(例如 JdbcChannelMessageStore),只要轮询器配置了事务,从存储中移除的消息只有在事务成功完成时才会被永久移除。否则,事务将回滚,Message 不会丢失。

还有许多其他的消息存储实现可供选择,随着越来越多与“NoSQL”数据存储相关的 Spring 项目不断提供对这些存储的底层支持。如果找不到满足您特定需求的实现,您还可以提供自己的 MessageGroupStore 接口实现。

从 4.0 版本开始,我们建议尽可能配置 QueueChannel 实例使用 ChannelMessageStore。这些存储一般针对这种用途进行了优化,而普通的消息存储则没有。如果 ChannelMessageStore 是一个 ChannelPriorityMessageStore,那么消息将按照优先级顺序以 FIFO 方式接收。优先级的概念由消息存储实现决定。例如,以下示例展示了 MongoDB Channel Message Store 的 Java 配置:

@Bean
public BasicMessageGroupStore mongoDbChannelMessageStore(MongoDbFactory mongoDbFactory) {
MongoDbChannelMessageStore store = new MongoDbChannelMessageStore(mongoDbFactory);
store.setPriorityEnabled(true);
return store;
}

@Bean
public PollableChannel priorityQueue(BasicMessageGroupStore mongoDbChannelMessageStore) {
return new PriorityChannel(new MessageGroupQueue(mongoDbChannelMessageStore, "priorityQueue"));
}
java
备注

注意 MessageGroupQueue 类。那是使用 MessageGroupStore 操作的一个 BlockingQueue 实现。

另一种自定义 QueueChannel 环境的选项是由 <int:queue> 子元素或其特定构造函数的 ref 属性提供的。此属性提供对任何 java.util.Queue 实现的引用。例如,可以如下配置 Hazelcast 分布式 IQueue

<int:queue ref="myHazelcastQueue"/>
xml

这里的 myHazelcastQueue 是一个 Hazelcast 分布式队列的 Bean 名称。

@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance(new Config()
.setProperty("hazelcast.logging.type", "log4j"));
}

@Bean
public PollableChannel distributedQueue() {
return new QueueChannel(hazelcastInstance()
.getQueue("springIntegrationQueue"));
}
java

PublishSubscribeChannel 配置

要创建 PublishSubscribeChannel,请使用 <publish-subscribe-channel/> 元素。在使用此元素时,您还可以指定用于发布消息的 task-executor(如果未指定任何内容,则它会在发送者的线程中发布),如下所示:

@Bean
public MessageChannel pubsubChannel() {
return new PublishSubscribeChannel(someExecutor());
}
java

如果你在 PublishSubscribeChannel 下游提供了一个重新排序器或聚合器,你可以在通道上将 'apply-sequence' 属性设置为 true。这样做表示该通道应在传递消息之前设置 sequence-sizesequence-number 消息头以及相关 ID。例如,如果有五个订阅者,sequence-size 会被设置为 5 ,而消息的 sequence-number 头值将从 1 到 5 不等。

除了 Executor,你还可以配置一个 ErrorHandler。默认情况下,PublishSubscribeChannel 使用 MessagePublishingErrorHandler 实现将错误发送到来自 errorChannel 头的 MessageChannel 或全局 errorChannel 实例。如果未配置 Executor,则忽略 ErrorHandler,异常会直接抛给调用者的线程。

如果你在 PublishSubscribeChannel 下游提供了一个 ResequencerAggregator,你可以在通道上将 'apply-sequence' 属性设置为 true。这样做表示该通道应该在传递消息之前设置 sequence-size 和 sequence-number 消息头以及 correlation ID。例如,如果有五个订阅者,sequence-size 会被设置为 5,而消息的 sequence-number 头值将从 15 不等。

以下示例展示了如何将 apply-sequence 标头设置为 true

@Bean
public MessageChannel pubsubChannel() {
PublishSubscribeChannel channel = new PublishSubscribeChannel();
channel.setApplySequence(true);
return channel;
}
java
备注

apply-sequence 的值默认为 false,以便发布-订阅通道可以将完全相同的消息实例发送到多个输出通道。由于 Spring Integration 强制有效载荷和头部引用的不可变性,当该标志设置为 true 时,通道会创建具有相同有效载荷引用但头部值不同的新 Message 实例。

从 5.4.3 版本开始,PublishSubscribeChannel 还可以使用其 BroadcastingDispatcherrequireSubscribers 选项进行配置,以指示当此通道没有订阅者时不会默默忽略消息。如果没有订阅者并且此选项设置为 true,则会抛出带有 Dispatcher has no subscribers 消息的 MessageDispatchingException

ExecutorChannel

要创建一个 ExecutorChannel,添加带有 task-executor 属性的 <dispatcher> 子元素。该属性的值可以引用上下文中的任何 TaskExecutor。例如,这样做可以配置一个线程池来分发消息给已订阅的处理器。如前所述,这样做会打破发送者和接收者之间的单线程执行上下文,因此任何活动的事务上下文不会被处理器的调用所共享(也就是说,处理器可能会抛出一个 Exception,但 send 调用已经成功返回)。以下示例展示了如何使用 dispatcher 元素并在 task-executor 属性中指定一个执行器:

<dispatcher task-executor="myTaskExecutor" />
xml

在这个例子中,myTaskExecutor 是你定义的一个任务执行器,用于处理消息分发。

@Bean
public MessageChannel executorChannel() {
return new ExecutorChannel(someExecutor());
}
java
备注

load-balancerfailover 选项同样都可以在 <dispatcher/> 子元素中使用,如前面在 DirectChannel 配置 中所述。相同的默认设置适用。因此,除非为这两个属性中的一个或两个提供了显式配置,否则通道将具有带有故障转移功能的轮询负载均衡策略,如下例所示:

<int:channel id="executorChannelWithoutFailover">
<int:dispatcher task-executor="someExecutor" failover="false"/>
</int:channel>
xml

PriorityChannel 配置

要创建 PriorityChannel,请使用 <priority-queue/> 子元素,如下例所示:

@Bean
public PollableChannel priorityChannel() {
return new PriorityChannel(20);
}
java

默认情况下,通道会查询消息的 priority 头。但是,你可以提供一个自定义的 Comparator 引用。另外,请注意 PriorityChannel(和其他类型一样)确实支持 datatype 属性。与 QueueChannel 一样,它也支持 capacity 属性。以下示例演示了所有这些:

@Bean
public PollableChannel priorityChannel() {
PriorityChannel channel = new PriorityChannel(20, widgetComparator());
channel.setDatatypes(example.Widget.class);
return channel;
}
java

自从 4.0 版本以来,priority-channel 子元素支持 message-store 选项(在这种情况下不允许使用 comparatorcapacity)。消息存储必须是 PriorityCapableChannelMessageStore。目前为 RedisJDBCMongoDB 提供了 PriorityCapableChannelMessageStore 的实现。更多信息请参阅 QueueChannel 配置消息存储。您可以在 支持的消息通道中找到示例配置。

RendezvousChannel 配置

当队列子元素是 <rendezvous-queue> 时,创建一个 RendezvousChannel。它没有提供任何额外的配置选项,这些选项已在前面描述过,而且它的队列不接受任何容量值,因为它是一个零容量直接交接队列。以下示例展示了如何声明一个 RendezvousChannel

@Bean
public PollableChannel rendezvousChannel() {
return new RendezvousChannel();
}
java

作用域通道配置

任何通道都可以配置一个 scope 属性,如下例所示:

<int:channel id="threadLocalChannel" scope="thread"/>
xml

Channel 拦截器配置

消息通道也可以有拦截器,如 Channel Interceptors 中所述。可以将 <interceptors/> 子元素添加到 <channel/>(或更具体的元素类型)。你可以提供 ref 属性来引用任何实现了 ChannelInterceptor 接口的 Spring 管理对象,如下例所示:

<int:channel id="exampleChannel">
<int:interceptors>
<ref bean="trafficMonitoringInterceptor"/>
</int:interceptors>
</int:channel>
xml

通常,我们建议将拦截器实现定义在单独的位置,因为它们通常提供可以在多个通道中重用的通用行为。

全局通道拦截器配置

Channel 拦截器提供了一种干净简洁的方式,为每个单独的通道应用横切行为。如果相同的 behavior 需要在多个通道上应用,则为每个通道配置相同的一组拦截器并不是最有效的方法。为了避免重复配置,同时允许拦截器应用于多个通道,Spring Integration 提供了全局拦截器。请考虑以下两个示例:

<int:channel-interceptor pattern="input*, thing2*, thing1, !cat*" order="3">
<bean class="thing1.thing2SampleInterceptor"/>
</int:channel-interceptor>
xml
<int:channel-interceptor ref="myInterceptor" pattern="input*, thing2*, thing1, !cat*" order="3"/>

<bean id="myInterceptor" class="thing1.thing2SampleInterceptor"/>
xml

每个 <channel-interceptor/> 元素允许你定义一个全局拦截器,该拦截器应用于匹配由 pattern 属性定义的任何模式的所有通道。在前面的例子中,全局拦截器被应用到 'thing1' 通道以及所有以 'thing2' 或 'input' 开头的其他通道,但不应用于以 'thing3' 开头的通道(自 5.0 版本起)。

注意

在此模式中添加此语法会导致一个可能的问题(尽管可能不太可能出现)。如果你有一个名为 !thing1 的 bean,并且你在通道拦截器的 pattern 模式中包含了一个 !thing1 模式的匹配,它将不再匹配。现在该模式匹配所有不名为 thing1 的 bean。在这种情况下,你可以用 \ 转义模式中的 !。模式 \!thing1 匹配名为 !thing1 的 bean。

order 属性让你管理在给定通道上有多个拦截器时,此拦截器的注入位置。例如,通道 'inputChannel' 可以在当地配置单独的拦截器(见下文),如下例所示:

<int:channel id="inputChannel">
<int:interceptors>
<int:wire-tap channel="logger"/>
</int:interceptors>
</int:channel>
xml

一个合理的问题是“全局拦截器如何相对于其他本地配置或通过其他全局拦截器定义配置的拦截器进行注入?”当前实现提供了一种简单的机制来定义拦截器执行顺序。order 属性中的正数确保拦截器在任何现有拦截器之后注入,而负数则确保拦截器在现有拦截器之前注入。这意味着,在前面的例子中,全局拦截器是在 'wire-tap' 拦截器(其 order 大于 0)之后注入的。如果有另一个具有匹配 pattern 的全局拦截器,它的顺序将通过比较两个拦截器的 order 属性值来确定。要在一个现有的拦截器之前注入全局拦截器,请为 order 属性使用负值。

备注

请注意,orderpattern 属性都是可选的。order 的默认值为 0,pattern 的默认值是 '*'(用于匹配所有通道)。

线路窃听

如前所述,Spring Integration 提供了一个简单的线程拦截器。你可以在 <interceptors/> 元素内的任何通道上配置一个线程tap。这样做对于调试特别有用,并且可以与 Spring Integration 的日志通道适配器结合使用,如下所示:

<int:channel id="in">
<int:interceptors>
<int:wire-tap channel="logger"/>
</int:interceptors>
</int:channel>

<int:logging-channel-adapter id="logger" level="DEBUG"/>
xml
提示

'logging-channel-adapter' 还接受一个 'expression' 属性,以便您可以针对 'payload' 和 'headers' 变量评估 SpEL 表达式。或者,要记录完整消息的 toString() 结果,可以为 'log-full-message' 属性提供 true 值。默认情况下,它是 false,因此只会记录有效负载。将其设置为 true 可以启用记录所有标头以及有效负载。'expression' 选项提供了最大的灵活性(例如,expression="payload.user.name")。

关于线程窃听器(wire tap)和其他类似组件(消息发布配置)的一个常见误解是它们本质上是自动异步的。默认情况下,作为组件的线程窃听器并不是以异步方式调用的。相反,Spring Integration 专注于统一的方式来配置异步行为:消息通道(message channel)。决定消息流中某些部分是同步还是异步的因素是在该流中配置的消息通道类型。这是消息通道抽象的主要优势之一。从框架诞生之初,我们就一直强调消息通道作为框架一级公民的需求和价值。它不仅仅是一个内部、隐式的 EIP 模式实现。它完全作为可配置组件暴露给最终用户。因此,线程窃听器组件仅负责执行以下任务:

  • 通过接入一个通道(例如,channelA)来拦截消息流

  • 获取每条消息

  • 将消息发送到另一个通道(例如,channelB

本质上它是桥接模式的一种变体,但它封装在通道定义内(因此可以更容易地启用和禁用而不中断流程)。此外,与桥接不同的是,它基本上是分叉了另一个消息流。那么这个流是同步的还是异步的呢?答案取决于 'channelB' 是什么类型的消息通道。我们有以下选项:直接通道、可轮询通道和执行器通道。最后两种打破了线程边界,使得通过这些通道进行通信变为异步,因为从该通道向其订阅处理程序发送消息的调度发生在与用于将消息发送到该通道的线程不同的线程上。这将决定你的线程插入点流是同步还是异步。它与其他框架内的组件(如消息发布者)保持一致,并通过让你不必提前担心(除了编写线程安全代码外)某段代码应该实现为同步还是异步,增加了一致性和简单性。通过消息通道连接两段代码(例如,组件 A 和组件 B)的实际布线决定了它们的合作是同步还是异步。你甚至可能希望将来从同步更改为异步,而消息通道使你可以在不触及代码的情况下快速完成更改。

关于线程窃听器的最后一点是,尽管上面给出了不默认异步的理由,但您应该记住,通常尽早传递消息是比较理想的。因此,在实践中,使用异步通道选项作为线程窃听器的输出通道是很常见的。然而,默认情况下并不会强制异步行为。有多个用例会因为我们这样做而中断,包括您可能不希望破坏事务边界的情况。也许您使用线程窃听模式进行审计,并且希望审计消息在原始事务中发送。例如,您可以将线程窃听器连接到 JMS 输出通道适配器。这样,您就可以两全其美:1) 发送 JMS 消息可以在事务内发生,同时 2) 它仍然是一个“发送即忘”的操作,从而防止主消息流中出现任何明显的延迟。

important

从 4.0 版本开始,当拦截器(例如 WireTap 类)引用一个通道时,避免循环引用非常重要。你需要将这些通道排除在当前拦截器所拦截的通道之外。这可以通过适当的模式或编程方式来实现。如果你有一个自定义的 ChannelInterceptor 引用了 channel,请考虑实现 VetoCapableInterceptor。这样,框架会根据提供的模式询问拦截器是否可以拦截每个候选通道。你还可以在拦截器方法中添加运行时保护,以确保该通道不是由拦截器引用的通道之一。WireTap 同时使用了这两种技术。

从 4.3 版本开始,WireTap 增加了新的构造函数,接受一个 channelName 而不是一个 MessageChannel 实例。这在使用 Java 配置和通道自动创建逻辑时非常方便。目标 MessageChannel bean 将在与拦截器的第一次交互时,根据提供的 channelName 进行解析。

important

通道解析需要一个 BeanFactory,因此线程tap实例必须是Spring管理的bean。

这种后期绑定方法还允许简化典型的 Java DSL 配置中的窃听模式,如下例所示:

@Bean
public PollableChannel myChannel() {
return MessageChannels.queue()
.wireTap("loggingFlow.input")
.get();
}

@Bean
public IntegrationFlow loggingFlow() {
return f -> f.log();
}
java

条件性线路窃听

Wire taps 可以通过使用 selectorselector-expression 属性来设置条件。selector 引用一个 MessageSelector bean,它可以在运行时确定消息是否应该发送到 tap 通道。同样,selector-expression 是一个布尔类型的 SpEL 表达式,具有相同的作用:如果表达式的值为 true,则消息将被发送到 tap 通道。

全局线缆窃听配置

可以配置一个全局线程监听器作为 Global Channel Interceptor Configuration 的特例。为此,配置一个顶级 wire-tap 元素。现在,除了正常的 wire-tap 命名空间支持外,还支持 patternorder 属性,并且它们的工作方式与 channel-interceptor 中的方式完全相同。以下示例展示了如何配置一个全局线程监听器:

<wire-tap pattern="*" order="1">
<!-- configuration details -->
</wire-tap>
xml
@Bean
@GlobalChannelInterceptor(patterns = "input*,thing2*,thing1", order = 3)
public WireTap wireTap(MessageChannel wiretapChannel) {
return new WireTap(wiretapChannel);
}
java
提示

全局线控监听提供了一种便捷的方法,可以在不修改现有通道配置的情况下,在外部配置单通道线控监听。为此,将 pattern 属性设置为目标通道名称。例如,你可以使用此技术配置测试用例以验证通道上的消息。