跳到主要内容
版本:7.0.3

接收消息

Hunyuan 7b 中英对照 Receiving a Message

这描述了如何使用Spring中的JMS接收消息。

同步接收

虽然JMS通常与异步处理相关联,但您也可以同步消费消息。JmsTemplateJmsClient上的receive(..)方法提供了这种功能。在同步接收过程中,调用线程会一直阻塞,直到有消息可用为止。这可能是一个危险的操作,因为调用线程可能会无限期地被阻塞。receiveTimeout属性指定了接收器在放弃等待消息之前应等待的时间。

异步接收:消息驱动的POJO

备注

Spring还支持通过使用@JmsListener注解来配置带注解的监听器端点,并提供了开放的基础设施以便程序化地注册这些端点。这是迄今为止设置异步接收器最便捷的方法。有关更多详细信息,请参阅启用监听器端点注解

类似于EJB世界中的消息驱动Bean(MDB),消息驱动的POJO(MDP)充当JMS消息的接收器。MDP的一个限制是它必须实现jakarta.jms.MessageListener接口(但请参阅使用MessageListenerAdapter)。需要注意的是,如果您的POJO在多个线程上接收消息,那么确保其实现是线程安全的就非常重要。

以下示例展示了一个MDP的简单实现:

public class ExampleListener implements MessageListener {

public void onMessage(Message message) {
if (message instanceof TextMessage textMessage) {
try {
System.out.println(textMessage.getText());
}
catch (JMSException ex) {
throw new RuntimeException(ex);
}
}
else {
throw new IllegalArgumentException("Message must be of type TextMessage");
}
}
}

一旦你实现了MessageListener,就该创建一个消息监听器容器了。

以下示例展示了如何定义和配置Spring自带的消息监听器容器之一(本例中为DefaultMessageListenerContainer):

@Bean
ExampleListener messageListener() {
return new ExampleListener();
}

@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {

DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
return jmsContainer;
}

请参阅各种消息监听器容器的Spring Javadoc(这些容器都实现了MessageListenerContainer),以获取每种实现所支持功能的完整描述。

使用 SessionAwareMessageListener 接口

SessionAwareMessageListener接口是Spring特有的接口,它提供了与JMS的MessageListener接口类似的契约,但同时让消息处理方法能够访问接收消息时所使用的JMS会话(Session)。以下列出了SessionAwareMessageListener接口的定义:

package org.springframework.jms.listener;

public interface SessionAwareMessageListener {

void onMessage(Message message, Session session) throws JMSException;
}

如果你希望MDP能够对收到的任何消息作出响应(通过使用onMessage(Message, Session)方法中提供的Session),你可以选择让MDP实现这个接口(而不是标准的JMS MessageListener接口)。Spring附带的所有消息监听器容器实现都支持实现MessageListenerSessionAwareMessageListener接口的MDP。实现SessionAwareMessageListener的类有一个限制,即它们会通过该接口与Spring紧密关联。是否使用它完全由你作为应用程序开发者或架构师来决定。

请注意,SessionAwareMessageListener接口的onMessage(..)方法会抛出JMSException。与标准的JMS MessageListener接口不同,使用SessionAwareMessageListener接口时,处理任何抛出的异常是客户端代码的责任。

使用 MessageListenerAdapter

MessageListenerAdapter 类是 Spring 异步消息支持中的最后一个组成部分。简而言之,它允许你将几乎任何类暴露为 MDP(尽管存在一些限制)。

考虑以下接口定义:

public interface MessageDelegate {

void handleMessage(String message);

void handleMessage(Map message);

void handleMessage(byte[] message);

void handleMessage(Serializable message);
}

请注意,尽管该接口既不继承MessageListener接口,也不继承SessionAwareMessageListener接口,但您仍然可以使用MessageListenerAdapter类将其作为MDP(Message Dispatcher Pattern)来使用。同时,请注意各种消息处理方法都是根据它们能够接收和处理的不同Message类型的属性来进行强类型化的。

现在考虑以下MessageDelegate接口的实现:

public class DefaultMessageDelegate implements MessageDelegate {

@Override
public void handleMessage(String message) {
// ...
}

@Override
public void handleMessage(Map message) {
// ...
}

@Override
public void handleMessage(byte[] message) {
// ...
}

@Override
public void handleMessage(Serializable message) {
// ...
}
}

