使用 Spring JMS
本节介绍如何使用 Spring 的 JMS 组件。
使用 JmsTemplate
JmsTemplate
类是 JMS 核心包中的核心类。它简化了 JMS 的使用,因为它在发送或同步接收消息时处理资源的创建和释放。
使用 JmsTemplate
的代码只需要实现回调接口,这些接口为它们提供了一个明确的高级契约。当 JmsTemplate
中的调用代码提供一个 Session
时,MessageCreator
回调接口会创建一个消息。为了允许更复杂的 JMS API 使用,SessionCallback
提供了 JMS 会话,而 ProducerCallback
则公开了一个 Session
和 MessageProducer
对。
JMS API 提供了两种类型的发送方法,一种方法接受传递模式、优先级和存活时间作为服务质量(QOS)参数,另一种方法不接受 QOS 参数并使用默认值。由于 JmsTemplate
有许多发送方法,因此将 QOS 参数设置为 bean 属性,以避免发送方法数量的重复。类似地,同步接收调用的超时时间是通过使用 setReceiveTimeout
属性来设置的。
一些 JMS 提供者允许通过配置 ConnectionFactory
来以管理方式设置默认的 QOS 值。这样做的结果是,调用 MessageProducer
实例的 send
方法(send(Destination destination, Message message)
)时使用的 QOS 默认值与 JMS 规范中指定的不同。为了提供一致的 QOS 值管理,必须通过将布尔属性 isExplicitQosEnabled
设置为 true
来专门启用 JmsTemplate
使用其自己的 QOS 值。
为了方便起见,JmsTemplate
还提供了一个基本的请求-回复操作,该操作允许发送消息并在操作的一部分创建的临时队列上等待回复。
一旦配置完成,JmsTemplate
类的实例是线程安全的。这一点很重要,因为这意味着你可以配置一个 JmsTemplate
的单一实例,然后安全地将这个共享引用注入到多个协作者中。需要明确的是,JmsTemplate
是有状态的,因为它维护了一个对 ConnectionFactory
的引用,但这种状态不是会话状态。
从 Spring Framework 4.1 开始,JmsMessagingTemplate
构建于 JmsTemplate
之上,并提供了与消息抽象的集成——即 org.springframework.messaging.Message
。这使您可以以通用的方式创建要发送的消息。
连接
JmsTemplate
需要一个对 ConnectionFactory
的引用。ConnectionFactory
是 JMS 规范的一部分,作为使用 JMS 的入口。客户端应用程序使用它作为工厂来创建与 JMS 提供者的连接,并封装了各种配置参数,其中许多是特定于供应商的,例如 SSL 配置选项。
在 EJB 中使用 JMS 时,供应商提供了 JMS 接口的实现,以便它们可以参与声明式事务管理并执行连接和会话的池化。为了使用此实现,Jakarta EE 容器通常要求您在 EJB 或 servlet 部署描述符中将 JMS 连接工厂声明为 resource-ref
。为了确保在 EJB 中使用 JmsTemplate
时能够使用这些功能,客户端应用程序应确保引用 ConnectionFactory
的托管实现。
缓存消息资源
标准 API 涉及创建许多中间对象。要发送消息,需要执行以下“API”流程:
ConnectionFactory->Connection->Session->MessageProducer->send
在 ConnectionFactory
和 Send
操作之间,会创建和销毁三个中间对象。为了优化资源使用并提高性能,Spring 提供了两种 ConnectionFactory
的实现。
使用 SingleConnectionFactory
Spring 提供了一个 ConnectionFactory
接口的实现,SingleConnectionFactory
,它在所有 createConnection()
调用中返回相同的 Connection
,并忽略对 close()
的调用。这对于测试和独立环境非常有用,因为可以使用相同的连接进行多个 JmsTemplate
调用,这些调用可能跨越任意数量的事务。SingleConnectionFactory
需要一个标准 ConnectionFactory
的引用,该引用通常来自 JNDI。
使用 CachingConnectionFactory
CachingConnectionFactory
扩展了 SingleConnectionFactory
的功能,并增加了对 Session
、MessageProducer
和 MessageConsumer
实例的缓存。初始缓存大小设置为 1
。你可以使用 sessionCacheSize
属性来增加缓存会话的数量。注意,实际缓存的会话数量会超过这个数字,因为会话是基于其确认模式进行缓存的,因此当 sessionCacheSize
设置为 1 时,可以有多达四个缓存的会话实例(每个确认模式一个)。MessageProducer
和 MessageConsumer
实例在其所属会话内缓存,并且在缓存时也考虑到生产者和消费者的独特属性。MessageProducers 是基于其目的地进行缓存的。MessageConsumers 是基于一个由目的地、选择器、noLocal 传递标志和持久订阅名称(如果创建持久消费者)组成的键进行缓存的。
临时队列和主题(TemporaryQueue/TemporaryTopic)的 MessageProducers 和 MessageConsumers 永远不会被缓存。不幸的是,WebLogic JMS 在其常规目标实现上实现了临时队列/主题接口,错误地表明其目标都不能被缓存。请在 WebLogic 上使用不同的连接池/缓存,或为 WebLogic 目的自定义 CachingConnectionFactory
。
目标管理
目标(Destinations),作为 ConnectionFactory
实例,是可以存储和检索在 JNDI 中的 JMS 管理对象。在配置 Spring 应用程序上下文时,可以使用 JNDI 的 JndiObjectFactoryBean
工厂类或 <jee:jndi-lookup>
来对对象对 JMS 目标的引用执行依赖注入。然而,如果应用程序中有大量目标,或者 JMS 提供者有独特的高级目标管理功能,这种策略通常会很繁琐。这些高级目标管理的例子包括创建动态目标或支持目标的分层命名空间。JmsTemplate
将目标名称的解析委托给实现 DestinationResolver
接口的 JMS 目标对象。DynamicDestinationResolver
是 JmsTemplate
使用的默认实现,支持解析动态目标。还提供了一个 JndiDestinationResolver
,用于作为 JNDI 中包含的目标的服务定位器,并可选择性地回退到 DynamicDestinationResolver
中包含的行为。
在 JMS 应用程序中,目标通常只有在运行时才知道,因此在应用程序部署时无法通过管理方式创建。这通常是因为交互系统组件之间共享了应用程序逻辑,这些逻辑会根据一个众所周知的命名约定在运行时创建目标。即使动态目标的创建不属于 JMS 规范的一部分,但大多数供应商都提供了此功能。动态目标是使用用户定义的名称创建的,这使它们与临时目标区分开来,并且通常不会在 JNDI 中注册。用于创建动态目标的 API 因供应商而异,因为与目标相关的属性是供应商特定的。然而,一些供应商有时会选择简单的实现方式,忽略 JMS 规范中的警告,使用 TopicSession
的 createTopic(String topicName)
方法或 QueueSession
的 createQueue(String queueName)
方法来创建具有默认目标属性的新目标。根据供应商的实现,DynamicDestinationResolver
也可以创建物理目标,而不仅仅是解析一个目标。
布尔属性 pubSubDomain
用于配置 JmsTemplate
以了解正在使用的 JMS 域。默认情况下,该属性的值为 false,表示使用点对点域,即 Queues
。此属性(由 JmsTemplate
使用)通过 DestinationResolver
接口的实现来决定动态目标解析的行为。
您还可以通过属性 defaultDestination
配置 JmsTemplate
的默认目标。默认目标用于发送和接收操作,这些操作不涉及特定目标。
消息监听器容器
在 EJB 世界中,JMS 消息最常见的用途之一是驱动消息驱动 Bean(MDBs)。Spring 提供了一种解决方案,可以创建消息驱动 POJO(MDPs),而不将用户绑定到 EJB 容器。(有关 Spring 的 MDP 支持的详细介绍,请参见异步接收:消息驱动 POJO)。端点方法可以使用 @JmsListener
注解 — 更多详情请参见注解驱动的监听器端点。
消息监听器容器用于从 JMS 消息队列接收消息,并驱动注入到其中的 MessageListener
。监听器容器负责所有消息接收的线程处理,并将消息分派给监听器进行处理。消息监听器容器是 MDP 和消息提供者之间的中介,负责注册接收消息、参与事务、资源获取和释放、异常转换等。这使您能够编写与接收消息(并可能响应消息)相关的(可能复杂的)业务逻辑,并将样板 JMS 基础设施问题委托给框架。
Spring 中包含两个标准的 JMS 消息监听器容器,每个都有其专门的功能集。
使用 SimpleMessageListenerContainer
此消息监听器容器是两种标准风格中较简单的一种。它在启动时创建固定数量的 JMS 会话和消费者,通过标准的 JMS MessageConsumer.setMessageListener()
方法注册监听器,并交由 JMS 提供者执行监听器回调。此变体不允许动态适应运行时需求,也不支持参与外部管理的事务。在兼容性方面,它非常接近独立的 JMS 规范的精神,但通常与 Jakarta EE 的 JMS 限制不兼容。
虽然 SimpleMessageListenerContainer
不允许参与外部管理的事务,但它支持本地 JMS 事务。要启用此功能,可以将 sessionTransacted
标志切换为 true
,或者在 XML 命名空间中,将 acknowledge
属性设置为 transacted
。这样,当监听器抛出异常时,会导致回滚,并重新传递消息。或者,可以考虑使用 CLIENT_ACKNOWLEDGE
模式,该模式在发生异常时也提供重新传递,但不使用事务化的 Session
实例,因此不包括事务协议中的任何其他 Session
操作(例如发送响应消息)。
默认的 AUTO_ACKNOWLEDGE
模式不提供适当的可靠性保证。当监听器执行失败时(因为提供者在监听器调用后自动确认每条消息,并且没有异常传播到提供者)或者当监听器容器关闭时,消息可能会丢失(您可以通过设置 acceptMessagesWhileStopping
标志来配置这一点)。如果需要可靠性,请确保使用事务会话(例如,用于可靠的队列处理和持久主题订阅)。
使用 DefaultMessageListenerContainer
这个消息监听器容器在大多数情况下使用。与 SimpleMessageListenerContainer
相比,这种容器变体允许动态适应运行时需求,并能够参与外部管理的事务。当配置了 JtaTransactionManager
时,每个接收到的消息都会注册到一个 XA 事务中。因此,处理可以利用 XA 事务语义。这个监听器容器在对 JMS 提供者的低要求、先进功能(如参与外部管理的事务)和与 Jakarta EE 环境的兼容性之间取得了良好的平衡。
您可以自定义容器的缓存级别。请注意,当未启用缓存时,每次接收消息都会创建一个新的连接和会话。将其与高负载的非持久订阅结合使用可能会导致消息丢失。在这种情况下,请确保使用适当的缓存级别。
此容器在代理关闭时也具有可恢复能力。默认情况下,一个简单的 BackOff
实现每五秒重试一次。您可以指定一个自定义的 BackOff
实现以获得更细粒度的恢复选项。有关示例,请参见 ExponentialBackOff。
与其兄弟组件(SimpleMessageListenerContainer)类似,DefaultMessageListenerContainer
支持原生 JMS 事务,并允许自定义确认模式。如果适合您的场景,强烈建议使用这种方式,而不是外部管理的事务——也就是说,如果您可以接受 JVM 崩溃时偶尔出现的重复消息。在您的业务逻辑中可以采取自定义的重复消息检测步骤来应对这种情况——例如,以业务实体存在性检查或协议表检查的形式。任何此类安排都比另一种选择更有效:通过使用 JtaTransactionManager
配置您的 DefaultMessageListenerContainer
来将整个处理包装在 XA 事务中,以涵盖 JMS 消息的接收以及消息监听器中的业务逻辑执行(包括数据库操作等)。
默认的 AUTO_ACKNOWLEDGE
模式不提供适当的可靠性保证。当监听器执行失败时,消息可能会丢失(因为提供者在监听器调用后自动确认每条消息,不会将异常传播给提供者),或者当监听器容器关闭时(您可以通过设置 acceptMessagesWhileStopping
标志进行配置)。如果需要可靠性,请确保使用事务会话(例如,用于可靠的队列处理和持久的主题订阅)。
事务管理
Spring 提供了一个 JmsTransactionManager
,用于管理单个 JMS ConnectionFactory
的事务。这使得 JMS 应用程序能够利用 Spring 的托管事务功能,具体描述请参见数据访问章节的事务管理部分。JmsTransactionManager
执行本地资源事务,将指定 ConnectionFactory
的 JMS Connection/Session 对绑定到线程上。JmsTemplate
会自动检测此类事务性资源并相应地对其进行操作。
在 Jakarta EE 环境中,ConnectionFactory
会对 Connection 和 Session 实例进行池化,因此这些资源可以在事务之间高效重用。在独立环境中,使用 Spring 的 SingleConnectionFactory
会导致共享的 JMS Connection
,每个事务都有自己独立的 Session
。或者,可以考虑使用特定提供者的池化适配器,例如 ActiveMQ 的 PooledConnectionFactory
类。
您还可以将 JmsTemplate
与 JtaTransactionManager
和支持 XA 的 JMS ConnectionFactory
一起使用以执行分布式事务。请注意,这需要使用 JTA 事务管理器以及正确配置 XA 的 ConnectionFactory。(请查看您的 Jakarta EE 服务器或 JMS 提供程序的文档。)
在使用 JMS API 从 Connection
创建一个 Session
时,在托管和非托管事务环境中重用代码可能会令人困惑。这是因为 JMS API 只有一个工厂方法来创建 Session
,并且它需要事务和确认模式的值。在托管环境中,设置这些值是环境事务基础设施的责任,因此这些值会被供应商对 JMS Connection 的包装器忽略。当你在非托管环境中使用 JmsTemplate
时,可以通过属性 sessionTransacted
和 sessionAcknowledgeMode
来指定这些值。当你将 PlatformTransactionManager
与 JmsTemplate
一起使用时,模板总是会被赋予一个事务性的 JMS Session
。