幂等接收器企业集成模式
从 4.1 版本开始,Spring Integration 提供了幂等接收器企业集成模式的实现。这是一种功能模式,整个幂等逻辑应在应用程序中实现。然而,为了简化决策过程,我们提供了 IdempotentReceiverInterceptor 组件。这是一个应用于 MessageHandler.handleMessage() 方法的 AOP Advice,它可以根据其配置过滤请求消息或将其标记为重复。
以前,您可以通过在 <filter/> 中使用自定义的 MessageSelector 来实现此模式(参见过滤器)。然而,由于此模式实际上定义的是端点行为而非端点本身,幂等接收器实现并不提供端点组件。相反,它被应用于应用程序中声明的端点。
IdempotentReceiverInterceptor 的逻辑基于提供的 MessageSelector,如果消息未被该选择器接受,则会为其添加 duplicateMessage 标头并设置为 true。目标 MessageHandler(或下游流)可以检查此标头以实现正确的幂等逻辑。如果 IdempotentReceiverInterceptor 配置了 discardChannel 或 throwExceptionOnRejection = true,重复消息将不会发送到目标 MessageHandler.handleMessage(),而是会被丢弃。若希望丢弃重复消息(不进行任何处理),应将 discardChannel 配置为 NullChannel,例如默认的 nullChannel bean。
为了在消息之间保持状态并提供比较消息以实现幂等性的能力,我们提供了 MetadataStoreSelector。它接受一个 MessageProcessor 实现(该实现基于 Message 创建查找键)和一个可选的 ConcurrentMetadataStore(元数据存储)。更多信息请参阅 MetadataStoreSelector Javadoc。您还可以通过使用额外的 MessageProcessor 来自定义 ConcurrentMetadataStore 的 value。默认情况下,MetadataStoreSelector 使用消息头中的 timestamp。
通常情况下,如果键不存在现有值,选择器会选择接受消息。在某些情况下,比较键的当前值和新值以确定是否应接受消息会很有用。从版本5.3开始,提供了 compareValues 属性,它引用一个 BiPredicate<String, String>;第一个参数是旧值;返回 true 以接受消息,并在 MetadataStore 中用新值替换旧值。这有助于减少键的数量;例如,在处理文件中的行时,可以将文件名存储在键中,将当前行号存储在值中。这样,在重启后,可以跳过已处理的行。有关示例,请参阅幂等下游处理拆分文件。
为了方便,MetadataStoreSelector 的选项可以直接在 <idempotent-receiver> 组件上进行配置。以下列表展示了所有可能的属性:
<idempotent-receiver
id="" // <1>
endpoint="" // <2>
selector="" // <3>
discard-channel="" // <4>
metadata-store="" // <5>
key-strategy="" // <6>
key-expression="" // <7>
value-strategy="" // <8>
value-expression="" // <9>
compare-values="" // <10>
throw-exception-on-rejection="" /> // <11>
IdempotentReceiverInterceptorBean 的 ID。可选。应用此拦截器的消费者端点名称或模式。使用逗号 (
,) 分隔名称(模式),例如endpoint="aaa, bbb*, **ccc, *ddd**, eee*fff"。匹配这些模式的端点 Bean 名称随后将用于检索目标端点的MessageHandlerBean(使用其.handler后缀),并将IdempotentReceiverInterceptor应用于这些 Bean。必需。MessageSelectorBean 引用。与metadata-store和key-strategy (key-expression)互斥。未提供selector时,需要提供key-strategy或key-strategy-expression其中之一。标识当
IdempotentReceiverInterceptor不接受消息时,将消息发送到的通道。省略时,重复消息将转发给带有duplicateMessage头的处理器。可选。ConcurrentMetadataStore引用。由底层的MetadataStoreSelector使用。与selector互斥。可选。默认的MetadataStoreSelector使用内部的SimpleMetadataStore,该存储不会在应用程序执行之间维护状态。MessageProcessor引用。由底层的MetadataStoreSelector使用。从请求消息中评估idempotentKey。与selector和key-expression互斥。未提供selector时,需要提供key-strategy或key-strategy-expression其中之一。用于填充
ExpressionEvaluatingMessageProcessor的 SpEL 表达式。由底层的MetadataStoreSelector使用。通过将请求消息作为评估上下文根对象来评估idempotentKey。与selector和key-strategy互斥。未提供selector时,需要提供key-strategy或key-strategy-expression其中之一。MessageProcessor引用。由底层的MetadataStoreSelector使用。从请求消息中为idempotentKey评估一个value。与selector和value-expression互斥。默认情况下,MetadataStoreSelector使用消息头timestamp作为元数据value。用于填充
ExpressionEvaluatingMessageProcessor的 SpEL 表达式。由底层的MetadataStoreSelector使用。通过将请求消息作为评估上下文根对象来为idempotentKey评估一个value。与selector和value-strategy互斥。默认情况下,MetadataStoreSelector使用消息头timestamp作为元数据value。对
BiPredicate<String, String>Bean 的引用,允许您通过比较键的旧值和新值来可选地选择消息;默认为null。当
IdempotentReceiverInterceptor拒绝消息时是否抛出异常。默认为false。无论是否提供了discard-channel,此设置均适用。
对于Java配置,Spring Integration提供了方法级别的@IdempotentReceiver注解。它用于标记带有消息注解(@ServiceActivator、@Router等)的method,以指定哪些IdempotentReceiverInterceptor对象应用于此端点。以下示例展示了如何使用@IdempotentReceiver注解:
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(new MetadataStoreSelector(m ->
m.getHeaders().get(INVOICE_NBR_HEADER)));
}
@Bean
@ServiceActivator(inputChannel = "input", outputChannel = "output")
@IdempotentReceiver("idempotentReceiverInterceptor")
public MessageHandler myService() {
....
}
在使用 Java DSL 时,你可以将拦截器添加到端点的通知链中,如下例所示:
@Bean
public IntegrationFlow flow() {
...
.handle("someBean", "someMethod",
e -> e.advice(idempotentReceiverInterceptor()))
...
}
IdempotentReceiverInterceptor 仅设计用于 MessageHandler.handleMessage(Message<?>) 方法。从版本 4.3.1 开始,它实现了 HandleMessageAdvice,并以 AbstractHandleMessageAdvice 作为基类,以实现更好的解耦。更多信息请参阅处理消息通知。