跳到主要内容

消息

QWen Plus 中英对照 Message

Spring Integration Message 是一个通用的数据容器。任何对象都可以作为有效负载提供,每个 Message 实例都包含标题,其中含有用户可扩展的属性,以键值对的形式存在。

Message 接口

下面的列表显示了 Message 接口的定义:

public interface Message<T> {

T getPayload();

MessageHeaders getHeaders();

}
java

Message 接口是 API 的核心部分。通过将数据封装在一个通用的包装器中,消息系统可以在不了解数据类型的情况下传递它。当应用程序演进以支持新类型或当类型本身被修改或扩展时,消息系统不会受到影响。另一方面,当消息系统中的某些组件确实需要访问有关 Message 的信息时,此类元数据通常可以存储到消息头的元数据中并从中检索。

消息头

就像 Spring Integration 允许任何 Object 用作 Message 的有效载荷一样,它也支持任何 Object 类型作为头值。事实上,MessageHeaders 类实现了 java.util.Map 接口,如下类定义所示:

public final class MessageHeaders implements Map<String, Object>, Serializable {
...
}
java
备注

虽然 MessageHeaders 类实现了 Map,但实际上它是一个只读实现。任何尝试向 Map 中 put 值的操作都会导致 UnsupportedOperationException。同样的规则也适用于 removeclear。由于消息可能会传递给多个消费者,因此 Map 的结构不能被修改。同样地,在初次创建后,消息的有效载荷 Object 也不能被 set。然而,头部值本身的可变性(或有效载荷对象)故意留给框架用户来决定。

作为 Map 的实现,可以通过调用带有头部名称的 get(..) 来获取头部信息。或者,你可以提供预期的 Class 作为附加参数。更好的是,在检索预定义值之一时,有便捷的 getter 方法可用。以下示例展示了这三种选项:

Object someValue = message.getHeaders().get("someKey");

CustomerId customerId = message.getHeaders().get("customerId", CustomerId.class);

Long timestamp = message.getHeaders().getTimestamp();
java

下表描述了预定义的消息头:

表 1. 预定义的消息标头

Header 名称Header 类型用法
MessageHeaders.IDjava.util.UUID此消息实例的标识符。每次消息被修改时都会改变。
MessageHeaders.
TIMESTAMP
java.lang.Long消息创建的时间。每次消息被修改时都会改变。
MessageHeaders.
REPLY_CHANNEL
java.lang.Object
(String 或
MessageChannel)
当没有配置显式输出通道且没有 ROUTING_SLIPROUTING_SLIP 已耗尽时,回复(如果有)发送到的通道。如果值是 String,它必须表示一个 bean 名称或由 ChannelRegistry 生成。
MessageHeaders.
ERROR_CHANNEL
java.lang.Object
(String 或
MessageChannel)
错误发送到的通道。如果值是 String,它必须表示一个 bean 名称或由 ChannelRegistry 生成。

许多入站和出站适配器实现也提供或期望某些标头,您可以配置额外的用户定义标头。这些标头的常量可以在存在此类标头的模块中找到——例如 AmqpHeadersJmsHeaders 等。

MessageHeaderAccessor API

从 Spring Framework 4.0 和 Spring Integration 4.0 开始,核心消息抽象已移动到 spring-messaging 模块,并引入了 MessageHeaderAccessor API 以提供对消息实现的额外抽象。所有(核心)Spring Integration 特定的消息头常量现在都在 IntegrationMessageHeaderAccessor 类中声明。下表描述了预定义的消息头:

表 2. 预定义的消息头

