跳到主要内容

JMS 支持

QWen Plus 中英对照 JMS Support

Spring Integration 提供了通道适配器来接收和发送 JMS 消息。

你需要将这个依赖项添加到你的项目中:

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jms</artifactId>
<version>6.4.2</version>
</dependency>
xml

必须通过某些 JMS 厂商特定的实现显式添加 jakarta.jms:jakarta.jms-api,例如 Apache ActiveMQ。

实际上有两种基于 JMS 的入站 Channel Adapter。第一种使用 Spring 的 JmsTemplate 根据轮询周期接收消息。第二种是“消息驱动”的,依赖于 Spring 的 MessageListener 容器。出站通道适配器使用 JmsTemplate 按需转换并发送 JMS 消息。

通过使用 JmsTemplateMessageListener 容器,Spring Integration 依赖于 Spring 的 JMS 支持。理解这一点很重要,因为这些适配器上公开的大多数属性都配置了底层的 JmsTemplateMessageListener 容器。有关 JmsTemplateMessageListener 容器的更多详细信息,请参阅 Spring JMS 文档

虽然 JMS 通道适配器旨在用于单向消息传递(仅发送或仅接收),但 Spring Integration 还提供了入站和出站 JMS 网关以进行请求和回复操作。入站网关依赖于 Spring 的一个 MessageListener 容器实现来进行基于消息驱动的接收。它还能够将返回值发送到由接收到的消息提供的 reply-to 目标。出站网关将 JMS 消息发送到 request-destination(或 request-destination-namerequest-destination-expression),然后接收回复消息。您可以显式配置 reply-destination 引用(或 reply-destination-namereply-destination-expression)。否则,出站网关使用 JMS TemporaryQueue

在 Spring Integration 2.2 之前,如果有必要,会为每个请求或回复创建(和移除)一个 TemporaryQueue。从 Spring Integration 2.2 开始,您可以配置出站网关以使用 MessageListener 容器来接收回复,而不是直接使用新的(或缓存的)Consumer 来接收每个请求的回复。当如此配置时,并且没有提供明确的回复目标,则为每个网关使用单个 TemporaryQueue,而不是为每个请求使用一个。

从 6.0 版本开始,如果 replyPubSubDomain 选项设置为 true,出站网关将创建一个 TemporaryTopic 而不是 TemporaryQueue。一些 JMS 厂商对这些目的地的处理方式不同。

入站通道适配器

入站通道适配器需要引用单个 JmsTemplate 实例或同时引用 ConnectionFactoryDestination(你可以提供一个 'destinationName' 来替代 'destination' 引用)。以下示例定义了一个带有 Destination 引用的入站通道适配器:

@Bean
public IntegrationFlow jmsInbound(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(
Jms.inboundAdapter(connectionFactory)
.destination("inQueue"),
e -> e.poller(poller -> poller.fixedRate(30000)))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
java
提示

从上述配置中可以看出,inbound-channel-adapter 是一个轮询消费者。这意味着它在被触发时调用 receive()。你应该只在轮询相对不频繁且及时性不重要的情况下使用它。对于所有其他情况(绝大多数基于 JMS 的用例),message-driven-channel-adapter (稍后描述) 是一个更好的选择。

备注

默认情况下,所有需要引用 ConnectionFactory 的 JMS 适配器都会自动查找名为 jmsConnectionFactory 的 bean。这就是为什么在许多示例中您看不到 connection-factory 属性。但是,如果您的 JMS ConnectionFactory 具有不同的 bean 名称,则需要提供该属性。

如果 extract-payload 被设置为 true(默认值),接收到的 JMS 消息将通过 MessageConverter 进行传递。当依赖于默认的 SimpleMessageConverter 时,这意味着生成的 Spring Integration 消息会将 JMS 消息的正文作为其有效负载。JMS TextMessage 会产生基于字符串的有效负载,JMS BytesMessage 会产生字节数组有效负载,而 JMS ObjectMessage 的可序列化实例将成为 Spring Integration 消息的有效负载。如果您希望将原始 JMS 消息作为 Spring Integration 消息的有效负载,请将 extractPayload 选项设置为 false

从 5.0.8 版本开始,org.springframework.jms.connection.CachingConnectionFactorycacheConsumersreceive-timeout 默认值为 -1(不等待),否则为 1 秒。JMS 入站通道适配器会根据提供的 ConnectionFactory 和选项创建一个 DynamicJmsTemplate。如果需要外部的 JmsTemplate(例如在 Spring Boot 环境中),或者 ConnectionFactory 没有缓存,或者没有 cacheConsumers,如果期望非阻塞消费,则建议设置 jmsTemplate.receiveTimeout(-1)

Jms.inboundAdapter(connectionFactory)
.destination(queueName)
.configureJmsTemplate(template -> template.receiveTimeout(-1))
java

事务

从 4.0 版本开始,入站通道适配器支持 session-transacted 属性。在早期版本中,您必须注入一个 sessionTransacted 设置为 trueJmsTemplate。(虽然适配器允许您将 acknowledge 属性设置为 transacted,但这是不正确的,并且无法工作)。

但是请注意,将 session-transacted 设置为 true 的价值不大,因为事务在 receive() 操作之后立即提交,并且在消息发送到 channel 之前。

如果你想让整个流程都是事务性的(例如,如果有下游的 outbound 渠道适配器),你必须使用带有 JmsTransactionManagertransactional 轮询器。或者,可以考虑使用 jms-message-driven-channel-adapter 并将 acknowledge 设置为 transacted(默认值)。

消息驱动的通道适配器

message-driven-channel-adapter 需要引用 Spring MessageListener 容器(AbstractMessageListenerContainer 的任意子类)的一个实例,或者同时引用 ConnectionFactoryDestination(可以提供一个 'destinationName' 来替代 'destination' 引用)。以下示例定义了一个带有 Destination 引用的消息驱动通道适配器:

@Bean
public IntegrationFlow jmsMessageDrivenRedeliveryFlow() {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(jmsConnectionFactory())
.destination("inQueue"))
.channel("exampleChannel")
.get();
}
java
备注

