延迟器
一个延迟器是一个简单的端点,允许消息流被延迟一定的间隔。当消息被延迟时,原始发送者不会阻塞。相反,延迟的消息会通过 org.springframework.scheduling.TaskScheduler
的实例进行调度,在延迟时间过后再发送到输出通道。这种方法即使对于较长的延迟也是可扩展的,因为它不会导致大量发送线程被阻塞。相反,在典型情况下,会使用线程池来实际执行释放消息的操作。本节包含多个配置延迟器的示例。
配置 Delayer
<delayer>
元素用于延迟两个消息通道之间的消息流。与其他端点一样,你可以提供 'input-channel' 和 'output-channel' 属性,但延迟器还具有 'default-delay' 和 'expression' 属性(以及 'expression' 元素),用于确定每个消息应被延迟的毫秒数。以下示例将所有消息延迟三秒:
<int:delayer id="delayer" input-channel="input"
default-delay="3000" output-channel="output"/>
如果你想确定每条消息的延迟,你也可以通过使用 'expression' 属性提供 SpEL 表达式,如下所示的表达式所示:
- Java DSL
- Kotlin DSL
- Java
- XML
@Bean
public IntegrationFlow flow() {
return IntegrationFlow.from("input")
.delay(d -> d
.messageGroupId("delayer.messageGroupId")
.defaultDelay(3_000L)
.delayExpression("headers['delay']"))
.channel("output")
.get();
}
@Bean
fun flow() =
integrationFlow("input") {
delay {
messageGroupId("delayer.messageGroupId")
defaultDelay(3000L)
delayExpression("headers['delay']")
}
channel("output")
}
@ServiceActivator(inputChannel = "input")
@Bean
public DelayHandler delayer() {
DelayHandler handler = new DelayHandler("delayer.messageGroupId");
handler.setDefaultDelay(3_000L);
handler.setDelayExpressionString("headers['delay']");
handler.setOutputChannelName("output");
return handler;
}
<int:delayer id="delayer" input-channel="input" output-channel="output"
default-delay="3000" expression="headers['delay']"/>
在前面的例子中,三秒的延迟仅适用于给定的入站消息表达式求值为 null 时。如果你想只对表达式求值有有效结果的消息应用延迟,你可以使用 0
的 'default-delay'(默认值)。对于任何延迟为 0
(或更少)的消息,消息会立即被发送,在调用线程上。
XML 解析器使用消息组 ID 为 <beanName>.messageGroupId
。
延迟处理器支持表达式求值结果,该结果表示以毫秒为单位的时间间隔(任何 Object
的 toString()
方法生成的值可以解析为一个 Long
),以及表示绝对时间的 java.util.Date
实例。在第一种情况下,毫秒数从当前时间开始计算(例如,值为 5000
会将消息延迟至少五秒,从延迟器接收到消息的时间开始)。对于 Date
实例,消息不会被释放,直到该 Date
对象所表示的时间。如果计算出的值等于非正延迟或过去的日期,则不会产生延迟。相反,它会在原始发送者的线程上直接发送到输出通道。如果表达式求值结果不是 Date
并且不能解析为 Long
,则应用默认延迟(如果有——默认是 0
)。
表达式评估可能会因为各种原因抛出评估异常,包括无效的表达式或其他条件。默认情况下,这些异常会被忽略(尽管会在 DEBUG 级别记录),并且延迟器会回退到默认延迟(如果有的话)。您可以通过设置 ignore-expression-failures
属性来修改此行为。默认情况下,此属性设置为 true
,延迟器的行为如前所述。但是,如果您不希望忽略表达式评估异常并将其抛给延迟器的调用者,则将 ignore-expression-failures
属性设置为 false
。
在前面的例子中,延迟表达式被指定为 headers['delay']
。这是 SpEL Indexer
语法,用于访问 Map
元素(MessageHeaders
实现了 Map
)。它调用:headers.get("delay")
。对于简单的映射元素名称(不包含 '.'),你也可以使用 SpEL 的“点访问器”语法,其中前面显示的头表达式可以指定为 headers.delay
。但是,如果缺少头信息,则会得到不同的结果。在第一种情况下,表达式求值为 null
。第二种情况会导致类似以下的结果:
org.springframework.expression.spel.SpelEvaluationException: EL1008E:(pos 8):
Field or property 'delay' cannot be found on object of type 'org.springframework.messaging.MessageHeaders'
因此,如果有可能省略头信息,并且你希望回退到默认延迟,通常更有效(也推荐)使用索引器语法而不是点属性访问器语法,因为检测 null
比捕获异常更快。
延迟器委托给 Spring 的 TaskScheduler
抽象的一个实例。延迟器使用的默认调度器是 Spring Integration 在启动时提供的 ThreadPoolTaskScheduler
实例。有关详细信息,请参阅配置任务调度器。如果您想委托给不同的调度器,可以通过延迟器元素的 'scheduler' 属性提供一个引用,如下例所示:
<int:delayer id="delayer" input-channel="input" output-channel="output"
expression="headers.delay"
scheduler="exampleTaskScheduler"/>
<task:scheduler id="exampleTaskScheduler" pool-size="3"/>
如果你配置了一个外部的 ThreadPoolTaskScheduler
,你可以在该属性上设置 waitForTasksToCompleteOnShutdown = true
。它允许在应用程序关闭时,已经处于执行状态(释放消息)的“延迟”任务成功完成。在 Spring Integration 2.2 之前,此属性在 <delayer>
元素上可用,因为 DelayHandler
可以在后台创建自己的调度器。自 2.2 版本以来,延迟处理器需要一个外部调度器实例,并且删除了 waitForTasksToCompleteOnShutdown
。你应该使用调度器自身的配置。
ThreadPoolTaskScheduler
有一个属性 errorHandler
,可以注入 org.springframework.util.ErrorHandler
的某些实现。此处理器允许处理来自调度任务线程发送延迟消息时的 Exception
。默认情况下,它使用 org.springframework.scheduling.support.TaskUtils$LoggingErrorHandler
,因此您可以在日志中看到堆栈跟踪。您可能需要考虑使用 org.springframework.integration.channel.MessagePublishingErrorHandler
,它会将 ErrorMessage
发送到失败消息头中的 error-channel
或默认的 error-channel
。此错误处理在事务回滚后(如果存在)执行。请参阅释放失败。
延迟器和消息存储
DelayHandler
将延迟消息持久化到提供的 MessageStore
中的消息组中。('groupId' 是基于 <delayer>
元素所需的 'id' 属性。另请参阅 DelayHandler.setMessageGroupId(String)
。)在 DelayHandler
将消息发送到 output-channel
之前,计划任务会立即将延迟消息从 MessageStore
中移除。如果提供的 MessageStore
是持久化的(例如 JdbcMessageStore
),它提供了在应用程序关闭时不会丢失消息的能力。应用程序启动后,DelayHandler
会从 MessageStore
中的消息组读取消息,并根据消息的原始到达时间重新安排延迟(如果延迟是数字)。对于延迟头是 Date
的消息,在重新调度时使用该 Date
。如果延迟消息在 MessageStore
中停留的时间超过了其 'delay',那么在启动后它将立即被发送。messageGroupId
是必需的,不能依赖可以生成的 DelayHandler
bean 名称。这样,在应用程序重新启动后,DelayHandler
可能会获得一个新的生成的 bean 名称。因此,由于它们的组不再由应用程序管理,延迟消息可能会在重新调度时丢失。
<delayer>
可以用两个互斥元素中的一个来丰富:<transactional>
和 <advice-chain>
。这些 AOP 建议的 List
会应用到代理的内部 DelayHandler.ReleaseMessageHandler
,其职责是在延迟之后,在调度任务的 Thread
上释放消息。例如,当下游消息流抛出异常且 ReleaseMessageHandler
的事务被回滚时,可能会使用它。在这种情况下,延迟的消息会保留在持久的 MessageStore
中。你可以在 <advice-chain>
内使用任何自定义的 org.aopalliance.aop.Advice
实现。<transactional>
元素定义了一个只有事务性建议的简单建议链。以下示例显示了 <delayer>
内的一个 advice-chain
:
<int:delayer id="delayer" input-channel="input" output-channel="output"
expression="headers.delay"
message-store="jdbcMessageStore">
<int:advice-chain>
<beans:ref bean="customAdviceBean"/>
<tx:advice>
<tx:attributes>
<tx:method name="*" read-only="true"/>
</tx:attributes>
</tx:advice>
</int:advice-chain>
</int:delayer>
DelayHandler
可以导出为 JMX MBean
,带有管理操作 (getDelayedMessageCount
和 reschedulePersistedMessages
),这允许在运行时重新调度延迟的持久消息 —— 例如,如果 TaskScheduler
之前已被停止。可以通过 Control Bus
命令调用这些操作,如下例所示:
Message<String> delayerReschedulingMessage =
MessageBuilder.withPayload("'delayer.handler'.reschedulePersistedMessages").build();
controlBusChannel.send(delayerReschedulingMessage);
有关消息存储、JMX 和控制总线的更多信息,请参阅 系统管理。
从版本 5.3.7 开始,如果在消息存储到 MessageStore
时有一个活跃的事务,则释放任务会在 TransactionSynchronization.afterCommit()
回调中调度。这是必要的,以防止竞争条件,在这种情况下,计划的释放可能会在事务提交之前运行,并且找不到消息。在这种情况下,消息将在延迟后或事务提交后发布,以较晚的时间为准。
发布失败
从版本 5.0.8 开始,delayer 上有两个新属性:
-
maxAttempts
(默认 5) -
retryDelay
(默认 1 秒)
当消息被释放时,如果下游流程失败,释放将在 retryDelay
后重试。如果达到了 maxAttempts
,消息将被丢弃(除非释放是事务性的,在这种情况下,消息将保留在存储中,但不再安排释放,直到应用程序重新启动,或者调用了 reschedulePersistedMessages()
方法,如上所述)。
此外,你可以配置一个 delayedMessageErrorChannel
;当发布失败时,会将一个 ErrorMessage
发送到该通道,其有效载荷为异常,并具有 originalMessage
属性。ErrorMessage
包含一个标题 IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT
,其中包含当前的尝试次数。
如果错误流消耗了错误消息并正常退出,则不会采取进一步的行动;如果发布是事务性的,事务将提交,并且消息将从存储中删除。如果错误流抛出异常,则发布将被重试最多 maxAttempts
次,如上所述。