跳到主要内容

消息端点

QWen Plus 中英对照 Message Endpoints

本章的第一部分涵盖了一些背景理论,并揭示了驱动 Spring Integration 各种消息组件的底层 API 的很多内容。如果您想真正了解幕后发生了什么,这些信息会很有帮助。但是,如果您想快速上手使用各种元素的简化命名空间配置,可以先跳转到 Endpoint Namespace Support

如概述中所述,消息端点负责将各种消息组件连接到通道。在接下来的几章中,我们将介绍许多不同的消费消息的组件。其中一些还能够发送回复消息。发送消息非常直接。正如我们在 Message Channels 中所展示的,你可以将消息发送到消息通道。然而,接收则要复杂一些。主要原因是有两种类型的消费者:轮询消费者事件驱动消费者

在这两者中,事件驱动的消费者要简单得多。由于不需要管理和调度单独的轮询线程,它们本质上是带有回调方法的监听器。当连接到 Spring Integration 的可订阅消息通道时,这种简单的选项效果很好。然而,当连接到缓冲的、可轮询的消息通道时,某些组件必须调度和管理轮询线程。Spring Integration 提供了两种不同的端点实现来适应这两种类型的消费者。因此,消费者本身只需要实现回调接口。当需要轮询时,端点充当消费者实例的容器。其好处类似于使用容器托管消息驱动的 bean,但由于这些消费者是运行在 ApplicationContext 内的 Spring 管理对象,它更类似于 Spring 自己的 MessageListener 容器。

消息处理器

Spring Integration 的 MessageHandler 接口由框架内的许多组件实现。换句话说,这并不是公共 API 的一部分,通常你不会直接实现 MessageHandler。然而,它被消息消费者用于实际处理消费的消息,因此了解这个策略接口有助于理解消费者的总体作用。该接口定义如下:

public interface MessageHandler {

void handleMessage(Message<?> message);

}
java

尽管其简单性,此接口为接下来章节中涵盖的大多数组件(路由器、转换器、拆分器、聚合器、服务激活器等)提供了基础。这些组件各自执行与其处理的消息相关的非常不同的功能,但实际接收消息的要求是相同的,轮询和事件驱动行为之间的选择也是相同的。Spring Integration 提供了两种端点实现,它们托管这些基于回调的处理器,并让它们连接到消息通道。

事件驱动的消费者

因为它是两者中较简单的一个,所以我们首先介绍事件驱动的消费者端点。您可能还记得,SubscribableChannel 接口提供了一个 subscribe() 方法,该方法接受一个 MessageHandler 参数(如 SubscribableChannel 中所示)。以下列表显示了 subscribe 方法的定义:

subscribableChannel.subscribe(messageHandler);
java

由于订阅通道的处理器不需要主动轮询该通道,因此这是一个事件驱动的消费者。Spring Integration 提供的实现接受一个 SubscribableChannel 和一个 MessageHandler,如下例所示:

SubscribableChannel channel = context.getBean("subscribableChannel", SubscribableChannel.class);

EventDrivenConsumer consumer = new EventDrivenConsumer(channel, exampleHandler);
java

轮询消费者

Spring Integration 还提供了一个 PollingConsumer,它可以以相同的方式实例化,只是通道必须实现 PollableChannel,如下例所示:

PollableChannel channel = context.getBean("pollableChannel", PollableChannel.class);

PollingConsumer consumer = new PollingConsumer(channel, exampleHandler);
java
备注

有关轮询消费者的更多信息,请参阅 Channel AdapterChannel Adapter

轮询消费者还有许多其他的配置选项。以下示例展示了如何设置触发器:

PollingConsumer consumer = new PollingConsumer(channel, handler);

consumer.setTrigger(new PeriodicTrigger(Duration.ofSeconds(30)));
java

PeriodicTrigger 通常使用简单的间隔(Duration)定义,但也支持 initialDelay 属性和布尔 fixedRate 属性(默认值为 false —— 即,没有固定延迟)。以下示例同时设置了这两个属性:

PeriodicTrigger trigger = new PeriodicTrigger(Duration.ofSeconds(1));
trigger.setInitialDelay(Duration.ofSeconds(5));
trigger.setFixedRate(true);
java

