跳到主要内容
版本:7.0.2

消息端点

DeepSeek V3 中英对照 Message Endpoints

本章的第一部分涵盖了一些背景理论,并揭示了驱动 Spring Integration 各种消息传递组件的底层 API 的诸多细节。如果您希望真正理解幕后的运作机制,这些信息会很有帮助。不过,如果您想直接开始使用基于命名空间的简化配置来设置各种元素,现在可以跳过这部分,直接阅读端点命名空间支持

正如概述中所述,消息端点负责将各种消息组件连接到通道。在接下来的几章中,我们将介绍多种消费消息的不同组件。其中一些组件也能够发送回复消息。发送消息相当直接。如先前在消息通道中所示,您可以将消息发送到消息通道。然而,接收消息则稍显复杂。主要原因是存在两种类型的消费者:轮询消费者事件驱动消费者

在这两种消费者中,事件驱动型消费者要简单得多。它们本质上就是带有回调方法的监听器,无需管理和调度独立的轮询线程。当连接到 Spring Integration 的可订阅消息通道时,这种简单方案非常适用。然而,当连接到带缓冲、可轮询的消息通道时,就需要有组件来调度和管理轮询线程。Spring Integration 提供了两种不同的端点实现来适配这两类消费者。因此,消费者本身只需实现回调接口。当需要轮询时,端点就充当消费者实例的容器。这样做的好处类似于使用容器托管消息驱动 Bean,但由于这些消费者是在 ApplicationContext 中运行的 Spring 托管对象,因此更接近于 Spring 自身的 MessageListener 容器。

消息处理器

Spring Integration 的 MessageHandler 接口由框架内的许多组件实现。换句话说,这不是公共 API 的一部分,您通常不会直接实现 MessageHandler。尽管如此,消息消费者会使用它来实际处理已消费的消息,因此了解这个策略接口确实有助于理解消费者的整体角色。该接口定义如下:

public interface MessageHandler {

void handleMessage(Message<?> message);

}

尽管接口简单,但它为后续章节涵盖的大多数组件(路由器、转换器、拆分器、聚合器、服务激活器等)提供了基础。这些组件各自对处理的消息执行截然不同的功能,但实际接收消息的需求是相同的,轮询与事件驱动行为之间的选择也是相同的。Spring Integration 提供了两种端点实现,用于承载这些基于回调的处理程序,并允许它们连接到消息通道。

事件驱动型消费者

由于事件驱动型消费者端点相对简单,我们先介绍它。你可能还记得 SubscribableChannel 接口提供了一个 subscribe() 方法,该方法接受一个 MessageHandler 参数(如 SubscribableChannel 所示)。以下清单展示了 subscribe 方法的定义:

subscribableChannel.subscribe(messageHandler);

由于订阅通道的处理器无需主动轮询该通道,因此这是一种事件驱动的消费者模式。Spring Integration 提供的实现接受一个 SubscribableChannel 和一个 MessageHandler,如下例所示:

SubscribableChannel channel = context.getBean("subscribableChannel", SubscribableChannel.class);

EventDrivenConsumer consumer = new EventDrivenConsumer(channel, exampleHandler);

轮询消费者

Spring Integration 还提供了一个 PollingConsumer,其实例化方式与上述相同,但通道必须实现 PollableChannel,如下例所示:

PollableChannel channel = context.getBean("pollableChannel", PollableChannel.class);

PollingConsumer consumer = new PollingConsumer(channel, exampleHandler);
备注

有关轮询消费者的更多信息,请参阅通道适配器通道适配器

轮询消费者还有许多其他配置选项。以下示例展示了如何设置触发器:

PollingConsumer consumer = new PollingConsumer(channel, handler);

consumer.setTrigger(new PeriodicTrigger(Duration.ofSeconds(30)));

PeriodicTrigger 通常使用简单的间隔(Duration)来定义,但也支持 initialDelay 属性和布尔类型的 fixedRate 属性(默认值为 false,即不固定延迟)。以下示例设置了这两个属性:

PeriodicTrigger trigger = new PeriodicTrigger(Duration.ofSeconds(1));
trigger.setInitialDelay(Duration.ofSeconds(5));
trigger.setFixedRate(true);

