跳到主要内容
版本:7.0.2

消息

DeepSeek V3 中英对照 Message

Spring Integration 的 Message 是一个通用的数据容器。任何对象都可以作为有效负载提供,并且每个 Message 实例都包含以键值对形式存储的用户可扩展属性的头部信息。

Message 接口

下面的代码清单展示了 Message 接口的定义:

public interface Message<T> {

T getPayload();

MessageHeaders getHeaders();

}

Message 接口是 API 的核心组成部分。通过将数据封装在一个通用包装器中,消息传递系统可以在不了解数据类型的情况下传递数据。随着应用程序演进以支持新类型,或者当类型本身被修改或扩展时,消息传递系统不会受到影响。另一方面,当消息传递系统中的某些组件确实需要访问有关 Message 的信息时,此类元数据通常可以存储在消息头部的元数据中,并从中检索。

消息头

正如Spring Integration允许任何Object作为Message的有效负载一样,它也支持任何Object类型作为标头值。实际上,MessageHeaders类实现了java.util.Map接口,如下类定义所示:

public final class MessageHeaders implements Map<String, Object>, Serializable {
...
}
备注

尽管 MessageHeaders 类实现了 Map 接口,但它实际上是一个只读实现。任何尝试在 Map 中 put 值的操作都会导致 UnsupportedOperationExceptionremoveclear 操作同样如此。由于消息可能被传递给多个消费者,因此 Map 的结构不能被修改。同样地,消息的有效载荷 Object 在初始创建后也不能被 set。然而,框架用户需要自行决定是否允许修改头值本身(或有效载荷 Object)的可变性。

作为 Map 的实现,可以通过调用 get(..) 并传入标头名称来检索标头。或者,您也可以提供一个预期的 Class 作为额外参数。更便捷的是,当检索预定义值时,可以直接使用方便的 getter 方法。以下示例展示了这三种选项:

Object someValue = message.getHeaders().get("someKey");

CustomerId customerId = message.getHeaders().get("customerId", CustomerId.class);

Long timestamp = message.getHeaders().getTimestamp();

下表描述了预定义的消息头:

表 1. 预定义的消息头

消息头名称消息头类型用途说明
MessageHeaders.IDjava.util.UUID此消息实例的唯一标识符。每次消息发生变更时都会更新。
MessageHeaders.
TIMESTAMP
java.lang.Long消息创建的时间戳。每次消息发生变更时都会更新。
MessageHeaders.
REPLY_CHANNEL
java.lang.Object
(String 或
MessageChannel)
当未配置显式输出通道且不存在 ROUTING_SLIPROUTING_SLIP 已耗尽时,用于发送回复(如有)的通道。如果值为 String,则必须代表 Bean 名称或由 ChannelRegistry 生成。
MessageHeaders.
ERROR_CHANNEL
java.lang.Object
(String 或
MessageChannel)
用于发送错误信息的通道。如果值为 String,则必须代表 Bean 名称或由 ChannelRegistry 生成。

许多入站和出站适配器实现也提供或期望特定的头部信息,并且您可以配置额外的用户自定义头部。这些头部的常量可以在存在此类头部的模块中找到——例如 AmqpHeadersJmsHeaders 等。

MessageHeaderAccessor API

从 Spring Framework 4.0 和 Spring Integration 4.0 开始,核心消息传递抽象已移至 spring-messaging 模块,并引入了 MessageHeaderAccessor API,以提供对消息传递实现的额外抽象。所有(core)Spring Integration 特定的消息头常量现在都在 IntegrationMessageHeaderAccessor 类中声明。下表描述了预定义的消息头:

表 2. 预定义消息头