Header 名称Header 类型用法
IntegrationMessageHeaderAccessor.
CORRELATION_ID
java.lang.Object用于关联两条或多条消息。
IntegrationMessageHeaderAccessor.
SEQUENCE_NUMBER
java.lang.Integer通常是一个消息组中的序列号,带有一个 SEQUENCE_SIZE,但也可以在 <resequencer/> 中使用,以重新排序一组无界的消息。
IntegrationMessageHeaderAccessor.
SEQUENCE_SIZE
java.lang.Integer一个相关消息组中的消息数量。
IntegrationMessageHeaderAccessor.
EXPIRATION_DATE
java.lang.Long表示消息过期的时间。框架本身不直接使用它,但可以通过带有头富化器设置,并在一个配置了 UnexpiredMessageSelector<filter/> 中使用。
IntegrationMessageHeaderAccessor.
PRIORITY
java.lang.Integer消息优先级 — 例如,在 PriorityChannel 中。
IntegrationMessageHeaderAccessor.
DUPLICATE_MESSAGE
java.lang.Boolean如果一条消息被幂等接收拦截器检测为重复,则为 True。请参阅 幂等接收器企业集成模式
IntegrationMessageHeaderAccessor.
CLOSEABLE_RESOURCE
java.io.Closeable如果消息与一个应在消息处理完成后关闭的 Closeable 关联,则此头存在。例如,通过 FTP、SFTP 等进行流式文件传输时关联的 Session
IntegrationMessageHeaderAccessor.
DELIVERY_ATTEMPT
java.lang.
AtomicInteger
如果消息驱动通道适配器支持 RetryTemplate 的配置,则此头包含当前的投递尝试次数。
IntegrationMessageHeaderAccessor.
ACKNOWLEDGMENT_CALLBACK
o.s.i.support.
Acknowledgment
Callback
如果入站端点支持,可以回调以接受、拒绝或重新排队消息。请参阅 延迟确认轮询消息源MQTT 手动确认

IntegrationMessageHeaderAccessor 类为其中一些标头提供了方便的类型化获取方法,如下例所示:

IntegrationMessageHeaderAccessor accessor = new IntegrationMessageHeaderAccessor(message);
int sequenceNumber = accessor.getSequenceNumber();
Object correlationId = accessor.getCorrelationId();
...
java

下表描述了在 IntegrationMessageHeaderAccessor 中也会出现的标题,但通常不由用户代码使用(也就是说,它们通常由 Spring Integration 的内部部分使用——将它们包含在这里是为了完整性):

表 3. 预定义的消息标头

Header 名称Header 类型用法
IntegrationMessageHeaderAccessor.
SEQUENCE_DETAILS
java.util.
List<List<Object>>
一个相关数据堆栈,在需要嵌套相关时使用(例如,splitter→…​→splitter→…​→aggregator→…​→aggregator)。
IntegrationMessageHeaderAccessor.
ROUTING_SLIP
java.util.
Map<List<Object>, Integer>
请参阅 Routing Slip

消息 ID 生成

当消息在应用程序中传输时,每次它被修改(例如,通过转换器)都会分配一个新的消息 ID。消息 ID 是一个 UUID。从 Spring Integration 3.0 开始,默认使用的 IS 生成策略比之前的 java.util.UUID.randomUUID() 实现更高效。它使用基于安全随机种子的简单随机数,而不是每次创建一个安全随机数。

可以通过在应用程序上下文中声明一个实现了 org.springframework.util.IdGenerator 的 bean 来选择不同的 UUID 生成策略。

important

在一个类加载器中只能使用一种 UUID 生成策略。这意味着,如果两个或多个应用程序上下文在同一个类加载器中运行,它们共享相同的策略。如果其中一个上下文更改了策略,则所有上下文都将使用该策略。如果在同一个类加载器中的两个或多个上下文中声明了类型为 org.springframework.util.IdGenerator 的 bean,它们必须都是同一类的实例。否则,尝试替换自定义策略的上下文将无法初始化。如果策略相同但已参数化,则使用第一个初始化的上下文中的策略。

除了默认策略外,还提供了两个额外的 IdGeneratorsorg.springframework.util.JdkIdGenerator 使用之前的 UUID.randomUUID() 机制。当不需要 UUID 而一个简单的递增值就足够时,你可以使用 o.s.i.support.IdGenerators.SimpleIncrementingIdGenerator

只读标题

MessageHeaders.IDMessageHeaders.TIMESTAMP 是只读标题,无法被覆盖。

自从 4.3.2 版本以来,MessageBuilder 提供了 readOnlyHeaders(String…​ readOnlyHeaders) API 来自定义不应从上游 Message 复制的一组头信息。默认情况下,只有 MessageHeaders.IDMessageHeaders.TIMESTAMP 是只读的。全局 spring.integration.readOnly.headers 属性(见 Global Properties)用于自定义框架组件的 DefaultMessageBuilderFactory。当您不希望填充某些现成的头信息时,例如 ObjectToJsonTransformercontentType(见 JSON Transformers),这可能会很有用。

当你尝试使用 MessageBuilder 构建新消息时,这种标题会被忽略,并会发出特定的 INFO 消息到日志。

从 5.0 版本开始,Messaging GatewayHeader EnricherContent EnricherHeader Filter 在使用 DefaultMessageBuilderFactory 时,不允许配置 MessageHeaders.IDMessageHeaders.TIMESTAMP 头名称,并且会抛出 BeanInitializationException

