跳到主要内容

示例应用

DeepSeek V3 中英对照 Sample Applications

Spring AMQP 示例 项目包含两个示例应用程序。第一个是一个简单的“Hello World”示例,展示了同步和异步消息接收。它为理解基本组件提供了一个极佳的起点。第二个示例基于股票交易的用例,展示了在现实世界应用程序中常见的交互类型。在本章中,我们将快速浏览每个示例,以便您能够专注于最重要的组件。这些示例都是基于 Maven 的,因此您应该能够将它们直接导入任何支持 Maven 的 IDE 中(例如 SpringSource Tool Suite)。

“Hello World”示例

“Hello World”示例展示了同步和异步消息接收。你可以将 spring-rabbit-helloworld 示例导入到 IDE 中,然后按照下面的讨论进行操作。

同步示例

src/main/java 目录中,导航到 org.springframework.amqp.helloworld 包。打开 HelloWorldConfiguration 类,注意到它在类级别上包含了 @Configuration 注解,并且在方法级别上包含了一些 @Bean 注解。这是 Spring 基于 Java 的配置示例。你可以在这里了解更多关于它的信息 这里

以下代码展示了如何创建连接工厂:

@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
java

配置中还包含一个 RabbitAdmin 实例,默认情况下,它会查找任何类型为 exchange、queue 或 binding 的 bean,然后在 broker 上声明它们。实际上,在 HelloWorldConfiguration 中生成的 helloWorldQueue bean 就是一个例子,因为它是一个 Queue 的实例。

以下列表展示了 helloWorldQueue bean 的定义:

@Bean
public Queue helloWorldQueue() {
return new Queue(this.helloWorldQueueName);
}
java

回顾 rabbitTemplate bean 的配置,你可以看到它的 queue 属性(用于接收消息)和 routingKey 属性(用于发送消息)都设置为 helloWorldQueue

在探索了配置之后,我们可以看看实际使用这些组件的代码。首先,打开同一包中的 Producer 类。它包含一个 main() 方法,其中创建了 Spring 的 ApplicationContext

以下代码展示了 main 方法:

public static void main(String[] args) {
ApplicationContext context =
new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
amqpTemplate.convertAndSend("Hello World");
System.out.println("Sent: Hello World");
}
java

在前面的示例中,AmqpTemplate bean 被检索并用于发送 Message。由于客户端代码应尽可能依赖接口,因此类型为 AmqpTemplate 而不是 RabbitTemplate。尽管在 HelloWorldConfiguration 中创建的 bean 是 RabbitTemplate 的实例,但依赖接口意味着这段代码更具可移植性(你可以独立于代码更改配置)。由于调用了 convertAndSend() 方法,模板将其委托给其 MessageConverter 实例。在这种情况下,它使用了默认的 SimpleMessageConverter,但可以为 rabbitTemplate bean 提供不同的实现,如 HelloWorldConfiguration 中所定义的那样。

现在打开 Consumer 类。它实际上共享相同的配置基类,这意味着它共享 rabbitTemplate bean。这就是为什么我们为该模板配置了 routingKey(用于发送)和 queue(用于接收)。正如我们在 AmqpTemplate 中描述的那样,你也可以将 routingKey 参数传递给发送方法,将 queue 参数传递给接收方法。Consumer 代码基本上是 Producer 的镜像,调用的是 receiveAndConvert() 而不是 convertAndSend()

以下清单展示了 Consumer 的主方法:

public static void main(String[] args) {
ApplicationContext context =
new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
System.out.println("Received: " + amqpTemplate.receiveAndConvert());
}
java

如果你运行 Producer,然后运行 Consumer,你应该在控制台输出中看到 Received: Hello World

异步示例

同步示例 讲解了同步的 Hello World 示例。本节将介绍一个稍微复杂但功能更强大的选项。通过一些修改,Hello World 示例可以展示异步接收,也称为消息驱动的 POJO(Plain Old Java Object)。实际上,有一个子包正好提供了这个功能:org.springframework.amqp.samples.helloworld.async

再次,我们从发送端开始。打开 ProducerConfiguration 类,注意到它创建了一个 connectionFactory 和一个 rabbitTemplate bean。这一次,由于配置专门用于消息发送端,我们甚至不需要任何队列定义,而 RabbitTemplate 只设置了 'routingKey' 属性。回想一下,消息是发送到交换机而不是直接发送到队列的。AMQP 的默认交换机是一个没有名称的直接交换机。所有队列都绑定到该默认交换机,并使用它们的名称作为路由键。这就是为什么我们在这里只需要提供路由键的原因。

以下清单显示了 rabbitTemplate 的定义:

public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setRoutingKey(this.helloWorldQueueName);
return template;
}
java

由于此示例演示了异步消息接收,生产端被设计为持续发送消息(如果它像同步版本那样是每次执行发送一条消息的模型,那么它实际上是一个消息驱动的消费者这一点就不会那么明显)。负责持续发送消息的组件被定义为 ProducerConfiguration 中的一个内部类。它被配置为每三秒运行一次。