上述三个设置的结果是一个触发器,它等待 5 秒,然后每秒触发一次。

CronTrigger 需要一个有效的 cron 表达式。详情请参阅 Javadoc。以下示例设置了一个新的 CronTrigger

CronTrigger trigger = new CronTrigger("*/10 * * * * MON-FRI");
java

上一个示例中定义的触发器的结果是一个每十秒触发一次的触发器,从周一到周五。

备注

轮询端点的默认触发器是一个 PeriodicTrigger 实例,具有 1 秒的固定延迟周期。

除了触发器之外,你还可以指定两个与轮询相关的配置属性:maxMessagesPerPollreceiveTimeout。以下示例展示了如何设置这两个属性:

PollingConsumer consumer = new PollingConsumer(channel, handler);

consumer.setMaxMessagesPerPoll(10);
consumer.setReceiveTimeout(5000);
java

maxMessagesPerPoll 属性指定了在给定的轮询操作中接收的最大消息数量。这意味着轮询器会继续调用 receive() 而不等待,直到返回 null 或达到最大值。例如,如果轮询器有一个十秒的时间间隔触发器,并且 maxMessagesPerPoll 设置为 25,并且它正在轮询一个队列中有 100 条消息的通道,则可以在 40 秒内检索到所有 100 条消息。它先获取 25 条,等待十秒,再获取下 25 条,依此类推。如果 maxMessagesPerPoll 配置为负值,则在单个轮询周期内将继续调用 MessageSource.receive() 直到它返回 null。从 5.5 版开始,0 值具有特殊含义 - 完全跳过 MessageSource.receive() 调用,这可以被视为暂停此轮询端点,直到稍后通过控制总线等将 maxMessagesPerPoll 更改为非零值。

receiveTimeout 属性指定了轮询器在调用接收操作时如果没有可用消息应该等待的时间。例如,考虑两个表面上看似相似但实际上差异很大的选项:第一个选项的间隔触发时间为 5 秒,接收超时时间为 50 毫秒;而第二个选项的间隔触发时间为 50 毫秒,接收超时时间为 5 秒。第一个配置可能在通道上接收到消息的时间比实际晚 4950 毫秒(如果该消息是在其一次轮询调用返回后立即到达)。另一方面,第二种配置永远不会错过超过 50 毫秒的消息。区别在于第二种选项需要一个线程来等待。然而,因此它可以对到达的消息做出更快的响应。这种技术被称为“长轮询”,可以用来模拟轮询源上的事件驱动行为。

一个轮询消费者也可以委托给 Spring TaskExecutor,如下例所示:

PollingConsumer consumer = new PollingConsumer(channel, handler);

TaskExecutor taskExecutor = context.getBean("exampleExecutor", TaskExecutor.class);
consumer.setTaskExecutor(taskExecutor);
java

此外,PollingConsumer 有一个名为 adviceChain 的属性。此属性允许你指定一个 AOP 建议的 List,用于处理额外的横切关注点,包括事务。这些建议应用于 doPoll() 方法周围。对于更深入的信息,请参阅 Endpoint Namespace Support 下的 AOP 建议链和事务支持部分。另请参阅 @Poller 注解 Javadoc 和相应的 消息注解支持部分。Java DSL 还提供了一个 .poller() 端点配置选项及其相应的 Pollers 工厂。

前面的例子展示了依赖查找。但是,请记住,这些消费者通常被配置为 Spring bean 定义。实际上,Spring Integration 还提供了一个名为 ConsumerEndpointFactoryBeanFactoryBean,它根据通道的类型创建适当的消费者类型。此外,Spring Integration 完全支持 XML 命名空间,以进一步隐藏这些细节。基于命名空间的配置在本指南中每介绍一种组件类型时都会有所体现。

备注

许多 MessageHandler 实现可以生成回复消息。如前所述,发送消息与接收消息相比微不足道。然而,发送回复消息的时间和数量取决于处理器类型。例如,聚合器等待一定数量的消息到达,并且通常配置为拆分器的下游使用者,后者可以为每个处理的消息生成多个回复。当使用命名空间配置时,您不需要严格了解所有细节。但是,仍然值得知道这些组件中的许多共享一个共同的基类,即 AbstractReplyProducingMessageHandler,并且它提供了一个 setOutputChannel(..) 方法。