标头名称标头类型用途
IntegrationMessageHeaderAccessor.
CORRELATION_ID
java.lang.Object用于关联两个或多个消息。
IntegrationMessageHeaderAccessor.
SEQUENCE_NUMBER
java.lang.Integer通常是一组具有 SEQUENCE_SIZE 的消息的序列号,但也可在 <resequencer/> 中用于对无界消息组进行重新排序。
IntegrationMessageHeaderAccessor.
SEQUENCE_SIZE
java.lang.Integer一组关联消息中的消息数量。
IntegrationMessageHeaderAccessor.
EXPIRATION_DATE
java.lang.Long指示消息何时过期。框架不直接使用,但可通过标头丰富器设置,并在配置了 UnexpiredMessageSelector<filter/> 中使用。
IntegrationMessageHeaderAccessor.
PRIORITY
java.lang.Integer消息优先级 — 例如,在 PriorityChannel 中使用。
IntegrationMessageHeaderAccessor.
DUPLICATE_MESSAGE
java.lang.Boolean如果幂等接收器拦截器检测到消息为重复消息,则为 true。参见幂等接收器企业集成模式
IntegrationMessageHeaderAccessor.
CLOSEABLE_RESOURCE
java.io.Closeable如果消息与应在消息处理完成时关闭的 Closeable 关联,则存在此标头。例如,使用 FTP、SFTP 等进行流式文件传输时关联的 Session
IntegrationMessageHeaderAccessor.
DELIVERY_ATTEMPT
java.lang.
AtomicInteger
如果消息驱动的通道适配器支持配置 RetryTemplate,则此标头包含当前传递尝试次数。
IntegrationMessageHeaderAccessor.
ACKNOWLEDGMENT_CALLBACK
o.s.i.support.
Acknowledgment
Callback
如果入站端点支持,则此回调用于接受、拒绝或重新排队消息。参见延迟确认可轮询消息源MQTT 手动确认

IntegrationMessageHeaderAccessor 类为其中一些头部提供了便捷的类型化获取器,如下例所示:

IntegrationMessageHeaderAccessor accessor = new IntegrationMessageHeaderAccessor(message);
int sequenceNumber = accessor.getSequenceNumber();
Object correlationId = accessor.getCorrelationId();
...

下表描述了同样出现在 IntegrationMessageHeaderAccessor 中的标头,但这些标头通常不由用户代码使用(即它们通常由 Spring Integration 的内部组件使用——此处列出是为了保持完整性):

表 3. 预定义消息头

Header NameHeader TypeUsage
IntegrationMessageHeaderAccessor.
SEQUENCE_DETAILS
java.util.
List<List<Object>>
当需要嵌套关联时使用的关联数据栈(例如,splitter→…​→splitter→…​→aggregator→…​→aggregator)。
IntegrationMessageHeaderAccessor.
ROUTING_SLIP
java.util.
Map<List<Object>, Integer>
参见 路由条

消息ID生成

当消息在应用程序中传递时,每次发生变更(例如通过转换器处理),都会分配一个新的消息ID。消息ID是一个UUID。从Spring Integration 3.0开始,用于生成ID的默认策略比之前的java.util.UUID.randomUUID()实现更高效。它基于安全随机种子生成简单随机数,而不是每次都创建安全随机数。

可以通过在应用上下文中声明一个实现 org.springframework.util.IdGenerator 接口的 Bean 来选择不同的 UUID 生成策略。

important

在一个类加载器中只能使用一种 UUID 生成策略。这意味着,如果两个或更多应用上下文运行在同一个类加载器中,它们将共享相同的策略。如果其中一个上下文更改了策略,该策略将被所有上下文使用。如果同一个类加载器中的两个或更多上下文声明了类型为 org.springframework.util.IdGenerator 的 bean,它们必须是同一个类的实例。否则,尝试替换自定义策略的上下文将无法初始化。如果策略相同但带有参数,则使用第一个被初始化的上下文中的策略。

除了默认策略外,还提供了两种额外的 IdGeneratororg.springframework.util.JdkIdGenerator 使用之前的 UUID.randomUUID() 机制。当不需要真正的 UUID 且简单的递增值足够时,可以使用 o.s.i.support.IdGenerators.SimpleIncrementingIdGenerator

只读标头

MessageHeaders.IDMessageHeaders.TIMESTAMP 是只读的头部信息,无法被覆盖。

自版本 4.3.2 起,MessageBuilder 提供了 readOnlyHeaders(String…​ readOnlyHeaders) API,用于自定义不应从上游 Message 复制的头部列表。默认情况下,只有 MessageHeaders.IDMessageHeaders.TIMESTAMP 是只读的。全局属性 spring.integration.readOnly.headers(参见全局属性)用于为框架组件自定义 DefaultMessageBuilderFactory。当您不希望填充某些开箱即用的头部(例如 ObjectToJsonTransformercontentType)时,这会很有用(参见JSON 转换器)。

当你尝试使用 MessageBuilder 构建新消息时,这类头部信息会被忽略,并且会在日志中记录一条特定的 INFO 级别消息。

自5.0版本起,当使用 DefaultMessageBuilderFactory 时,消息网关头部增强器内容增强器头部过滤器 不再允许配置 MessageHeaders.IDMessageHeaders.TIMESTAMP 头部名称,否则会抛出 BeanInitializationException

