幂等接收器 企业集成模式
从 4.1 版本开始,Spring Integration 提供了 幂等接收者企业集成模式的实现。它是一个功能模式,所有的幂等逻辑都应该在应用程序中实现。然而,为了简化决策,提供了 IdempotentReceiverInterceptor 组件。这是一个 AOP Advice,应用于 MessageHandler.handleMessage() 方法,并且可以根据其配置 过滤 请求消息或将其标记为 重复。
之前,你可以通过在 <filter/>(见 过滤器)中使用自定义的 MessageSelector 来实现这种模式。然而,由于这种模式实际上定义的是端点的行为而不是作为一个端点本身,幂等接收者实现并不提供端点组件。相反,它被应用到应用程序中声明的端点上。
IdempotentReceiverInterceptor 的逻辑基于提供的 MessageSelector,如果消息未被该选择器接受,则会用 duplicateMessage 头设置为 true 进行丰富。目标 MessageHandler(或下游流程)可以查询此头以实现正确的幂等性逻辑。如果 IdempotentReceiverInterceptor 配置了 discardChannel 或 throwExceptionOnRejection = true,重复的消息将不会发送到目标 MessageHandler.handleMessage()。相反,它会被丢弃。如果你想丢弃(不对)重复消息,应将 discardChannel 配置为 NullChannel,例如默认的 nullChannel bean。
为了在消息之间保持状态并提供比较消息以确保幂等性的能力,我们提供了 MetadataStoreSelector。它接受一个 MessageProcessor 实现(根据 Message 创建查找键)和一个可选的 ConcurrentMetadataStore (元数据存储)。有关更多信息,请参阅 MetadataStoreSelector Javadoc。您还可以通过使用额外的 MessageProcessor 来自定义 ConcurrentMetadataStore 的 value。默认情况下,MetadataStoreSelector 使用 timestamp 消息头。
通常情况下,如果没有现有键值,选择器会选择一条消息进行接受。在某些情况下,比较键的当前值和新值以确定是否应该接受该消息是很有用的。从版本 5.3 开始,提供了 compareValues 属性,它引用了一个 BiPredicate<String, String>;第一个参数是旧值;返回 true 表示接受该消息,并用新值替换 MetadataStore 中的旧值。这有助于减少键的数量;例如,在处理文件中的行时,可以将文件名存储为键,将当前行号存储为值。然后,在重新启动后,您可以跳过已经处理过的行。有关示例,请参阅幂等下游处理拆分文件。
为了方便起见,MetadataStoreSelector 选项可以直接在 <idempotent-receiver> 组件上进行配置。以下列表显示了所有可能的属性:
<idempotent-receiver
        id=""  // <1>
        endpoint=""  // <2>
        selector=""  // <3>
        discard-channel=""  // <4>
        metadata-store=""  // <5>
        key-strategy=""  // <6>
        key-expression=""  // <7>
        value-strategy=""  // <8>
        value-expression=""  // <9>
        compare-values="" // <10>
        throw-exception-on-rejection="" />  // <11>
- IdempotentReceiverInterceptorbean 的 ID。可选。
- 应用此拦截器的消费者端点名称或模式。使用逗号( - ,)分隔名称(模式),例如- endpoint="aaa, bbb*, **ccc, *ddd**, eee*fff"。然后使用这些模式匹配的端点 bean 名称来检索目标端点的- MessageHandlerbean(使用其- .handler后缀),并将- IdempotentReceiverInterceptor应用于这些 bean。必填。
- 一个 - MessageSelectorbean 引用。与- metadata-store和- key-strategy (key-expression)互斥。当未提供- selector时,- key-strategy或- key-strategy-expression中的一个是必需的。
- 标识当 - IdempotentReceiverInterceptor不接受消息时发送消息的通道。如果省略,则重复的消息将带有- duplicateMessage头转发给处理器。可选。
- 一个 - ConcurrentMetadataStore引用。由底层的- MetadataStoreSelector使用。与- selector互斥。可选。默认的- MetadataStoreSelector使用内部的- SimpleMetadataStore,该存储不会在应用程序执行之间保持状态。
- 一个 - MessageProcessor引用。由底层的- MetadataStoreSelector使用。从请求消息中评估- idempotentKey。与- selector和- key-expression互斥。当未提供- selector时,- key-strategy或- key-strategy-expression中的一个是必需的。
- 一个 SpEL 表达式,用于填充 - ExpressionEvaluatingMessageProcessor。由底层的- MetadataStoreSelector使用。使用请求消息作为评估上下文根对象来评估- idempotentKey。与- selector和- key-strategy互斥。当未提供- selector时,- key-strategy或- key-strategy-expression中的一个是必需的。
- 一个 - MessageProcessor引用。由底层的- MetadataStoreSelector使用。从请求消息中评估- idempotentKey的- value。与- selector和- value-expression互斥。默认情况下,'MetadataStoreSelector' 使用 'timestamp' 消息头作为元数据 'value'。
- 一个 SpEL 表达式,用于填充 - ExpressionEvaluatingMessageProcessor。由底层的- MetadataStoreSelector使用。使用请求消息作为评估上下文根对象来评估- idempotentKey的- value。与- selector和- value-strategy互斥。默认情况下,'MetadataStoreSelector' 使用 'timestamp' 消息头作为元数据 'value'。
- 一个引用 - BiPredicate<String, String>bean 的引用,允许您通过比较键的新旧值来选择性地选择消息;默认为- null。
- 如果 - IdempotentReceiverInterceptor拒绝消息是否抛出异常。默认为- false。无论是否提供了- discard-channel,都会应用此设置。
对于 Java 配置,Spring Integration 提供了方法级别的 @IdempotentReceiver 注解。它用于标记带有消息注解(@ServiceActivator、@Router 等)的 method,以指定哪些 IdempotentReceiverInterceptor 对象应用于此端点。以下示例展示了如何使用 @IdempotentReceiver 注解:
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
   return new IdempotentReceiverInterceptor(new MetadataStoreSelector(m ->
                                                    m.getHeaders().get(INVOICE_NBR_HEADER)));
}
@Bean
@ServiceActivator(inputChannel = "input", outputChannel = "output")
@IdempotentReceiver("idempotentReceiverInterceptor")
public MessageHandler myService() {
    ....
}
当你使用 Java DSL 时,你可以将拦截器添加到端点的建议链中,如下例所示:
@Bean
public IntegrationFlow flow() {
    ...
        .handle("someBean", "someMethod",
            e -> e.advice(idempotentReceiverInterceptor()))
    ...
}
IdempotentReceiverInterceptor 仅针对 MessageHandler.handleMessage(Message<?>) 方法设计。从 4.3.1 版本开始,它实现了 HandleMessageAdvice,以 AbstractHandleMessageAdvice 作为基类,以便更好地解耦。更多信息,请参阅 处理消息建议。