使用Spring JMS
本节描述了如何使用Spring的JMS组件。
JmsTemplate 和 JmsClient
JmsTemplate 类是 JMS 核心包中的核心类。它简化了 JMS 的使用,因为它在发送或同步接收消息时负责资源的创建和释放。
JmsClient是Spring Framework 7.0中的一种新API变体,它沿用了JdbcClient等的设计思路。JmsClient基于JmsTemplate构建,能够实现直接的发送和接收操作,并且每个操作都提供了自定义选项。
使用 JmsTemplate
使用JmsTemplate的代码只需要实现提供明确定义的高级接口的回调函数。MessageCreator回调接口在接收到JmsTemplate中调用代码提供的Session时创建消息。为了允许更复杂地使用JMS API,SessionCallback提供了JMS会话,而ProducerCallback则暴露了Session和MessageProducer的组合。
JMS API提供了两种类型的发送方法:一种方法将传递模式(delivery mode)、优先级(priority)和生存时间(time-to-live)作为服务质量(QOS)参数;另一种方法不使用QOS参数,而是使用默认值。由于JmsTemplate拥有许多发送方法,为了避免发送方法数量的重复,QOS参数的设置被作为bean属性来提供。同样地,同步接收调用的超时值也是通过使用setReceiveTimeout属性来设置的。
一些JMS提供者允许通过配置ConnectionFactory来行政性地设置默认的QOS值。这样,调用MessageProducer实例的send方法(send(Destination destination, Message message))时,使用的QOS默认值就会与JMS规范中指定的值不同。因此,为了对QOS值进行一致的管理,必须明确启用JmsTemplate使其使用自己的QOS值,方法是将布尔属性isExplicitQosEnabled设置为true。
为了方便起见,JmsTemplate还提供了一个基本的请求-回复操作,该操作允许发送消息并等待在操作过程中创建的临时队列中收到的回复。
一旦配置完成,JmsTemplate 类的实例就是线程安全的。这一点很重要,因为这意味着你可以配置一个 JmsTemplate 的单一样本,然后安全地将这个共享引用注入到多个协作者中。需要明确的是,JmsTemplate 是有状态的,因为它维护着一个对 ConnectionFactory 的引用,但这种状态并不是会话状态。
使用 JmsClient
从Spring Framework 4.1开始,JmsMessagingTemplate是在JmsTemplate的基础上构建的,它提供了与Spring通用消息抽象的集成——也就是说,用于处理发送和接收org.springframework.messaging.Message,抛出org.springframeworkessaging.MessagingException,并且数据载体的转换通过org.springframeworkMessaging.converter.MessageConverter来完成(有许多常见的转换器实现可供使用)。
从Spring Framework 7.0开始,提供了一个名为JmsClient的流畅API。该API提供了围绕org.springframeworkessaging.Message的可定制操作,并会抛出org.springframework.messaging.MessagingException异常,类似于JmsMessagingTemplate的功能,同时还能与org.springframework messaging converter.MessageConverter集成。JmsClient可以为给定的ConnectionFactory创建,也可以为给定的JmsTemplate创建;在后一种情况下,默认会重用后者的设置。有关使用示例,请参阅JmsClient。
连接
JmsTemplate 需要一个对 ConnectionFactory 的引用。ConnectionFactory 是 JMS 规范的一部分,作为与 JMS 交互的入口点。它被客户端应用程序用作工厂来创建与 JMS 提供者的连接,并封装了各种配置参数,其中许多参数是特定于供应商的,例如 SSL 配置选项。
在EJB中使用JMS时,供应商会提供JMS接口的实现,以便这些接口能够参与声明式事务管理,并执行连接和会话的池化。为了使用这种实现,Jakarta EE容器通常要求你在EJB或servlet部署描述符中声明一个JMS连接工厂作为resource-ref。为了确保在EJB中使用JmsTemplate时能够利用这些功能,客户端应用程序应确保它引用了ConnectionFactory的托管实现。
缓存消息资源
标准API涉及创建许多中间对象。要发送一条消息,需要执行以下“API”流程:
ConnectionFactory->Connection->Session->MessageProducer->send
在ConnectionFactory和Send操作之间,会创建并销毁三个中间对象。为了优化资源使用和提高性能,Spring提供了两种ConnectionFactory的实现。
使用 SingleConnectionFactory
Spring提供了ConnectionFactory接口的一个实现类SingleConnectionFactory,该类在所有createConnection()调用中返回相同的Connection,并忽略对close()的调用。这对于测试和独立环境非常有用,因为可以在多个可能涉及任意数量事务的JmsTemplate调用中使用同一个连接。SingleConnectionFactory接受一个标准ConnectionFactory的引用,这个引用通常来自JNDI。
使用 CachingConnectionFactory
CachingConnectionFactory 扩展了 SingleConnectionFactory 的功能,并添加了对 Session、MessageProducer 和 MessageConsumer 实例的缓存机制。初始缓存大小设置为 1。您可以使用 sessionCacheSize 属性来增加缓存的会话数量。请注意,实际缓存的会话数量可能会超过这个数值,因为会话的缓存是根据其确认模式(acknowledgment mode)来决定的;因此,当 sessionCacheSize 被设置为 1 时,最多可以缓存四个会话实例(每种确认模式一个)。MessageProducer 和 MessageConsumer 实例会在其所属的会话中缓存,同时在缓存过程中还会考虑生产者(producers)和消费者(consumers)的独特属性。MessageProducer 的缓存是基于其目标(destination)来进行的;而 MessageConsumer 的缓存则基于一个键来决定,该键由目标地址、选择器(selector)、noLocal 交付标志(delivery flag)以及持久订阅名称(durable subscription name,如果创建的是持久订阅消费者的话)组成。
临时队列和主题(TemporaryQueue/TemporaryTopic)的消息生产者(MessageProducer)和消息消费者(MessageConsumer)永远不会被缓存。不幸的是,WebLogic JMS在实现其常规目的地接口时,错误地表明这些临时队列/主题的目的地都不能被缓存。请在WebLogic上使用不同的连接池或缓存机制,或者为WebLogic自定义CachingConnectionFactory。
目的地管理
目的地(Destinations)作为ConnectionFactory的实例,是JMS管理的对象,可以存储在JNDI中并从中检索。在配置Spring应用程序上下文时,可以使用JNDI的JndiObjectFactoryBean工厂类或<jee:jndi-lookup>来实现对JMS目的地引用对象的依赖注入。然而,如果应用程序中存在大量目的地,或者JMS提供者具有独特的先进目的地管理功能,这种策略往往会变得繁琐。这类先进的目的地管理功能包括创建动态目的地(dynamic destinations)或支持目的地的层次命名空间(hierarchical namespace of destinations)。JmsTemplate将目的地名称的解析工作委托给实现DestinationResolver接口的JMS目的地对象。DynamicDestinationResolver是JmsTemplate使用的默认实现,能够处理动态目的地的解析。此外,还提供了JndiDestinationResolver作为JNDI中包含的目的地的服务定位器(service locator),并在必要时回退到DynamicDestinationResolver的行为。
在JMS应用程序中,目标(destinations)的设置往往只有在运行时才可知,因此,在部署应用程序时无法预先进行管理性创建。这通常是因为相互作用的系统组件之间存在共享的应用逻辑,这些组件会根据既定的命名规范在运行时动态创建目标。尽管动态目标的创建并不属于JMS规范的强制要求,但大多数供应商都提供了这一功能。动态目标会使用用户定义的名称来标识,这与临时目标有所不同,并且通常不会被注册到JNDI中。用于创建动态目标的API因供应商而异,因为与目标相关的属性是特定于每个供应商的。然而,有些供应商会采取一种简单的实现方式,即忽略JMS规范中的警告,直接使用TopicSession的createTopic(String topicName)方法或QueueSession的createQueue(String queueName)方法来创建具有默认属性的新目标。根据不同的供应商实现,DynamicDestinationResolver也可能同时创建一个物理目标(physical destination),而不仅仅是进行解析操作。
布尔属性pubSubDomain用于配置JmsTemplate,以便知道正在使用哪个JMS域。默认情况下,该属性的值为false,表示将使用点对点域Queues。这个属性(由JmsTemplate使用)通过实现DestinationResolver接口来确定动态目标解析的行为。
您还可以通过属性 defaultDestination 为 JmsTemplate 配置一个默认目标。当发送和接收操作不指向特定目标时,就会使用这个默认目标。
消息监听器容器
在EJB世界中,JMS消息最常见的用途之一是驱动基于消息的Bean(MDB)。Spring提供了一种解决方案,可以创建基于消息的POJO(MDP),而不会将用户绑定到EJB容器上。(有关Spring对MDP支持的详细介绍,请参阅异步接收:基于消息的POJO。)端点方法可以使用@JmsListener进行注解——更多细节请参见基于注解的监听器端点。
消息监听器容器(Message Listener Container)用于从JMS消息队列接收消息,并驱动其中注入的MessageListener。该监听器容器负责整个消息接收与分发的线程处理过程。它作为MDP(Message Delivery Provider)与消息提供者(messaging provider)之间的中介,负责注册以接收消息、参与事务处理、资源获取与释放、异常转换等操作。这样,你就可以专注于编写与接收消息相关的(可能较为复杂的)业务逻辑(以及可能的响应逻辑),而将繁琐的JMS基础设施相关任务委托给框架来处理。
Spring附带了两个标准的JMS消息监听器容器,每个容器都有其特定的功能集。
使用 SimpleMessageListenerContainer
这个消息监听器容器是两种标准类型中较为简单的一种。它在启动时会创建固定数量的JMS会话和消费者,通过标准的JMS MessageConsumer setMessageListener() 方法注册监听器,然后将监听器的回调执行任务留给JMS提供者来完成。这种变体不允许根据运行时的需求进行动态调整,也不支持参与外部管理的事务。在兼容性方面,它非常接近于独立的JMS规范,但通常与Jakarta EE的JMS限制不兼容。
虽然 SimpleMessageListenerContainer 不支持参与外部管理的事务,但它确实支持原生的 JMS 事务。要启用此功能,您可以将 sessionTransacted 标志设置为 true,或者在 XML 命名空间中将 acknowledge 属性设置为 transacted。如果您的监听器抛出异常,则会导致事务回滚,并且消息会重新发送。或者,您可以考虑使用 CLIENT_ACKNOWLEDGE 模式,该模式在发生异常时也会重新发送消息,但不使用事务化的 Session 实例,因此,在事务协议中不包含任何其他 Session 操作(例如发送响应消息)。
默认的 AUTO_ACKNOWLEDGE 模式不能提供适当的可靠性保证。当监听器执行失败时(因为提供者会在每次调用监听器后自动确认消息,而不会将任何异常传播给提供者),或者当监听器容器关闭时(你可以通过设置 acceptMessagesWhileStopping 标志来配置这一点),消息可能会丢失。在需要可靠性的情况下(例如,为了可靠地处理队列和持久化主题订阅),请确保使用事务会话。
使用 DefaultMessageListenerContainer
这种消息监听器容器在大多数情况下都会被使用。与SimpleMessageListenerContainer相比,这种容器变体能够动态适应运行时的需求,并且能够参与由外部管理的事务。当配置了JtaTransactionManager时,每条接收到的消息都会被注册到一个XA事务中。因此,处理过程可以利用XA事务的语义。这种监听器容器在对JMS提供者的要求较低、功能先进(如能够参与外部管理的事务)以及与Jakarta EE环境的兼容性之间取得了良好的平衡。
您可以自定义容器的缓存级别。请注意,当未启用缓存时,每次接收消息都会创建一个新的连接和新的会话。如果这种情况与高负载的非持久性订阅结合在一起,可能会导致消息丢失。在这种情况下,请确保使用适当的缓存级别。
当代理服务器发生故障时,该容器也具备恢复能力。默认情况下,一个简单的BackOff实现会每五秒重试一次。你可以指定一个自定义的BackOff实现来获得更精细的恢复选项。示例请参见ExponentialBackOff。
与它的兄弟类(SimpleMessageListenerContainer)一样,DefaultMessageListenerContainer支持原生JMS事务,并允许自定义确认模式。如果您的场景适用,强烈建议使用这种方法,而不是外部管理的事务——也就是说,如果您能接受在JVM崩溃时偶尔出现重复消息的情况。您可以在业务逻辑中添加自定义的重复消息检测步骤来处理这些情况,例如通过业务实体存在检查或协议表检查等方式。任何这样的安排都比另一种方式(即使用JtaTransactionManager配置DefaultMessageListenerContainer,将整个处理过程包装在一个XA事务中,以覆盖JMS消息的接收以及消息监听器中的业务逻辑执行(包括数据库操作等))要高效得多。
默认的AUTO_ACKNOWLEDGE模式无法提供适当的可靠性保证。当监听器执行失败时(因为提供者在调用监听器后会自动确认每条消息,且不会将任何异常传播给提供者),或者当监听器容器关闭时(你可以通过设置acceptMessagesWhileStopping标志来配置这一点),消息可能会丢失。在需要可靠性的情况下,请确保使用事务会话(例如,用于可靠的队列处理和持久的主题订阅)。
事务管理
Spring提供了一个JmsTransactionManager,用于管理单个JMS ConnectionFactory的事务。这使得JMS应用程序能够利用Spring的事务管理功能,如数据访问章节中的“事务管理”部分所描述的那样。JmsTransactionManager执行本地资源事务,将来自指定ConnectionFactory的JMS Connection/Session对绑定到线程上。JmsTemplate会自动检测这些事务性资源,并据此对其进行操作。
在Jakarta EE环境中,ConnectionFactory会池化Connection和Session实例,因此这些资源可以在不同的事务之间被高效地重用。在独立(standalone)环境中,使用Spring的SingleConnectionFactory会导致一个共享的JMS Connection,每个事务都有自己独立的Session。或者,可以考虑使用特定提供者的池化适配器,例如ActiveMQ的PooledConnectionFactory类。
您还可以使用JmsTemplate与JtaTransactionManager以及支持XA的JMS ConnectionFactory来执行分布式事务。请注意,这需要使用JTA事务管理器以及正确配置了XA的ConnectionFactory。(请查阅您的Jakarta EE服务器或JMS提供者的文档。)
在使用JMS API从Connection创建Session时,在托管环境和非托管环境中重用代码可能会引起混淆。这是因为JMS API只有一个用于创建Session的工厂方法,而且该方法需要交易(transaction)和确认模式(acknowledgment mode)的参数值。在托管环境中,设置这些参数值是环境事务基础设施(transactional infrastructure)的职责,因此供应商提供的JMS Connection封装类会忽略这些参数值。当在非托管环境中使用JmsTemplate时,可以通过sessionTransacted和sessionAcknowledgeMode属性来指定这些参数值。而当使用PlatformTransactionManager与JmsTemplate结合时,JmsTemplate总是会获得一个支持事务处理的JMS Session。