上述示例中三种设置的结果是:触发器等待五秒后,每秒触发一次。

CronTrigger 需要一个有效的 cron 表达式。详情请参阅 Javadoc。以下示例设置了一个新的 CronTrigger

CronTrigger trigger = new CronTrigger("*/10 * * * * MON-FRI");

根据前例定义的触发器,其结果是创建一个每周一至周五每十秒触发一次的触发器。

备注

默认的轮询端点触发器是一个 PeriodicTrigger 实例,其固定延迟周期为 1 秒。

除了触发器之外,您还可以指定另外两个与轮询相关的配置属性:maxMessagesPerPollreceiveTimeout。以下示例展示了如何设置这两个属性:

PollingConsumer consumer = new PollingConsumer(channel, handler);

consumer.setMaxMessagesPerPoll(10);
consumer.setReceiveTimeout(5000);

maxMessagesPerPoll 属性用于指定在单个轮询操作中接收消息的最大数量。这意味着轮询器会持续调用 receive() 方法而不等待,直到返回 null 或达到最大值。例如,如果一个轮询器设置了十秒的间隔触发器,且 maxMessagesPerPoll 设置为 25,并且它正在轮询一个队列中有 100 条消息的通道,那么所有 100 条消息可以在 40 秒内被获取。它会先获取 25 条,等待十秒,再获取下 25 条,依此类推。如果 maxMessagesPerPoll 配置为负值,则会在单个轮询周期内持续调用 MessageSource.receive(),直到返回 null。从版本 5.5 开始,0 值具有特殊含义——完全跳过 MessageSource.receive() 调用,这可以被视为暂停此轮询端点,直到后续将 maxMessagesPerPoll 更改为非零值,例如通过控制总线进行修改。

receiveTimeout 属性指定了当轮询器调用接收操作时,如果没有可用消息,它应该等待的时间。例如,考虑两个表面上看起来相似但实际上截然不同的选项:第一个选项的间隔触发器为 5 秒,接收超时为 50 毫秒;而第二个选项的间隔触发器为 50 毫秒,接收超时为 5 秒。第一个选项接收消息的时间可能比它在通道上接受消息的时间晚最多 4950 毫秒(如果该消息在其轮询调用返回后立即到达)。另一方面,第二种配置永远不会错过超过 50 毫秒的消息。区别在于第二个选项需要一个线程来等待。然而,这样一来,它能够更快地响应到达的消息。这种被称为“长轮询”的技术可用于在轮询源上模拟事件驱动的行为。

轮询消费者也可以委托给 Spring 的 TaskExecutor,如下例所示:

PollingConsumer consumer = new PollingConsumer(channel, handler);

TaskExecutor taskExecutor = context.getBean("exampleExecutor", TaskExecutor.class);
consumer.setTaskExecutor(taskExecutor);

此外,PollingConsumer 具有一个名为 adviceChain 的属性。该属性允许您指定一个 AOP 通知的 List,用于处理包括事务在内的额外横切关注点。这些通知会围绕 doPoll() 方法应用。如需更深入的信息,请参阅 端点命名空间支持 下关于 AOP 通知链和事务支持的部分。另请参阅 @Poller 注解的 Javadocs 以及相应的 消息注解支持 部分。Java DSL 也提供了一个 .poller() 端点配置选项及其相应的 Pollers 工厂。

之前的示例展示了依赖查找。然而,请记住,这些消费者通常被配置为Spring bean定义。实际上,Spring Integration还提供了一个名为ConsumerEndpointFactoryBeanFactoryBean,它根据通道类型创建相应的消费者类型。此外,Spring Integration还提供了完整的XML命名空间支持,以进一步隐藏这些细节。在本指南中,随着每种组件类型的介绍,将重点展示基于命名空间的配置。

备注

