跳到主要内容
版本:7.0.2

JMS 支持

DeepSeek V3 中英对照 JMS Support

Spring Integration 提供了用于接收和发送 JMS 消息的通道适配器。

此依赖项为项目所需:

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jms</artifactId>
<version>7.0.2</version>
</dependency>

jakarta.jms:jakarta.jms-api 必须通过某些 JMS 供应商特定的实现(例如 Apache ActiveMQ)显式添加。

实际上,有两种基于 JMS 的入站通道适配器。第一种使用 Spring 的 JmsTemplate 基于轮询周期接收消息。第二种是“消息驱动”的,依赖于 Spring 的 MessageListener 容器。出站通道适配器则使用 JmsTemplate 按需转换并发送 JMS 消息。

通过使用 JmsTemplateMessageListener 容器,Spring Integration 依赖于 Spring 的 JMS 支持。理解这一点很重要,因为这些适配器上暴露的大多数属性实际上是在配置底层的 JmsTemplateMessageListener 容器。有关 JmsTemplateMessageListener 容器的更多详细信息,请参阅 Spring JMS 文档

虽然 JMS 通道适配器设计用于单向消息传递(仅发送或仅接收),但 Spring Integration 也为请求和应答操作提供了入站和出站 JMS 网关。入站网关依赖 Spring 的 MessageListener 容器实现之一进行消息驱动式接收,并能够根据接收消息提供的 reply-to 目的地发送返回值。出站网关将 JMS 消息发送到 request-destination(或 request-destination-namerequest-destination-expression),然后接收应答消息。您可以显式配置 reply-destination 引用(或 reply-destination-namereply-destination-expression),否则出站网关将使用 JMS TemporaryQueue

在 Spring Integration 2.2 之前,如有必要,会为每个请求或回复创建(并移除)一个 TemporaryQueue。从 Spring Integration 2.2 开始,你可以将出站网关配置为使用 MessageListener 容器来接收回复,而不是直接使用新的(或缓存的)Consumer 来接收每个请求的回复。当这样配置且未提供显式回复目的地时,每个网关将使用一个单独的 TemporaryQueue,而不是为每个请求都创建一个。

从版本 6.0 开始,如果 replyPubSubDomain 选项设置为 true,出站网关将创建一个 TemporaryTopic 而不是 TemporaryQueue。某些 JMS 供应商对这些目的地的处理方式有所不同。

入站通道适配器

入站通道适配器需要引用一个 JmsTemplate 实例,或者同时引用 ConnectionFactoryDestination(您可以用 'destinationName' 代替 'destination' 引用)。以下示例定义了一个带有 Destination 引用的入站通道适配器:

@Bean
public IntegrationFlow jmsInbound(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(
Jms.inboundAdapter(connectionFactory)
.destination("inQueue"),
e -> e.poller(poller -> poller.fixedRate(30000)))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
提示

请注意,从上述配置中可以看出,inbound-channel-adapter 是一个轮询消费者。这意味着它在被触发时会调用 receive() 方法。仅当轮询操作相对不频繁且时效性不重要时,才应使用此适配器。对于所有其他情况(绝大多数基于 JMS 的使用场景),message-driven-channel-adapter稍后介绍)是更好的选择。

备注

默认情况下,所有需要引用 ConnectionFactory 的 JMS 适配器都会自动查找名为 jmsConnectionFactory 的 bean。这就是为什么你在许多示例中看不到 connection-factory 属性的原因。然而,如果你的 JMS ConnectionFactory 具有不同的 bean 名称,则需要提供该属性。

extract-payload 设置为 true(默认值),接收到的 JMS 消息将通过 MessageConverter 进行转换。当使用默认的 SimpleMessageConverter 时,这意味着生成的 Spring Integration 消息将以 JMS 消息体作为其有效载荷。JMS TextMessage 会生成基于字符串的有效载荷,JMS BytesMessage 会生成字节数组有效载荷,而 JMS ObjectMessage 的可序列化实例将成为 Spring Integration 消息的有效载荷。如果您希望将原始 JMS 消息作为 Spring Integration 消息的有效载荷,请将 extractPayload 选项设置为 false

从 5.0.8 版本开始,org.springframework.jms.connection.CachingConnectionFactorycacheConsumersreceive-timeout 默认值为 -1(无等待),否则为 1 秒。JMS 入站通道适配器基于提供的 ConnectionFactory 和选项创建一个 DynamicJmsTemplate。如果需要外部 JmsTemplate(例如在 Spring Boot 环境中),或者 ConnectionFactory 未启用缓存,或者未启用 cacheConsumers,建议在期望非阻塞消费时设置 jmsTemplate.receiveTimeout(-1)

Jms.inboundAdapter(connectionFactory)
.destination(queueName)
.configureJmsTemplate(template -> template.receiveTimeout(-1))

事务

从版本 4.0 开始,入站通道适配器支持 session-transacted 属性。在早期版本中,您必须注入一个将 sessionTransacted 设置为 trueJmsTemplate。(适配器确实允许您将 acknowledge 属性设置为 transacted,但这是不正确的,并且无法正常工作)。

但请注意,将 session-transacted 设置为 true 几乎没有实际价值,因为事务会在 receive() 操作完成后立即提交,而消息此时尚未发送到 channel

如果你希望整个流程都是事务性的(例如,如果存在下游出站通道适配器),你必须使用带有 JmsTransactionManagertransactional 轮询器。或者,考虑使用 acknowledge 设置为 transacted(默认值)的 jms-message-driven-channel-adapter

消息驱动型通道适配器