消息驱动适配器还接受与 MessageListener 容器相关的几个属性。这些值仅在你没有提供 container 引用时才会被考虑。在这种情况下,会创建并根据这些属性配置一个 DefaultMessageListenerContainer 的实例。例如,你可以指定 transaction-manager 引用,concurrent-consumers 值以及多个其他属性引用和值。详情请参阅 Javadoc 和 Spring Integration 的 JMS 模式 (spring-integration-jms.xsd)。

如果你有一个自定义的监听器容器实现(通常是 DefaultMessageListenerContainer 的子类),你可以通过使用 container 属性提供其实例的引用,或者通过使用 container-class 属性提供其完全限定的类名。在这种情况下,适配器上的属性将被传递到你的自定义容器实例中。

备注

你不能使用 Spring JMS 命名空间元素 <jms:listener-container/> 来为 <int-jms:message-driven-channel-adapter> 配置容器引用,因为该元素实际上并不引用容器。每个 <jms:listener/> 子元素都会获得自己的 DefaultMessageListenerContainer(并共享在父级 <jms:listener-container/> 元素上定义的属性)。你可以给每个监听器子元素一个 id,并用它注入到通道适配器中,但是 <jms:/> 命名空间需要一个真实的监听器。

建议为 DefaultMessageListenerContainer 配置一个普通的 <bean> 并将其作为引用在通道适配器中使用。

important

从 4.2 版本开始,默认的 acknowledge 模式是 transacted,除非你提供了外部容器。在这种情况下,你应该根据需要配置容器。我们建议使用 transacted 模式与 DefaultMessageListenerContainer 一起,以避免消息丢失。

'extract-payload' 属性具有相同的效果,其默认值为 'true'。poller 元素对于消息驱动的通道适配器不适用,因为它会被主动调用。对于大多数场景,消息驱动的方法更好,因为消息一旦从底层 JMS 消费者接收到,就会立即传递给 MessageChannel

最后,<message-driven-channel-adapter> 元素也接受 'error-channel' 属性。这提供了与 进入 GatewayProxyFactoryBean 中描述的相同的基本功能。以下示例展示了如何在消息驱动的通道适配器上设置错误通道:

<int-jms:message-driven-channel-adapter id="jmsIn" destination="inQueue"
channel="exampleChannel"
error-channel="exampleErrorChannel"/>
xml

当我们将前面的例子与后面讨论的通用网关配置或 JMS 'inbound-gateway' 进行比较时,关键区别在于我们处于单向流程中,因为这是一个 'channel-adapter',而不是网关。因此,从 'error-channel' 流下游的流程也应该是单向的。例如,它可以发送到日志记录处理器,或者它可以连接到不同的 JMS <outbound-channel-adapter> 元素。

当从主题消费时,将 pub-sub-domain 属性设置为 true。对于持久订阅,将 subscription-durable 设置为 true;对于共享订阅(需要 JMS 2.0 代理,并且自 4.2 版本起可用),将 subscription-shared 设置为 true。使用 subscription-name 来命名订阅。

从 5.1 版开始,当端点停止而应用程序继续运行时,底层的监听器容器将被关闭,从而关闭其共享连接和消费者。以前,连接和消费者会保持打开状态。要恢复到之前的行为,可以将 JmsMessageDrivenEndpoint 上的 shutdownContainerOnStop 设置为 false

从 6.3 版本开始,ChannelPublishingJmsMessageListener 现在可以提供一个 RetryTemplateRecoveryCallback<Message<?>>,用于对下游发送和发送-接收操作进行重试。这些选项也暴露在 JmsMessageDrivenChannelAdapterSpec 中,供 Java DSL 使用。

入站转换错误

从 4.2 版本开始,error-channel 也用于转换错误。以前,如果 JMS <message-driven-channel-adapter/><inbound-gateway/> 由于转换错误而无法传递消息,则会将异常抛回到容器。如果容器配置为使用事务,消息会被回滚并重复重新传递。转换过程发生在消息构建之前和期间,因此这些错误不会发送到 error-channel。现在,此类转换异常会导致一个 ErrorMessage 被发送到 error-channel,异常作为 payload。如果您希望事务回滚,并且您定义了一个 error-channel,那么在 error-channel 上的集成流必须重新抛出异常(或另一个异常)。如果错误流没有抛出异常,事务将被提交并且消息将被移除。如果没有定义 error-channel,异常将像以前一样被抛回到容器。

外发通道适配器

JmsSendingMessageHandler 实现了 MessageHandler 接口,能够将 Spring Integration 的 Messages 转换为 JMS 消息并发送到 JMS 目的地。它需要一个 jmsTemplate 引用或同时需要 jmsConnectionFactorydestination 引用(可以用 destinationName 替代 destination)。与入站通道适配器一样,使用命名空间支持是配置此适配器的最简单方法。以下配置产生一个适配器,该适配器从 exampleChannel 接收 Spring Integration 消息,将这些消息转换为 JMS 消息,并将它们发送到名为 outQueue 的 bean 引用的 JMS 目的地:

@Bean
public IntegrationFlow jmsOutboundFlow() {
return IntegrationFlow.from("exampleChannel")
.handle(Jms.outboundAdapter(cachingConnectionFactory())
.destinationExpression("headers." + SimpMessageHeaderAccessor.DESTINATION_HEADER)
.configureJmsTemplate(t -> t.id("jmsOutboundFlowTemplate")));
}
java

与入站通道适配器一样,这里也有一个 'extract-payload' 属性。但是,对于出站适配器来说,其含义是相反的。不是应用于 JMS 消息,而是应用于 Spring Integration 消息的有效负载。换句话说,这个布尔属性决定了是将 Spring Integration 消息本身作为 JMS 消息体传递,还是将 Spring Integration 消息的有效负载作为 JMS 消息体传递。默认值是 true。因此,如果你传递了一个有效负载为 String 的 Spring Integration 消息,就会创建一个 JMS TextMessage。然而,如果你想通过 JMS 将实际的 Spring Integration 消息发送到另一个系统,则应将其设置为 false

备注

无论有效负载提取的布尔值如何,Spring Integration 的 MessageHeaders 都会映射到 JMS 属性,只要你依赖默认转换器或提供另一个 MessageConverter 实例的引用。(对于 '入站' 适配器也是如此,只是在这些情况下,JMS 属性会映射到 Spring Integration 的 MessageHeaders)。

从 5.1 版本开始,<int-jms:outbound-channel-adapter> (JmsSendingMessageHandler) 可以通过 deliveryModeExpressiontimeToLiveExpression 属性进行配置,以在运行时根据请求的 Spring Message 评估适当的 JMS 消息 QoS 值。DefaultJmsHeaderMapper 的新选项 setMapInboundDeliveryMode(true)setMapInboundExpiration(true) 可能会作为动态 deliveryModetimeToLive 的信息来源:

<int-jms:outbound-channel-adapter delivery-mode-expression="headers.jms_deliveryMode"
time-to-live-expression="headers.jms_expiration - T(System).currentTimeMillis()"/>
xml

事务

从 4.0 版本开始, outbound channel adapter 支持 session-transacted 属性。在早期版本中,您必须注入一个将 sessionTransacted 设置为 trueJmsTemplate 。现在该属性会在内置的默认 JmsTemplate 上设置此属性。如果存在事务(可能来自上游的 message-driven-channel-adapter),则发送操作将在同一事务内执行。否则,将启动一个新事务。

入站网关

Spring Integration 的消息驱动 JMS 入站网关委托给一个 MessageListener 容器,支持动态调整并发消费者,并且还可以处理回复。入站网关需要引用一个 ConnectionFactory 和一个请求 Destination(或 'requestDestinationName')。以下示例定义了一个 JMS inbound-gateway,它从由 bean id 为 inQueue 引用的 JMS 队列接收,并发送到名为 exampleChannel 的 Spring Integration 通道:

<int-jms:inbound-gateway id="jmsInGateway"
request-destination="inQueue"
request-channel="exampleChannel"/>
xml

由于网关提供请求-回复行为而不是单向发送或接收行为,因此它们也具有两个不同的“有效负载提取”属性(如之前讨论的针对通道适配器的 extract-payload 设置)。对于入站网关,extract-request-payload 属性决定是否提取接收到的 JMS 消息体。如果设置为 false,JMS 消息本身将成为 Spring Integration 消息的有效负载。默认值是 true

同样,对于入站网关,“extract-reply-payload”属性适用于要转换为回复 JMS 消息的 Spring Integration 消息。如果你想传递整个 Spring Integration 消息(作为 JMS ObjectMessage 的主体),将此值设置为 'false'。默认情况下,它也是 'true',即 Spring Integration 消息的有效载荷会被转换为 JMS 消息(例如,String 类型的有效载荷会变成 JMS 文本消息)。

与任何其他事物一样,网关调用可能会导致错误。默认情况下,生产者不会收到消费者端可能出现的错误通知,并且会在等待回复时超时。然而,有时您可能希望将错误情况传达回消费者(换句话说,您可能希望将异常视为有效回复,通过将其映射到消息来处理)。为了实现这一点,JMS 入站网关提供了对消息通道的支持,可以将错误发送到该通道进行处理,从而生成符合某些契约定义的回复消息负载,这些契约定义了调用者可以期待作为“错误”回复的内容。您可以使用 error-channel 属性来配置这样的通道,如下例所示:

<int-jms:inbound-gateway request-destination="requestQueue"
request-channel="jmsInputChannel"
error-channel="errorTransformationChannel"/>

<int:transformer input-channel="exceptionTransformationChannel"
ref="exceptionTransformer" method="createErrorResponse"/>
xml

你可能会注意到,这个例子与 进入 GatewayProxyFactoryBean 中包含的例子非常相似。这里应用了相同的概念:exceptionTransformer 可以是一个创建错误响应对象的 POJO,你可以引用 nullChannel 来抑制错误,或者你可以省略 'error-channel' 以让异常传播。

当从主题消费时,将 pub-sub-domain 属性设置为 true。对于持久订阅,将 subscription-durable 设置为 true;对于共享订阅,将 subscription-shared 设置为 true(需要 JMS 2.0 代理,并且自 4.2 版本起可用)。使用 subscription-name 来命名订阅。

important

从 4.2 版本开始,默认的 acknowledge 模式是 transacted,除非提供了外部容器。在这种情况下,你应该根据需要配置容器。我们建议你使用 transacted 模式与 DefaultMessageListenerContainer 以避免消息丢失。

从 5.1 版开始,当端点停止而应用程序继续运行时,底层监听器容器将关闭,从而关闭其共享连接和消费者。以前,连接和消费者保持打开状态。要恢复到之前的行为,可以将 JmsInboundGateway 上的 shutdownContainerOnStop 设置为 false