Header 传播

当消息被消息生成端点(如 服务激活器)处理(和修改)时,通常情况下,入站消息头会被传播到出站消息。一个例外是 转换器,当一个完整的消息返回给框架时。在这种情况下,用户代码负责整个出站消息。当转换器只返回消息体时,入站消息头会被传播。此外,只有当出站消息中不存在该消息头时,才会传播该消息头,这使得你可以根据需要更改消息头的值。

从 4.3.10 版本开始,您可以配置消息处理程序(修改消息并生成输出)以抑制特定头信息的传播。要配置您不希望被复制的头信息,可以在 MessageProducingMessageHandler 抽象类上调用 setNotPropagatedHeaders()addNotPropagatedHeaders() 方法。

您也可以通过在 META-INF/spring.integration.properties 中将 readOnlyHeaders 属性设置为以逗号分隔的头列表,来全局抑制特定消息头的传播。

从 5.0 版本开始,AbstractMessageProducingHandler 上的 setNotPropagatedHeaders() 实现应用简单模式 (xxx*, *xxx, *xxx*, 或 xxx*yyy) 来允许过滤具有共同后缀或前缀的头信息。更多信息请参阅 PatternMatchUtils Javadoc。当其中一个模式为 *(星号)时,没有头信息会被传播。所有其他模式将被忽略。在这种情况下,服务激活器的行为与转换器相同,任何必要的头信息必须在从服务方法返回的 Message 中提供。notPropagatedHeaders() 选项在 Java DSL 的 ConsumerEndpointSpec 中可用。它也作为 not-propagated-headers 属性在 <service-activator> 组件的 XML 配置中可用。

important

头部传播抑制不适用于那些不修改消息的端点,例如 桥梁路由器

消息实现

Message 接口的基本实现是 GenericMessage<T>,它提供了两个构造函数,如下所示:

new GenericMessage<T>(T payload);

new GenericMessage<T>(T payload, Map<String, Object> headers)
java

当创建一个 Message 时,会生成一个随机的唯一 ID。接受一个头部 Map 的构造函数会将提供的头部复制到新创建的 Message 中。

还提供了一种方便的 Message 实现,用于传达错误情况。这种实现以 Throwable 对象作为其有效载荷,如下例所示:

ErrorMessage message = new ErrorMessage(someThrowable);

Throwable t = message.getPayload();
java

请注意,此实现利用了 GenericMessage 基类是参数化的这一事实。因此,如两个示例所示,在检索 Message 负载 Object 时不需要进行类型转换。

提到的 Message 类实现是不可变的。在某些情况下,当可变性不是问题,并且应用程序的逻辑设计得可以避免并发修改时,可以使用 MutableMessage

MessageBuilder 辅助类

你可能注意到 Message 接口定义了用于获取其有效负载和头部的方法,但没有提供设置方法。这样做的原因是 Message 在初次创建后不能被修改。因此,当一个 Message 实例被发送到多个消费者(例如,通过发布-订阅通道),如果其中一个消费者需要发送具有不同有效负载类型的回复,则必须创建一个新的 Message。因此,其他消费者不会受到这些更改的影响。请记住,多个消费者可能会访问相同的有效负载实例或头部值,并且该实例本身是否不可变由你自己决定。换句话说,Message 实例的契约类似于不可修改的 Collection,而 MessageHeaders 映射进一步体现了这一点。尽管 MessageHeaders 类实现了 java.util.Map,但是对 MessageHeaders 实例调用 put 操作(或 'remove' 或 'clear')的任何尝试都会导致 UnsupportedOperationException

与其要求创建和填充一个 Map 以传递给 GenericMessage 构造函数,Spring Integration 提供了一种更方便的方式来构建消息:MessageBuilderMessageBuilder 提供了两种工厂方法来创建 Message 实例,一种是从现有的 Message 创建,另一种是使用负载 Object 创建。当从现有 Message 构建时,该 Message 的标题和负载将被复制到新的 Message 中,如下例所示:

Message<String> message1 = MessageBuilder.withPayload("test")
.setHeader("foo", "bar")
.build();

Message<String> message2 = MessageBuilder.fromMessage(message1).build();

assertEquals("test", message2.getPayload());
assertEquals("bar", message2.getHeaders().get("foo"));
java

如果你需要创建一个带有新有效负载的 Message,但仍然希望复制现有 Message 的标题,你可以使用其中一个 'copy' 方法,如下例所示:

Message<String> message3 = MessageBuilder.withPayload("test3")
.copyHeaders(message1.getHeaders())
.build();

Message<String> message4 = MessageBuilder.withPayload("test4")
.setHeader("foo", 123)
.copyHeadersIfAbsent(message1.getHeaders())
.build();

assertEquals("bar", message3.getHeaders().get("foo"));
assertEquals(123, message4.getHeaders().get("foo"));
java

请注意,copyHeadersIfAbsent 方法不会覆盖现有值。此外,在前面的例子中,你可以看到如何使用 setHeader 设置任何用户定义的头信息。最后,为预定义的头信息提供了 set 方法,以及一个非破坏性的设置任何头信息的方法(MessageHeaders 也定义了预定义头信息名称的常量)。

你也可以使用 MessageBuilder 来设置消息的优先级,如下例所示:

Message<Integer> importantMessage = MessageBuilder.withPayload(99)
.setPriority(5)
.build();

assertEquals(5, importantMessage.getHeaders().getPriority());

Message<Integer> lessImportantMessage = MessageBuilder.fromMessage(importantMessage)
.setHeaderIfAbsent(IntegrationMessageHeaderAccessor.PRIORITY, 2)
.build();

assertEquals(2, lessImportantMessage.getHeaders().getPriority());
java

priority 标头仅在使用 PriorityChannel 时才会被考虑(如下一章所述)。它被定义为 java.lang.Integer

MutableMessageBuilder 用于处理 MutableMessage 实例。此类的逻辑是创建一个 MutableMessage 或保持不变,并通过构建器方法来变异其内容。这样,在消息交换不需要不可变性时,运行中的应用程序可以获得一定的性能提升。

备注

从 6.4 版本开始,BaseMessageBuilder 类从 MessageBuilder 中抽取出来,以简化对默认消息构建逻辑的扩展。例如,与自定义 MessageBuilderFactory 一起,可以在应用程序上下文中全局使用自定义的 BaseMessageBuilder 实现来提供自定义的 Message 实例。特别是,可以重写 GenericMessage.toString() 方法,以在记录此类消息时隐藏有效负载和标题中的敏感信息。

MessageBuilderFactory 抽象

MessageBuilderFactory bean 与 IntegrationUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAME 全局注册到应用程序上下文中,并在框架的各处用于创建 Message 实例。默认情况下,它是一个 DefaultMessageBuilderFactory 实例。开箱即用,框架还提供了一个 MutableMessageBuilderFactory,以在框架组件中创建 MutableMessage 实例。为了自定义 Message 实例的创建,在目标应用程序上下文中必须提供带有 IntegrationUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAMEMessageBuilderFactory bean 以覆盖默认实例。例如,可以为 BaseMessageBuilder 的实现注册一个自定义的 MessageBuilderFactory,我们希望提供一个 GenericMessage 扩展,并重写 toString() 方法,以在记录此类消息时隐藏有效负载和标题中的敏感信息。

这些类的一些快速实现以演示个人可识别信息缓解可以像这样:

class PiiMessageBuilderFactory implements MessageBuilderFactory {

@Override
public <T> PiiMessageBuilder<T> fromMessage(Message<T> message) {
return new PiiMessageBuilder<>(message.getPayload(), message);
}

@Override
public <T> PiiMessageBuilder<T> withPayload(T payload) {
return new PiiMessageBuilder<>(payload, null);
}

}

class PiiMessageBuilder<P> extends BaseMessageBuilder<P, PiiMessageBuilder<P>> {

public PiiMessageBuilder(P payload, @Nullable Message<P> originalMessage) {
super(payload, originalMessage);
}

@Override
public Message<P> build() {
return new PiiMessage<>(getPayload(), getHeaders());
}

}

class PiiMessage<P> extends GenericMessage<P> {

@Serial
private static final long serialVersionUID = -354503673433669578L;

public PiiMessage(P payload, Map<String, Object> headers) {
super(payload, headers);
}

@Override
public String toString() {
return "PiiMessage [payload=" + getPayload() + ", headers=" + maskHeaders(getHeaders()) + ']';
}

private static Map<String, Object> maskHeaders(Map<String, Object> headers) {
return headers.entrySet()
.stream()
.map((entry) -> entry.getKey().equals("password") ? Map.entry(entry.getKey(), "******") : entry)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

}
java

这个 PiiMessageBuilderFactory 可以注册为一个 bean,每当框架记录消息时(例如在 errorChannel 情况下),password 头信息将会被屏蔽。