基于注解的监听器端点
接收异步消息的最简单方法是使用带注释的监听器端点基础设施。简而言之,它允许你将托管 bean 的方法公开为 JMS 监听器端点。以下示例展示了如何使用它:
@Component
public class MyService {
@JmsListener(destination = "myDestination")
public void processOrder(String data) { ... }
}
前面示例的思想是,每当 jakarta.jms.Destination
myDestination
上有消息可用时,processOrder
方法就会相应地被调用(在这种情况下,使用 JMS 消息的内容,类似于 MessageListenerAdapter 提供的功能)。
注解的端点基础设施在幕后为每个注解的方法创建一个消息监听器容器,使用 JmsListenerContainerFactory
。这样的容器不会注册到应用程序上下文中,但可以通过使用 JmsListenerEndpointRegistry
bean 轻松找到以进行管理。
@JmsListener
是 Java 8 中的可重复注解,因此你可以通过向同一个方法添加额外的 @JmsListener
声明,将多个 JMS 目标与该方法关联。
启用监听器端点注解
要启用对 @JmsListener
注解的支持,可以在你的一个 @Configuration
类中添加 @EnableJms
,如下例所示:
- Java
- Kotlin
- Xml
@Configuration
@EnableJms
public class JmsConfiguration {
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory,
DestinationResolver destinationResolver) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setDestinationResolver(destinationResolver);
factory.setSessionTransacted(true);
factory.setConcurrency("3-10");
return factory;
}
}
@Configuration
@EnableJms
class JmsConfiguration {
@Bean
fun jmsListenerContainerFactory(connectionFactory: ConnectionFactory, destinationResolver: DestinationResolver) =
DefaultJmsListenerContainerFactory().apply {
setConnectionFactory(connectionFactory)
setDestinationResolver(destinationResolver)
setSessionTransacted(true)
setConcurrency("3-10")
}
}
<jms:annotation-driven/>
<bean id="jmsListenerContainerFactory" class="org.springframework.jms.config.DefaultJmsListenerContainerFactory">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destinationResolver" ref="destinationResolver"/>
<property name="sessionTransacted" value="true"/>
<property name="concurrency" value="3-10"/>
</bean>
默认情况下,基础设施会寻找一个名为 jmsListenerContainerFactory
的 bean 作为用于创建消息监听容器的工厂的来源。在这种情况下(并忽略 JMS 基础设施的设置),你可以使用核心线程池大小为 3 个线程和最大线程池大小为 10 个线程来调用 processOrder
方法。
您可以自定义监听器容器工厂以用于每个注解,或者通过实现 JmsListenerConfigurer
接口来配置一个显式默认值。仅当至少有一个端点在没有特定容器工厂的情况下注册时才需要默认值。有关详细信息和示例,请参阅实现 JmsListenerConfigurer 的类的 javadoc。
编程端点注册
JmsListenerEndpoint
提供了一个 JMS 端点的模型,并负责为该模型配置容器。该基础设施允许您以编程方式配置端点,除了通过 JmsListener
注解检测到的端点。以下示例展示了如何实现这一点:
@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {
@Override
public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
endpoint.setId("myJmsEndpoint");
endpoint.setDestination("anotherQueue");
endpoint.setMessageListener(message -> {
// processing
});
registrar.registerEndpoint(endpoint);
}
}
在前面的示例中,我们使用了 SimpleJmsListenerEndpoint
,它提供了实际要调用的 MessageListener
。然而,你也可以构建自己的端点变体来描述自定义的调用机制。
注意,您可以完全跳过使用 @JmsListener
,并通过 JmsListenerConfigurer
以编程方式仅注册您的端点。
注解的端点方法签名
到目前为止,我们在端点中注入了一个简单的 String
,但实际上它可以有一个非常灵活的方法签名。在下面的例子中,我们重写它以通过自定义头注入 Order
:
@Component
public class MyService {
@JmsListener(destination = "myDestination")
public void processOrder(Order order, @Header("order_type") String orderType) {
...
}
}
您可以在 JMS 监听器端点中注入的主要元素如下:
-
原始的
jakarta.jms.Message
或其任何子类(前提是它与传入消息类型匹配)。 -
jakarta.jms.Session
用于可选访问本地 JMS API(例如,用于发送自定义回复)。 -
表示传入 JMS 消息的
org.springframework.messaging.Message
。请注意,此消息包含自定义和标准头(由JmsHeaders
定义)。 -
使用
@Header
注解的方法参数以提取特定的头值,包括标准的 JMS 头。 -
一个使用
@Headers
注解的参数,也必须可以分配给java.util.Map
以访问所有头信息。 -
一个未注解且不是支持类型之一(
Message
或Session
)的元素被视为负载。您可以通过使用@Payload
注解参数来明确这一点。您还可以通过添加额外的@Valid
来开启验证。
能够注入 Spring 的 Message
抽象特别有用,因为可以利用存储在传输特定消息中的所有信息,而无需依赖传输特定的 API。以下示例展示了如何实现这一点:
@JmsListener(destination = "myDestination")
public void processOrder(Message<Order> order) { ... }
方法参数的处理由 DefaultMessageHandlerMethodFactory
提供,您可以进一步自定义以支持其他方法参数。您也可以在此处自定义转换和验证支持。
例如,如果我们想在处理 Order
之前确保其有效,我们可以使用 @Valid
注解有效载荷,并配置必要的验证器,如下例所示:
@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {
@Override
public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(myJmsHandlerMethodFactory());
}
@Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setValidator(myValidator());
return factory;
}
}
响应管理
MessageListenerAdapter 中已有的支持已经允许你的方法具有非 void
的返回类型。在这种情况下,调用的结果会被封装在一个 jakarta.jms.Message
中,并发送到原始消息的 JMSReplyTo
头中指定的目标,或者发送到监听器上配置的默认目标。现在,你可以通过消息抽象的 @SendTo
注解来设置该默认目标。
假设我们的 processOrder
方法现在应该返回一个 OrderStatus
,我们可以编写它以自动发送响应,如以下示例所示:
@JmsListener(destination = "myDestination")
@SendTo("status")
public OrderStatus processOrder(Order order) {
// order processing
return status;
}
如果你有多个用 @JmsListener
注解的方法,你也可以在类级别放置 @SendTo
注解,以共享一个默认的回复目标。
如果您需要以与传输无关的方式设置其他头信息,您可以返回一个 Message
,方法类似于以下内容:
@JmsListener(destination = "myDestination")
@SendTo("status")
public Message<OrderStatus> processOrder(Order order) {
// order processing
return MessageBuilder
.withPayload(status)
.setHeader("code", 1234)
.build();
}
如果您需要在运行时计算响应目标,您可以将响应封装在一个 JmsResponse
实例中,该实例还提供了在运行时使用的目标。我们可以将之前的示例重写如下:
@JmsListener(destination = "myDestination")
public JmsResponse<Message<OrderStatus>> processOrder(Order order) {
// order processing
Message<OrderStatus> response = MessageBuilder
.withPayload(status)
.setHeader("code", 1234)
.build();
return JmsResponse.forQueue(response, "status");
}
最后,如果您需要为响应指定一些 QoS 值,例如优先级或生存时间,您可以相应地配置 JmsListenerContainerFactory
,如下例所示:
@Configuration
@EnableJms
public class AppConfig {
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
QosSettings replyQosSettings = new QosSettings();
replyQosSettings.setPriority(2);
replyQosSettings.setTimeToLive(10000);
factory.setReplyQosSettings(replyQosSettings);
return factory;
}
}