默认情况下,JmsInboundGateway 在接收到的消息中查找 jakarta.jms.Message.getJMSReplyTo() 属性,以确定发送回复的位置。否则,它可以配置为使用静态的 defaultReplyDestination,或 defaultReplyQueueNamedefaultReplyTopicName。此外,从 6.1 版开始,可以在提供的 ChannelPublishingJmsMessageListener 上配置 replyToExpression,以在请求的标准 JMSReplyTo 属性为 null 时动态确定回复目标。接收到的 jakarta.jms.Message 用作根评估上下文对象。以下示例演示了如何使用 Java DSL API 配置带有自定义回复目标的入站 JMS 网关,该目标是从请求消息解析的:

@Bean
public IntegrationFlow jmsInboundGatewayFlow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(
Jms.inboundGateway(connectionFactory)
.requestDestination("requestDestination")
.replyToFunction(message -> message.getStringProperty("myReplyTo")))
.<String, String>transform(String::toUpperCase)
.get();
}
java

从 6.3 版开始,Jms.inboundGateway() API 暴露了 retryTemplate()recoveryCallback() 选项,用于重试内部的发送和接收操作。

外发网关

outbound 网关从 Spring Integration 消息创建 JMS 消息并将其发送到 request-destination。然后它通过使用选择器从您配置的 reply-destination 接收 JMS 回复消息,或者如果未提供 reply-destination,则通过创建 JMS TemporaryQueue(如果 replyPubSubDomain= true 则为 TemporaryTopic)实例来处理 JMS 回复消息。

警告

使用 reply-destination (或 reply-destination-name )与设置了 cacheConsumerstrueCachingConnectionFactory 可能会导致内存不足的情况。这是因为在每个请求中,都会创建一个新的消费者和一个新的选择器(根据 correlation-key 值进行选择,或者在没有 correlation-key 时,根据发送的 JMSMessageID 进行选择)。由于这些选择器是唯一的,所以在当前请求完成后,它们会保留在缓存中(未被使用)。

如果您指定了回复目的地,则建议不要使用缓存的消费者。或者,考虑使用 <reply-listener/> ,如 下文所述

以下示例展示了如何配置一个出站网关:

<int-jms:outbound-gateway id="jmsOutGateway"
request-destination="outQueue"
request-channel="outboundJmsRequests"
reply-channel="jmsReplies"/>
xml

'outbound-gateway' 的有效负载提取属性与 'inbound-gateway' 的属性是相反的(参见 前面的讨论)。这意味着 'extract-request-payload' 属性值适用于被转换成 JMS 消息以作为请求发送的 Spring Integration 消息。'extract-reply-payload' 属性值适用于作为回复接收的 JMS 消息,然后将其转换为 Spring Integration 消息,并随后发送到 'reply-channel',如前面的配置示例所示。

使用 <reply-listener/>

Spring Integration 2.2 引入了一种处理回复的替代技术。如果你向网关添加一个 <reply-listener/> 子元素,而不是为每个回复创建一个消费者,则会使用 MessageListener 容器来接收回复并将它们交给请求线程。这提供了许多性能优势,并且缓解了前面警告中描述的缓存消费者内存利用率问题。

当使用带有无 reply-destination 的出站网关的 <reply-listener/> 时,不会为每个请求创建一个 TemporaryQueue,而是使用单个 TemporaryQueue。(如果与代理的连接断开并恢复,网关会根据需要创建额外的 TemporaryQueue)。如果将 replyPubSubDomain 设置为 true,从版本 6.0 开始,将创建 TemporaryTopic 而不是 TemporaryQueue

当使用 correlation-key 时,多个网关可以共享相同的回复目的地,因为监听器容器使用对每个网关唯一的选择器。

警告

如果您指定了回复监听器并指定了回复目标(或回复目标名称)但没有提供相关键,网关会记录警告并回退到 2.2 版本之前的行为。这是因为在这种情况下没有办法配置选择器。因此,没有办法避免回复发送到可能配置了相同回复目标的不同网关。

请注意,在这种情况下,每个请求都使用一个新的消费者,并且如上所述的消费者可能会在内存中堆积;因此在这种情况下不应该使用缓存的消费者。

以下示例显示了一个具有默认属性的回复监听器:

<int-jms:outbound-gateway id="jmsOutGateway"
request-destination="outQueue"
request-channel="outboundJmsRequests"
reply-channel="jmsReplies">
<int-jms:reply-listener />
</int-jms-outbound-gateway>
xml

监听器非常轻量级,我们预计在大多数情况下,您只需要一个消费者。但是,您可以添加诸如 concurrent-consumersmax-concurrent-consumers 等属性。请参阅模式以获取支持属性的完整列表,并参考 Spring JMS 文档以了解它们的含义。

空闲回复监听器

从 4.2 版本开始,您可以根据需要启动回复监听器(并在空闲一段时间后停止它),而不是在整个网关的生命周期中运行。如果您在应用程序上下文中有很多网关,并且它们大多处于空闲状态,这将非常有用。一种这样的情况是在上下文中使用 Spring Integration 和 JMS 进行分区分发的许多(非活动)分区 Spring Batch 作业。如果所有回复监听器都处于活动状态,则 JMS 经纪人每个网关都有一个活跃消费者。通过启用空闲超时,每个消费者仅在其对应的批处理作业运行期间存在(以及在它完成后的一小段时间内)。

参见 属性参考 中的 idle-reply-listener-timeout

网关回复关联