许多 MessageHandler 实现能够生成回复消息。如前所述,与接收消息相比,发送消息相对简单。然而,回复消息的发送时机和数量取决于处理器的类型。例如,聚合器会等待一定数量的消息到达,并且通常被配置为拆分器的下游消费者,而拆分器可以为它处理的每条消息生成多个回复。在使用命名空间配置时,您并不严格需要了解所有细节。然而,了解这些组件中有几个共享一个共同的基类 AbstractReplyProducingMessageHandler,并且它提供了一个 setOutputChannel(..) 方法,可能仍然是有价值的。

端点命名空间支持

在本参考手册中,你可以找到端点元素的具体配置示例,例如 router、transformer、service-activator 等。这些元素大多支持 input-channel 属性,许多还支持 output-channel 属性。解析后,这些端点元素会根据所引用的 input-channel 类型(分别为 PollableChannelSubscribableChannel)生成 PollingConsumerEventDrivenConsumer 的实例。当通道为可轮询类型时,轮询行为基于端点元素的 poller 子元素及其属性。

以下是 poller 的所有可用配置选项:

<int:poller cron=""                                  // <1>
default="false" // <2>
error-channel="" // <3>
fixed-delay="" // <4>
fixed-rate="" // <5>
initial-delay="" // <6>
id="" // <7>
max-messages-per-poll="" // <8>
receive-timeout="" // <9>
ref="" // <10>
task-executor="" // <11>
time-unit="MILLISECONDS" // <12>
trigger=""> // <13>
<int:advice-chain /> // <14>
<int:transactional /> // <15>
</int:poller>
  • 提供使用 Cron 表达式配置轮询器的能力。底层实现使用 org.springframework.scheduling.support.CronTrigger。如果设置了此属性,则不得指定以下任何属性:fixed-delaytriggerfixed-rateref

  • 通过将此属性设置为 true,可以精确地定义一个全局默认轮询器。如果在应用程序上下文中定义了多个默认轮询器,则会引发异常。任何连接到 PollableChannel (PollingConsumer) 的端点或任何未显式配置轮询器的 SourcePollingChannelAdapter 都将使用此全局默认轮询器。默认值为 false。可选。

  • 标识在此轮询器调用期间发生故障时,错误消息将发送到的通道。要完全抑制异常,可以提供对 nullChannel 的引用。可选。

  • 固定延迟触发器在底层使用 PeriodicTrigger。数值单位为 time-unit,或者可以采用持续时间格式(从版本 6.2 开始),例如 PT10SP1D。如果设置了此属性,则不得指定以下任何属性:fixed-ratetriggercronref

  • 固定速率触发器在底层使用 PeriodicTrigger。数值单位为 time-unit,或者可以采用持续时间格式(从版本 6.2 开始),例如 PT10SP1D。如果设置了此属性,则不得指定以下任何属性:fixed-delaytriggercronref

  • 底层 PeriodicTrigger 的初始延迟(从版本 6.2 开始)。数值单位为 time-unit,或者可以采用持续时间格式,例如 PT10SP1D

  • 引用轮询器底层 bean 定义的 ID,其类型为 org.springframework.integration.scheduling.PollerMetadata。对于顶级轮询器元素,id 属性是必需的,除非它是默认轮询器 (default="true")。

  • 更多信息请参阅配置入站通道适配器。如果未指定,默认值取决于上下文。如果使用 PollingConsumer,此属性默认为 -1。但是,如果使用 SourcePollingChannelAdapter,则 max-messages-per-poll 属性默认为 1。可选。

  • 值设置在底层类 PollerMetadata 上。如果未指定,则默认为 1000(毫秒)。可选。

  • 对另一个顶级轮询器的 bean 引用。ref 属性不得出现在顶级 poller 元素上。但是,如果设置了此属性,则不得指定以下任何属性:fixed-ratetriggercronfixed-delay

  • 提供引用自定义任务执行器的能力。更多信息请参阅任务执行器支持。可选。

  • 此属性指定底层 org.springframework.scheduling.support.PeriodicTrigger 上的 java.util.concurrent.TimeUnit 枚举值。因此,此属性只能与 fixed-delayfixed-rate 属性结合使用。如果与 crontrigger 引用属性结合使用,将导致失败。PeriodicTrigger 支持的最小粒度是毫秒。因此,唯一可用的选项是毫秒和秒。如果未提供此值,则任何 fixed-delayfixed-rate 值都将被解释为毫秒。基本上,此枚举为基于秒的间隔触发器值提供了便利。对于每小时、每天和每月的设置,我们建议使用 cron 触发器。

  • 引用任何实现 org.springframework.scheduling.Trigger 接口的 Spring 配置的 bean。但是,如果设置了此属性,则不得指定以下任何属性:fixed-delayfixed-ratecronref。可选。

  • 允许指定额外的 AOP 通知来处理额外的横切关注点。更多信息请参阅事务。可选。

  • 轮询器可以设置为事务性的。更多信息请参阅AOP 通知链。可选。