特别是,请注意前面的MessageDelegate接口实现(DefaultMessageDelegate类)完全没有JMS依赖。它确实是一个普通的Java对象(POJO),我们可以通过以下配置将其转换为MDP:

@Bean
MessageListenerAdapter messageListener(DefaultMessageDelegate messageDelegate) {
return new MessageListenerAdapter(messageDelegate);
}

@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {

DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
return jmsContainer;
}

下一个示例展示了另一个只能接收JMS TextMessage消息的MDP(马尔可夫决策过程)。请注意,消息处理方法的名称实际上为receive(在MessageListenerAdapter中,消息处理方法的默认名称是handleMessage),但该方法是可配置的(正如您在本节后面会看到的)。同时请注意,receive(..)方法被严格限定为只接收并响应JMS TextMessage消息。以下代码片段展示了TextMessageDelegate接口的定义:

public interface TextMessageDelegate {

void receive(TextMessage message);
}

以下列表展示了一个实现了TextMessageDelegate接口的类:

public class DefaultTextMessageDelegate implements TextMessageDelegate {

@Override
public void receive(TextMessage message) {
// ...
}
}

那么,MessageListenerAdapter的配置将如下所示:

@Bean
MessageListenerAdapter messageListener(DefaultTextMessageDelegate messageDelegate) {
MessageListenerAdapter messageListener = new MessageListenerAdapter(messageDelegate);
messageListener.setDefaultListenerMethod("receive");
// We don't want automatic message context extraction
messageListener.setMessageConverter(null);
return messageListener;
}

请注意,如果messageListener接收到的JMSMessage类型不是TextMessage,则会抛出IllegalStateException(随后该异常会被忽略)。MessageListenerAdapter类的另一个功能是,如果处理方法返回非void类型的值,它能够自动发送回一个响应Message。请考虑以下接口和类:

public interface ResponsiveTextMessageDelegate {

// Notice the return type...
String receive(TextMessage message);
}
public class DefaultResponsiveTextMessageDelegate implements ResponsiveTextMessageDelegate {

@Override
public String receive(TextMessage message) {
return "message";
}
}

如果你将DefaultResponsiveTextMessageDelegateMessageListenerAdapter一起使用,那么在执行'receive(..)'方法时返回的任何非空值(在默认配置下)都会被转换为TextMessage。然后,这个转换后的TextMessage会被发送到原始Message的JMS Reply-To属性中定义的Destination(如果存在的话),或者发送到在MessageListenerAdapter上设置的默认Destination(如果已经配置了的话)。如果没有找到Destination,则会抛出InvalidDestinationException(注意,这个异常不会被吞没,而是会沿着调用栈向上传播)。

在事务中处理消息

在事务中调用消息监听器只需要重新配置监听器容器即可。

你可以通过监听器容器定义中的sessionTransacted标志来激活本地资源事务。每次调用消息监听器时,都会在活跃的JMS事务中执行;如果监听器执行失败,则消息接收操作会被回滚。发送响应消息(通过SessionAwareMessageListener)属于同一本地事务的一部分,但任何其他资源操作(如数据库访问)则是独立执行的。这通常需要在监听器实现中进行重复消息检测,以便处理数据库处理已经完成但消息处理未能成功提交的情况。

考虑以下bean定义:

@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {

DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
jmsContainer.setSessionTransacted(true);
return jmsContainer;
}

要参与外部管理的事务,您需要配置一个事务管理器,并使用支持外部管理事务的监听器容器(通常是DefaultMessageListenerContainer)。

要配置用于XA事务参与的消息监听器容器,您需要配置一个JtaTransactionManager(默认情况下,它会委托给Jakarta EE服务器的事务子系统)。请注意,底层的JMSConnectionFactory需要支持XA功能,并且必须正确地注册到您的JTA事务协调器中。(请检查您的Jakarta EE服务器的JNDI资源配置。)这样,消息接收以及(例如)数据库访问就可以成为同一事务的一部分(具有统一的事务提交语义,但会带来XA事务日志的开销)。

以下bean定义创建了一个事务管理器:

@Bean
JtaTransactionManager transactionManager() {
return new JtaTransactionManager();
}

然后我们需要将其添加到我们之前的容器配置中。容器的其余部分会自动处理。以下示例展示了如何操作:

@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {

DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
jmsContainer.setSessionTransacted(true);
return jmsContainer;
}