message-driven-channel-adapter 需要一个对 Spring MessageListener 容器实例(AbstractMessageListenerContainer 的任何子类)的引用,或者同时需要 ConnectionFactoryDestination 的引用(可以用 'destinationName' 代替 'destination' 引用)。以下示例定义了一个带有 Destination 引用的消息驱动通道适配器:

@Bean
public IntegrationFlow jmsMessageDrivenRedeliveryFlow() {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(jmsConnectionFactory())
.destination("inQueue"))
.channel("exampleChannel")
.get();
}
备注

消息驱动适配器也接受一些与 MessageListener 容器相关的属性。仅当您未提供 container 引用时,才会考虑这些值。在这种情况下,将基于这些属性创建并配置一个 DefaultMessageListenerContainer 实例。例如,您可以指定 transaction-manager 引用、concurrent-consumers 值以及其他一些属性引用和值。更多详细信息,请参阅 Javadoc 和 Spring Integration 的 JMS 模式 (spring-integration-jms.xsd)。

如果您有一个自定义的监听器容器实现(通常是 DefaultMessageListenerContainer 的子类),您可以通过 container 属性提供其实例的引用,或者通过 container-class 属性提供其完全限定类名。在这种情况下,适配器上的属性将转移到您的自定义容器实例中。

备注

你不能使用 Spring JMS 命名空间元素 <jms:listener-container/> 来为 <int-jms:message-driven-channel-adapter> 配置容器引用,因为该元素实际上并不引用一个容器。每个 <jms:listener/> 子元素都会获得其自己的 DefaultMessageListenerContainer(共享父元素 <jms:listener-container/> 上定义的属性)。你可以为每个监听器子元素指定一个 id,并使用该 id 注入到通道适配器中,然而,<jms:/> 命名空间需要一个真正的监听器。

建议为 DefaultMessageListenerContainer 配置一个常规的 <bean>,并在通道适配器中将其作为引用使用。

important

从 4.2 版本开始,默认的 acknowledge 模式为 transacted,除非你提供了外部容器。在这种情况下,你需要根据需要配置容器。我们建议在 DefaultMessageListenerContainer 中使用 transacted 模式以避免消息丢失。

'extract-payload' 属性具有相同的效果,其默认值为 'true'。poller 元素不适用于消息驱动通道适配器,因为它是主动调用的。在大多数场景中,消息驱动的方式更优,因为消息一旦从底层 JMS 消费者接收,就会立即传递到 MessageChannel

最后,<message-driven-channel-adapter> 元素也接受 'error-channel' 属性。这提供了与 Enter the GatewayProxyFactoryBean 中描述相同的基本功能。以下示例展示了如何在消息驱动通道适配器上设置错误通道:

<int-jms:message-driven-channel-adapter id="jmsIn" destination="inQueue"
channel="exampleChannel"
error-channel="exampleErrorChannel"/>

将前面的示例与通用网关配置或稍后讨论的JMS 'inbound-gateway' 进行比较时,关键区别在于我们处于单向流中,因为这是一个 'channel-adapter' 而非网关。因此,从 'error-channel' 开始的下游流也应是单向的。例如,它可以发送到日志处理程序,或者连接到另一个 JMS <outbound-channel-adapter> 元素。

从主题消费时,将 pub-sub-domain 属性设置为 true。对于持久订阅,将 subscription-durable 设置为 true;对于共享订阅(需要 JMS 2.0 代理,自 4.2 版本起可用),则使用 subscription-shared。使用 subscription-name 为订阅命名。

从版本5.1开始,当端点停止而应用程序仍在运行时,底层监听器容器将被关闭,其共享连接和消费者也会随之关闭。在此之前,连接和消费者会保持打开状态。若要恢复之前的行为,请将 JmsMessageDrivenEndpoint 上的 shutdownContainerOnStop 属性设置为 false

从版本6.3开始,ChannelPublishingJmsMessageListener 现在可以配置 RetryTemplateRecoveryCallback<Message<?>>,用于在下游发送及发送-接收操作中进行重试。这些选项也通过 JmsMessageDrivenChannelAdapterSpec 在 Java DSL 中开放使用。

入站转换错误

从版本4.2开始,'error-channel' 也用于处理转换错误。在此之前,如果 JMS <message-driven-channel-adapter/><inbound-gateway/> 由于转换错误而无法传递消息,异常会被抛回容器。如果容器配置为使用事务,消息会被回滚并重复重投。转换过程发生在消息构建之前和期间,因此此类错误不会发送到 'error-channel'。现在,此类转换异常会导致一个 ErrorMessage 被发送到 'error-channel',异常作为 payload。如果您希望事务回滚,并且已定义 'error-channel',则 'error-channel' 上的集成流必须重新抛出异常(或另一个异常)。如果错误流未抛出异常,事务将被提交,消息会被移除。如果未定义 'error-channel',异常会像之前一样被抛回容器。

出站通道适配器

JmsSendingMessageHandler 实现了 MessageHandler 接口,能够将 Spring Integration 的 Message 转换为 JMS 消息并发送到 JMS 目的地。它需要引用 jmsTemplate,或者同时引用 jmsConnectionFactorydestination(也可用 destinationName 替代 destination)。与入站通道适配器类似,配置此适配器最简单的方法是使用命名空间支持。以下配置创建了一个适配器,该适配器从 exampleChannel 接收 Spring Integration 消息,将其转换为 JMS 消息,并发送到 bean 名称为 outQueue 的 JMS 目的地引用:

@Bean
public IntegrationFlow jmsOutboundFlow() {
return IntegrationFlow.from("exampleChannel")
.handle(Jms.outboundAdapter(cachingConnectionFactory())
.destinationExpression("headers." + SimpMessageHeaderAccessor.DESTINATION_HEADER)
.configureJmsTemplate(t -> t.id("jmsOutboundFlowTemplate")))
.get();
}

与入站通道适配器类似,也存在一个extract-payload属性。然而,对于出站适配器,其含义是相反的。该布尔属性并非作用于 JMS 消息,而是作用于 Spring Integration 消息的有效载荷。换句话说,该属性决定是将 Spring Integration 消息本身作为 JMS 消息体传递,还是将 Spring Integration 消息的有效载荷作为 JMS 消息体传递。默认值为true。因此,如果传递一个有效载荷为String的 Spring Integration 消息,则会创建一个 JMS TextMessage。反之,若希望通过 JMS 将实际的 Spring Integration 消息发送到另一个系统,则需将其设置为false

备注

无论有效载荷提取的布尔值如何,只要您依赖默认转换器或提供对 MessageConverter 另一个实例的引用,Spring Integration 的 MessageHeaders 就会映射到 JMS 属性。(对于“入站”适配器同样适用,区别在于在这些情况下,JMS 属性会映射到 Spring Integration 的 MessageHeaders)。

从版本 5.1 开始,<int-jms:outbound-channel-adapter>JmsSendingMessageHandler)可以通过配置 deliveryModeExpressiontimeToLiveExpression 属性,针对运行时请求的 Spring Message 评估出合适的 JMS 消息服务质量(QoS)值。DefaultJmsHeaderMapper 的新选项 setMapInboundDeliveryMode(true)setMapInboundExpiration(true) 可以方便地将消息头中的信息作为动态 deliveryModetimeToLive 的来源:

<int-jms:outbound-channel-adapter delivery-mode-expression="headers.jms_deliveryMode"
time-to-live-expression="headers.jms_expiration - T(System).currentTimeMillis()"/>

事务

从版本 4.0 开始,出站通道适配器支持 session-transacted 属性。在早期版本中,您必须注入一个将 sessionTransacted 设置为 trueJmsTemplate。现在,该属性会在内置的默认 JmsTemplate 上设置此属性。如果存在事务(可能来自上游的 message-driven-channel-adapter),则发送操作将在同一事务内执行。否则,将启动一个新事务。

入站网关

Spring Integration 的消息驱动型 JMS 入站网关委托给 MessageListener 容器,支持动态调整并发消费者,并且能够处理回复。入站网关需要引用 ConnectionFactory 和请求 Destination(或 'requestDestinationName')。以下示例定义了一个 JMS inbound-gateway,它从 bean ID 为 inQueue 引用的 JMS 队列接收消息,并发送到名为 exampleChannel 的 Spring Integration 通道:

<int-jms:inbound-gateway id="jmsInGateway"
request-destination="inQueue"
request-channel="exampleChannel"/>

由于网关提供的是请求-回复行为,而非单向发送或接收行为,它们还具备两个独立的"负载提取"属性(与先前讨论的通道适配器'extract-payload'设置类似)。对于入站网关,'extract-request-payload'属性决定了是否提取接收到的 JMS 消息体。若设为'false',则 JMS 消息本身将成为 Spring Integration 消息负载。默认值为'true'。

同样地,对于入站网关(inbound-gateway),'extract-reply-payload' 属性适用于将要转换为回复 JMS 消息的 Spring Integration 消息。如果您希望传递整个 Spring Integration 消息(作为 JMS ObjectMessage 的正文),请将此值设置为 'false'。默认情况下,Spring Integration 消息负载会被转换为 JMS 消息(例如,String 负载会变为 JMS TextMessage),该属性默认值同样为 'true'。

与其他任何操作一样,网关调用也可能产生错误。默认情况下,生产者不会收到消费者端可能发生的错误通知,并在等待回复时超时。然而,有时您可能希望将错误情况传回给消费者(换句话说,您可能希望通过将异常映射到消息来将其视为有效回复)。为了实现这一点,JMS入站网关提供了对消息通道的支持,错误可以发送到该通道进行处理,最终可能生成符合某种约定的回复消息负载,该约定定义了调用方可能期望的“错误”回复。您可以使用error-channel属性来配置此类通道,如下例所示:

<int-jms:inbound-gateway request-destination="requestQueue"
request-channel="jmsInputChannel"
error-channel="errorTransformationChannel"/>

<int:transformer input-channel="exceptionTransformationChannel"
ref="exceptionTransformer" method="createErrorResponse"/>

您可能会注意到,这个示例与进入 GatewayProxyFactoryBean 中包含的示例非常相似。同样的思路在这里也适用:exceptionTransformer 可以是一个创建错误响应对象的 POJO,您可以引用 nullChannel 来抑制错误,或者可以省略 'error-channel' 以让异常传播。

请参阅入站转换错误

从主题消费时,将 pub-sub-domain 属性设置为 true。对于持久订阅,将 subscription-durable 设置为 true;对于共享订阅,则设置为 subscription-shared(需要 JMS 2.0 代理,自 4.2 版本起可用)。使用 subscription-name 来命名订阅。

important

从 4.2 版本开始,默认的 acknowledge 模式为 transacted,除非提供了外部容器。在这种情况下,您应根据需要配置容器。我们建议您将 transactedDefaultMessageListenerContainer 结合使用,以避免消息丢失。

从版本5.1开始,当端点停止而应用程序仍在运行时,底层监听器容器将被关闭,其共享连接和消费者也会随之关闭。在此之前,连接和消费者会保持开启状态。若要恢复之前的行为,请将 JmsInboundGateway 上的 shutdownContainerOnStop 属性设置为 false