端点命名空间支持

在整个参考手册中,您可以找到针对端点元素(如 router、transformer、service-activator 等)的具体配置示例。这些大多数支持 input-channel 属性,许多还支持 output-channel 属性。解析后,这些端点元素会产生一个 PollingConsumerEventDrivenConsumer 的实例,具体取决于引用的 input-channel 类型:PollableChannelSubscribableChannel。当通道是可轮询时,轮询行为基于端点元素的 poller 子元素及其属性。

以下列出了 poller 的所有可用配置选项:

<int:poller cron=""                                  // <1>
default="false" // <2>
error-channel="" // <3>
fixed-delay="" // <4>
fixed-rate="" // <5>
initial-delay="" // <6>
id="" // <7>
max-messages-per-poll="" // <8>
receive-timeout="" // <9>
ref="" // <10>
task-executor="" // <11>
time-unit="MILLISECONDS" // <12>
trigger=""> // <13>
<int:advice-chain /> // <14>
<int:transactional /> // <15>
</int:poller>
xml
  • 提供了使用 Cron 表达式配置轮询器的能力。底层实现使用 org.springframework.scheduling.support.CronTrigger。如果设置了此属性,则以下属性不能被指定:fixed-delaytriggerfixed-rateref

  • 通过将此属性设置为 true,您可以定义一个全局默认轮询器。如果在应用程序上下文中定义了多个默认轮询器,则会抛出异常。任何连接到 PollableChannel (PollingConsumer) 或任何没有显式配置轮询器的 SourcePollingChannelAdapter 的端点都将使用全局默认轮询器。默认值为 false。可选。

  • 标识如果在此轮询器调用中发生故障时发送错误消息的通道。要完全抑制异常,可以提供对 nullChannel 的引用。可选。

  • 固定延迟触发器使用 PeriodicTrigger 实现。数值以 time-unit 表示,或者可以是持续时间格式(从版本 6.2 开始),例如 PT10SP1D。如果设置了此属性,则以下属性不能被指定:fixed-ratetriggercronref

  • 固定速率触发器使用 PeriodicTrigger 实现。数值以 time-unit 表示,或者可以是持续时间格式(从版本 6.2 开始),例如 PT10SP1D。如果设置了此属性,则以下属性不能被指定:fixed-delaytriggercronref

  • PeriodicTrigger 的初始延迟(从版本 6.2 开始)。数值以 time-unit 表示,或者可以是持续时间格式,例如 PT10SP1D

  • 引用轮询器的基础 bean 定义的 ID,其类型为 org.springframework.integration.scheduling.PollerMetadata。对于顶级轮询器元素,id 属性是必需的,除非它是默认轮询器 (default="true" )。

  • 有关更多信息,请参阅 配置入站通道适配器。如果没有指定,默认值取决于上下文。如果您使用 PollingConsumer,此属性默认为 -1。但是,如果您使用 SourcePollingChannelAdapter,则 max-messages-per-poll 属性默认为 1。可选。

  • 设置在基础类 PollerMetadata 上。如果没有指定,默认为 1000(毫秒)。可选。

  • 对另一个顶级轮询器的 Bean 引用。ref 属性不能出现在顶级 poller 元素上。但是,如果设置了此属性,则以下属性不能被指定:fixed-ratetriggercronfixed-delay

  • 提供引用自定义任务执行器的能力。有关更多信息,请参阅 任务执行器支持。可选。

  • 此属性指定了底层 org.springframework.scheduling.support.PeriodicTrigger 上的 java.util.concurrent.TimeUnit 枚举值。因此,此属性只能与 fixed-delayfixed-rate 属性一起使用。如果与 crontrigger 引用属性结合使用,会导致失败。PeriodicTrigger 支持的最小粒度为毫秒。因此,可用选项只有毫秒和秒。如果没有提供此值,任何 fixed-delayfixed-rate 值都将被视为毫秒。基本上,此枚举为基于秒的时间间隔触发器值提供了便利。对于小时、天和月的设置,我们建议使用 cron 触发器。

  • 引用任何实现了 org.springframework.scheduling.Trigger 接口的 Spring 配置的 Bean。但是,如果设置了此属性,则以下属性不能被指定:fixed-delayfixed-ratecronref。可选。

  • 允许指定额外的 AOP 切面来处理其他横切关注点。有关更多信息,请参阅 事务。可选。

  • 轮询器可以进行事务化。有关更多信息,请参阅 AOP 切面链。可选。