标头传播

当消息由消息生成端点(例如服务激活器)处理(并修改)时,通常入站头部信息会传播到出站消息。对此的一个例外是转换器,当完整消息被返回给框架时。在这种情况下,用户代码负责整个出站消息。当转换器仅返回有效载荷时,入站头部信息会被传播。此外,仅当出站消息中尚不存在某个头部时,该头部才会被传播,这允许您根据需要更改头部值。

从 4.3.10 版本开始,你可以配置消息处理器(用于修改消息并生成输出)来抑制特定标头的传播。要配置你不想被复制的标头,请在 MessageProducingMessageHandler 抽象类上调用 setNotPropagatedHeaders()addNotPropagatedHeaders() 方法。

您还可以通过在 META-INF/spring.integration.properties 文件中设置 readOnlyHeaders 属性为逗号分隔的头部列表,来全局禁止特定消息头部的传播。

从 5.0 版本开始,AbstractMessageProducingHandler 上的 setNotPropagatedHeaders() 实现应用了简单模式(xxx**xxx*xxx*xxx*yyy),以允许过滤具有共同后缀或前缀的标头。更多信息请参阅 PatternMatchUtils Javadoc。当其中一个模式为 *(星号)时,不会传播任何标头。所有其他模式将被忽略。在这种情况下,服务激活器的行为方式与转换器相同,并且必须在从服务方法返回的 Message 中提供任何必需的标头。notPropagatedHeaders() 选项在 Java DSL 的 ConsumerEndpointSpec 中可用,也可用于 <service-activator> 组件的 XML 配置,作为 not-propagated-headers 属性。

important

头部传播抑制不适用于那些不修改消息的端点,例如桥接器路由器

消息实现

Message 接口的基础实现是 GenericMessage<T>,它提供了两个构造函数,如下所示:

new GenericMessage<T>(T payload);

new GenericMessage<T>(T payload, Map<String, Object> headers)

当创建一个 Message 时,会生成一个随机的唯一ID。接受 Map 类型头信息的构造函数会将提供的头信息复制到新创建的 Message 中。

此外,Message 还提供了一个便捷的实现,用于传递错误条件。该实现以 Throwable 对象作为其有效载荷,如下例所示:

ErrorMessage message = new ErrorMessage(someThrowable);

Throwable t = message.getPayload();

请注意,此实现利用了 GenericMessage 基类被参数化的事实。因此,如两个示例所示,在检索 Message 负载 Object 时无需进行类型转换。

上述提到的 Message 类实现是不可变的。在某些情况下,当可变性不是关注点且应用程序逻辑设计良好以避免并发修改时,可以使用 MutableMessage

MessageBuilder 辅助类

您可能会注意到,Message 接口定义了获取其有效载荷和头部信息的方法,但没有提供设置器。这是因为 Message 在初始创建后无法修改。因此,当一个 Message 实例被发送给多个消费者时(例如,通过发布-订阅通道),如果其中一个消费者需要使用不同的有效载荷类型发送回复,它必须创建一个新的 Message。这样一来,其他消费者就不会受到这些更改的影响。请记住,多个消费者可能会访问同一个有效载荷实例或头部值,而该实例本身是否不可变则由您来决定。换句话说,Message 实例的契约类似于不可修改的 Collection,而 MessageHeaders 映射进一步体现了这一点。尽管 MessageHeaders 类实现了 java.util.Map,但任何尝试在 MessageHeaders 实例上调用 put 操作(或 removeclear)都会导致 UnsupportedOperationException

相较于要求创建并填充一个 Map 来传递给 GenericMessage 构造函数,Spring Integration 确实提供了一种更为便捷的方式来构建消息:MessageBuilderMessageBuilder 提供了两个工厂方法,用于从现有 Message 或负载 Object 创建 Message 实例。当从现有 Message 构建时,该 Message 的头部和负载会被复制到新的 Message 中,如下例所示:

Message<String> message1 = MessageBuilder.withPayload("test")
.setHeader("foo", "bar")
.build();

Message<String> message2 = MessageBuilder.fromMessage(message1).build();

assertEquals("test", message2.getPayload());
assertEquals("bar", message2.getHeaders().get("foo"));

如果你需要创建一个带有新负载的 Message,但仍想复制现有 Message 的头部信息,可以使用其中一个 'copy' 方法,如下例所示:

Message<String> message3 = MessageBuilder.withPayload("test3")
.copyHeaders(message1.getHeaders())
.build();

