跳到主要内容
版本:7.0.2

Delayer

DeepSeek V3 中英对照 Delayer

延迟器是一个简单的端点,它允许消息流延迟一定的时间间隔。当消息被延迟时,原始发送方不会阻塞。相反,延迟的消息会被调度给 org.springframework.scheduling.TaskScheduler 的一个实例,在延迟时间过后发送到输出通道。这种方法即使对于较长的延迟也具有可扩展性,因为它不会导致大量发送方线程被阻塞。相反,在典型情况下,会使用线程池来实际执行消息的释放。本节包含几个配置延迟器的示例。

配置延迟器

<delayer> 元素用于在两个消息通道之间延迟消息流。与其他端点类似,您可以提供 'input-channel' 和 'output-channel' 属性,但 delayer 还具有 'default-delay' 和 'expression' 属性(以及 'expression' 元素),用于确定每条消息应延迟的毫秒数。以下示例将所有消息延迟三秒:

<int:delayer id="delayer" input-channel="input"
default-delay="3000" output-channel="output"/>

如果需要确定每条消息的延迟时间,你也可以通过使用 expression 属性来提供 SpEL 表达式,如下所示:

@Bean
public IntegrationFlow flow() {
return IntegrationFlow.from("input")
.delay(d -> d
.messageGroupId("delayer.messageGroupId")
.defaultDelay(3_000L)
.delayExpression("headers['delay']"))
.channel("output")
.get();
}

在前面的例子中,三秒延迟仅适用于入站消息的表达式求值结果为 null 的情况。如果只想对表达式求值结果有效的消息应用延迟,可以使用 0(默认值)作为 default-delay。对于延迟为 0(或更小)的任何消息,消息将在调用线程上立即发送。

备注

XML 解析器使用的消息组 ID 为 <beanName>.messageGroupId

提示

延迟处理器支持表达式求值结果为表示毫秒间隔(任何 Object,其 toString() 方法产生的值可解析为 Long)以及表示绝对时间的 java.util.Date 实例。第一种情况下,毫秒数从当前时间开始计算,例如,值为 5000 将使消息从被延迟器接收时起至少延迟五秒。对于 Date 实例,消息直到该 Date 对象表示的时间才会被释放。若值等价于非正延迟或过去的日期,则不会产生延迟,而是直接在原始发送者的线程上将消息发送到输出通道。如果表达式求值结果不是 Date 且无法解析为 Long,则应用默认延迟(如果有的话——默认为 0)。

important

表达式求值可能因各种原因抛出求值异常,包括无效表达式或其他条件。默认情况下,此类异常会被忽略(尽管在 DEBUG 级别记录日志),并且延迟器会回退到默认延迟(如果有)。您可以通过设置 ignore-expression-failures 属性来修改此行为。默认情况下,此属性设置为 true,延迟器的行为如前所述。但是,如果您希望不忽略表达式求值异常并将其抛出给延迟器的调用者,请将 ignore-expression-failures 属性设置为 false

提示

在前面的示例中,延迟表达式被指定为 headers['delay']。这是用于访问 Map 元素(MessageHeaders 实现了 Map)的 SpEL Indexer 语法。它调用:headers.get("delay")。对于简单的映射元素名称(不包含 '.'),你也可以使用 SpEL 的“点访问器”语法,前面显示的头部表达式可以指定为 headers.delay。然而,如果头部缺失,结果会有所不同。在第一种情况下,表达式求值为 null。第二种情况会导致类似以下的结果:

org.springframework.expression.spel.SpelEvaluationException: EL1008E:(pos 8):
Field or property 'delay' cannot be found on object of type 'org.springframework.messaging.MessageHeaders'

因此,如果存在头部可能被省略的情况,并且你希望回退到默认延迟,通常更高效(且推荐)使用索引器语法而不是点属性访问器语法,因为检测 null 比捕获异常更快。

延迟器委托给Spring的TaskScheduler抽象的一个实例。延迟器使用的默认调度器是Spring Integration启动时提供的ThreadPoolTaskScheduler实例。请参阅配置任务调度器。如果您希望委托给不同的调度器,可以通过延迟器元素的'scheduler'属性提供引用,如下例所示:

<int:delayer id="delayer" input-channel="input" output-channel="output"
expression="headers.delay"
scheduler="exampleTaskScheduler"/>

<task:scheduler id="exampleTaskScheduler" pool-size="3"/>
提示

如果你配置了一个外部的 ThreadPoolTaskScheduler,你可以在此属性上设置 waitForTasksToCompleteOnShutdown = true。这允许在应用程序关闭时,成功完成那些已处于执行状态的“延迟”任务(释放消息)。在 Spring Integration 2.2 之前,此属性在 <delayer> 元素上可用,因为 DelayHandler 可以在后台创建自己的调度器。自 2.2 版本起,延迟器需要一个外部的调度器实例,因此 waitForTasksToCompleteOnShutdown 被移除。你应该使用调度器自身的配置。