示例

一个简单的基于间隔的轮询器,每隔 1 秒钟轮询一次,可以如下配置:

<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller fixed-rate="1000"/>
</int:transformer>
xml

作为使用 fixed-rate 属性的替代方案,你也可以使用 fixed-delay 属性。

对于基于 Cron 表达式的轮询器,改用 cron 属性,如下例所示:

<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller cron="*/10 * * * * MON-FRI"/>
</int:transformer>
xml

如果输入通道是 PollableChannel,则需要配置轮询器。具体来说,如前所述,triggerPollingConsumer 类的必需属性。因此,如果您省略了轮询消费者端点配置中的 poller 子元素,则可能会抛出异常。如果尝试在连接到非轮询通道的元素上配置轮询器,也可能会抛出异常。

也可以创建顶级轮询器,在这种情况下,只需要一个 ref 属性,如下例所示:

<int:poller id="weekdayPoller" cron="*/10 * * * * MON-FRI"/>

<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller ref="weekdayPoller"/>
</int:transformer>
xml
备注

ref 属性只允许在内部轮询器定义中使用。在顶级轮询器上定义此属性会导致在应用程序上下文初始化期间抛出配置异常。

全局默认轮询器

为了进一步简化配置,您可以定义一个全局默认轮询器。在 XML DSL 中,单个顶级轮询器组件可以将 default 属性设置为 true。对于 Java 配置,在这种情况下必须声明一个名为 PollerMetadata.DEFAULT_POLLERPollerMetadata bean。在这种情况下,任何具有 PollableChannel 作为其输入通道的端点,如果定义在同一个 ApplicationContext 内,并且没有显式配置的 poller,则会使用该默认值。以下示例展示了这样的轮询器和一个使用它的转换器:

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setMaxMessagesPerPoll(5);
pollerMetadata.setTrigger(new PeriodicTrigger(3000));
return pollerMetadata;
}

// No 'poller' attribute because there is a default global poller
@Bean
public IntegrationFlow transformFlow(MyTransformer transformer) {
return IntegrationFlow.from(MessageChannels.queue("pollable"))
.transform(transformer) // No 'poller' attribute because there is a default global poller
.channel("output")
.get();
}
java

事务支持

Spring Integration 还为轮询器提供了事务支持,以便每次接收和转发操作可以作为一个原子工作单元执行。要为轮询器配置事务,添加 <transactional/> 子元素。以下示例显示了可用的属性:

<int:poller fixed-delay="1000">
<int:transactional transaction-manager="txManager"
propagation="REQUIRED"
isolation="REPEATABLE_READ"
timeout="10000"
read-only="false"/>
</int:poller>
xml

有关更多信息,请参阅轮询器事务支持

AOP 切面链

由于 Spring 事务支持依赖于使用 TransactionInterceptor(AOP 建议)处理由轮询器发起的消息流的事务行为的代理机制,有时您必须提供额外的建议来处理与轮询器相关的其他交叉行为。为此,poller 定义了一个 advice-chain 元素,允许你在实现 MethodInterceptor 接口的类中添加更多建议。以下示例展示了如何为 poller 定义 advice-chain

<int:service-activator id="advicedSa" input-channel="goodInputWithAdvice" ref="testBean"
method="good" output-channel="output">
<int:poller max-messages-per-poll="1" fixed-rate="10000">
<int:advice-chain>
<ref bean="adviceA" />
<beans:bean class="org.something.SampleAdvice" />
<ref bean="txAdvice" />
</int:advice-chain>
</int:poller>
</int:service-activator>
xml

