通道适配器
通道适配器是消息端点,它允许将单个发送者或接收者连接到消息通道。Spring Integration 提供了许多适配器以支持各种传输方式,如 JMS、文件、HTTP、Web 服务、邮件等。本参考指南的后续章节将讨论每个适配器。但是,本章重点介绍简单而灵活的方法调用通道适配器支持。既有入站适配器也有出站适配器,每种适配器都可以使用核心命名空间中提供的 XML 元素进行配置。只要有一个方法可以作为源或目标被调用,这些都为扩展 Spring Integration 提供了一种简便的方式。
配置一个入站通道适配器
一个 inbound-channel-adapter
元素(在 Java 配置中是一个 SourcePollingChannelAdapter
)可以调用 Spring 管理对象上的任何方法,并在将方法的输出转换为 Message
后,将非空返回值发送到 MessageChannel
。当适配器的订阅被激活时,轮询器尝试从源接收消息。轮询器根据提供的配置与 TaskScheduler
进行调度。要为单个通道适配器配置轮询间隔或 cron 表达式,您可以提供一个带有调度属性(如 'fixed-rate' 或 'cron')的 'poller' 元素。以下示例定义了两个 inbound-channel-adapter
实例:
- Java DSL
- Java
- Kotlin DSL
- XML
@Bean
public IntegrationFlow source1() {
return IntegrationFlow.from(() -> new GenericMessage<>(...),
e -> e.poller(p -> p.fixedRate(5000)))
...
.get();
}
@Bean
public IntegrationFlow source2() {
return IntegrationFlow.from(() -> new GenericMessage<>(...),
e -> e.poller(p -> p.cron("30 * 9-17 * * MON-FRI")))
...
.get();
}
public class SourceService {
@InboundChannelAdapter(channel = "channel1", poller = @Poller(fixedRate = "5000"))
Object method1() {
...
}
@InboundChannelAdapter(channel = "channel2", poller = @Poller(cron = "30 * 9-17 * * MON-FRI"))
Object method2() {
...
}
}
@Bean
fun messageSourceFlow() =
integrationFlow( { GenericMessage<>(...) },
{ poller { it.fixedRate(5000) } }) {
...
}
<int:inbound-channel-adapter ref="source1" method="method1" channel="channel1">
<int:poller fixed-rate="5000"/>
</int:inbound-channel-adapter>
<int:inbound-channel-adapter ref="source2" method="method2" channel="channel2">
<int:poller cron="30 * 9-17 * * MON-FRI"/>
</int:channel-adapter>
另请参阅 Channel Adapter 表达式和脚本。
如果未提供轮询器,则必须在上下文中注册一个默认轮询器。详情请参阅 Endpoint 命名空间支持。
轮询端点的默认触发器是一个 PeriodicTrigger
实例,具有 1 秒的固定延迟周期。
重要:轮询器配置
所有 inbound-channel-adapter
类型都由 SourcePollingChannelAdapter
支持,这意味着它们包含一个轮询器配置,该配置根据 Poller 中指定的配置轮询 MessageSource
(以调用生成将成为 Message
负载的值的自定义方法)。以下示例显示了两个轮询器的配置:
<int:poller max-messages-per-poll="1" fixed-rate="1000"/>
<int:poller max-messages-per-poll="10" fixed-rate="1000"/>
在第一个配置中,每次轮询会调用一次轮询任务,并且在每次任务(轮询)期间,基于 max-messages-per-poll
属性值的方法(其结果是生成消息)被调用一次。在第二个配置中,轮询任务会在每次轮询中调用 10 次或直到它返回 'null',因此每次轮询可能会产生十条消息,而每次轮询发生在一秒间隔内。然而,如果配置如下所示会发生什么:
<int:poller fixed-rate="1000"/>
请注意没有指定 max-messages-per-poll
。正如我们后面将要介绍的,在 PollingConsumer
(例如 service-activator
、filter
、router
等)中的相同轮询器配置将为 max-messages-per-poll
设置默认值 -1
,这意味着“除非轮询方法返回 null(可能是因为 QueueChannel
中没有更多消息),否则执行轮询任务不停止”,然后休眠一秒钟。
然而,在 SourcePollingChannelAdapter
中有一点不同。max-messages-per-poll
的默认值是 1
,除非您显式将其设置为负值(如 -1
)。这确保了轮询器可以响应生命周期事件(如启动和停止),并防止它由于 MessageSource
的自定义方法实现有可能永远不返回 null 并且恰好不可中断而导致无限循环。
但是,如果您确定您的方法可以返回 null,并且您需要在每次轮询时尽可能多地轮询源,则应显式将 max-messages-per-poll
设置为负值,如下所示:
<int:poller max-messages-per-poll="-1" fixed-rate="1000"/>
从版本 5.5 开始,max-messages-per-poll
的 0
值具有特殊含义 - 完全跳过 MessageSource.receive()
调用,这可以被视为暂停此入站通道适配器,直到 maxMessagesPerPoll
在稍后通过控制总线等更改为非零值。
从版本 6.2 开始,fixed-delay
和 fixed-rate
可以配置为 ISO 8601 持续时间格式,例如 PT10S
,P1D
等。此外,底层 PeriodicTrigger
的 initial-interval
也以与 fixed-delay
和 fixed-rate
相同的值格式暴露。
另请参阅 全局默认轮询器 以获取更多信息。
配置一个 outbound 通道适配器
一个 outbound-channel-adapter
元素(在 Java 配置中是一个 @ServiceActivator
)也可以将 MessageChannel
连接到任何 POJO 消费方法,该方法应该使用发送到该通道的消息的有效负载进行调用。以下示例展示了如何定义一个输出通道适配器:
- Java DSL
- Java
- Kotlin DSL
- XML
@Bean
public IntegrationFlow outboundChannelAdapterFlow(MyPojo myPojo) {
return f -> f
.handle(myPojo, "handle");
}
public class MyPojo {
@ServiceActivator(channel = "channel1")
void handle(Object payload) {
...
}
}
@Bean
fun outboundChannelAdapterFlow(myPojo: MyPojo) =
integrationFlow {
handle(myPojo, "handle")
}
<int:outbound-channel-adapter channel="channel1" ref="target" method="handle"/>
<beans:bean id="target" class="org.MyPojo"/>
如果要适配的通道是 PollableChannel
,则必须提供轮询器子元素(@ServiceActivator
上的 @Poller
子注解),如下例所示:
- Java
- XML
public class MyPojo {
@ServiceActivator(channel = "channel1", poller = @Poller(fixedRate = "3000"))
void handle(Object payload) {
...
}
}
<int:outbound-channel-adapter channel="channel2" ref="target" method="handle">
<int:poller fixed-rate="3000" />
</int:outbound-channel-adapter>
<beans:bean id="target" class="org.MyPojo"/>
如果你的 POJO 消费者实现可以在其他 <outbound-channel-adapter>
定义中重用,那么你应该使用 ref
属性。但是,如果消费者实现仅被单个 <outbound-channel-adapter>
定义引用,你可以将其定义为内部 bean,如下例所示:
<int:outbound-channel-adapter channel="channel" method="handle">
<beans:bean class="org.Foo"/>
</int:outbound-channel-adapter>
在同一 <outbound-channel-adapter>
配置中使用 ref
属性和内部处理器定义是不允许的,因为它会创建一个模棱两可的条件。这种配置会导致抛出异常。
任何通道适配器都可以在没有 channel
引用的情况下创建,在这种情况下,它会隐式创建一个 DirectChannel
的实例。创建的通道名称与 <inbound-channel-adapter>
或 <outbound-channel-adapter>
元素的 id
属性匹配。因此,如果未提供 channel
,则需要提供 id
。
通道适配器表达式和脚本
像许多其他 Spring Integration 组件一样,<inbound-channel-adapter>
和 <outbound-channel-adapter>
也提供了对 SpEL 表达式求值的支持。要使用 SpEL,在 'expression' 属性中提供表达式字符串,而不是提供用于在 bean 上进行方法调用的 'ref' 和 'method' 属性。当表达式被求值时,它遵循与方法调用相同的原则:对于 <inbound-channel-adapter>
,只要求值结果是非空值,表达式就会生成一条消息;而对于 <outbound-channel-adapter>
,表达式必须相当于一个返回 void 的方法调用。
从 Spring Integration 3.0 开始,<int:inbound-channel-adapter/>
也可以配置一个 SpEL <expression/>
(或甚至是一个 <script/>
)子元素,以便在需要比简单的 'expression' 属性所能实现的更复杂的逻辑时使用。如果你通过使用 location
属性提供一个脚本作为 Resource
,你还可以设置 refresh-check-delay
,这允许资源定期刷新。如果你想在每次轮询时检查脚本,你需要将此设置与轮询器的触发器协调,如下例所示:
<int:inbound-channel-adapter ref="source1" method="method1" channel="channel1">
<int:poller max-messages-per-poll="1" fixed-delay="5000"/>
<script:script lang="ruby" location="Foo.rb" refresh-check-delay="5000"/>
</int:inbound-channel-adapter>
另请参阅在使用 <expression/>
子元素时 ReloadableResourceBundleExpressionSource
上的 cacheSeconds
属性。有关表达式的更多信息,请参阅 Spring 表达式语言 (SpEL)。对于脚本,请参阅 Groovy 支持 和 脚本支持。
<int:inbound-channel-adapter/>
(SourcePollingChannelAdapter
) 是一个通过定期触发轮询某些底层 MessageSource
来启动消息流的端点。由于在轮询时没有消息对象,表达式和脚本无法访问根 Message
,因此在大多数其他消息 SpEL 表达式中可用的有效负载或标题属性在此处不可用。脚本可以生成并返回一个带有标题和有效负载的完整 Message
对象,或者仅返回有效负载,该有效负载由框架添加到具有基本标题的消息中。