示例

一个简单的基于间隔的轮询器,其间隔为1秒,可以按如下方式配置:

<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller fixed-rate="1000"/>
</int:transformer>

除了使用 fixed-rate 属性外,你也可以使用 fixed-delay 属性。

对于基于 Cron 表达式的轮询器,请改用 cron 属性,如下例所示:

<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller cron="*/10 * * * * MON-FRI"/>
</int:transformer>

如果输入通道是 PollableChannel,则轮询器配置是必需的。具体来说,如前所述,triggerPollingConsumer 类的一个必需属性。因此,如果为轮询消费者端点的配置省略了 poller 子元素,可能会抛出异常。如果尝试在连接到不可轮询通道的元素上配置轮询器,也可能会抛出异常。

也可以创建顶级轮询器,在这种情况下只需要一个 ref 属性,如下例所示:

<int:poller id="weekdayPoller" cron="*/10 * * * * MON-FRI"/>

<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller ref="weekdayPoller"/>
</int:transformer>
备注

ref 属性仅允许在内部轮询器定义上使用。在顶级轮询器上定义此属性会导致应用上下文初始化期间抛出配置异常。

全局默认轮询器

为了进一步简化配置,您可以定义一个全局默认轮询器。在 XML DSL 中,单个顶级轮询器组件可以设置 default 属性为 true。对于 Java 配置,在这种情况下必须声明一个名为 PollerMetadata.DEFAULT_POLLERPollerMetadata bean。这样,任何输入通道为 PollableChannel 且定义在同一 ApplicationContext 中的端点,如果没有显式配置 poller,就会使用该默认轮询器。以下示例展示了这样一个轮询器以及使用它的转换器:

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setMaxMessagesPerPoll(5);
pollerMetadata.setTrigger(new PeriodicTrigger(3000));
return pollerMetadata;
}

// No 'poller' attribute because there is a default global poller
@Bean
public IntegrationFlow transformFlow(MyTransformer transformer) {
return IntegrationFlow.from(MessageChannels.queue("pollable"))
.transform(transformer) // No 'poller' attribute because there is a default global poller
.channel("output")
.get();
}

事务支持

Spring Integration 还为轮询器提供了事务支持,使得每个接收-转发操作都能作为原子工作单元执行。要为轮询器配置事务,请添加 <transactional/> 子元素。以下示例展示了可用的属性:

<int:poller fixed-delay="1000">
<int:transactional transaction-manager="txManager"
propagation="REQUIRED"
isolation="REPEATABLE_READ"
timeout="10000"
read-only="false"/>
</int:poller>

更多信息,请参阅 轮询器事务支持

AOP 通知链

由于Spring事务支持依赖于代理机制,其中TransactionInterceptor(AOP通知)处理由轮询器发起的消息流的事务行为,因此有时您需要提供额外的通知来处理与轮询器相关的其他横切行为。为此,poller定义了一个advice-chain元素,允许您在实现MethodInterceptor接口的类中添加更多通知。以下示例展示了如何为poller定义advice-chain

<int:service-activator id="advicedSa" input-channel="goodInputWithAdvice" ref="testBean"
method="good" output-channel="output">
<int:poller max-messages-per-poll="1" fixed-rate="10000">
<int:advice-chain>
<ref bean="adviceA" />
<beans:bean class="org.something.SampleAdvice" />
<ref bean="txAdvice" />
</int:advice-chain>
</int:poller>
</int:service-activator>