有关如何实现 MethodInterceptor 接口的更多信息,请参阅 Spring Framework 参考指南的 AOP 部分。也可以将建议链应用于没有事务配置的轮询器,使您可以增强由轮询器发起的消息流的行为。

important

在使用建议链时,不能指定 <transactional/> 子元素。相反,应声明一个 <tx:advice/> bean 并将其添加到 <advice-chain/>。有关完整的配置详细信息,请参阅 轮询器事务支持

TaskExecutor 支持

轮询线程可以由 Spring 的 TaskExecutor 抽象的任何实例执行。这为一个端点或一组端点启用了并发性。从 Spring 3.0 开始,核心 Spring 框架有一个 task 命名空间,其 <executor/> 元素支持创建一个简单的线程池执行器。该元素接受常见并发设置的属性,例如 pool-size 和 queue-capacity。配置线程池执行器可以在负载下对端点的性能产生显著影响。这些设置对每个端点都可用,因为端点的性能是需要考虑的主要因素之一(另一个主要因素是端点订阅的通道上的预期流量)。要为使用 XML 命名空间支持配置的轮询端点启用并发,请在 <poller/> 元素上提供 task-executor 引用,然后提供以下示例中所示的一个或多个属性:

<int:poller task-executor="pool" fixed-rate="1000"/>

<task:executor id="pool"
pool-size="5-25"
queue-capacity="20"
keep-alive="120"/>
xml

如果你不提供任务执行器(task-executor),消费者的处理器将在调用者的线程中被调用。请注意,调用者通常是默认的 TaskScheduler (参见 配置任务调度器)。你还应该记住,task-executor 属性可以通过指定bean名称来提供对Spring的 TaskExecutor 接口任何实现的引用。前面所示的 executor 元素是为了方便而提供的。

如前所述的轮询消费者背景部分,你也可以配置一个轮询消费者以模拟事件驱动行为。通过在触发器中设置较长的接收超时时间和较短的时间间隔,你可以确保即使在轮询的消息源上也能及时响应到达的消息。请注意,这仅适用于具有带超时的阻塞等待调用的源。例如,文件轮询器不会阻塞。每个 receive() 调用都会立即返回,并且要么包含新文件,要么不包含。因此,即使轮询器包含较长的 receive-timeout,该值在这种情况下也永远不会被使用。另一方面,在使用 Spring Integration 自己的基于队列的通道时,超时值确实有机会参与其中。以下示例展示了轮询消费者如何几乎即时地接收消息:

<int:service-activator input-channel="someQueueChannel"
output-channel="output">
<int:poller receive-timeout="30000" fixed-rate="10"/>

</int:service-activator>
xml

使用这种方法不会带来太多的开销,因为在其内部,它只是一个定时等待的线程,这比(例如)一个忙等的无限循环所需的 CPU 资源要少得多。

在运行时更改轮询率

在使用 fixed-delayfixed-rate 属性配置轮询器时,默认实现使用 PeriodicTrigger 实例。PeriodicTrigger 是 Spring 框架的核心部分。它仅接受构造函数参数的时间间隔,因此无法在运行时更改。

但是,您可以定义自己的 org.springframework.scheduling.Trigger 接口实现。您甚至可以使用 PeriodicTrigger 作为起点。然后,您可以添加一个设置间隔(周期)的 setter,或者您可以在触发器本身内嵌入自己的节流逻辑。period 属性在每次调用 nextExecutionTime 时用于安排下一次轮询。要在轮询器中使用此自定义触发器,请在应用程序上下文中声明自定义触发器的 bean 定义,并通过使用 trigger 属性将依赖项注入到轮询器配置中,该属性引用自定义触发器 bean 实例。现在,您可以获取对触发器 bean 的引用,并在轮询之间更改轮询间隔。

例如,可以参见 Spring Integration Samples 项目。它包含一个名为 dynamic-poller 的示例,该示例使用自定义触发器并展示了在运行时更改轮询间隔的能力。

示例提供了一个自定义触发器,该触发器实现了 org.springframework.scheduling.Trigger 接口。示例中的触发器基于 Spring 的 PeriodicTrigger 实现。但是,自定义触发器的字段不是 final 的,并且属性具有显式的 getter 和 setter,允许你在运行时动态更改轮询周期。

