轮询器
本节描述了轮询在 Spring Integration 中的工作原理。
轮询消费者
当消息端点(通道适配器)连接到通道并实例化时,它们会产生以下实例之一:
实际的实现取决于这些端点连接的通道类型。连接到实现了 org.springframework.messaging.SubscribableChannel 接口的通道的通道适配器会生成一个 EventDrivenConsumer
实例。另一方面,连接到实现了 org.springframework.messaging.PollableChannel 接口的通道(例如 QueueChannel
)的通道适配器会生成一个 PollingConsumer
实例。
轮询消费者让 Spring Integration 组件能够主动轮询消息,而不是以事件驱动的方式处理消息。
它们在许多消息传递场景中代表一个关键的横切关注点。在 Spring Integration 中,轮询消费者是基于同名模式的,该模式在 Gregor Hohpe 和 Bobby Woolf 撰写的《企业集成模式》一书中有所描述。你可以在书籍的网站上找到该模式的描述。
有关轮询消费者配置的更多信息,请参阅 消息端点。
可轮询的消息源
Spring Integration 提供了轮询消费者模式的第二种变体。当使用入站通道适配器时,这些适配器通常被 SourcePollingChannelAdapter
包装。例如,在从远程 FTP 服务器位置检索消息时,FTP 入站通道适配器 中描述的适配器会配置一个轮询器以定期检索消息。因此,当组件配置有轮询器时,生成的实例是以下类型之一:
这意味着轮询器在入站和出站消息场景中都使用。以下是一些使用轮询器的用例:
-
轮询某些外部系统,例如 FTP 服务器、数据库和 Web 服务
-
轮询内部 (可轮询的) 消息通道
-
轮询内部服务 (例如反复执行 Java 类上的方法)
AOP 建议类可以应用于轮询器,在 advice-chain
中,例如事务建议以开始一个事务。从 4.1 版本开始,提供了一个 PollSkipAdvice
。轮询器使用触发器来确定下一次轮询的时间。PollSkipAdvice
可用于抑制(跳过)轮询,可能是因为存在某些下游条件会阻止消息被处理。要使用此建议,您必须为其提供一个 PollSkipStrategy
的实现。从 4.2.5 版本开始,提供了一个 SimplePollSkipStrategy
。要使用它,您可以将其实例作为 bean 添加到应用程序上下文中,注入到 PollSkipAdvice
中,并将其添加到轮询器的建议链中。要跳过轮询,调用 skipPolls()
。要恢复轮询,调用 reset()
。4.2 版本在此方面增加了更多的灵活性。请参阅 Conditional Pollers。
延迟确认轮询消息源
从 5.0.1 版本开始,某些模块提供了 MessageSource
实现,支持将确认推迟到下游流完成(或把消息传递给另一个线程)。这目前仅限于 AmqpMessageSource
和 KafkaMessageSource
。
对于这些消息源,会向消息中添加 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
头(参见 MessageHeaderAccessor API)。当与可轮询的消息源一起使用时,该头的值是一个 AcknowledgmentCallback
实例,如下例所示:
@FunctionalInterface
public interface AcknowledgmentCallback {
void acknowledge(Status status);
boolean isAcknowledged();
void noAutoAck();
default boolean isAutoAck();
enum Status {
/**
* Mark the message as accepted.
*/
ACCEPT,
/**
* Mark the message as rejected.
*/
REJECT,
/**
* Reject the message and requeue so that it will be redelivered.
*/
REQUEUE
}
}
并非所有消息源(例如,KafkaMessageSource
)都支持 REJECT
状态。它被视为与 ACCEPT
相同。
应用程序可以在任何时间确认一条消息,如下例所示:
Message<?> received = source.receive();
...
StaticMessageHeaderAccessor.getAcknowledgmentCallback(received)
.acknowledge(Status.ACCEPT);
如果 MessageSource
被连接到 SourcePollingChannelAdapter
,当轮询线程在下游流完成后返回到适配器时,适配器会检查确认是否已经被确认,如果没有,则将其状态设置为 ACCEPT
(或者如果流抛出异常则设置为 REJECT
)。状态值是在 AcknowledgmentCallback.Status 枚举 中定义的。
Spring Integration 提供了 MessageSourcePollingTemplate
来执行对 MessageSource
的临时轮询。它也负责在 MessageHandler
回调返回(或抛出异常时)在 AcknowledgmentCallback
上设置 ACCEPT
或 REJECT
。以下示例展示了如何使用 MessageSourcePollingTemplate
进行轮询:
MessageSourcePollingTemplate template =
new MessageSourcePollingTemplate(this.source);
template.poll(h -> {
...
});
在这两种情况下(SourcePollingChannelAdapter
和 MessageSourcePollingTemplate
),您可以通过在回调中调用 noAutoAck()
来禁用自动确认/否定确认。如果您将消息传递给另一个线程并希望稍后确认,可能会这样做。并不是所有实现都支持这一点(例如,Apache Kafka 不支持,因为偏移量提交必须在同一线程上执行)。
消息源的条件轮询器
本节介绍如何使用条件轮询器。
背景
Advice
对象,在轮询器上的 advice-chain
中,为整个轮询任务(包括消息检索和处理)提供建议。这些 “around advice” 方法无法访问任何关于轮询的上下文 —— 只有轮询本身。这对于使任务事务化或由于某些外部条件而跳过轮询等需求来说是可以接受的,如前所述。如果我们希望根据轮询的 receive
部分的结果采取某些行动,或者我们想根据条件调整轮询器怎么办?对于这些情况,Spring Integration 提供了“智能”轮询。
“智能”轮询
版本 5.3 引入了 ReceiveMessageAdvice
接口。在 advice-chain
中实现此接口的任何 Advice
对象仅应用于 receive()
操作 - MessageSource.receive()
和 PollableChannel.receive(timeout)
。因此,它们只能应用于 SourcePollingChannelAdapter
或 PollingConsumer
。此类实现了以下方法:
-
beforeReceive(Object source)
在调用Object.receive()
方法之前会调用此方法。它允许你检查和重新配置源。返回false
将取消此次轮询(类似于前面提到的PollSkipAdvice
)。 -
Message<?> afterReceive(Message<?> result, Object source)
在receive()
方法之后会调用此方法。同样,你可以重新配置源或采取任何行动(可能取决于结果,如果没有消息由源创建,则结果可以是null
)。你甚至可以返回一个不同的消息。
线程安全性
如果 Advice
修改了源,你不应该用 TaskExecutor
配置轮询器。如果 Advice
修改了源,这些修改不是线程安全的,可能会导致意外的结果,特别是在高频率轮询器的情况下。如果你需要并行处理轮询结果,请考虑使用下游的 ExecutorChannel
,而不是向轮询器添加执行器。
建议链顺序
你应该了解建议链在初始化期间是如何处理的。没有实现 ReceiveMessageAdvice
接口的 Advice
对象会应用于整个轮询过程,并且会在任何 ReceiveMessageAdvice
之前按顺序调用。然后,ReceiveMessageAdvice
对象会围绕源 receive()
方法按顺序调用。例如,如果你有 Advice
对象 a, b, c, d
,其中 b
和 d
是 ReceiveMessageAdvice
,这些对象将按照以下顺序应用:a, c, b, d
。另外,如果源已经是一个 Proxy
,则 ReceiveMessageAdvice
会在任何现有的 Advice
对象之后调用。如果你想改变顺序,你必须自己设置代理。
SimpleActiveIdleReceiveMessageAdvice
此建议是 ReceiveMessageAdvice
的一个简单实现。当与 DynamicPeriodicTrigger
一起使用时,它会根据上一次轮询是否产生了消息来调整轮询频率。轮询器还必须引用相同的 DynamicPeriodicTrigger
。
重要:异步交接
SimpleActiveIdleReceiveMessageAdvice
根据 receive()
结果修改触发器。这只有在建议在轮询线程上调用时才有效。如果轮询器有一个 task-executor
,则无效。如果你想在轮询结果之后使用异步操作,可以稍后进行异步交接,例如通过使用 ExecutorChannel
。
CompoundTriggerAdvice
此建议允许根据轮询是否返回消息来选择两个触发器之一。考虑一个使用 CronTrigger
的轮询器。CronTrigger
实例是不可变的,因此一旦构造后就不能被修改。考虑这样一个用例:我们希望使用 cron 表达式每小时触发一次轮询,但如果未收到消息,则每分钟轮询一次,并且当检索到消息时,恢复使用 cron 表达式。
建议(和轮询器)为此使用 CompoundTrigger
。触发器的 primary
触发器可以是 CronTrigger
。当建议检测到没有收到消息时,它会将次要触发器添加到 CompoundTrigger
中。当调用 CompoundTrigger
实例的 nextExecutionTime
方法时,如果存在次要触发器,则委托给次要触发器。否则,委托给主要触发器。
轮询器也必须有一个对同一 CompoundTrigger
的引用。
以下示例显示了每小时 cron 表达式的配置,如果失败则回退到每一分钟:
<int:inbound-channel-adapter channel="nullChannel" auto-startup="false">
<bean class="org.springframework.integration.endpoint.PollerAdviceTests.Source" />
<int:poller trigger="compoundTrigger">
<int:advice-chain>
<bean class="org.springframework.integration.aop.CompoundTriggerAdvice">
<constructor-arg ref="compoundTrigger"/>
<constructor-arg ref="secondary"/>
</bean>
</int:advice-chain>
</int:poller>
</int:inbound-channel-adapter>
<bean id="compoundTrigger" class="org.springframework.integration.util.CompoundTrigger">
<constructor-arg ref="primary" />
</bean>
<bean id="primary" class="org.springframework.scheduling.support.CronTrigger">
<constructor-arg value="0 0 * * * *" /> <!-- top of every hour -->
</bean>
<bean id="secondary" class="org.springframework.scheduling.support.PeriodicTrigger">
<constructor-arg value="60000" />
</bean>
重要:异步交接
CompoundTriggerAdvice
根据 receive()
结果修改触发器。这只有在建议在轮询线程上调用时才有效。如果轮询器有一个 task-executor
,它将无法工作。如果你想在此建议之后使用异步操作,可以在轮询结果之后稍后进行异步交接,例如通过使用 ExecutorChannel
。
仅限 MessageSource 的建议
一些建议可能只适用于 MessageSource.receive()
,对于 PollableChannel
来说没有意义。为此,仍然存在一个 MessageSourceMutator
接口(ReceiveMessageAdvice
的扩展)。更多信息,请参阅 Inbound Channel Adapters: Polling Multiple Servers and Directories。