默认情况下,JmsInboundGateway 会在接收到的消息中查找 jakarta.jms.Message.getJMSReplyTo() 属性,以确定回复的发送位置。否则,可以配置一个静态的 defaultReplyDestination,或 defaultReplyQueueNamedefaultReplyTopicName。此外,从版本 6.1 开始,可以在提供的 ChannelPublishingJmsMessageListener 上配置 replyToExpression,以便在请求中的标准 JMSReplyTo 属性为 null 时动态确定回复目的地。接收到的 jakarta.jms.Message 被用作根评估上下文对象。以下示例演示了如何使用 Java DSL API 配置一个入站 JMS 网关,该网关使用从请求消息解析出的自定义回复目的地:

@Bean
public IntegrationFlow jmsInboundGatewayFlow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(
Jms.inboundGateway(connectionFactory)
.requestDestination("requestDestination")
.replyToFunction(message -> message.getStringProperty("myReplyTo")))
.<String, String>transform(String::toUpperCase)
.get();
}

从版本6.3开始,Jms.inboundGateway() API 提供了 retryTemplate()recoveryCallback() 选项,用于重试内部的发送-接收操作。

出站网关

出站网关从Spring Integration消息创建JMS消息,并将其发送到 request-destination。随后,它通过两种方式处理JMS回复消息:若配置了 reply-destination,则使用选择器从该目标接收;若未提供 reply-destination,则通过创建JMS TemporaryQueue(若 replyPubSubDomain=true 则创建 TemporaryTopic)实例来处理。

警告

reply-destination(或 reply-destination-name)与 CachingConnectionFactory 结合使用时,若该工厂的 cacheConsumers 设置为 true,可能导致内存不足的情况。这是因为每个请求都会获得一个带有新选择器的新消费者(根据 correlation-key 值进行选择,或者在没有 correlation-key 时,根据发送的 JMSMessageID 进行选择)。由于这些选择器是唯一的,它们会在当前请求完成后保留在缓存中(未被使用)。

如果指定了回复目的地,建议不要使用缓存的消费者。或者,考虑使用 下文所述<reply-listener/>

以下示例展示了如何配置出站网关:

<int-jms:outbound-gateway id="jmsOutGateway"
request-destination="outQueue"
request-channel="outboundJmsRequests"
reply-channel="jmsReplies"/>

'outbound-gateway' 的负载提取属性与 'inbound-gateway' 的属性呈反向对应关系(参见先前讨论)。这意味着 'extract-request-payload' 属性值适用于将 Spring Integration 消息转换为要作为请求发送的 JMS 消息的场景。而 'extract-reply-payload' 属性值则适用于将接收到的 JMS 回复消息转换为 Spring Integration 消息,随后发送至 'reply-channel' 的场景,如前述配置示例所示。

使用 <reply-listener/>

Spring Integration 2.2 引入了一种处理回复的替代技术。如果你在网关中添加一个 <reply-listener/> 子元素,而不是为每个回复创建一个消费者,系统将使用一个 MessageListener 容器来接收回复,并将它们交给请求线程处理。这带来了许多性能优势,同时也缓解了先前警告中描述的缓存消费者内存使用问题。

当使用带有无reply-destination的出站网关的<reply-listener/>时,不会为每个请求创建TemporaryQueue,而是使用单个TemporaryQueue。(如果与代理的连接丢失并恢复,网关会根据需要创建一个额外的TemporaryQueue)。如果replyPubSubDomain设置为true,从版本6.0开始,将创建TemporaryTopic

在使用 correlation-key 时,多个网关可以共享相同的回复目的地,因为监听器容器使用了每个网关独有的选择器。

警告

如果你指定了回复监听器并指定了回复目标(或回复目标名称),但未提供关联键,网关会记录警告并回退到 2.2 版本之前的行为。这是因为在这种情况下无法配置选择器,从而无法避免回复被发送到可能配置了相同回复目标的另一个网关。

请注意,在这种情况下,每个请求都会使用一个新的消费者,并且消费者可能会在内存中累积,如上文警告所述;因此,在这种情况下不应使用缓存的消费者。

以下示例展示了一个具有默认属性的回复监听器:

<int-jms:outbound-gateway id="jmsOutGateway"
request-destination="outQueue"
request-channel="outboundJmsRequests"
reply-channel="jmsReplies">
<int-jms:reply-listener />
</int-jms-outbound-gateway>

监听器非常轻量级,我们预计在大多数情况下,您只需要一个消费者。但是,您可以添加诸如 concurrent-consumersmax-concurrent-consumers 等属性。有关支持的完整属性列表及其含义,请参阅模式文档以及 Spring JMS 文档

空闲回复监听器

从版本 4.2 开始,你可以根据需要启动应答监听器(并在空闲一段时间后停止它),而不是在网关的整个生命周期内持续运行。如果应用程序上下文中有许多网关且它们大多处于空闲状态,这个功能会很有用。一种典型场景是:在包含许多(非活跃的)分区 Spring Batch 作业的上下文中,使用 Spring Integration 和 JMS 进行分区分发。如果所有应答监听器都处于活跃状态,JMS 代理会为每个网关维护一个活跃的消费者。通过启用空闲超时,每个消费者仅在其对应的批处理作业运行时存在(并在作业结束后短暂保持)。

请参阅 属性参考 中的 idle-reply-listener-timeout

网关回复关联

本节描述了根据网关配置,用于实现回复关联(确保发起网关仅接收其自身请求的回复)的机制。有关此处讨论属性的完整描述,请参阅属性参考