备注

需要注意的是,由于触发器方法是 nextExecutionTime(),对动态触发器的任何更改都不会生效,直到根据现有配置进行下一次轮询。无法强制触发器在当前配置的下次执行时间之前触发。

Payload 类型转换

在整个参考手册中,您还可以看到各种端点的具体配置和实现示例,这些端点接受消息或任何任意的 Object 作为输入参数。在 Object 的情况下,此类参数会被映射到消息的有效负载或有效负载的一部分或头(当使用 Spring 表达式语言时)。然而,端点方法的输入参数类型有时与有效负载或其部分的类型不匹配。在这种情况下,我们需要执行类型转换。Spring Integration 提供了一种方便的方法来注册类型转换器(通过使用 Spring ConversionService),在其自己的名为 integrationConversionService 的转换服务 bean 实例内。该 bean 在使用 Spring Integration 基础设施定义第一个转换器时自动创建。要注册一个转换器,您可以实现 org.springframework.core.convert.converter.Converterorg.springframework.core.convert.converter.GenericConverterorg.springframework.core.convert.converter.ConverterFactory

Converter 的实现是最简单的,它将一种类型转换为另一种类型。对于更复杂的情况,例如转换到类层次结构,你可以实现 GenericConverter 以及可能的 ConditionalConverter。这些接口可以让你完全访问 fromto 类型描述符,从而实现复杂的转换。例如,如果你有一个名为 Something 的抽象类作为转换的目标(参数类型、通道数据类型等),并且你有两个具体的实现类 Thing1Thing,你希望根据输入类型转换为其中一个,那么 GenericConverter 将是一个很好的选择。有关更多信息,请参阅这些接口的 Javadoc:

当你实现你的转换器后,你可以很方便地注册它并支持命名空间,如下例所示:

<int:converter ref="sampleConverter"/>

<bean id="sampleConverter" class="foo.bar.TestConverter"/>
xml

或者,你可以使用一个内部bean,如下例所示:

<int:converter>
<bean class="o.s.i.config.xml.ConverterParserTests$TestConverter3"/>
</int:converter>
xml

从 Spring Integration 4.0 开始,你可以使用注解来创建前面的配置,如下例所示:

@Component
@IntegrationConverter
public class TestConverter implements Converter<Boolean, Number> {

public Number convert(Boolean source) {
return source ? 1 : 0;
}

}
java

或者,你可以使用 @Configuration 注解,如下例所示:

@Configuration
@EnableIntegration
public class ContextConfiguration {

@Bean
@IntegrationConverter
public SerializingConverter serializingConverter() {
return new SerializingConverter();
}

}
java
important

在配置应用程序上下文时,Spring 框架允许你添加一个 conversionService bean(参见 配置 ConversionService 章节)。当需要时,此服务用于在创建和配置 bean 期间执行适当的转换。

相比之下,integrationConversionService 用于运行时转换。这些用途是相当不同的。旨在用于连接 bean 构造函数参数和属性的转换器如果在运行时用于 Spring Integration 表达式评估(针对数据类型通道中的消息、有效负载类型转换器等)可能会产生意外结果。

但是,如果你想使用 Spring 的 conversionService 作为 Spring Integration 的 integrationConversionService,你可以在应用程序上下文中配置一个别名,如下例所示:

<alias name="conversionService" alias="integrationConversionService"/>
xml

在这种情况下,由 conversionService 提供的转换器可用于 Spring Integration 运行时转换。

内容类型转换

从 5.0 版本开始,默认情况下,方法调用机制基于 org.springframework.messaging.handler.invocation.InvocableHandlerMethod 基础设施。它的 HandlerMethodArgumentResolver 实现(如 PayloadArgumentResolverMessageMethodArgumentResolver)可以使用 MessageConverter 抽象将传入的 payload 转换为目标方法参数类型。转换可以基于 contentType 消息头。为此,Spring Integration 提供了 ConfigurableCompositeMessageConverter,它委托给一组注册的转换器列表,直到其中一个返回非空结果为止。默认情况下,此转换器提供(严格按顺序):