有关如何实现 MethodInterceptor 接口的更多信息,请参阅 Spring Framework 参考指南的 AOP 章节。建议链也可以应用于没有任何事务配置的轮询器,从而增强由轮询器启动的消息流的行为。

important

在使用建议链时,不能指定 <transactional/> 子元素。相反,需要声明一个 <tx:advice/> bean 并将其添加到 <advice-chain/> 中。完整的配置细节请参阅 轮询器事务支持

TaskExecutor 支持

轮询线程可以由Spring的TaskExecutor抽象的任何实例执行。这为端点或端点组启用了并发功能。自Spring 3.0起,核心Spring框架提供了task命名空间,其<executor/>元素支持创建简单的线程池执行器。该元素接受常见并发设置的属性,例如池大小和队列容量。配置线程池执行器可以显著影响端点在负载下的性能表现。这些设置适用于每个端点,因为端点的性能是需要考虑的主要因素之一(另一个主要因素是端点订阅通道的预期消息量)。要为使用XML命名空间支持配置的轮询端点启用并发功能,请在其<poller/>元素上提供task-executor引用,然后提供以下示例中所示的一个或多个属性:

<int:poller task-executor="pool" fixed-rate="1000"/>

<task:executor id="pool"
pool-size="5-25"
queue-capacity="20"
keep-alive="120"/>

如果你没有提供任务执行器(task-executor),消费者的处理器将在调用者的线程中被调用。请注意,调用者通常是默认的 TaskScheduler(参见配置任务调度器)。你还应该记住,通过指定 bean 名称,task-executor 属性可以引用 Spring TaskExecutor 接口的任何实现。前面展示的 executor 元素是为了方便而提供的。

正如之前在轮询消费者的背景部分所述,您也可以配置轮询消费者以模拟事件驱动行为。通过设置较长的接收超时时间和较短的触发器间隔,即使在轮询消息源上也能确保对到达消息的及时响应。请注意,这仅适用于具有阻塞等待调用且带超时的消息源。例如,文件轮询器不会阻塞。每次 receive() 调用会立即返回,可能包含新文件也可能不包含。因此,即使轮询器设置了较长的 receive-timeout,该值在此类场景中也不会被使用。另一方面,当使用 Spring Integration 自带的基于队列的通道时,超时值确实有机会发挥作用。以下示例展示了轮询消费者如何近乎即时地接收消息:

<int:service-activator input-channel="someQueueChannel"
output-channel="output">
<int:poller receive-timeout="30000" fixed-rate="10"/>

</int:service-activator>

采用这种方法并不会带来太多开销,因为其内部实现仅是一个定时等待的线程,所需的CPU资源远低于例如一个持续空转的无限while循环。

运行时更改轮询速率

当使用 fixed-delayfixed-rate 属性配置轮询器时,默认实现会使用一个 PeriodicTrigger 实例。PeriodicTrigger 是 Spring 框架核心的一部分。它仅接受间隔时间作为构造函数的参数。因此,无法在运行时更改该间隔时间。

然而,您可以自定义实现 org.springframework.scheduling.Trigger 接口。您甚至可以将 PeriodicTrigger 作为起点,然后为间隔(周期)添加一个 setter 方法,或者将您自己的节流逻辑嵌入到触发器本身中。period 属性会在每次调用 nextExecutionTime 时用于安排下一次轮询。要在轮询器中使用此自定义触发器,请在应用程序上下文中声明自定义触发器的 bean 定义,并通过 trigger 属性将依赖项注入到轮询器配置中,该属性引用自定义触发器 bean 实例。现在,您可以获取触发器 bean 的引用,并在轮询之间更改轮询间隔。

例如,可参考 Spring Integration Samples 项目。该项目包含一个名为 dynamic-poller 的示例,该示例使用自定义触发器并展示了在运行时更改轮询间隔的能力。

该示例提供了一个自定义触发器,它实现了 org.springframework.scheduling.Trigger 接口。该示例的触发器基于 Spring 的 PeriodicTrigger 实现。然而,自定义触发器的字段并非 final 类型,并且属性具有显式的 getter 和 setter 方法,允许您在运行时动态更改轮询周期。

备注