以下列表描述了各种场景(数字仅用于标识,顺序无关紧要):

  1. 没有 reply-destination* 属性且没有 <reply-listener>

    为每个请求创建一个 TemporaryQueue,并在请求完成(无论成功与否)时删除。correlation-key 无关紧要。

  2. 提供了 reply-destination* 属性,但既没有提供 <reply-listener/> 也没有提供 correlation-key

    使用与发出消息相等的 JMSCorrelationID 作为消费者的消息选择器:

    messageSelector = "JMSCorrelationID = '" + messageId + "'"

    响应系统应将在入站消息中的 JMSMessageID 返回到回复消息的 JMSCorrelationID 中。这是一种常见模式,由 Spring Integration 入站网关以及 Spring 用于消息驱动 POJO 的 MessageListenerAdapter 实现。

    备注

    当您使用此配置时,不应将主题用于回复。回复可能会丢失。

  3. 提供了 reply-destination* 属性,没有提供 <reply-listener/>,并且 correlation-key="JMSCorrelationID"

    网关生成一个唯一的关联 ID 并将其插入到 JMSCorrelationID 头中。消息选择器为:

    messageSelector = "JMSCorrelationID = '" + uniqueId + "'"

    响应系统应将在入站消息中的 JMSCorrelationID 返回到回复消息的 JMSCorrelationID 中。这是一种常见模式,由 Spring Integration 入站网关以及 Spring 用于消息驱动 POJO 的 MessageListenerAdapter 实现。

  4. 提供了 reply-destination* 属性,没有提供 <reply-listener/>,并且 correlation-key="myCorrelationHeader"

    网关生成一个唯一的关联 ID 并将其插入到 myCorrelationHeader 消息属性中。correlation-key 可以是任何用户定义的值。消息选择器为:

    messageSelector = "myCorrelationHeader = '" + uniqueId + "'"

    响应系统应将在入站消息中的 myCorrelationHeader 返回到回复消息的 myCorrelationHeader 中。

  5. 提供了 reply-destination* 属性,没有提供 <reply-listener/>,并且 correlation-key="JMSCorrelationID*"(注意关联键中的 *

    网关使用请求消息中 jms_correlationId 头(如果存在)的值,并将其插入到 JMSCorrelationID 头中。消息选择器为:

    messageSelector = "JMSCorrelationID = '" + headers['jms_correlationId'] + "'"

    用户必须确保此值是唯一的。

    如果该头不存在,则网关的行为如 3 所述。

    响应系统应将在入站消息中的 JMSCorrelationID 返回到回复消息的 JMSCorrelationID 中。这是一种常见模式,由 Spring Integration 入站网关以及 Spring 用于消息驱动 POJO 的 MessageListenerAdapter 实现。

  6. 没有提供 reply-destination* 属性,但提供了 <reply-listener>

    创建一个临时队列,并用于此网关实例的所有回复。消息中不需要关联数据,但网关内部使用发出的 JMSMessageID 将回复定向到正确的请求线程。

  7. 提供了 reply-destination* 属性,提供了 <reply-listener>,但没有提供 correlation-key

    不允许。

    <reply-listener/> 配置被忽略,网关的行为如 2 所述。会记录一条警告日志消息来指示此情况。

  8. 提供了 reply-destination* 属性,提供了 <reply-listener>,并且 correlation-key="JMSCorrelationID"

    网关具有唯一的关联 ID,并将其与一个递增值一起插入到 JMSCorrelationID 头中(gatewayId + "_" + ++seq)。消息选择器为:

    messageSelector = "JMSCorrelationID LIKE '" + gatewayId%'"

    响应系统应将在入站消息中的 JMSCorrelationID 返回到回复消息的 JMSCorrelationID 中。这是一种常见模式,由 Spring Integration 入站网关以及 Spring 用于消息驱动 POJO 的 MessageListenerAdapter 实现。由于每个网关都有唯一的 ID,每个实例只获取自己的回复。完整的关联数据用于将回复路由到正确的请求线程。

  9. 提供了 reply-destination* 属性,提供了 <reply-listener/>,并且 correlation-key="myCorrelationHeader"

    网关具有唯一的关联 ID,并将其与一个递增值一起插入到 myCorrelationHeader 属性中(gatewayId + "_" + ++seq)。correlation-key 可以是任何用户定义的值。消息选择器为:

    messageSelector = "myCorrelationHeader LIKE '" + gatewayId%'"

    响应系统应将在入站消息中的 myCorrelationHeader 返回到回复消息的 myCorrelationHeader 中。由于每个网关都有唯一的 ID,每个实例只获取自己的回复。完整的关联数据用于将回复路由到正确的请求线程。

  10. 提供了 reply-destination* 属性,提供了 <reply-listener/>,并且 correlation-key="JMSCorrelationID*"

(注意关联键中的 *

不允许。

用户提供的关联ID不允许与回复监听器一同使用。网关不会使用此配置进行初始化。

异步网关

从 4.3 版本开始,您现在可以在配置出站网关时指定 async="true"(或在 Java 中使用 setAsync(true))。

默认情况下,当请求发送到网关时,请求线程会被挂起,直到收到回复。然后流程在该线程上继续执行。如果 async 设置为 true,则在 send() 完成后立即释放请求线程,而回复将在监听器容器线程上返回(且流程继续执行)。当网关在轮询器线程上被调用时,这非常有用。线程被释放后,可用于框架内的其他任务。

async 需要一个 <reply-listener/>(或在使用 Java 配置时设置 setUseReplyContainer(true))。同时,它还需要指定一个 correlationKey(通常为 JMSCorrelationID)。如果这两个条件中的任何一个未满足,async 将被忽略。

属性参考

以下列表展示了 outbound-gateway 的所有可用属性:

<int-jms:outbound-gateway
connection-factory="connectionFactory" // <1>
correlation-key="" // <2>
delivery-persistent="" // <3>
destination-resolver="" // <4>
explicit-qos-enabled="" // <5>
extract-reply-payload="true" // <6>
extract-request-payload="true" // <7>
header-mapper="" // <8>
message-converter="" // <9>
priority="" // <10>
receive-timeout="" // <11>
reply-channel="" // <12>
reply-destination="" // <13>
reply-destination-expression="" // <14>
reply-destination-name="" // <15>
reply-pub-sub-domain="" // <16>
reply-timeout="" // <17>
request-channel="" // <18>
request-destination="" // <19>
request-destination-expression="" // <20>
request-destination-name="" // <21>
request-pub-sub-domain="" // <22>
time-to-live="" // <23>
requires-reply="" // <24>
idle-reply-listener-timeout="" // <25>
async=""> // <26>
<int-jms:reply-listener /> // <27>
</int-jms:outbound-gateway>
  • 引用一个 jakarta.jms.ConnectionFactory。默认的 jmsConnectionFactory

  • 包含用于关联响应与回复的相关数据的属性名称。如果省略,网关期望响应系统在 JMSCorrelationID 头部中返回出站 JMSMessageID 头部的值。如果指定,网关会生成一个关联 ID 并用其填充指定的属性。响应系统必须在同一属性中回传该值。可以将其设置为 JMSCorrelationID,此时将使用标准头部而非 String 属性来保存关联数据。当使用 <reply-container/> 时,如果提供了显式的 reply-destination,则必须指定 correlation-key。从版本 4.0.1 开始,此属性也支持值 JMSCorrelationID*,这意味着如果出站消息已具有 JMSCorrelationID(从 jms_correlationId 映射而来)头部,则使用该头部而不是生成新的。注意,当使用 <reply-container/> 时,不允许使用 JMSCorrelationID* 键,因为容器需要在初始化期间设置消息选择器。

important

您应当理解,网关无法确保唯一性,如果提供的关联ID不唯一,可能会产生意外的副作用。

  • [#] 您应当理解,网关无法确保唯一性,如果提供的关联 ID 不唯一,可能会发生意外的副作用。
  • 一个布尔值,指示投递模式应为 DeliveryMode.PERSISTENT (true) 还是 DeliveryMode.NON_PERSISTENT (false)。此设置仅在 explicit-qos-enabledtrue 时生效。

  • 一个 DestinationResolver。默认为 SimpleDestinationResolver,它将目标名称映射到同名的队列或主题,并缓存目标。

  • 当设置为 true 时,启用服务质量属性的使用:prioritydelivery-modetime-to-live

  • 当设置为 true(默认值)时,Spring Integration 回复消息的有效负载是从 JMS 回复消息的正文创建的(通过使用 MessageConverter)。当设置为 false 时,整个 JMS 消息成为 Spring Integration 消息的有效负载。

  • 当设置为 true(默认值)时,Spring Integration 消息的有效负载被转换为 JMSMessage(通过使用 MessageConverter)。当设置为 false 时,整个 Spring Integration 消息被转换为 JMSMessage。在这两种情况下,Spring Integration 消息头都通过 HeaderMapper 映射到 JMS 头和属性。

  • 一个 HeaderMapper,用于在 Spring Integration 消息头与 JMS 消息头和属性之间进行映射。

  • MessageConverter 的引用,用于在 JMS 消息和 Spring Integration 消息有效负载(或消息,如果 extract-request-payloadfalse)之间进行转换。默认为 SimpleMessageConverter

  • 请求消息的默认优先级。如果存在消息优先级头,则会被其覆盖。其范围为 09。此设置仅在 explicit-qos-enabledtrue 时生效。

  • 等待回复的时间(以毫秒为单位)。默认为 5000(五秒)。

  • 回复消息发送到的通道。

  • Destination 的引用,它被设置为 JMSReplyTo 头。最多只允许使用 reply-destinationreply-destination-expressionreply-destination-name 中的一个。如果未提供任何一项,则使用 TemporaryQueue 来接收此网关的回复。

  • 一个评估为 Destination 的 SpEL 表达式,它将被设置为 JMSReplyTo 头。表达式的结果可以是 Destination 对象或 String。它由 DestinationResolver 用于解析实际的 Destination。最多只允许使用 reply-destinationreply-destination-expressionreply-destination-name 中的一个。如果未提供任何一项,则使用 TemporaryQueue 来接收此网关的回复。

  • 被设置为 JMSReplyTo 头的目标名称。它由 DestinationResolver 用于解析实际的 Destination。最多只允许使用 reply-destinationreply-destination-expressionreply-destination-name 中的一个。如果未提供任何一项,则使用 TemporaryQueue 来接收此网关的回复。

  • 当设置为 true 时,表示由 DestinationResolver 解析的任何回复 Destination 都应该是 Topic 而不是 Queue

  • 网关将回复消息发送到 reply-channel 时的等待时间。这仅在 reply-channel 可能阻塞时才有效——例如,具有容量限制且当前已满的 QueueChannel。默认为无限期。

  • 此网关接收请求消息的通道。

  • 对发送请求消息的 Destination 的引用。reply-destinationreply-destination-expressionreply-destination-name 中必须指定一项。您只能使用这三个属性中的一个。

  • 一个评估为发送请求消息的 Destination 的 SpEL 表达式。表达式的结果可以是 Destination 对象或 String。它由 DestinationResolver 用于解析实际的 Destinationreply-destinationreply-destination-expressionreply-destination-name 中必须指定一项。您只能使用这三个属性中的一个。

  • 发送请求消息的目标名称。它由 DestinationResolver 用于解析实际的 Destinationreply-destinationreply-destination-expressionreply-destination-name 中必须指定一项。您只能使用这三个属性中的一个。

  • 当设置为 true 时,表示由 DestinationResolver 解析的任何请求 Destination 都应该是 Topic 而不是 Queue

  • 指定消息的生存时间。此设置仅在 explicit-qos-enabledtrue 时生效。

  • 指定此出站网关是否必须返回非空值。默认情况下,此值为 true,当底层服务在 receive-timeout 后未返回值时,会抛出 MessageTimeoutException。请注意,如果预期服务永远不会返回回复,最好使用 <int-jms:outbound-channel-adapter/> 而不是将 requires-reply="false"<int-jms:outbound-gateway/>。对于后者,发送线程会被阻塞,等待 receive-timeout 期间的回复。

  • 当您使用 <reply-listener /> 时,默认情况下其生命周期(启动和停止)与网关的生命周期匹配。当此值大于 0 时,容器按需启动(当发送请求时)。容器继续运行,直到至少经过此时间且未收到任何请求(并且直到没有未完成的回复)。容器在下一个请求时再次启动。停止时间是最小值,实际上可能高达此值的 1.5 倍。

  • 请参阅 异步网关

  • 当包含此元素时,回复由异步的 MessageListenerContainer 接收,而不是为每个回复创建消费者。在许多情况下,这可能更高效。

映射消息头与JMS消息之间的转换

JMS消息可以包含元信息,例如JMS API头部和简单属性。您可以通过使用JmsHeaderMapper将这些信息映射到Spring Integration消息头部,或从Spring Integration消息头部映射回来。JMS API头部会传递给相应的setter方法(例如setJMSReplyTo),而其他头部则被复制到JMS消息的通用属性中。JMS出站网关默认使用JmsHeaderMapper的默认实现进行引导,该实现会映射标准的JMS API头部以及原始类型或String类型的消息头部。您还可以通过入站和出站网关的header-mapper属性提供自定义的头部映射器。

important

许多 JMS 供应商特定的客户端不允许直接在已创建的 JMS 消息上设置 deliveryModeprioritytimeToLive 属性。这些属性被视为 QoS 属性,因此必须传递到目标 MessageProducer.send(message, deliveryMode, priority, timeToLive) API。因此,DefaultJmsHeaderMapper 不会将相应的 Spring Integration 头部(或表达式结果)映射到上述 JMS 消息属性中。相反,JmsSendingMessageHandler 使用 DynamicJmsTemplate 将请求消息中的头部值传递到 MessageProducer.send() API。要启用此功能,您必须为出站端点配置一个 DynamicJmsTemplate,并将其 explicitQosEnabled 属性设置为 true。Spring Integration Java DSL 默认配置了 DynamicJmsTemplate,但您仍需设置 explicitQosEnabled 属性。

important

自版本 4.0 起,对于入站消息,JMSPriority 头部已映射到标准的 priority 头部。在此之前,priority 头部仅用于出站消息。若要恢复之前的行为(即不映射入站优先级),请将 DefaultJmsHeaderMappermapInboundPriority 属性设置为 false

important

自版本 4.3 起,DefaultJmsHeaderMapper 通过调用其 toString() 方法将标准的 correlationId 头部映射为消息属性(correlationId 通常是一个 UUID,而 JMS 不支持 UUID)。在入站侧,它被映射为一个 String。这与 jms_correlationId 头部无关,后者映射到 JMSCorrelationID 头部或从该头部映射而来。JMSCorrelationID 通常用于关联请求和回复,而 correlationId 则常用于将相关消息组合成一个组(例如与聚合器或重排序器一起使用)。

从版本 5.1 开始,DefaultJmsHeaderMapper 可以配置为映射入站的 JMSDeliveryModeJMSExpiration 属性:

@Bean
public DefaultJmsHeaderMapper jmsHeaderMapper() {
DefaultJmsHeaderMapper mapper = new DefaultJmsHeaderMapper();
mapper.setMapInboundDeliveryMode(true)
mapper.setMapInboundExpiration(true)
return mapper;
}

这些JMS属性分别映射到Spring消息头中的JmsHeaders.DELIVERY_MODEJmsHeaders.EXPIRATION

消息转换、编组与解组

如果你需要转换消息,所有的JMS适配器和网关都允许你通过设置message-converter属性来提供一个MessageConverter。为此,请提供一个在同一ApplicationContext中可用的MessageConverter实例的bean名称。此外,为了与编组器和解组器接口保持一致,Spring提供了MarshallingMessageConverter,你可以使用自定义的编组器和解组器来配置它。以下示例展示了如何操作

<int-jms:inbound-gateway request-destination="requestQueue"
request-channel="inbound-gateway-channel"
message-converter="marshallingMessageConverter"/>

<bean id="marshallingMessageConverter"
class="org.springframework.jms.support.converter.MarshallingMessageConverter">
<constructor-arg>
<bean class="org.bar.SampleMarshaller"/>
</constructor-arg>
<constructor-arg>
<bean class="org.bar.SampleUnmarshaller"/>
</constructor-arg>
</bean>
备注

当你提供自定义的 MessageConverter 实例时,它仍会被包装在 HeaderMappingMessageConverter 中。这意味着 'extract-request-payload' 和 'extract-reply-payload' 属性可能会影响传递给你的转换器的实际对象。HeaderMappingMessageConverter 本身会委托给目标 MessageConverter,同时将 Spring Integration 的 MessageHeaders 映射到 JMS 消息属性,并再次映射回来。

基于 JMS 的消息通道

之前介绍的通道适配器和网关都旨在用于与其他外部系统集成的应用程序。入站选项假设其他系统正在向JMS目的地发送JMS消息,而出站选项假设其他系统正在从该目的地接收消息。其他系统可能是也可能不是Spring Integration应用程序。当然,当将Spring Integration消息实例作为JMS消息体本身发送时(extract-payload 值设置为 false),则假定其他系统基于Spring Integration。然而,这绝不是必需条件。这种灵活性正是使用基于消息的集成选项并借助“通道”(在JMS中称为目的地)抽象的优势之一。

有时,对于给定的 JMS 目标,生产者和消费者都旨在成为同一应用程序的一部分,并在同一进程中运行。您可以通过使用一对入站和出站通道适配器来实现这一点。这种方法的问题在于,即使从概念上讲,目标是拥有一个单一的消息通道,您仍然需要两个适配器。从 Spring Integration 2.0 版本开始,支持一个更好的选项。现在,在使用 JMS 命名空间时,可以定义一个单一的“通道”,如下例所示:

<int-jms:channel id="jmsChannel" queue="exampleQueue"/>

前面示例中的通道行为非常类似于Spring Integration主命名空间中的普通<channel/>元素。它可以被任何端点的input-channeloutput-channel属性引用。不同之处在于,该通道由名为exampleQueue的JMS队列实例支持。这意味着生产端点和消费端点之间可以进行异步消息传递。然而,与通过在非JMS的<channel/>元素内添加<queue/>元素创建的更简单的异步消息通道不同,消息并非存储在内存队列中。相反,这些消息在JMS消息体内传递,并且底层JMS提供程序的全部功能都可用于该通道。使用此替代方案的最常见理由可能是利用JMS消息传递的存储转发方法所提供的持久性优势。

如果配置得当,基于JMS的消息通道也支持事务。换句话说,如果生产者的发送操作是回滚事务的一部分,那么它实际上不会写入到基于JMS的事务性通道。同样,如果消息的接收是回滚事务的一部分,消费者也不会从通道中物理移除JMS消息。请注意,在这种情况下,生产者事务和消费者事务是分开的。这与通过没有 <queue/> 子元素的简单同步 <channel/> 元素传播事务上下文的情况有显著不同。

由于上述示例引用了 JMS Queue 实例,因此它充当了点对点通道。另一方面,如果您需要发布-订阅行为,可以使用单独的元素并引用 JMS Topic。以下示例展示了如何实现:

<int-jms:publish-subscribe-channel id="jmsChannel" topic="exampleTopic"/>

对于任一类型的 JMS 支持通道,都可以提供目标名称而非引用,如下例所示:

<int-jms:channel id="jmsQueueChannel" queue-name="exampleQueueName"/>

<jms:publish-subscribe-channel id="jmsTopicChannel" topic-name="exampleTopicName"/>

在前面的示例中,目的地名称由Spring默认的SimpleDestinationResolver实现解析,但您也可以提供DestinationResolver接口的任何实现。此外,JMS ConnectionFactory是通道的必需属性,但默认情况下,预期的bean名称应为jmsConnectionFactory。以下示例提供了一个用于解析JMS目的地名称的自定义实例,以及一个不同的ConnectionFactory名称:

<int-jms:channel id="jmsChannel" queue-name="exampleQueueName"
destination-resolver="customDestinationResolver"
connection-factory="customConnectionFactory"/>

对于 <publish-subscribe-channel />,将 durable 属性设置为 true 表示持久订阅,设置为 subscription-shared 表示共享订阅(需要 JMS 2.0 代理,自 4.2 版本起可用)。使用 subscription 来命名订阅。

使用 JMS 消息选择器

通过JMS消息选择器,你可以基于JMS消息头以及JMS属性来过滤消息。例如,如果你想监听自定义JMS头属性 myHeaderProperty 等于 something 的消息,可以指定以下表达式:

myHeaderProperty = 'something'

消息选择器表达式是 SQL-92 条件表达式语法的子集,并作为 Java 消息服务 规范的一部分定义。您可以通过为以下 Spring Integration JMS 组件使用 XML 命名空间配置来指定 JMS 消息的 selector 属性:

  • JMS 通道

  • JMS 发布订阅通道

  • JMS 入站通道适配器

  • JMS 入站网关

  • JMS 消息驱动通道适配器

important

您无法通过 JMS 消息选择器引用消息体中的值。

JMS 示例

要试验这些 JMS 适配器,请查看 Spring Integration Samples Git 仓库中提供的 JMS 示例,地址为 https://github.com/spring-projects/spring-integration-samples/tree/master/basic/jms

该仓库包含两个示例。其中一个提供了入站和出站通道适配器,另一个则提供了入站和出站网关。这些示例配置为在嵌入式 ActiveMQ 进程中运行,但您可以修改每个示例的 common.xml Spring 应用上下文文件,以支持其他 JMS 提供程序或独立的 ActiveMQ 进程。

换句话说,你可以将配置拆分,使入站和出站适配器运行在独立的JVM中。如果你已安装ActiveMQ,请修改common.xml文件中的brokerURL属性,将其设置为tcp://localhost:61616(而非vm://localhost)。两个示例均接受来自标准输入的内容,并将回显内容输出至标准输出。查看配置即可了解这些消息如何通过JMS进行路由。