Message<String> message4 = MessageBuilder.withPayload("test4")
.setHeader("foo", 123)
.copyHeadersIfAbsent(message1.getHeaders())
.build();

assertEquals("bar", message3.getHeaders().get("foo"));
assertEquals(123, message4.getHeaders().get("foo"));

请注意,copyHeadersIfAbsent 方法不会覆盖已存在的值。此外,在前面的示例中,您可以看到如何使用 setHeader 设置任何用户自定义的头部信息。最后,对于预定义的头部信息,也有相应的 set 方法可用,同时还有一个非破坏性的方法来设置任何头部信息(MessageHeaders 也为预定义的头部名称定义了常量)。

你也可以使用 MessageBuilder 来设置消息的优先级,如下例所示:

Message<Integer> importantMessage = MessageBuilder.withPayload(99)
.setPriority(5)
.build();

assertEquals(5, importantMessage.getHeaders().getPriority());

Message<Integer> lessImportantMessage = MessageBuilder.fromMessage(importantMessage)
.setHeaderIfAbsent(IntegrationMessageHeaderAccessor.PRIORITY, 2)
.build();

assertEquals(2, lessImportantMessage.getHeaders().getPriority());

priority 标头仅在配合使用 PriorityChannel 时生效(详见下一章)。其类型定义为 java.lang.Integer

MutableMessageBuilder 用于处理 MutableMessage 实例。该类的逻辑是创建一个 MutableMessage 或保持原样,并通过构建器方法修改其内容。这样,当消息交换不涉及不可变性时,运行中的应用程序可以获得轻微的性能提升。

备注

从版本 6.4 开始,从 MessageBuilder 中提取了一个 BaseMessageBuilder 类,以简化默认消息构建逻辑的扩展。例如,结合自定义的 MessageBuilderFactory,可以在应用程序上下文中全局使用自定义的 BaseMessageBuilder 实现,以提供自定义的 Message 实例。特别是,可以重写 GenericMessage.toString() 方法,以便在记录此类消息时隐藏有效负载和标头中的敏感信息。

MessageBuilderFactory 抽象概念

带有 IntegrationUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAMEMessageBuilderFactory bean 被全局注册到应用程序上下文中,并在框架各处用于创建 Message 实例。默认情况下,它是 DefaultMessageBuilderFactory 的一个实例。开箱即用,框架还提供了一个 MutableMessageBuilderFactory,用于在框架组件中创建 MutableMessage 实例。要自定义 Message 实例的创建,必须在目标应用程序上下文中提供一个带有 IntegrationUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAMEMessageBuilderFactory bean 来覆盖默认的。例如,可以注册一个自定义的 MessageBuilderFactory 用于 BaseMessageBuilder 的实现,其中我们希望提供一个 GenericMessage 扩展,并重写 toString() 方法,以便在记录此类消息时隐藏有效负载和标头中的敏感信息。

以下是一些快速实现这些类以演示个人可识别信息缓解的示例:

class PiiMessageBuilderFactory implements MessageBuilderFactory {

@Override
public <T> PiiMessageBuilder<T> fromMessage(Message<T> message) {
return new PiiMessageBuilder<>(message.getPayload(), message);
}

@Override
public <T> PiiMessageBuilder<T> withPayload(T payload) {
return new PiiMessageBuilder<>(payload, null);
}

}

class PiiMessageBuilder<P> extends BaseMessageBuilder<P, PiiMessageBuilder<P>> {

public PiiMessageBuilder(P payload, @Nullable Message<P> originalMessage) {
super(payload, originalMessage);
}

@Override
public Message<P> build() {
return new PiiMessage<>(getPayload(), getHeaders());
}

}

class PiiMessage<P> extends GenericMessage<P> {

@Serial
private static final long serialVersionUID = -354503673433669578L;

public PiiMessage(P payload, Map<String, Object> headers) {
super(payload, headers);
}

@Override
public String toString() {
return "PiiMessage [payload=" + getPayload() + ", headers=" + maskHeaders(getHeaders()) + ']';
}

private static Map<String, Object> maskHeaders(Map<String, Object> headers) {
return headers.entrySet()
.stream()
.map((entry) -> entry.getKey().equals("password") ? Map.entry(entry.getKey(), "******") : entry)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

}

然后,这个 PiiMessageBuilderFactory 可以被注册为一个 bean,每当框架记录消息时(例如在 errorChannel 的情况下),password 头部信息就会被屏蔽。