消息端点
本章的第一部分涵盖了一些背景理论,并揭示了驱动 Spring Integration 各种消息组件的底层 API 的很多内容。如果您想真正了解幕后发生了什么,这些信息会很有帮助。但是,如果您想快速上手使用各种元素的简化命名空间配置,可以先跳转到 Endpoint Namespace Support。
如概述中所述,消息端点负责将各种消息组件连接到通道。在接下来的几章中,我们将介绍许多不同的消费消息的组件。其中一些还能够发送回复消息。发送消息非常直接。正如我们在 Message Channels 中所展示的,你可以将消息发送到消息通道。然而,接收则要复杂一些。主要原因是有两种类型的消费者:轮询消费者 和 事件驱动消费者。
在这两者中,事件驱动的消费者要简单得多。由于不需要管理和调度单独的轮询线程,它们本质上是带有回调方法的监听器。当连接到 Spring Integration 的可订阅消息通道时,这种简单的选项效果很好。然而,当连接到缓冲的、可轮询的消息通道时,某些组件必须调度和管理轮询线程。Spring Integration 提供了两种不同的端点实现来适应这两种类型的消费者。因此,消费者本身只需要实现回调接口。当需要轮询时,端点充当消费者实例的容器。其好处类似于使用容器托管消息驱动的 bean,但由于这些消费者是运行在 ApplicationContext
内的 Spring 管理对象,它更类似于 Spring 自己的 MessageListener
容器。
消息处理器
Spring Integration 的 MessageHandler
接口由框架内的许多组件实现。换句话说,这并不是公共 API 的一部分,通常你不会直接实现 MessageHandler
。然而,它被消息消费者用于实际处理消费的消息,因此了解这个策略接口有助于理解消费者的总体作用。该接口定义如下:
public interface MessageHandler {
void handleMessage(Message<?> message);
}
尽管其简单性,此接口为接下来章节中涵盖的大多数组件(路由器、转换器、拆分器、聚合器、服务激活器等)提供了基础。这些组件各自执行与其处理的消息相关的非常不同的功能,但实际接收消息的要求是相同的,轮询和事件驱动行为之间的选择也是相同的。Spring Integration 提供了两种端点实现,它们托管这些基于回调的处理器,并让它们连接到消息通道。
事件驱动的消费者
因为它是两者中较简单的一个,所以我们首先介绍事件驱动的消费者端点。您可能还记得,SubscribableChannel
接口提供了一个 subscribe()
方法,该方法接受一个 MessageHandler
参数(如 SubscribableChannel 中所示)。以下列表显示了 subscribe
方法的定义:
subscribableChannel.subscribe(messageHandler);
由于订阅通道的处理器不需要主动轮询该通道,因此这是一个事件驱动的消费者。Spring Integration 提供的实现接受一个 SubscribableChannel
和一个 MessageHandler
,如下例所示:
SubscribableChannel channel = context.getBean("subscribableChannel", SubscribableChannel.class);
EventDrivenConsumer consumer = new EventDrivenConsumer(channel, exampleHandler);
轮询消费者
Spring Integration 还提供了一个 PollingConsumer
,它可以以相同的方式实例化,只是通道必须实现 PollableChannel
,如下例所示:
PollableChannel channel = context.getBean("pollableChannel", PollableChannel.class);
PollingConsumer consumer = new PollingConsumer(channel, exampleHandler);
有关轮询消费者的更多信息,请参阅 Channel Adapter 和 Channel Adapter。
轮询消费者还有许多其他的配置选项。以下示例展示了如何设置触发器:
PollingConsumer consumer = new PollingConsumer(channel, handler);
consumer.setTrigger(new PeriodicTrigger(Duration.ofSeconds(30)));
PeriodicTrigger
通常使用简单的间隔(Duration
)定义,但也支持 initialDelay
属性和布尔 fixedRate
属性(默认值为 false
—— 即,没有固定延迟)。以下示例同时设置了这两个属性:
PeriodicTrigger trigger = new PeriodicTrigger(Duration.ofSeconds(1));
trigger.setInitialDelay(Duration.ofSeconds(5));
trigger.setFixedRate(true);
上述三个设置的结果是一个触发器,它等待 5 秒,然后每秒触发一次。
CronTrigger
需要一个有效的 cron 表达式。详情请参阅 Javadoc。以下示例设置了一个新的 CronTrigger
:
CronTrigger trigger = new CronTrigger("*/10 * * * * MON-FRI");
上一个示例中定义的触发器的结果是一个每十秒触发一次的触发器,从周一到周五。
轮询端点的默认触发器是一个 PeriodicTrigger
实例,具有 1 秒的固定延迟周期。
除了触发器之外,你还可以指定两个与轮询相关的配置属性:maxMessagesPerPoll
和 receiveTimeout
。以下示例展示了如何设置这两个属性:
PollingConsumer consumer = new PollingConsumer(channel, handler);
consumer.setMaxMessagesPerPoll(10);
consumer.setReceiveTimeout(5000);
maxMessagesPerPoll
属性指定了在给定的轮询操作中接收的最大消息数量。这意味着轮询器会继续调用 receive()
而不等待,直到返回 null
或达到最大值。例如,如果轮询器有一个十秒的时间间隔触发器,并且 maxMessagesPerPoll
设置为 25
,并且它正在轮询一个队列中有 100 条消息的通道,则可以在 40 秒内检索到所有 100 条消息。它先获取 25 条,等待十秒,再获取下 25 条,依此类推。如果 maxMessagesPerPoll
配置为负值,则在单个轮询周期内将继续调用 MessageSource.receive()
直到它返回 null
。从 5.5 版开始,0
值具有特殊含义 - 完全跳过 MessageSource.receive()
调用,这可以被视为暂停此轮询端点,直到稍后通过控制总线等将 maxMessagesPerPoll
更改为非零值。
receiveTimeout
属性指定了轮询器在调用接收操作时如果没有可用消息应该等待的时间。例如,考虑两个表面上看似相似但实际上差异很大的选项:第一个选项的间隔触发时间为 5 秒,接收超时时间为 50 毫秒;而第二个选项的间隔触发时间为 50 毫秒,接收超时时间为 5 秒。第一个配置可能在通道上接收到消息的时间比实际晚 4950 毫秒(如果该消息是在其一次轮询调用返回后立即到达)。另一方面,第二种配置永远不会错过超过 50 毫秒的消息。区别在于第二种选项需要一个线程来等待。然而,因此它可以对到达的消息做出更快的响应。这种技术被称为“长轮询”,可以用来模拟轮询源上的事件驱动行为。
一个轮询消费者也可以委托给 Spring TaskExecutor
,如下例所示:
PollingConsumer consumer = new PollingConsumer(channel, handler);
TaskExecutor taskExecutor = context.getBean("exampleExecutor", TaskExecutor.class);
consumer.setTaskExecutor(taskExecutor);
此外,PollingConsumer
有一个名为 adviceChain
的属性。此属性允许你指定一个 AOP 建议的 List
,用于处理额外的横切关注点,包括事务。这些建议应用于 doPoll()
方法周围。对于更深入的信息,请参阅 Endpoint Namespace Support 下的 AOP 建议链和事务支持部分。另请参阅 @Poller
注解 Javadoc 和相应的 消息注解支持部分。Java DSL 还提供了一个 .poller() 端点配置选项及其相应的 Pollers
工厂。
前面的例子展示了依赖查找。但是,请记住,这些消费者通常被配置为 Spring bean 定义。实际上,Spring Integration 还提供了一个名为 ConsumerEndpointFactoryBean
的 FactoryBean
,它根据通道的类型创建适当的消费者类型。此外,Spring Integration 完全支持 XML 命名空间,以进一步隐藏这些细节。基于命名空间的配置在本指南中每介绍一种组件类型时都会有所体现。
许多 MessageHandler
实现可以生成回复消息。如前所述,发送消息与接收消息相比微不足道。然而,发送回复消息的时间和数量取决于处理器类型。例如,聚合器等待一定数量的消息到达,并且通常配置为拆分器的下游使用者,后者可以为每个处理的消息生成多个回复。当使用命名空间配置时,您不需要严格了解所有细节。但是,仍然值得知道这些组件中的许多共享一个共同的基类,即 AbstractReplyProducingMessageHandler
,并且它提供了一个 setOutputChannel(..)
方法。
端点命名空间支持
在整个参考手册中,您可以找到针对端点元素(如 router、transformer、service-activator 等)的具体配置示例。这些大多数支持 input-channel
属性,许多还支持 output-channel
属性。解析后,这些端点元素会产生一个 PollingConsumer
或 EventDrivenConsumer
的实例,具体取决于引用的 input-channel
类型:PollableChannel
或 SubscribableChannel
。当通道是可轮询时,轮询行为基于端点元素的 poller
子元素及其属性。
以下列出了 poller
的所有可用配置选项:
<int:poller cron="" // <1>
default="false" // <2>
error-channel="" // <3>
fixed-delay="" // <4>
fixed-rate="" // <5>
initial-delay="" // <6>
id="" // <7>
max-messages-per-poll="" // <8>
receive-timeout="" // <9>
ref="" // <10>
task-executor="" // <11>
time-unit="MILLISECONDS" // <12>
trigger=""> // <13>
<int:advice-chain /> // <14>
<int:transactional /> // <15>
</int:poller>
提供了使用 Cron 表达式配置轮询器的能力。底层实现使用
org.springframework.scheduling.support.CronTrigger
。如果设置了此属性,则以下属性不能被指定:fixed-delay
、trigger
、fixed-rate
和ref
。通过将此属性设置为
true
,您可以定义一个全局默认轮询器。如果在应用程序上下文中定义了多个默认轮询器,则会抛出异常。任何连接到PollableChannel
(PollingConsumer
) 或任何没有显式配置轮询器的SourcePollingChannelAdapter
的端点都将使用全局默认轮询器。默认值为false
。可选。标识如果在此轮询器调用中发生故障时发送错误消息的通道。要完全抑制异常,可以提供对
nullChannel
的引用。可选。固定延迟触发器使用
PeriodicTrigger
实现。数值以time-unit
表示,或者可以是持续时间格式(从版本 6.2 开始),例如PT10S
,P1D
。如果设置了此属性,则以下属性不能被指定:fixed-rate
、trigger
、cron
和ref
。固定速率触发器使用
PeriodicTrigger
实现。数值以time-unit
表示,或者可以是持续时间格式(从版本 6.2 开始),例如PT10S
,P1D
。如果设置了此属性,则以下属性不能被指定:fixed-delay
、trigger
、cron
和ref
。PeriodicTrigger
的初始延迟(从版本 6.2 开始)。数值以time-unit
表示,或者可以是持续时间格式,例如PT10S
,P1D
。引用轮询器的基础 bean 定义的 ID,其类型为
org.springframework.integration.scheduling.PollerMetadata
。对于顶级轮询器元素,id
属性是必需的,除非它是默认轮询器 (default="true"
)。有关更多信息,请参阅 配置入站通道适配器。如果没有指定,默认值取决于上下文。如果您使用
PollingConsumer
,此属性默认为-1
。但是,如果您使用SourcePollingChannelAdapter
,则max-messages-per-poll
属性默认为1
。可选。设置在基础类
PollerMetadata
上。如果没有指定,默认为 1000(毫秒)。可选。对另一个顶级轮询器的 Bean 引用。
ref
属性不能出现在顶级poller
元素上。但是,如果设置了此属性,则以下属性不能被指定:fixed-rate
、trigger
、cron
和fixed-delay
。提供引用自定义任务执行器的能力。有关更多信息,请参阅 任务执行器支持。可选。
此属性指定了底层
org.springframework.scheduling.support.PeriodicTrigger
上的java.util.concurrent.TimeUnit
枚举值。因此,此属性只能与fixed-delay
或fixed-rate
属性一起使用。如果与cron
或trigger
引用属性结合使用,会导致失败。PeriodicTrigger
支持的最小粒度为毫秒。因此,可用选项只有毫秒和秒。如果没有提供此值,任何fixed-delay
或fixed-rate
值都将被视为毫秒。基本上,此枚举为基于秒的时间间隔触发器值提供了便利。对于小时、天和月的设置,我们建议使用cron
触发器。引用任何实现了
org.springframework.scheduling.Trigger
接口的 Spring 配置的 Bean。但是,如果设置了此属性,则以下属性不能被指定:fixed-delay
、fixed-rate
、cron
和ref
。可选。允许指定额外的 AOP 切面来处理其他横切关注点。有关更多信息,请参阅 事务。可选。
轮询器可以进行事务化。有关更多信息,请参阅 AOP 切面链。可选。
示例
一个简单的基于间隔的轮询器,每隔 1 秒钟轮询一次,可以如下配置:
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller fixed-rate="1000"/>
</int:transformer>
作为使用 fixed-rate
属性的替代方案,你也可以使用 fixed-delay
属性。
对于基于 Cron 表达式的轮询器,改用 cron
属性,如下例所示:
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller cron="*/10 * * * * MON-FRI"/>
</int:transformer>
如果输入通道是 PollableChannel
,则需要配置轮询器。具体来说,如前所述,trigger
是 PollingConsumer
类的必需属性。因此,如果您省略了轮询消费者端点配置中的 poller
子元素,则可能会抛出异常。如果尝试在连接到非轮询通道的元素上配置轮询器,也可能会抛出异常。
也可以创建顶级轮询器,在这种情况下,只需要一个 ref
属性,如下例所示:
<int:poller id="weekdayPoller" cron="*/10 * * * * MON-FRI"/>
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller ref="weekdayPoller"/>
</int:transformer>
ref
属性只允许在内部轮询器定义中使用。在顶级轮询器上定义此属性会导致在应用程序上下文初始化期间抛出配置异常。
全局默认轮询器
为了进一步简化配置,您可以定义一个全局默认轮询器。在 XML DSL 中,单个顶级轮询器组件可以将 default
属性设置为 true
。对于 Java 配置,在这种情况下必须声明一个名为 PollerMetadata.DEFAULT_POLLER
的 PollerMetadata
bean。在这种情况下,任何具有 PollableChannel
作为其输入通道的端点,如果定义在同一个 ApplicationContext
内,并且没有显式配置的 poller
,则会使用该默认值。以下示例展示了这样的轮询器和一个使用它的转换器:
- Java DSL
- Java
- Kotlin DSL
- XML
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setMaxMessagesPerPoll(5);
pollerMetadata.setTrigger(new PeriodicTrigger(3000));
return pollerMetadata;
}
// No 'poller' attribute because there is a default global poller
@Bean
public IntegrationFlow transformFlow(MyTransformer transformer) {
return IntegrationFlow.from(MessageChannels.queue("pollable"))
.transform(transformer) // No 'poller' attribute because there is a default global poller
.channel("output")
.get();
}
@Bean(PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setMaxMessagesPerPoll(5);
pollerMetadata.setTrigger(new PeriodicTrigger(3000));
return pollerMetadata;
}
@Bean
public QueueChannel pollable() {
return new QueueChannel();
}
// No 'poller' attribute because there is a default global poller
@Transformer(inputChannel = "pollable", outputChannel = "output")
public Object transform(Object payload) {
...
}
@Bean(PollerMetadata.DEFAULT_POLLER)
fun defaultPoller() =
PollerMetadata()
.also {
it.maxMessagesPerPoll = 5
it.trigger = PeriodicTrigger(3000)
}
@Bean
fun convertFlow() =
integrationFlow(MessageChannels.queue("pollable")) {
transform(transformer) // No 'poller' attribute because there is a default global poller
channel("output")
}
<int:poller id="defaultPoller" default="true" max-messages-per-poll="5" fixed-delay="3000"/>
<!-- No <poller/> sub-element is necessary, because there is a default -->
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output"/>
事务支持
Spring Integration 还为轮询器提供了事务支持,以便每次接收和转发操作可以作为一个原子工作单元执行。要为轮询器配置事务,添加 <transactional/>
子元素。以下示例显示了可用的属性:
<int:poller fixed-delay="1000">
<int:transactional transaction-manager="txManager"
propagation="REQUIRED"
isolation="REPEATABLE_READ"
timeout="10000"
read-only="false"/>
</int:poller>
有关更多信息,请参阅轮询器事务支持。
AOP 切面链
由于 Spring 事务支持依赖于使用 TransactionInterceptor
(AOP 建议)处理由轮询器发起的消息流的事务行为的代理机制,有时您必须提供额外的建议来处理与轮询器相关的其他交叉行为。为此,poller
定义了一个 advice-chain
元素,允许你在实现 MethodInterceptor
接口的类中添加更多建议。以下示例展示了如何为 poller
定义 advice-chain
:
<int:service-activator id="advicedSa" input-channel="goodInputWithAdvice" ref="testBean"
method="good" output-channel="output">
<int:poller max-messages-per-poll="1" fixed-rate="10000">
<int:advice-chain>
<ref bean="adviceA" />
<beans:bean class="org.something.SampleAdvice" />
<ref bean="txAdvice" />
</int:advice-chain>
</int:poller>
</int:service-activator>
有关如何实现 MethodInterceptor
接口的更多信息,请参阅 Spring Framework 参考指南的 AOP 部分。也可以将建议链应用于没有事务配置的轮询器,使您可以增强由轮询器发起的消息流的行为。
在使用建议链时,不能指定 <transactional/>
子元素。相反,应声明一个 <tx:advice/>
bean 并将其添加到 <advice-chain/>
。有关完整的配置详细信息,请参阅 轮询器事务支持。
TaskExecutor 支持
轮询线程可以由 Spring 的 TaskExecutor
抽象的任何实例执行。这为一个端点或一组端点启用了并发性。从 Spring 3.0 开始,核心 Spring 框架有一个 task
命名空间,其 <executor/>
元素支持创建一个简单的线程池执行器。该元素接受常见并发设置的属性,例如 pool-size 和 queue-capacity。配置线程池执行器可以在负载下对端点的性能产生显著影响。这些设置对每个端点都可用,因为端点的性能是需要考虑的主要因素之一(另一个主要因素是端点订阅的通道上的预期流量)。要为使用 XML 命名空间支持配置的轮询端点启用并发,请在 <poller/>
元素上提供 task-executor
引用,然后提供以下示例中所示的一个或多个属性:
<int:poller task-executor="pool" fixed-rate="1000"/>
<task:executor id="pool"
pool-size="5-25"
queue-capacity="20"
keep-alive="120"/>
如果你不提供任务执行器(task-executor),消费者的处理器将在调用者的线程中被调用。请注意,调用者通常是默认的 TaskScheduler
(参见 配置任务调度器)。你还应该记住,task-executor
属性可以通过指定bean名称来提供对Spring的 TaskExecutor
接口任何实现的引用。前面所示的 executor
元素是为了方便而提供的。
如前所述的轮询消费者背景部分,你也可以配置一个轮询消费者以模拟事件驱动行为。通过在触发器中设置较长的接收超时时间和较短的时间间隔,你可以确保即使在轮询的消息源上也能及时响应到达的消息。请注意,这仅适用于具有带超时的阻塞等待调用的源。例如,文件轮询器不会阻塞。每个 receive()
调用都会立即返回,并且要么包含新文件,要么不包含。因此,即使轮询器包含较长的 receive-timeout
,该值在这种情况下也永远不会被使用。另一方面,在使用 Spring Integration 自己的基于队列的通道时,超时值确实有机会参与其中。以下示例展示了轮询消费者如何几乎即时地接收消息:
<int:service-activator input-channel="someQueueChannel"
output-channel="output">
<int:poller receive-timeout="30000" fixed-rate="10"/>
</int:service-activator>
使用这种方法不会带来太多的开销,因为在其内部,它只是一个定时等待的线程,这比(例如)一个忙等的无限循环所需的 CPU 资源要少得多。
在运行时更改轮询率
在使用 fixed-delay
或 fixed-rate
属性配置轮询器时,默认实现使用 PeriodicTrigger
实例。PeriodicTrigger
是 Spring 框架的核心部分。它仅接受构造函数参数的时间间隔,因此无法在运行时更改。
但是,您可以定义自己的 org.springframework.scheduling.Trigger
接口实现。您甚至可以使用 PeriodicTrigger
作为起点。然后,您可以添加一个设置间隔(周期)的 setter,或者您可以在触发器本身内嵌入自己的节流逻辑。period
属性在每次调用 nextExecutionTime
时用于安排下一次轮询。要在轮询器中使用此自定义触发器,请在应用程序上下文中声明自定义触发器的 bean 定义,并通过使用 trigger
属性将依赖项注入到轮询器配置中,该属性引用自定义触发器 bean 实例。现在,您可以获取对触发器 bean 的引用,并在轮询之间更改轮询间隔。
例如,可以参见 Spring Integration Samples 项目。它包含一个名为 dynamic-poller
的示例,该示例使用自定义触发器并展示了在运行时更改轮询间隔的能力。
示例提供了一个自定义触发器,该触发器实现了 org.springframework.scheduling.Trigger 接口。示例中的触发器基于 Spring 的 PeriodicTrigger 实现。但是,自定义触发器的字段不是 final
的,并且属性具有显式的 getter 和 setter,允许你在运行时动态更改轮询周期。
需要注意的是,由于触发器方法是 nextExecutionTime()
,对动态触发器的任何更改都不会生效,直到根据现有配置进行下一次轮询。无法强制触发器在当前配置的下次执行时间之前触发。
Payload 类型转换
在整个参考手册中,您还可以看到各种端点的具体配置和实现示例,这些端点接受消息或任何任意的 Object
作为输入参数。在 Object
的情况下,此类参数会被映射到消息的有效负载或有效负载的一部分或头(当使用 Spring 表达式语言时)。然而,端点方法的输入参数类型有时与有效负载或其部分的类型不匹配。在这种情况下,我们需要执行类型转换。Spring Integration 提供了一种方便的方法来注册类型转换器(通过使用 Spring ConversionService
),在其自己的名为 integrationConversionService
的转换服务 bean 实例内。该 bean 在使用 Spring Integration 基础设施定义第一个转换器时自动创建。要注册一个转换器,您可以实现 org.springframework.core.convert.converter.Converter
、org.springframework.core.convert.converter.GenericConverter
或 org.springframework.core.convert.converter.ConverterFactory
。
Converter
的实现是最简单的,它将一种类型转换为另一种类型。对于更复杂的情况,例如转换到类层次结构,你可以实现 GenericConverter
以及可能的 ConditionalConverter
。这些接口可以让你完全访问 from
和 to
类型描述符,从而实现复杂的转换。例如,如果你有一个名为 Something
的抽象类作为转换的目标(参数类型、通道数据类型等),并且你有两个具体的实现类 Thing1
和 Thing
,你希望根据输入类型转换为其中一个,那么 GenericConverter
将是一个很好的选择。有关更多信息,请参阅这些接口的 Javadoc:
当你实现你的转换器后,你可以很方便地注册它并支持命名空间,如下例所示:
<int:converter ref="sampleConverter"/>
<bean id="sampleConverter" class="foo.bar.TestConverter"/>
或者,你可以使用一个内部bean,如下例所示:
<int:converter>
<bean class="o.s.i.config.xml.ConverterParserTests$TestConverter3"/>
</int:converter>
从 Spring Integration 4.0 开始,你可以使用注解来创建前面的配置,如下例所示:
@Component
@IntegrationConverter
public class TestConverter implements Converter<Boolean, Number> {
public Number convert(Boolean source) {
return source ? 1 : 0;
}
}
或者,你可以使用 @Configuration
注解,如下例所示:
@Configuration
@EnableIntegration
public class ContextConfiguration {
@Bean
@IntegrationConverter
public SerializingConverter serializingConverter() {
return new SerializingConverter();
}
}
在配置应用程序上下文时,Spring 框架允许你添加一个 conversionService
bean(参见 配置 ConversionService 章节)。当需要时,此服务用于在创建和配置 bean 期间执行适当的转换。
相比之下,integrationConversionService
用于运行时转换。这些用途是相当不同的。旨在用于连接 bean 构造函数参数和属性的转换器如果在运行时用于 Spring Integration 表达式评估(针对数据类型通道中的消息、有效负载类型转换器等)可能会产生意外结果。
但是,如果你想使用 Spring 的 conversionService
作为 Spring Integration 的 integrationConversionService
,你可以在应用程序上下文中配置一个别名,如下例所示:
<alias name="conversionService" alias="integrationConversionService"/>
在这种情况下,由 conversionService
提供的转换器可用于 Spring Integration 运行时转换。
内容类型转换
从 5.0 版本开始,默认情况下,方法调用机制基于 org.springframework.messaging.handler.invocation.InvocableHandlerMethod
基础设施。它的 HandlerMethodArgumentResolver
实现(如 PayloadArgumentResolver
和 MessageMethodArgumentResolver
)可以使用 MessageConverter
抽象将传入的 payload
转换为目标方法参数类型。转换可以基于 contentType
消息头。为此,Spring Integration 提供了 ConfigurableCompositeMessageConverter
,它委托给一组注册的转换器列表,直到其中一个返回非空结果为止。默认情况下,此转换器提供(严格按顺序):
请参阅 Javadoc (见上述列表中的链接),以了解更多关于其目的和适当的 contentType
值的信息。使用 ConfigurableCompositeMessageConverter
是因为它可以提供任何其他 MessageConverter
实现,包括或不包括前面提到的默认转换器。它还可以作为适当的功能在应用程序上下文中注册,覆盖默认转换器,如下例所示:
@Bean(name = IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME)
public ConfigurableCompositeMessageConverter compositeMessageConverter() {
List<MessageConverter> converters =
Arrays.asList(new MarshallingMessageConverter(jaxb2Marshaller()),
new JavaSerializationMessageConverter());
return new ConfigurableCompositeMessageConverter(converters);
}
这两个新的转换器在默认转换器之前注册到组合中。你也可以不使用 ConfigurableCompositeMessageConverter
,而是通过注册一个名为 integrationArgumentResolverMessageConverter
的 bean 来提供自己的 MessageConverter
(通过设置 IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME
属性)。
基于 MessageConverter
(包括 contentType
标头)的转换在使用 SpEL 方法调用时不可用。在这种情况下,只能使用上述 有效负载类型转换 中提到的常规类到类的转换。
异步轮询
如果你想让轮询异步进行,轮询器可以可选地指定一个 task-executor
属性,该属性指向任何现有的 TaskExecutor
bean 实例(Spring 3.0 通过 task
命名空间提供了一种方便的命名空间配置)。然而,在使用 TaskExecutor
配置轮询器时,有某些事情你必须了解。
问题是存在两个配置,一个是轮询器(poller),另一个是 TaskExecutor
。这两个必须协调一致,否则你可能会造成人为的内存泄漏。
考虑以下配置:
<int:channel id="publishChannel">
<int:queue />
</int:channel>
<int:service-activator input-channel="publishChannel" ref="myService">
<int:poller receive-timeout="5000" task-executor="taskExecutor" fixed-rate="50" />
</int:service-activator>
<task:executor id="taskExecutor" pool-size="20" />
前面的配置示例展示了一个不协调的配置。
默认情况下,任务执行器有一个无界的任务队列。轮询器会不断调度新任务,即使所有线程都被阻塞,等待新消息到达或超时到期。鉴于有 20 个线程以五秒的超时时间执行任务,它们以每秒 4 个的速度执行。然而,新任务以每秒 20 个的速度被调度,因此任务执行器中的内部队列以每秒 16 个的速度增长(当进程空闲时),所以我们有一个内存泄漏。
处理此问题的一种方法是设置任务执行器的 queue-capacity
属性。即使是 0 也是一个合理的值。您还可以通过设置 Task Executor 的 rejection-policy
属性来指定如何处理无法排队的消息来进行管理(例如,设置为 DISCARD
)。换句话说,在配置 TaskExecutor
时,必须了解一些细节。有关该主题的更多详细信息,请参阅 Spring 参考手册中的 “任务执行和调度”。
端点内部 Bean
许多端点是复合的 bean。这包括所有消费者和所有轮询的入站通道适配器。消费者(轮询或事件驱动)委托给一个 MessageHandler
。轮询适配器通过委托给一个 MessageSource
来获取消息。通常,获取对委托 bean 的引用是有用的,可能是在运行时更改配置或用于测试。这些 bean 可以从 ApplicationContext
中使用众所周知的名称获取。MessageHandler
实例会以类似于 someConsumer.handler
的 bean ID 注册到应用程序上下文中(其中 'consumer' 是端点的 id
属性值)。MessageSource
实例会以类似于 somePolledAdapter.source
的 bean ID 注册,其中 'somePolledAdapter' 是适配器的 ID。
前述内容仅适用于框架组件本身。相反,你可以使用内部的bean定义,如下例所示:
<int:service-activator id="exampleServiceActivator" input-channel="inChannel"
output-channel = "outChannel" method="foo">
<beans:bean class="org.foo.ExampleServiceActivator"/>
</int:service-activator>
该 bean 被视为任何声明的内部 bean,并且不会在应用程序上下文中注册。如果您希望以其他方式访问此 bean,请在顶级声明它并带有 id
,并使用 ref
属性。更多信息请参见 Spring 文档。