提示

ThreadPoolTaskScheduler 有一个属性 errorHandler,可以注入 org.springframework.util.ErrorHandler 的某些实现。该处理器允许处理来自发送延迟消息的调度任务线程中的 Exception。默认情况下,它使用 org.springframework.scheduling.support.TaskUtils$LoggingErrorHandler,你可以在日志中看到堆栈跟踪。你可能需要考虑使用 org.springframework.integration.channel.MessagePublishingErrorHandler,它会将 ErrorMessage 发送到 error-channel,该通道可以来自失败消息的头部,也可以是默认的 error-channel。此错误处理在事务回滚(如果存在)之后执行。请参阅释放失败

延迟器与消息存储

DelayHandler 将延迟消息持久化存储到指定 MessageStore 的消息组中。('groupId' 基于 <delayer> 元素必需的 'id' 属性生成。另请参阅 DelayHandler.setMessageGroupId(String)。)在 DelayHandler 将消息发送到 output-channel 之前,调度任务会立即从 MessageStore 中移除该延迟消息。如果提供的 MessageStore 是持久化的(例如 JdbcMessageStore),则能确保应用关闭时不会丢失消息。应用启动后,DelayHandler 会从其 MessageStore 的消息组中读取消息,并根据消息原始到达时间重新调度延迟(若延迟值为数值类型)。对于延迟头信息为 Date 类型的消息,重新调度时将使用该 Date。若延迟消息在 MessageStore 中的停留时间已超过其设定的 'delay',则会在启动后立即发送。messageGroupId 是必需参数,不能依赖可能自动生成的 DelayHandler bean 名称。因为应用重启后,DelayHandler 可能会获得新的生成 bean 名称。若如此,延迟消息可能因所属消息组不再受应用管理而在重新调度过程中丢失。

<delayer> 可以通过两个互斥元素之一进行增强:<transactional><advice-chain>。这些 AOP 通知的 List 被应用于代理的内部 DelayHandler.ReleaseMessageHandler,该处理器负责在延迟后,在调度任务的 Thread 上释放消息。例如,当下游消息流抛出异常且 ReleaseMessageHandler 的事务回滚时,可能会用到它。在这种情况下,延迟的消息将保留在持久化的 MessageStore 中。您可以在 <advice-chain> 中使用任何自定义的 org.aopalliance.aop.Advice 实现。<transactional> 元素定义了一个仅包含事务通知的简单通知链。以下示例展示了 <delayer> 中的 advice-chain

<int:delayer id="delayer" input-channel="input" output-channel="output"
expression="headers.delay"
message-store="jdbcMessageStore">
<int:advice-chain>
<beans:ref bean="customAdviceBean"/>
<tx:advice>
<tx:attributes>
<tx:method name="*" read-only="true"/>
</tx:attributes>
</tx:advice>
</int:advice-chain>
</int:delayer>

DelayHandler 可以作为 JMX MBean 导出,并包含受管操作(getDelayedMessageCountreschedulePersistedMessages),这允许在运行时重新调度延迟的持久化消息——例如,如果 TaskScheduler 之前已停止。这些操作可以通过 Control Bus 命令调用,如下例所示:

Message<String> delayerReschedulingMessage =
MessageBuilder.withPayload("'delayer.handler'.reschedulePersistedMessages").build();
controlBusChannel.send(delayerReschedulingMessage);
备注

有关消息存储、JMX 和控制总线的更多信息,请参阅系统管理

从版本5.3.7开始,当消息存储到 MessageStore 时若存在活跃事务,释放任务会在 TransactionSynchronization.afterCommit() 回调中调度。这是为了避免竞态条件——即预定的释放操作可能在事务提交前执行,导致消息无法被找到。在这种情况下,消息将在延迟时间后或事务提交后释放(以较晚发生者为准)。

发布失败

从 5.0.8 版本开始,延迟器新增了两个属性:

  • maxAttempts(默认 5 次)

  • retryDelay(默认 1 秒)

当消息被释放时,如果下游流程失败,系统将在 retryDelay 后尝试重新释放。如果达到 maxAttempts 次数,消息将被丢弃(除非释放是事务性的,在这种情况下,消息将保留在存储中,但不再被安排释放,直到应用程序重新启动,或调用 reschedulePersistedMessages() 方法,如上所述)。

此外,您可以配置一个 delayedMessageErrorChannel;当发布失败时,一个 ErrorMessage 会被发送到该通道,其有效负载为异常,并包含 originalMessage 属性。该 ErrorMessage 包含一个标头 IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT,其中记录了当前的尝试次数。

如果错误流消费了错误消息并正常退出,则不会采取进一步操作;如果发布是事务性的,事务将被提交,消息将从存储中删除。如果错误流抛出异常,发布将重试最多 maxAttempts 次,如上所述。