以下清单展示了该组件:

static class ScheduledProducer {

@Autowired
private volatile RabbitTemplate rabbitTemplate;

private final AtomicInteger counter = new AtomicInteger();

@Scheduled(fixedRate = 3000)
public void sendMessage() {
rabbitTemplate.convertAndSend("Hello World " + counter.incrementAndGet());
}
}
java

你不需要理解所有的细节,因为真正的重点应该放在接收端(我们接下来会介绍)。然而,如果你还不熟悉 Spring 的任务调度支持,你可以在这里了解更多。简单来说,ProducerConfiguration 中的 postProcessor bean 会向调度器注册任务。

现在我们可以转向接收端。为了强调消息驱动的 POJO 行为,我们从对消息做出反应的组件开始。这个类名为 HelloWorldHandler,如下所示:

public class HelloWorldHandler {

public void handleMessage(String text) {
System.out.println("Received: " + text);
}

}
java

该类是一个 POJO。它没有继承任何基类,也没有实现任何接口,甚至不包含任何导入。它通过 Spring AMQP 的 MessageListenerAdapter 被“适配”到 MessageListener 接口。然后,您可以在 SimpleMessageListenerContainer 上配置该适配器。在本示例中,容器是在 ConsumerConfiguration 类中创建的。您可以在那里看到被包装在适配器中的 POJO。

以下代码展示了如何定义 listenerContainer

@Bean
public SimpleMessageListenerContainer listenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueueName(this.helloWorldQueueName);
container.setMessageListener(new MessageListenerAdapter(new HelloWorldHandler()));
return container;
}
java

SimpleMessageListenerContainer 是一个 Spring 生命周期组件,默认情况下会自动启动。如果你查看 Consumer 类,可以看到它的 main() 方法仅包含一行用于创建 ApplicationContext 的引导代码。同样,Producermain() 方法也是一行引导代码,因为带有 @Scheduled 注解的方法的组件也会自动启动。你可以以任何顺序启动 ProducerConsumer,并且应该会看到每三秒钟发送和接收的消息。

股票交易

股票交易示例展示了比 Hello World 示例 更高级的消息传递场景。然而,配置非常相似,只是稍微复杂一些。由于我们已经详细介绍了 Hello World 的配置,这里我们重点讨论这个示例的不同之处。有一个服务器将市场数据(股票报价)推送到一个主题交换器(topic exchange)。然后,客户端可以通过绑定一个队列并使用路由模式(例如 app.stock.quotes.nasdaq.*)订阅市场数据流。这个演示的另一个主要特性是由客户端发起并由服务器处理的请求-响应“股票交易”交互。这涉及到一个私有的 replyTo 队列,该队列由客户端在订单请求消息本身中发送。

服务器的核心配置位于 org.springframework.amqp.rabbit.stocks.config.server 包中的 RabbitServerConfiguration 类。它继承了 AbstractStockAppRabbitConfiguration。在这里定义了服务器和客户端共用的资源,包括市场数据主题交换机(名称为 app.stock.marketdata)以及服务器为股票交易暴露的队列(名称为 app.stock.request)。在这个通用的配置文件中,你还可以看到在 RabbitTemplate 上配置了一个 Jackson2JsonMessageConverter

服务器特定的配置由两部分组成。首先,它在 RabbitTemplate 上配置了市场数据交换,这样就不需要在每次发送 Message 时提供交换名称。这是在基础配置类中定义的抽象回调方法中完成的。以下代码展示了该方法:

public void configureRabbitTemplate(RabbitTemplate rabbitTemplate) {
rabbitTemplate.setExchange(MARKET_DATA_EXCHANGE_NAME);
}
java

其次,声明了库存请求队列。在这种情况下,它不需要任何显式的绑定,因为它绑定到默认的无名交换器,并使用其自身名称作为路由键。正如前面提到的,AMQP 规范定义了这种行为。以下清单展示了 stockRequestQueue bean 的定义:

@Bean
public Queue stockRequestQueue() {
return new Queue(STOCK_REQUEST_QUEUE_NAME);
}
java

现在你已经看到了服务器 AMQP 资源的配置,接下来请导航到 src/test/java 目录下的 org.springframework.amqp.rabbit.stocks 包。在那里,你可以看到实际的 Server 类,它提供了一个 main() 方法。该类基于 server-bootstrap.xml 配置文件创建了一个 ApplicationContext。在那里,你可以看到用于发布模拟市场数据的定时任务。该配置依赖于 Spring 的 task 命名空间支持。引导配置文件还导入了一些其他文件。最有趣的是位于 src/main/resources 目录下的 server-messaging.xml 文件。在那里,你可以看到负责处理股票交易请求的 messageListenerContainer bean。最后,看一下 server-handlers.xml 文件中定义的 serverHandler bean(该文件也在 src/main/resources 目录下)。该 bean 是 ServerHandler 类的一个实例,是一个很好的消息驱动 POJO 示例,它还可以发送回复消息。请注意,它本身并没有与框架或任何 AMQP 概念耦合。它接受一个 TradeRequest 并返回一个 TradeResponse。以下代码清单展示了 handleMessage 方法的定义:

