Poller
本节描述了轮询机制在 Spring Integration 中的工作原理。
轮询消费者
当消息端点(通道适配器)连接到通道并实例化时,它们会产生以下实例之一:
实际实现取决于这些端点所连接通道的类型。连接到实现 org.springframework.messaging.SubscribableChannel 接口的通道的通道适配器会生成 EventDrivenConsumer 实例。另一方面,连接到实现 org.springframework.messaging.PollableChannel 接口的通道(例如 QueueChannel)的通道适配器会生成 PollingConsumer 实例。
轮询消费者允许Spring Integration组件主动轮询消息,而非以事件驱动的方式处理消息。
它们代表了众多消息传递场景中的一个关键横切关注点。在Spring Integration中,轮询消费者基于同名模式实现,该模式在Gregor Hohpe和Bobby Woolf所著的《企业集成模式》一书中有所描述。您可以在该书的网站上找到关于此模式的详细说明。
有关轮询消费者配置的更多信息,请参阅消息端点。
可轮询消息源
Spring Integration 提供了轮询消费者模式的第二种变体。当使用入站通道适配器时,这些适配器通常由 SourcePollingChannelAdapter 包装。例如,当从远程 FTP 服务器位置检索消息时,FTP 入站通道适配器 中描述的适配器会配置一个轮询器来定期检索消息。因此,当组件配置了轮询器时,生成的实例属于以下类型之一:
这意味着轮询器在入站和出站消息传递场景中都有应用。以下是使用轮询器的一些用例:
-
轮询某些外部系统,例如 FTP 服务器、数据库和 Web 服务
-
轮询内部(可轮询)消息通道
-
轮询内部服务(例如重复执行 Java 类上的方法)
AOP 通知类可以应用于轮询器,例如在 advice-chain 中,比如用于启动事务的事务通知。从版本 4.1 开始,提供了 PollSkipAdvice。轮询器使用触发器来确定下一次轮询的时间。PollSkipAdvice 可用于抑制(跳过)轮询,可能是因为存在某些下游条件会阻止消息被处理。要使用此通知,您需要为其提供一个 PollSkipStrategy 的实现。从版本 4.2.5 开始,提供了 SimplePollSkipStrategy。要使用它,您可以将其实例作为 bean 添加到应用程序上下文中,将其注入到 PollSkipAdvice 中,并将其添加到轮询器的通知链中。要跳过轮询,请调用 skipPolls()。要恢复轮询,请调用 reset()。版本 4.2 在此领域增加了更多灵活性。请参阅条件轮询器。
可轮询消息源的延迟确认
从 5.0.1 版本开始,部分模块提供了支持延迟确认的 MessageSource 实现,直到下游流程完成(或将消息传递给另一个线程)。目前该功能仅限于 AmqpMessageSource 和 KafkaMessageSource。
通过这些消息源,IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 头信息(参见 MessageHeaderAccessor API)会被添加到消息中。当与可轮询消息源一起使用时,该头信息的值是 AcknowledgmentCallback 的一个实例,如下例所示:
@FunctionalInterface
public interface AcknowledgmentCallback extends SimpleAcknowledgment {
void acknowledge(Status status);
@Override
default void acknowledge() {
acknowledge(Status.ACCEPT);
}
default boolean isAcknowledged() {
return false;
}
default void noAutoAck() {
throw new UnsupportedOperationException("You cannot disable auto acknowledgment with this implementation");
}
default boolean isAutoAck() {
return true;
}
enum Status {
/**
* Mark the message as accepted.
*/
ACCEPT,
/**
* Mark the message as rejected.
*/
REJECT,
/**
* Reject the message and requeue so that it will be redelivered.
*/
REQUEUE
}
}
并非所有消息源(例如 KafkaMessageSource)都支持 REJECT 状态。该状态的处理方式与 ACCEPT 相同。
应用程序可以在任何时候确认消息,如下例所示:
Message<?> received = source.receive();
...
StaticMessageHeaderAccessor.getAcknowledgmentCallback(received)
.acknowledge(Status.ACCEPT);
如果将 MessageSource 连接到 SourcePollingChannelAdapter,当轮询器线程在下游流程完成后返回到适配器时,适配器会检查确认是否已被确认,如果尚未确认,则将其状态设置为 ACCEPT(如果流程抛出异常,则设置为 REJECT)。状态值定义在 AcknowledgmentCallback.Status 枚举 中。
Spring Integration 提供了 MessageSourcePollingTemplate 来执行对 MessageSource 的临时轮询。当 MessageHandler 回调返回(或抛出异常)时,它同样负责在 AcknowledgmentCallback 上设置 ACCEPT 或 REJECT。以下示例展示了如何使用 MessageSourcePollingTemplate 进行轮询:
MessageSourcePollingTemplate template =
new MessageSourcePollingTemplate(this.source);
template.poll(h -> {
...
});
在这两种情况下(SourcePollingChannelAdapter 和 MessageSourcePollingTemplate),您都可以通过回调函数调用 noAutoAck() 来禁用自动确认/拒绝。如果您将消息传递给另一个线程并希望稍后确认,则可能需要这样做。但并非所有实现都支持此功能,例如 Apache Kafka 就不支持,因为偏移量提交必须在同一线程上执行。
消息源的条件轮询器
本节将介绍如何使用条件轮询器。
背景
Advice对象,在轮询器的advice-chain中,作用于整个轮询任务(包括消息检索和处理)。这些“环绕通知”方法无法访问轮询的任何上下文——仅能访问轮询本身。这对于诸如使任务具有事务性或根据某些外部条件跳过轮询等需求来说是可以接受的,正如之前所讨论的。如果我们希望根据轮询的receive部分的结果采取某些操作,或者想要根据条件调整轮询器,该怎么办?对于这些情况,Spring Integration 提供了“智能”轮询。
“智能”轮询
版本 5.3 引入了 ReceiveMessageAdvice 接口。advice-chain 中任何实现此接口的 Advice 对象仅应用于 receive() 操作——即 MessageSource.receive() 和 PollableChannel.receive(timeout)。因此,它们仅适用于 SourcePollingChannelAdapter 或 PollingConsumer。此类类实现以下方法:
-
beforeReceive(Object source)该方法在Object.receive()方法之前被调用。它允许你检查并重新配置消息源。返回false将取消本次轮询(类似于前面提到的PollSkipAdvice)。 -
Message<?> afterReceive(Message<?> result, Object source)该方法在receive()方法之后被调用。同样,你可以重新配置消息源或执行任何操作(可能取决于result参数,如果消息源未创建消息,该参数可能为null)。你甚至可以返回一个不同的消息。
线程安全性
如果某个 Advice 会修改源数据,则不应使用 TaskExecutor 配置轮询器。如果 Advice 修改了源数据,此类修改操作不是线程安全的,可能导致意外结果,尤其是在高频轮询器中。如果需要并发处理轮询结果,请考虑使用下游的 ExecutorChannel,而不是为轮询器添加执行器。
建议链顺序
您需要理解在初始化过程中建议链是如何被处理的。未实现 ReceiveMessageAdvice 的 Advice 对象将应用于整个轮询过程,并且会在任何 ReceiveMessageAdvice 之前按顺序首先全部调用。然后,ReceiveMessageAdvice 对象会按顺序围绕源 receive() 方法调用。例如,如果您有 Advice 对象 a, b, c, d,其中 b 和 d 是 ReceiveMessageAdvice,则这些对象将按以下顺序应用:a, c, b, d。此外,如果源已经是一个 Proxy,则 ReceiveMessageAdvice 将在任何现有的 Advice 对象之后调用。如果您希望更改顺序,必须自行配置代理。
SimpleActiveIdleReceiveMessageAdvice
此建议是 ReceiveMessageAdvice 的一个简单实现。当与 DynamicPeriodicTrigger 结合使用时,它会根据前一次轮询是否获取到消息来调整轮询频率。轮询器还必须引用同一个 DynamicPeriodicTrigger。
重要提示:异步移交
SimpleActiveIdleReceiveMessageAdvice 会根据 receive() 的结果来修改触发器。这仅在建议在轮询器线程上调用时才有效。如果轮询器配置了 task-executor,则此功能无效。若希望在轮询结果后使用异步操作,请稍后进行异步移交,例如通过使用 ExecutorChannel。
CompoundTriggerAdvice
此建议允许根据轮询是否返回消息来选择两个触发器中的一个。考虑一个使用 CronTrigger 的轮询器。CronTrigger 实例是不可变的,因此一旦构建就无法更改。考虑一个用例:我们希望使用 cron 表达式每小时触发一次轮询,但如果未收到消息,则每分钟轮询一次,并在检索到消息时恢复使用 cron 表达式。
建议(以及轮询器)为此使用了 CompoundTrigger。该触发器的 primary 触发器可以是 CronTrigger。当建议检测到没有收到消息时,它会将次要触发器添加到 CompoundTrigger 中。当调用 CompoundTrigger 实例的 nextExecutionTime 方法时,如果存在次要触发器,则委托给次要触发器;否则,委托给主要触发器。
轮询器也必须引用同一个 CompoundTrigger。
以下示例展示了每小时定时任务表达式的配置,并设置了每分钟作为备用方案:
<int:inbound-channel-adapter channel="nullChannel" auto-startup="false">
<bean class="org.springframework.integration.endpoint.PollerAdviceTests.Source" />
<int:poller trigger="compoundTrigger">
<int:advice-chain>
<bean class="org.springframework.integration.aop.CompoundTriggerAdvice">
<constructor-arg ref="compoundTrigger"/>
<constructor-arg ref="secondary"/>
</bean>
</int:advice-chain>
</int:poller>
</int:inbound-channel-adapter>
<bean id="compoundTrigger" class="org.springframework.integration.util.CompoundTrigger">
<constructor-arg ref="primary" />
</bean>
<bean id="primary" class="org.springframework.scheduling.support.CronTrigger">
<constructor-arg value="0 0 * * * *" /> <!-- top of every hour -->
</bean>
<bean id="secondary" class="org.springframework.scheduling.support.PeriodicTrigger">
<constructor-arg value="60000" />
</bean>
重要提示:异步移交
CompoundTriggerAdvice 会根据 receive() 的结果来修改触发器。这仅在建议在轮询器线程上调用时才有效。如果轮询器配置了 task-executor,则此功能无效。若希望在轮询结果后执行异步操作并使用此建议,应在后续阶段进行异步移交,例如通过使用 ExecutorChannel 来实现。
仅限 MessageSource 的建议
某些建议可能仅适用于 MessageSource.receive(),而对于 PollableChannel 则没有意义。为此,MessageSourceMutator 接口(ReceiveMessageAdvice 的扩展)仍然存在。更多信息请参阅入站通道适配器:轮询多个服务器和目录。