请参阅 Javadoc (见上述列表中的链接),以了解更多关于其目的和适当的 contentType 值的信息。使用 ConfigurableCompositeMessageConverter 是因为它可以提供任何其他 MessageConverter 实现,包括或不包括前面提到的默认转换器。它还可以作为适当的功能在应用程序上下文中注册,覆盖默认转换器,如下例所示:

@Bean(name = IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME)
public ConfigurableCompositeMessageConverter compositeMessageConverter() {
List<MessageConverter> converters =
Arrays.asList(new MarshallingMessageConverter(jaxb2Marshaller()),
new JavaSerializationMessageConverter());
return new ConfigurableCompositeMessageConverter(converters);
}
java

这两个新的转换器在默认转换器之前注册到组合中。你也可以不使用 ConfigurableCompositeMessageConverter,而是通过注册一个名为 integrationArgumentResolverMessageConverter 的 bean 来提供自己的 MessageConverter(通过设置 IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME 属性)。

备注

基于 MessageConverter(包括 contentType 标头)的转换在使用 SpEL 方法调用时不可用。在这种情况下,只能使用上述 有效负载类型转换 中提到的常规类到类的转换。

异步轮询

如果你想让轮询异步进行,轮询器可以可选地指定一个 task-executor 属性,该属性指向任何现有的 TaskExecutor bean 实例(Spring 3.0 通过 task 命名空间提供了一种方便的命名空间配置)。然而,在使用 TaskExecutor 配置轮询器时,有某些事情你必须了解。

问题是存在两个配置,一个是轮询器(poller),另一个是 TaskExecutor。这两个必须协调一致,否则你可能会造成人为的内存泄漏。

考虑以下配置:

<int:channel id="publishChannel">
<int:queue />
</int:channel>

<int:service-activator input-channel="publishChannel" ref="myService">
<int:poller receive-timeout="5000" task-executor="taskExecutor" fixed-rate="50" />
</int:service-activator>

<task:executor id="taskExecutor" pool-size="20" />
xml

前面的配置示例展示了一个不协调的配置。

默认情况下,任务执行器有一个无界的任务队列。轮询器会不断调度新任务,即使所有线程都被阻塞,等待新消息到达或超时到期。鉴于有 20 个线程以五秒的超时时间执行任务,它们以每秒 4 个的速度执行。然而,新任务以每秒 20 个的速度被调度,因此任务执行器中的内部队列以每秒 16 个的速度增长(当进程空闲时),所以我们有一个内存泄漏。

处理此问题的一种方法是设置任务执行器的 queue-capacity 属性。即使是 0 也是一个合理的值。您还可以通过设置 Task Executor 的 rejection-policy 属性来指定如何处理无法排队的消息来进行管理(例如,设置为 DISCARD)。换句话说,在配置 TaskExecutor 时,必须了解一些细节。有关该主题的更多详细信息,请参阅 Spring 参考手册中的 “任务执行和调度”

端点内部 Bean

许多端点是复合的 bean。这包括所有消费者和所有轮询的入站通道适配器。消费者(轮询或事件驱动)委托给一个 MessageHandler。轮询适配器通过委托给一个 MessageSource 来获取消息。通常,获取对委托 bean 的引用是有用的,可能是在运行时更改配置或用于测试。这些 bean 可以从 ApplicationContext 中使用众所周知的名称获取。MessageHandler 实例会以类似于 someConsumer.handler 的 bean ID 注册到应用程序上下文中(其中 'consumer' 是端点的 id 属性值)。MessageSource 实例会以类似于 somePolledAdapter.source 的 bean ID 注册,其中 'somePolledAdapter' 是适配器的 ID。

前述内容仅适用于框架组件本身。相反,你可以使用内部的bean定义,如下例所示:

<int:service-activator id="exampleServiceActivator" input-channel="inChannel"
output-channel = "outChannel" method="foo">
<beans:bean class="org.foo.ExampleServiceActivator"/>
</int:service-activator>
xml

该 bean 被视为任何声明的内部 bean,并且不会在应用程序上下文中注册。如果您希望以其他方式访问此 bean,请在顶级声明它并带有 id,并使用 ref 属性。更多信息请参见 Spring 文档