public TradeResponse handleMessage(TradeRequest tradeRequest) { ...
}
java

现在我们已经了解了服务器端最重要的配置和代码,接下来可以转向客户端。最好的起点可能是 RabbitClientConfiguration,它位于 org.springframework.amqp.rabbit.stocks.config.client 包中。请注意,它声明了两个队列,但没有提供显式的名称。以下清单展示了这两个队列的 bean 定义:

@Bean
public Queue marketDataQueue() {
return amqpAdmin().declareQueue();
}

@Bean
public Queue traderJoeQueue() {
return amqpAdmin().declareQueue();
}
java

这些是私有队列,且会自动生成唯一的名称。第一个生成的队列被客户端用来绑定到服务器暴露的市场数据交换器。回想一下,在 AMQP 中,消费者与队列交互,而生产者则与交换器交互。将队列绑定到交换器的操作是告诉代理将来自特定交换器的消息传递(或路由)到队列。由于市场数据交换器是一个主题交换器,绑定可以用路由模式来表达。RabbitClientConfiguration 通过 Binding 对象来实现这一点,而该对象是通过 BindingBuilder 的流式 API 生成的。以下代码展示了 Binding 对象:

@Bean
public Binding marketDataBinding() {
return BindingBuilder.bind(marketDataQueue())
.to(marketDataExchange())
.with("market.data.*");
}
java
@Value("${stocks.quote.pattern}")
private String marketDataRoutingKey;

@Bean
public Binding marketDataBinding() {
return BindingBuilder.bind(
marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
}
java

注意到实际值已经被外部化到一个属性文件中(位于 src/main/resources 下的 client.properties),并且我们使用了 Spring 的 @Value 注解来注入该值。这通常是一个好主意。否则,该值将被硬编码在类中,并且在不重新编译的情况下无法修改。在这种情况下,运行多个版本的客户端并在修改用于绑定的路由模式时变得容易得多。我们现在可以尝试这样做。

首先运行 org.springframework.amqp.rabbit.stocks.Server,然后运行 org.springframework.amqp.rabbit.stocks.Client。你应该会看到 NASDAQ 股票的模拟报价,因为 client.properties 文件中与 'stocks.quote.pattern' 键关联的当前值是 'app.stock.quotes.nasdaq.'。现在,在保持现有的 ServerClient 运行的同时,将该属性值更改为 'app.stock.quotes.nyse.' 并启动第二个 Client 实例。你应该会看到第一个客户端仍然接收 NASDAQ 的报价,而第二个客户端接收 NYSE 的报价。你也可以更改模式以获取所有股票甚至单个股票的报价。

我们探索的最后一个特性是从客户端视角来看的请求-回复交互。回想一下,我们已经见过 ServerHandler,它接收 TradeRequest 对象并返回 TradeResponse 对象。在 Client 端对应的代码是位于 org.springframework.amqp.rabbit.stocks.gateway 包中的 RabbitStockServiceGateway。它委托给 RabbitTemplate 来发送消息。以下清单展示了 send 方法:

public void send(TradeRequest tradeRequest) {
getRabbitTemplate().convertAndSend(tradeRequest, new MessagePostProcessor() {
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setReplyTo(new Address(defaultReplyToQueue));
try {
message.getMessageProperties().setCorrelationId(
UUID.randomUUID().toString().getBytes("UTF-8"));
}
catch (UnsupportedEncodingException e) {
throw new AmqpException(e);
}
return message;
}
});
}
java

请注意,在发送消息之前,它设置了 replyTo 地址。它提供了由 traderJoeQueue bean 定义生成的队列(之前展示过)。以下清单展示了 StockServiceGateway 类本身的 @Bean 定义:

@Bean
public StockServiceGateway stockServiceGateway() {
RabbitStockServiceGateway gateway = new RabbitStockServiceGateway();
gateway.setRabbitTemplate(rabbitTemplate());
gateway.setDefaultReplyToQueue(traderJoeQueue());
return gateway;
}
java

如果你不再运行服务器和客户端,现在启动它们。尝试发送一个格式为 100 TCKR 的请求。在模拟请求“处理”的短暂人工延迟后,你应该会在客户端上看到一条确认消息。

从非 Spring 应用程序接收 JSON

Spring 应用程序在发送 JSON 时,会将 _TypeId_ 头设置为完全限定的类名,以帮助接收应用程序将 JSON 转换回 Java 对象。

spring-rabbit-json 示例探讨了将来自非 Spring 应用程序的 JSON 转换的几种技术。