本节描述了用于回复关联的机制(确保原始网关只接收对其请求的回复),具体取决于网关的配置方式。有关这里讨论的属性的完整描述,请参阅属性参考

以下列表描述了各种场景(数字仅用于标识——顺序无关紧要):

  1. 没有 reply-destination* 属性且没有 <reply-listener>

    为每个请求创建一个 TemporaryQueue,并在请求完成(成功或失败)时删除。correlation-key 无关紧要。

  2. 提供了 reply-destination* 属性且既没有 <reply-listener/> 也没有 correlation-key

    使用等于发出消息的 JMSCorrelationID 作为消费者的 message selector:

    messageSelector = "JMSCorrelationID = '" + messageId + "'"

    响应系统应将传入的 JMSMessageID 返回到回复中的 JMSCorrelationID。这是常见的模式,并由 Spring Integration inbound gateway 以及 Spring 的 MessageListenerAdapter(用于消息驱动的 POJO)实现。

    备注

    使用此配置时,你不应该使用主题进行回复。回复可能会丢失。

  3. 提供了 reply-destination* 属性,没有 <reply-listener/>,并且 correlation-key="JMSCorrelationID"

    网关生成唯一的 correlation ID 并将其插入 JMSCorrelationID 头中。message selector 是:

    messageSelector = "JMSCorrelationID = '" + uniqueId + "'"

    响应系统应将传入的 JMSCorrelationID 返回到回复中的 JMSCorrelationID。这是常见的模式,并由 Spring Integration inbound gateway 以及 Spring 的 MessageListenerAdapter(用于消息驱动的 POJO)实现。

  4. 提供了 reply-destination* 属性,没有 <reply-listener/>,并且 correlation-key="myCorrelationHeader"

    网关生成唯一的 correlation ID 并将其插入 myCorrelationHeader 消息属性中。correlation-key 可以是任何用户定义的值。message selector 是:

    messageSelector = "myCorrelationHeader = '" + uniqueId + "'"

    响应系统应将传入的 myCorrelationHeader 返回到回复中的 myCorrelationHeader

  5. 提供了 reply-destination* 属性,没有 <reply-listener/>,并且 correlation-key="JMSCorrelationID*"(注意 correlation key 中的 *

    网关使用请求消息中的 jms_correlationId 头(如果存在)的值,并将其插入 JMSCorrelationID 头中。message selector 是:

    messageSelector = "JMSCorrelationID = '" + headers['jms_correlationId'] + "'"

    用户必须确保此值唯一。

    如果头不存在,网关行为如同 3

    响应系统应将传入的 JMSCorrelationID 返回到回复中的 JMSCorrelationID。这是常见的模式,并由 Spring Integration inbound gateway 以及 Spring 的 MessageListenerAdapter(用于消息驱动的 POJO)实现。

  6. 没有提供 reply-destination* 属性,但提供了 <reply-listener>

    创建一个临时队列并用于此网关实例的所有回复。消息中不需要关联数据,但内部使用的传出 JMSMessageID 在网关中用于将回复定向到正确的请求线程。

  7. 提供了 reply-destination* 属性,提供了 <reply-listener> 且没有 correlation-key

    不允许。

    忽略 <reply-listener/> 配置,网关行为如同 2。会记录一条警告日志消息以指示这种情况。

  8. 提供了 reply-destination* 属性,提供了 <reply-listener>correlation-key="JMSCorrelationID"

    网关有一个唯一的 correlation ID,并将其与递增的值一起插入 JMSCorrelationID 头中(gatewayId + "_" + ++seq)。message selector 是:

    messageSelector = "JMSCorrelationID LIKE '" + gatewayId%'"

    响应系统应将传入的 JMSCorrelationID 返回到回复中的 JMSCorrelationID。这是常见的模式,并由 Spring Integration inbound gateway 以及 Spring 的 MessageListenerAdapter(用于消息驱动的 POJO)实现。由于每个网关都有唯一的 ID,每个实例只接收自己的回复。完整的关联数据用于将回复路由到正确的请求线程。

  9. 提供了 reply-destination* 属性,提供了 <reply-listener/>correlation-key="myCorrelationHeader"

    网关有一个唯一的 correlation ID,并将其与递增的值一起插入 myCorrelationHeader 属性中(gatewayId + "_" + ++seq)。correlation-key 可以是任何用户定义的值。message selector 是:

    messageSelector = "myCorrelationHeader LIKE '" + gatewayId%'"

    响应系统应将传入的 myCorrelationHeader 返回到回复中的 myCorrelationHeader。由于每个网关都有唯一的 ID,每个实例只接收自己的回复。完整的关联数据用于将回复路由到正确的请求线程。

  10. 提供了 reply-destination* 属性,提供了 <reply-listener/>correlation-key="JMSCorrelationID*"

(注意相关键中的 *

不允许。

用户提供的相关 ID 不允许用于回复监听器。网关不会使用此配置进行初始化。

异步网关

从 4.3 版本开始,现在可以在配置出站网关时指定 async="true"(或在 Java 中使用 setAsync(true))。

默认情况下,当请求发送到网关时,请求线程会被挂起,直到收到回复。然后流程会在该线程上继续执行。如果 asynctrue,则在 send() 完成后请求线程会立即被释放,并且回复会在监听器容器线程上返回(并且流程继续执行)。当网关在一个轮询线程上调用时,这可能会很有用。线程被释放,并可用于框架内的其他任务。

async 需要一个 <reply-listener/>(或在使用 Java 配置时 setUseReplyContainer(true))。它还需要指定一个 correlationKey(通常是 JMSCorrelationID)。如果这些条件中的任何一个没有满足,async 将被忽略。

属性引用

以下列表显示了 outbound-gateway 的所有可用属性:

<int-jms:outbound-gateway
connection-factory="connectionFactory" // <1>
correlation-key="" // <2>
delivery-persistent="" // <3>
destination-resolver="" // <4>
explicit-qos-enabled="" // <5>
extract-reply-payload="true" // <6>
extract-request-payload="true" // <7>
header-mapper="" // <8>
message-converter="" // <9>
priority="" // <10>
receive-timeout="" // <11>
reply-channel="" // <12>
reply-destination="" // <13>
reply-destination-expression="" // <14>
reply-destination-name="" // <15>
reply-pub-sub-domain="" // <16>
reply-timeout="" // <17>
request-channel="" // <18>
request-destination="" // <19>
request-destination-expression="" // <20>
request-destination-name="" // <21>
request-pub-sub-domain="" // <22>
time-to-live="" // <23>
requires-reply="" // <24>
idle-reply-listener-timeout="" // <25>
async=""> // <26>
<int-jms:reply-listener /> // <27>
</int-jms:outbound-gateway>
xml
  • 引用 jakarta.jms.ConnectionFactory。默认为 jmsConnectionFactory

  • 包含关联数据的属性名称,用于将响应与回复相关联。如果省略,网关期望响应系统在 JMSCorrelationID 头中返回出站 JMSMessageID 头的值。如果指定了该属性,网关将生成一个关联 ID 并将其填充到指定的属性中。响应系统必须在同一属性中回显该值。它可以设置为 JMSCorrelationID,在这种情况下,使用标准头而不是 String 属性来保存关联数据。当您使用 <reply-container/> 时,如果提供了显式的 reply-destination,则必须指定 correlation-key。从 4.0.1 版本开始,此属性还支持 JMSCorrelationID* 值,这意味着如果出站消息已经具有 JMSCorrelationID(从 jms_correlationId 映射)头,则使用它而不是生成一个新的。注意,当您使用 <reply-container/> 时,不允许使用 JMSCorrelationID* 键,因为容器需要在初始化期间设置消息选择器。

important

你应该理解网关没有办法确保唯一性,如果提供的相关 ID 不是唯一的,则可能会发生意外的副作用。

  • [#] 您应该理解网关无法确保唯一性,如果提供的相关 ID 不唯一,则可能会发生意外的副作用。
  • 一个布尔值,指示传递模式应该是 DeliveryMode.PERSISTENT (true) 或 DeliveryMode.NON_PERSISTENT (false)。此设置仅在 explicit-qos-enabledtrue 时生效。

  • 一个 DestinationResolver。默认是一个 DynamicDestinationResolver,它将目标名称映射到同名的队列或主题。

  • 当设置为 true 时,启用服务质量属性的使用:prioritydelivery-modetime-to-live

  • 当设置为 true(默认值),Spring Integration 回复消息的有效负载是从 JMS 回复消息的正文创建的(通过使用 MessageConverter)。当设置为 false 时,整个 JMS 消息成为 Spring Integration 消息的有效负载。

  • 当设置为 true(默认值),Spring Integration 消息的有效负载被转换为 JMSMessage(通过使用 MessageConverter)。当设置为 false 时,整个 Spring Integration 消息被转换为 JMSMessage。在这两种情况下,Spring Integration 消息头都通过 HeaderMapper 映射到 JMS 头和属性。

  • 用于将 Spring Integration 消息头映射到 JMS 消息头和属性的 HeaderMapper

  • 一个引用 MessageConverter 的对象,用于在 JMS 消息和 Spring Integration 消息有效负载(或消息,如果 extract-request-payloadfalse)之间进行转换。默认是 SimpleMessageConverter

  • 请求消息的默认优先级。如果有消息优先级头,则会覆盖此值。其范围是 09。此设置仅在 explicit-qos-enabledtrue 时生效。

  • 等待回复的时间(以毫秒为单位)。默认是 5000(五秒)。

  • 回复消息发送到的通道。

  • Destination 的引用,设置为 JMSReplyTo 头。最多只能设置 reply-destinationreply-destination-expressionreply-destination-name 中的一个。如果没有提供任何一项,则使用 TemporaryQueue 作为此网关的回复。

  • 一个 SpEL 表达式,评估为 Destination,将被设置为 JMSReplyTo 头。表达式可以是 Destination 对象或 String。它由 DestinationResolver 用来解析实际的 Destination。最多只能设置 reply-destinationreply-destination-expressionreply-destination-name 中的一个。如果没有提供任何一项,则使用 TemporaryQueue 作为此网关的回复。

  • 设置为 JMSReplyTo 头的目标名称。它由 DestinationResolver 用来解析实际的 Destination。最多只能设置 reply-destinationreply-destination-expressionreply-destination-name 中的一个。如果没有提供任何一项,则使用 TemporaryQueue 作为此网关的回复。

  • 当设置为 true 时,表示由 DestinationResolver 解析的任何回复 Destination 应该是一个 Topic 而不是 Queue

  • 网关在向 reply-channel 发送回复消息时等待的时间。这只有在 reply-channel 可以阻塞的情况下才有影响——例如,容量已满的 QueueChannel。默认是无限期。

  • 此网关接收请求消息的通道。

  • 对发送请求消息的 Destination 的引用。需要设置 reply-destinationreply-destination-expressionreply-destination-name 中的一个。只能使用这三个属性中的一个。

  • 一个 SpEL 表达式,评估为发送请求消息的 Destination。表达式可以是 Destination 对象或 String。它由 DestinationResolver 用来解析实际的 Destination。需要设置 reply-destinationreply-destination-expressionreply-destination-name 中的一个。只能使用这三个属性中的一个。

  • 发送请求消息的目标名称。它由 DestinationResolver 用来解析实际的 Destination。需要设置 reply-destinationreply-destination-expressionreply-destination-name 中的一个。只能使用这三个属性中的一个。

  • 当设置为 true 时,表示由 DestinationResolver 解析的任何请求 Destination 应该是一个 Topic 而不是 Queue

  • 指定消息的生存时间。此设置仅在 explicit-qos-enabledtrue 时生效。

  • 指定此出站网关是否必须返回非空值。默认情况下,此值为 true,如果底层服务在 receive-timeout 后未返回值,则会抛出 MessageTimeoutException。请注意,如果服务从未预期返回回复,最好使用 <int-jms:outbound-channel-adapter/> 而不是带有 requires-reply="false"<int-jms:outbound-gateway/>。后者会使发送线程阻塞,等待 receive-timeout 期间的回复。

  • 当您使用 <reply-listener /> 时,默认情况下其生命周期(启动和停止)与网关匹配。当此值大于 0 时,容器会在有请求发送时按需启动。容器将继续运行,直到至少这段时间内没有收到请求(并且没有未完成的回复)。容器会在下一个请求时再次启动。停止时间是一个最小值,实际上可能高达 1.5 倍于此值。

  • 参见 异步网关

  • 包含此元素时,回复由异步 MessageListenerContainer 接收,而不是为每个回复创建消费者。这在许多情况下更高效。

将消息头映射到 JMS 消息和从 JMS 消息映射

JMS 消息可以包含元信息,例如 JMS API 标头和简单属性。您可以使用 JmsHeaderMapper 将这些内容映射到 Spring Integration 消息标头或从其映射。JMS API 标头将传递给适当 的 setter 方法(如 setJMSReplyTo),而其他标头则复制到 JMS 消息的一般属性中。JMS 外发网关是用 JmsHeaderMapper 的默认实现引导的,它将映射标准 JMS API 标头以及原始或 String 消息标头。您也可以通过使用入站和出站网关的 header-mapper 属性提供自定义标头映射器。

important

许多 JMS 厂商特定的客户端不允许直接在已创建的 JMS 消息上设置 deliveryModeprioritytimeToLive 属性。这些属性被认为是 QoS 属性,因此必须传播到目标 MessageProducer.send(message, deliveryMode, priority, timeToLive) API。由于这个原因,DefaultJmsHeaderMapper 不会将适当的 Spring Integration 头(或表达式结果)映射到上述 JMS 消息属性。相反,JmsSendingMessageHandler 使用 DynamicJmsTemplate 将请求消息中的头值传播到 MessageProducer.send() API。要启用此功能,您必须使用 explicitQosEnabled 属性设置为 true 的 DynamicJmsTemplate 配置输出端点。Spring Integration Java DSL 默认配置了一个 DynamicJmsTemplate,但您仍然需要设置 explicitQosEnabled 属性。

important

从 4.0 版本开始,JMSPriority 标头被映射到标准的 priority 标头用于传入消息。之前,priority 标头仅用于传出消息。要恢复到之前的行为(即,不映射传入优先级),将 DefaultJmsHeaderMappermapInboundPriority 属性设置为 false

important

从 4.3 版本开始,DefaultJmsHeaderMapper 通过调用 correlationIdtoString() 方法(correlationId 通常是 UUID,这是 JMS 不支持的)将标准 correlationId 头映射为消息属性。在入站时,它被映射为一个 String。这与 jms_correlationId 头是独立的,后者映射到和来自 JMSCorrelationID 头。JMSCorrelationID 通常用于关联请求和回复,而 correlationId 则经常用于将相关消息组合成一组(例如使用聚合器或重排序器)。

从 5.1 版本开始,可以配置 DefaultJmsHeaderMapper 以映射传入的 JMSDeliveryModeJMSExpiration 属性:

@Bean
public DefaultJmsHeaderMapper jmsHeaderMapper() {
DefaultJmsHeaderMapper mapper = new DefaultJmsHeaderMapper();
mapper.setMapInboundDeliveryMode(true)
mapper.setMapInboundExpiration(true)
return mapper;
}
java

这些 JMS 属性分别映射到 JmsHeaders.DELIVERY_MODEJmsHeaders.EXPIRATION Spring 消息头。

消息转换、编组和解组

如果你需要转换消息,所有 JMS 适配器和网关都允许你通过设置 message-converter 属性来提供一个 MessageConverter。为此,请提供在同一 ApplicationContext 中可用的 MessageConverter 实例的 bean 名称。此外,为了与编解码器接口保持一致,Spring 提供了 MarshallingMessageConverter,你可以使用自己的自定义编解码器进行配置。以下示例展示了如何做到这一点。

<int-jms:inbound-gateway request-destination="requestQueue"
request-channel="inbound-gateway-channel"
message-converter="marshallingMessageConverter"/>

<bean id="marshallingMessageConverter"
class="org.springframework.jms.support.converter.MarshallingMessageConverter">
<constructor-arg>
<bean class="org.bar.SampleMarshaller"/>
</constructor-arg>
<constructor-arg>
<bean class="org.bar.SampleUnmarshaller"/>
</constructor-arg>
</bean>
xml
备注

当你提供自己的 MessageConverter 实例时,它仍然被包装在 HeaderMappingMessageConverter 内部。这意味着 'extract-request-payload' 和 'extract-reply-payload' 属性可以影响传递给你的转换器的实际对象。HeaderMappingMessageConverter 本身委托给目标 MessageConverter,同时将 Spring Integration 的 MessageHeaders 映射到 JMS 消息属性,反之亦然。

基于 JMS 的消息通道

前面介绍的通道适配器和网关都是为与其他外部系统集成的应用程序设计的。入站选项假定某些其他系统正在将 JMS 消息发送到 JMS 目的地,而出站选项假定某些其他系统正在从目的地接收消息。其他系统可能是也可能不是 Spring Integration 应用程序。当然,当将 Spring Integration 消息实例作为 JMS 消息本身的内容发送(将 extract-payload 值设置为 false)时,假定其他系统是基于 Spring Integration 的。然而,这绝不是必需的。这种灵活性是使用基于消息的集成选项与“通道”抽象(在 JMS 的情况下为目的地)的好处之一。

有时,给定 JMS 目的地的生产者和消费者都打算成为同一应用程序的一部分,在同一个进程中运行。你可以通过使用一对入站和出站通道适配器来实现这一点。这种方法的问题在于你需要两个适配器,即使从概念上讲,目标是拥有一个单一的消息通道。一个更好的选项是从 Spring Integration 2.0 版本开始支持的。现在,在使用 JMS 命名空间时,可以定义一个单一的“通道”,如下例所示:

<int-jms:channel id="jmsChannel" queue="exampleQueue"/>
xml

前面的例子中的通道表现得非常像主 Spring Integration 命名空间中的普通 <channel/> 元素。它可以通过任何端点的 input-channeloutput-channel 属性进行引用。不同之处在于,这个通道是由名为 exampleQueue 的 JMS 队列实例支持的。这意味着在生产和消费端点之间可以实现异步消息传递。然而,与通过在非 JMS <channel/> 元素内添加 <queue/> 元素创建的更简单的异步消息通道不同,消息不会存储在内存队列中。相反,这些消息是在 JMS 消息体中传递的,并且该通道可以使用底层 JMS 提供程序的全部功能。使用这种替代方案最常见的理由可能是利用 JMS 消息传递的存储转发方法所提供的持久性。

如果配置正确,JMS 支持的消息通道也支持事务。换句话说,如果生产者的发送操作是属于一个回滚的事务,则不会实际写入到事务性 JMS 支持的通道。同样地,如果消费者的接收消息操作是属于一个回滚的事务,则不会从通道中物理移除 JMS 消息。请注意,在这种情况下,生产和消费的事务是独立的。这与简单的、同步的 <channel/> 元素(没有 <queue/> 子元素)跨事务上下文传播有显著不同。

由于上述示例引用了一个 JMS 队列实例,因此它作为一个点对点通道。另一方面,如果您需要发布 - 订阅行为,则可以使用单独的元素并引用 JMS 主题。以下示例展示了如何做到这一点:

<int-jms:publish-subscribe-channel id="jmsChannel" topic="exampleTopic"/>
xml

对于这两种类型的 JMS 支持的通道,可以提供目标的名称而不是引用,如下例所示:

<int-jms:channel id="jmsQueueChannel" queue-name="exampleQueueName"/>

<jms:publish-subscribe-channel id="jmsTopicChannel" topic-name="exampleTopicName"/>
xml

在前面的示例中,目标名称由 Spring 的默认 DynamicDestinationResolver 实现解析,但您可以提供 DestinationResolver 接口的任何实现。此外,JMS ConnectionFactory 是通道的必需属性,但默认情况下,预期的 bean 名称是 jmsConnectionFactory。以下示例提供了一个自定义实例,用于解析 JMS 目标名称,并为 ConnectionFactory 提供了不同的名称:

<int-jms:channel id="jmsChannel" queue-name="exampleQueueName"
destination-resolver="customDestinationResolver"
connection-factory="customConnectionFactory"/>
xml

对于 <publish-subscribe-channel />,将 durable 属性设置为 true 以进行持久订阅,或设置为 subscription-shared 以进行共享订阅(需要 JMS 2.0 代理,并且自版本 4.2 起可用)。使用 subscription 来命名订阅。

使用 JMS 消息选择器

使用 JMS 消息选择器,你可以基于 JMS 标头以及 JMS 属性来过滤 JMS Messages 。例如,如果你想监听自定义 JMS 标头属性 myHeaderProperty 等于 something 的消息,可以指定以下表达式:

myHeaderProperty = 'something'
xml

消息选择器表达式是 SQL-92 条件表达语法的一个子集,并且作为 Java 消息服务 规范的一部分被定义。你可以通过使用 XML 命名空间配置为以下 Spring Integration JMS 组件指定 JMS 消息 selector 属性:

  • JMS 通道

  • JMS 发布订阅通道

  • JMS 输入通道适配器

  • JMS 输入网关

  • JMS 消息驱动的通道适配器

important

您不能通过使用 JMS 消息选择器来引用消息体值。

JMS 示例

要试验这些 JMS 适配器,可以查看 Spring Integration Samples Git 仓库中提供的 JMS 示例,地址为 https://github.com/spring-projects/spring-integration-samples/tree/master/basic/jms

该仓库包含两个示例。一个提供了入站和出站通道适配器,另一个提供了入站和出站网关。它们被配置为与嵌入式的 ActiveMQ 进程一起运行,但你可以修改每个示例的 common.xml Spring 应用程序上下文文件,以支持不同的 JMS 提供商或独立的 ActiveMQ 进程。

换句话说,你可以将配置拆分,使入站和出站适配器在单独的 JVM 中运行。如果你安装了 ActiveMQ,请修改 common.xml 文件中的 brokerURL 属性,使用 tcp://localhost:61616(而不是 vm://localhost)。这两个示例都从 stdin 接受输入并回显到 stdout。查看配置以了解这些消息如何通过 JMS 进行路由。