需要注意的是,由于触发器方法为 nextExecutionTime(),因此对动态触发器的任何更改都将在下一次轮询时(基于现有配置)生效。无法强制触发器在其当前配置的下次执行时间之前触发。

载荷类型转换

在本参考手册中,您还可以看到各种端点的具体配置和实现示例,这些端点接受消息或任意 Object 作为输入参数。对于 Object 类型的参数,该参数会被映射到消息的有效载荷(payload)或其一部分,或者消息头(当使用 Spring 表达式语言时)。然而,端点方法的输入参数类型有时与有效载荷或其部分的类型不匹配。在这种情况下,我们需要执行类型转换。Spring Integration 提供了一种便捷的方式来注册类型转换器(通过使用 Spring 的 ConversionService),它在其自己的名为 integrationConversionService 的转换服务 bean 实例中完成。一旦通过 Spring Integration 基础设施定义了第一个转换器,该 bean 就会自动创建。要注册转换器,您可以实现 org.springframework.core.convert.converter.Converterorg.springframework.core.convert.converter.GenericConverterorg.springframework.core.convert.converter.ConverterFactory

Converter 实现是最简单的,用于将单一类型转换为另一种类型。对于更复杂的场景,例如转换为类层次结构,您可以实现 GenericConverter 并可能结合 ConditionalConverter。这些接口让您能够完全访问 fromto 的类型描述符,从而实现复杂的转换。例如,如果您有一个名为 Something 的抽象类作为转换目标(参数类型、通道数据类型等),并且有两个具体实现 Thing1Thing2,您希望根据输入类型决定转换为其中一种,那么 GenericConverter 将是一个合适的选择。更多信息,请参阅这些接口的 Javadoc:

当你实现了转换器后,可以将其注册到便捷的命名空间下,如下例所示:

<int:converter ref="sampleConverter"/>

<bean id="sampleConverter" class="foo.bar.TestConverter"/>

或者,您也可以使用内部 bean,如下例所示:

<int:converter>
<bean class="o.s.i.config.xml.ConverterParserTests$TestConverter3"/>
</int:converter>

从 Spring Integration 4.0 开始,你可以使用注解来创建上述配置,如下例所示:

@Component
@IntegrationConverter
public class TestConverter implements Converter<Boolean, Number> {

public Number convert(Boolean source) {
return source ? 1 : 0;
}

}

或者,你也可以使用 @Configuration 注解,如下例所示:

@Configuration
@EnableIntegration
public class ContextConfiguration {

@Bean
@IntegrationConverter
public SerializingConverter serializingConverter() {
return new SerializingConverter();
}

}
important

在配置应用上下文时,Spring Framework 允许你添加一个 conversionService bean(参见 配置 ConversionService 章节)。该服务在需要时用于在 bean 创建和配置期间执行适当的转换。

相比之下,integrationConversionService 用于运行时转换。这两种用途截然不同。用于连接 bean 构造函数参数和属性的转换器,如果在运行时用于针对数据类型的通道、负载类型转换器等中的消息进行 Spring Integration 表达式求值,可能会产生意外的结果。

然而,如果你确实希望将 Spring 的 conversionService 用作 Spring Integration 的 integrationConversionService,你可以在应用上下文中配置一个别名,如下例所示:

<alias name="conversionService" alias="integrationConversionService"/>

在这种情况下,conversionService 提供的转换器可用于 Spring Integration 的运行时转换。

内容类型转换

从版本 5.0 开始,默认情况下,方法调用机制基于 org.springframework.messaging.handler.invocation.InvocableHandlerMethod 基础设施。其 HandlerMethodArgumentResolver 实现(例如 PayloadArgumentResolverMessageMethodArgumentResolver)可以使用 MessageConverter 抽象将传入的 payload 转换为目标方法参数类型。转换可以基于 contentType 消息头进行。为此,Spring Integration 提供了 ConfigurableCompositeMessageConverter,它委托给一系列已注册的转换器依次调用,直到其中一个返回非空结果。默认情况下,此转换器提供以下转换器(按严格顺序):

