通道适配器
通道适配器是一种消息端点,它能够将单个发送方或接收方连接到消息通道。Spring Integration 提供了多种适配器以支持各种传输协议,例如 JMS、文件、HTTP、Web 服务、邮件等。本参考指南的后续章节将讨论每种适配器。然而,本章重点介绍简单而灵活的方法调用通道适配器支持。适配器分为入站和出站两种类型,每种都可以通过核心命名空间中提供的 XML 元素进行配置。只要您拥有可作为源或目标调用的方法,这些适配器就提供了一种扩展 Spring Integration 的简便方式。
配置入站通道适配器
inbound-channel-adapter 元素(在 Java 配置中对应 SourcePollingChannelAdapter)可以调用 Spring 管理对象上的任何方法,并在将方法输出转换为 Message 后,将非空返回值发送到 MessageChannel。当适配器的订阅被激活时,轮询器会尝试从源接收消息。轮询器根据提供的配置通过 TaskScheduler 进行调度。要为单个通道适配器配置轮询间隔或 cron 表达式,您可以提供一个带有调度属性(例如 fixed-rate 或 cron)的 poller 元素。以下示例定义了两个 inbound-channel-adapter 实例:
- Java DSL
- Java
- Kotlin DSL
- XML
@Bean
public IntegrationFlow source1() {
return IntegrationFlow.from(() -> new GenericMessage<>(...),
e -> e.poller(p -> p.fixedRate(5000)))
...
.get();
}
@Bean
public IntegrationFlow source2() {
return IntegrationFlow.from(() -> new GenericMessage<>(...),
e -> e.poller(p -> p.cron("30 * 9-17 * * MON-FRI")))
...
.get();
}
public class SourceService {
@InboundChannelAdapter(channel = "channel1", poller = @Poller(fixedRate = "5000"))
Object method1() {
...
}
@InboundChannelAdapter(channel = "channel2", poller = @Poller(cron = "30 * 9-17 * * MON-FRI"))
Object method2() {
...
}
}
@Bean
fun messageSourceFlow() =
integrationFlow( { GenericMessage<>(...) },
{ poller { it.fixedRate(5000) } }) {
...
}
<int:inbound-channel-adapter ref="source1" method="method1" channel="channel1">
<int:poller fixed-rate="5000"/>
</int:inbound-channel-adapter>
<int:inbound-channel-adapter ref="source2" method="method2" channel="channel2">
<int:poller cron="30 * 9-17 * * MON-FRI"/>
</int:channel-adapter>
另请参阅通道适配器表达式与脚本。
如果没有提供轮询器,则必须在上下文中注册一个默认的轮询器。更多详情请参阅端点命名空间支持。
默认的轮询端点触发器是一个 PeriodicTrigger 实例,其固定延迟周期为 1 秒。
重要:轮询器配置
所有 inbound-channel-adapter 类型都由 SourcePollingChannelAdapter 支持,这意味着它们包含一个轮询器配置,该配置根据轮询器中指定的配置轮询 MessageSource(以调用产生成为 Message 负载值的自定义方法)。以下示例展示了两个轮询器的配置:
<int:poller max-messages-per-poll="1" fixed-rate="1000"/>
<int:poller max-messages-per-poll="10" fixed-rate="1000"/>
在第一个配置中,每次轮询调用一次轮询任务,并且在每个任务(轮询)期间,根据 max-messages-per-poll 属性值,方法(产生消息)被调用一次。在第二个配置中,每次轮询调用 10 次轮询任务,或者直到返回 'null',因此每次轮询可能产生十条消息,而每次轮询以一秒的间隔发生。然而,如果配置如下例所示,会发生什么:
<int:poller fixed-rate="1000"/>
注意,没有指定 max-messages-per-poll。正如我们后面将介绍的,在 PollingConsumer(例如 service-activator、filter、router 等)中相同的轮询器配置,max-messages-per-poll 的默认值为 -1,这意味着“除非轮询方法返回 null(可能是因为 QueueChannel 中没有更多消息),否则不间断地执行轮询任务”,然后休眠一秒。
然而,在 SourcePollingChannelAdapter 中,情况略有不同。max-messages-per-poll 的默认值为 1,除非您显式将其设置为负值(例如 -1)。这确保了轮询器能够响应生命周期事件(例如启动和停止),并且如果 MessageSource 的自定义方法实现可能永不返回 null 并且恰好是不可中断的,可以防止其可能陷入无限循环。
但是,如果您确定您的方法可以返回 null,并且需要在每次轮询中轮询尽可能多的可用源,您应该显式将 max-messages-per-poll 设置为负值,如下例所示:
<int:poller max-messages-per-poll="-1" fixed-rate="1000"/>
从版本 5.5 开始,max-messages-per-poll 的 0 值具有特殊含义——完全跳过 MessageSource.receive() 调用,这可以被视为暂停此入站通道适配器,直到稍后将 maxMessagesPerPoll 更改为非零值,例如通过控制总线。
从版本 6.2 开始,fixed-delay 和 fixed-rate 可以配置为 ISO 8601 持续时间 格式,例如 PT10S、P1D 等。此外,底层 PeriodicTrigger 的 initial-interval 也以与 fixed-delay 和 fixed-rate 类似的值格式公开。
更多信息,请参阅全局默认轮询器。
配置出站通道适配器
outbound-channel-adapter 元素(在 Java 配置中对应 @ServiceActivator)也可以将 MessageChannel 连接到任何 POJO 消费者方法,该方法将使用发送到该通道的消息负载进行调用。以下示例展示了如何定义出站通道适配器:
- Java DSL
- Java
- Kotlin DSL
- XML
@Bean
public IntegrationFlow outboundChannelAdapterFlow(MyPojo myPojo) {
return f -> f
.handle(myPojo, "handle");
}
public class MyPojo {
@ServiceActivator(channel = "channel1")
void handle(Object payload) {
...
}
}
@Bean
fun outboundChannelAdapterFlow(myPojo: MyPojo) =
integrationFlow {
handle(myPojo, "handle")
}
<int:outbound-channel-adapter channel="channel1" ref="target" method="handle"/>
<beans:bean id="target" class="org.MyPojo"/>
如果被适配的通道是一个 PollableChannel,你必须提供一个轮询器子元素(即 @ServiceActivator 注解中的 @Poller 子注解),如下例所示:
- Java
- XML
public class MyPojo {
@ServiceActivator(channel = "channel1", poller = @Poller(fixedRate = "3000"))
void handle(Object payload) {
...
}
}
<int:outbound-channel-adapter channel="channel2" ref="target" method="handle">
<int:poller fixed-rate="3000" />
</int:outbound-channel-adapter>
<beans:bean id="target" class="org.MyPojo"/>
如果 POJO 消费者实现可以在其他 <outbound-channel-adapter> 定义中复用,则应使用 ref 属性。然而,如果消费者实现仅被单个 <outbound-channel-adapter> 定义引用,则可以将其定义为内部 bean,如下例所示:
<int:outbound-channel-adapter channel="channel" method="handle">
<beans:bean class="org.Foo"/>
</int:outbound-channel-adapter>
不允许在同一个 <outbound-channel-adapter> 配置中同时使用 ref 属性和内部处理器定义,因为这会造成歧义。此类配置将导致抛出异常。
任何通道适配器都可以在没有 channel 引用的情况下创建,此时它会隐式创建一个 DirectChannel 实例。创建的通道名称与 <inbound-channel-adapter> 或 <outbound-channel-adapter> 元素的 id 属性相匹配。因此,如果未提供 channel,则 id 是必需的。
通道适配器表达式与脚本
与许多其他 Spring Integration 组件类似,<inbound-channel-adapter> 和 <outbound-channel-adapter> 也支持 SpEL 表达式求值。要使用 SpEL,请在 'expression' 属性中提供表达式字符串,而不是提供用于在 bean 上进行方法调用的 'ref' 和 'method' 属性。表达式求值时遵循与方法调用相同的契约:当求值结果为非空值时,<inbound-channel-adapter> 的表达式会生成消息;而 <outbound-channel-adapter> 的表达式必须等效于返回 void 的方法调用。
从 Spring Integration 3.0 开始,当需要比简单的 'expression' 属性更复杂的配置时,<int:inbound-channel-adapter/> 也可以配置 SpEL <expression/>(甚至 <script/>)子元素。如果通过 location 属性将脚本作为 Resource 提供,还可以设置 refresh-check-delay 以允许定期刷新资源。若希望每次轮询时都检查脚本,则需要将此设置与轮询器的触发器协调配合,如下例所示:
<int:inbound-channel-adapter ref="source1" method="method1" channel="channel1">
<int:poller max-messages-per-poll="1" fixed-delay="5000"/>
<script:script lang="ruby" location="Foo.rb" refresh-check-delay="5000"/>
</int:inbound-channel-adapter>
另请参阅使用 <expression/> 子元素时 ReloadableResourceBundleExpressionSource 上的 cacheSeconds 属性。有关表达式的更多信息,请参阅 Spring 表达式语言 (SpEL)。关于脚本,请参阅 Groovy 支持 和 脚本支持。
<int:inbound-channel-adapter/> (SourcePollingChannelAdapter) 是一个端点,它通过定期触发轮询某个底层的 MessageSource 来启动消息流。由于在轮询时没有消息对象,表达式和脚本无法访问根 Message,因此大多数其他消息传递 SpEL 表达式中可用的 payload 或 headers 属性在此处不可用。脚本可以生成并返回一个包含 headers 和 payload 的完整 Message 对象,或者仅返回 payload,框架会将其添加到带有基本 headers 的消息中。