有关其用途及转换时适用的 contentType 值详情,请参阅 Javadoc(链接见前文列表)。之所以使用 ConfigurableCompositeMessageConverter,是因为它可以接收任何其他 MessageConverter 实现,既可包含也可排除之前提到的默认转换器。它还可以在应用上下文中注册为合适的 bean 以覆盖默认转换器,如下例所示:

@Bean(name = IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME)
public ConfigurableCompositeMessageConverter compositeMessageConverter() {
List<MessageConverter> converters =
Arrays.asList(new MarshallingMessageConverter(jaxb2Marshaller()),
new JavaSerializationMessageConverter());
return new ConfigurableCompositeMessageConverter(converters);
}

这两个新的转换器在默认转换器之前被注册到组合中。你也可以不使用 ConfigurableCompositeMessageConverter,而是通过注册一个名为 integrationArgumentResolverMessageConverter 的 bean(通过设置 IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME 属性)来提供你自己的 MessageConverter

备注

当使用 SpEL 方法调用时,基于 MessageConverter(包括 contentType 头部)的转换不可用。在这种情况下,只有上文 Payload Type Conversion 中提到的常规类到类的转换可用。

异步轮询

如果你希望轮询是异步的,轮询器可以选择性地指定一个 task-executor 属性,该属性指向任何 TaskExecutor bean 的现有实例(Spring 3.0 通过 task 命名空间提供了便捷的命名空间配置)。然而,在使用 TaskExecutor 配置轮询器时,必须理解某些事项。

问题在于,当前存在两个配置项:轮询器(poller)和 TaskExecutor。它们必须相互协调,否则可能会导致人为的内存泄漏。

考虑以下配置:

<int:channel id="publishChannel">
<int:queue />
</int:channel>

<int:service-activator input-channel="publishChannel" ref="myService">
<int:poller receive-timeout="5000" task-executor="taskExecutor" fixed-rate="50" />
</int:service-activator>

<task:executor id="taskExecutor" pool-size="20" />

前面的配置展示了一个不协调的配置。

默认情况下,任务执行器拥有一个无界的任务队列。即使所有线程都被阻塞,轮询器仍会持续调度新任务,这些线程要么在等待新消息到达,要么在等待超时。假设有20个线程执行任务,超时时间为5秒,那么任务执行速率为每秒4个。然而,新任务的调度速率是每秒20个,因此任务执行器内部队列以每秒16个的速度增长(在进程空闲时),这就导致了内存泄漏。

处理此问题的方法之一是设置任务执行器的 queue-capacity 属性。即使设置为 0 也是一个合理的值。您还可以通过设置任务执行器的 rejection-policy 属性来指定如何处理无法排队的消息,例如将其设置为 DISCARD。换句话说,在配置 TaskExecutor 时,您必须了解某些细节。有关此主题的更多详细信息,请参阅 Spring 参考手册中的“任务执行与调度”

端点内部 Bean

许多端点都是复合型Bean。这包括所有消费者(Consumer)和所有轮询入站通道适配器(Polled Inbound Channel Adapter)。消费者(无论是轮询型还是事件驱动型)都会委托给一个 MessageHandler。轮询适配器则通过委托给 MessageSource 来获取消息。通常,获取委托Bean的引用非常有用,例如在运行时更改配置或进行测试。这些Bean可以通过已知的名称从 ApplicationContext 中获取。MessageHandler 实例在应用上下文中的注册Bean ID 类似于 someConsumer.handler(其中 'consumer' 是端点 id 属性的值)。MessageSource 实例的注册Bean ID 则类似于 somePolledAdapter.source,其中 'somePolledAdapter' 是适配器的ID。

上述内容仅适用于框架组件本身。您也可以使用内部 bean 定义,如下例所示:

<int:service-activator id="exampleServiceActivator" input-channel="inChannel"
output-channel = "outChannel" method="foo">
<beans:bean class="org.foo.ExampleServiceActivator"/>
</int:service-activator>

该 bean 的处理方式与任何声明的内部 bean 相同,并且不会在应用程序上下文中注册。如果您希望以其他方式访问此 bean,请在顶层使用 id 声明它,并使用 ref 属性进行引用。更多信息请参阅 Spring 文档