示例应用
Spring AMQP 示例 项目包含两个示例应用程序。第一个是一个简单的“Hello World”示例,展示了同步和异步消息接收。它为理解基本组件提供了一个极佳的起点。第二个示例基于股票交易的用例,展示了在现实世界应用程序中常见的交互类型。在本章中,我们将快速浏览每个示例,以便您能够专注于最重要的组件。这些示例都是基于 Maven 的,因此您应该能够将它们直接导入任何支持 Maven 的 IDE 中(例如 SpringSource Tool Suite)。
“Hello World”示例
“Hello World”示例展示了同步和异步消息接收。你可以将 spring-rabbit-helloworld
示例导入到 IDE 中,然后按照下面的讨论进行操作。
同步示例
在 src/main/java
目录中,导航到 org.springframework.amqp.helloworld
包。打开 HelloWorldConfiguration
类,注意到它在类级别上包含了 @Configuration
注解,并且在方法级别上包含了一些 @Bean
注解。这是 Spring 基于 Java 的配置示例。你可以在这里了解更多关于它的信息 这里。
以下代码展示了如何创建连接工厂:
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
配置中还包含一个 RabbitAdmin
实例,默认情况下,它会查找任何类型为 exchange、queue 或 binding 的 bean,然后在 broker 上声明它们。实际上,在 HelloWorldConfiguration
中生成的 helloWorldQueue
bean 就是一个例子,因为它是一个 Queue
的实例。
以下列表展示了 helloWorldQueue
bean 的定义:
@Bean
public Queue helloWorldQueue() {
return new Queue(this.helloWorldQueueName);
}
回顾 rabbitTemplate
bean 的配置,你可以看到它的 queue
属性(用于接收消息)和 routingKey
属性(用于发送消息)都设置为 helloWorldQueue
。
在探索了配置之后,我们可以看看实际使用这些组件的代码。首先,打开同一包中的 Producer
类。它包含一个 main()
方法,其中创建了 Spring 的 ApplicationContext
。
以下代码展示了 main
方法:
public static void main(String[] args) {
ApplicationContext context =
new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
amqpTemplate.convertAndSend("Hello World");
System.out.println("Sent: Hello World");
}
在前面的示例中,AmqpTemplate
bean 被检索并用于发送 Message
。由于客户端代码应尽可能依赖接口,因此类型为 AmqpTemplate
而不是 RabbitTemplate
。尽管在 HelloWorldConfiguration
中创建的 bean 是 RabbitTemplate
的实例,但依赖接口意味着这段代码更具可移植性(你可以独立于代码更改配置)。由于调用了 convertAndSend()
方法,模板将其委托给其 MessageConverter
实例。在这种情况下,它使用了默认的 SimpleMessageConverter
,但可以为 rabbitTemplate
bean 提供不同的实现,如 HelloWorldConfiguration
中所定义的那样。
现在打开 Consumer
类。它实际上共享相同的配置基类,这意味着它共享 rabbitTemplate
bean。这就是为什么我们为该模板配置了 routingKey
(用于发送)和 queue
(用于接收)。正如我们在 AmqpTemplate 中描述的那样,你也可以将 routingKey
参数传递给发送方法,将 queue
参数传递给接收方法。Consumer
代码基本上是 Producer
的镜像,调用的是 receiveAndConvert()
而不是 convertAndSend()
。
以下清单展示了 Consumer
的主方法:
public static void main(String[] args) {
ApplicationContext context =
new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
System.out.println("Received: " + amqpTemplate.receiveAndConvert());
}
如果你运行 Producer
,然后运行 Consumer
,你应该在控制台输出中看到 Received: Hello World
。
异步示例
同步示例 讲解了同步的 Hello World 示例。本节将介绍一个稍微复杂但功能更强大的选项。通过一些修改,Hello World 示例可以展示异步接收,也称为消息驱动的 POJO(Plain Old Java Object)。实际上,有一个子包正好提供了这个功能:org.springframework.amqp.samples.helloworld.async
。
再次,我们从发送端开始。打开 ProducerConfiguration
类,注意到它创建了一个 connectionFactory
和一个 rabbitTemplate
bean。这一次,由于配置专门用于消息发送端,我们甚至不需要任何队列定义,而 RabbitTemplate
只设置了 'routingKey' 属性。回想一下,消息是发送到交换机而不是直接发送到队列的。AMQP 的默认交换机是一个没有名称的直接交换机。所有队列都绑定到该默认交换机,并使用它们的名称作为路由键。这就是为什么我们在这里只需要提供路由键的原因。
以下清单显示了 rabbitTemplate
的定义:
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setRoutingKey(this.helloWorldQueueName);
return template;
}
由于此示例演示了异步消息接收,生产端被设计为持续发送消息(如果它像同步版本那样是每次执行发送一条消息的模型,那么它实际上是一个消息驱动的消费者这一点就不会那么明显)。负责持续发送消息的组件被定义为 ProducerConfiguration
中的一个内部类。它被配置为每三秒运行一次。
以下清单展示了该组件:
static class ScheduledProducer {
@Autowired
private volatile RabbitTemplate rabbitTemplate;
private final AtomicInteger counter = new AtomicInteger();
@Scheduled(fixedRate = 3000)
public void sendMessage() {
rabbitTemplate.convertAndSend("Hello World " + counter.incrementAndGet());
}
}
你不需要理解所有的细节,因为真正的重点应该放在接收端(我们接下来会介绍)。然而,如果你还不熟悉 Spring 的任务调度支持,你可以在这里了解更多。简单来说,ProducerConfiguration
中的 postProcessor
bean 会向调度器注册任务。
现在我们可以转向接收端。为了强调消息驱动的 POJO 行为,我们从对消息做出反应的组件开始。这个类名为 HelloWorldHandler
,如下所示:
public class HelloWorldHandler {
public void handleMessage(String text) {
System.out.println("Received: " + text);
}
}
该类是一个 POJO。它没有继承任何基类,也没有实现任何接口,甚至不包含任何导入。它通过 Spring AMQP 的 MessageListenerAdapter
被“适配”到 MessageListener
接口。然后,您可以在 SimpleMessageListenerContainer
上配置该适配器。在本示例中,容器是在 ConsumerConfiguration
类中创建的。您可以在那里看到被包装在适配器中的 POJO。
以下代码展示了如何定义 listenerContainer
:
@Bean
public SimpleMessageListenerContainer listenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueueName(this.helloWorldQueueName);
container.setMessageListener(new MessageListenerAdapter(new HelloWorldHandler()));
return container;
}
SimpleMessageListenerContainer
是一个 Spring 生命周期组件,默认情况下会自动启动。如果你查看 Consumer
类,可以看到它的 main()
方法仅包含一行用于创建 ApplicationContext
的引导代码。同样,Producer
的 main()
方法也是一行引导代码,因为带有 @Scheduled
注解的方法的组件也会自动启动。你可以以任何顺序启动 Producer
和 Consumer
,并且应该会看到每三秒钟发送和接收的消息。
股票交易
股票交易示例展示了比 Hello World 示例 更高级的消息传递场景。然而,配置非常相似,只是稍微复杂一些。由于我们已经详细介绍了 Hello World 的配置,这里我们重点讨论这个示例的不同之处。有一个服务器将市场数据(股票报价)推送到一个主题交换器(topic exchange)。然后,客户端可以通过绑定一个队列并使用路由模式(例如 app.stock.quotes.nasdaq.*
)订阅市场数据流。这个演示的另一个主要特性是由客户端发起并由服务器处理的请求-响应“股票交易”交互。这涉及到一个私有的 replyTo
队列,该队列由客户端在订单请求消息本身中发送。
服务器的核心配置位于 org.springframework.amqp.rabbit.stocks.config.server
包中的 RabbitServerConfiguration
类。它继承了 AbstractStockAppRabbitConfiguration
。在这里定义了服务器和客户端共用的资源,包括市场数据主题交换机(名称为 app.stock.marketdata
)以及服务器为股票交易暴露的队列(名称为 app.stock.request
)。在这个通用的配置文件中,你还可以看到在 RabbitTemplate
上配置了一个 Jackson2JsonMessageConverter
。
服务器特定的配置由两部分组成。首先,它在 RabbitTemplate
上配置了市场数据交换,这样就不需要在每次发送 Message
时提供交换名称。这是在基础配置类中定义的抽象回调方法中完成的。以下代码展示了该方法:
public void configureRabbitTemplate(RabbitTemplate rabbitTemplate) {
rabbitTemplate.setExchange(MARKET_DATA_EXCHANGE_NAME);
}
其次,声明了库存请求队列。在这种情况下,它不需要任何显式的绑定,因为它绑定到默认的无名交换器,并使用其自身名称作为路由键。正如前面提到的,AMQP 规范定义了这种行为。以下清单展示了 stockRequestQueue
bean 的定义:
@Bean
public Queue stockRequestQueue() {
return new Queue(STOCK_REQUEST_QUEUE_NAME);
}
现在你已经看到了服务器 AMQP 资源的配置,接下来请导航到 src/test/java
目录下的 org.springframework.amqp.rabbit.stocks
包。在那里,你可以看到实际的 Server
类,它提供了一个 main()
方法。该类基于 server-bootstrap.xml
配置文件创建了一个 ApplicationContext
。在那里,你可以看到用于发布模拟市场数据的定时任务。该配置依赖于 Spring 的 task
命名空间支持。引导配置文件还导入了一些其他文件。最有趣的是位于 src/main/resources
目录下的 server-messaging.xml
文件。在那里,你可以看到负责处理股票交易请求的 messageListenerContainer
bean。最后,看一下 server-handlers.xml
文件中定义的 serverHandler
bean(该文件也在 src/main/resources
目录下)。该 bean 是 ServerHandler
类的一个实例,是一个很好的消息驱动 POJO 示例,它还可以发送回复消息。请注意,它本身并没有与框架或任何 AMQP 概念耦合。它接受一个 TradeRequest
并返回一个 TradeResponse
。以下代码清单展示了 handleMessage
方法的定义:
public TradeResponse handleMessage(TradeRequest tradeRequest) { ...
}
现在我们已经了解了服务器端最重要的配置和代码,接下来可以转向客户端。最好的起点可能是 RabbitClientConfiguration
,它位于 org.springframework.amqp.rabbit.stocks.config.client
包中。请注意,它声明了两个队列,但没有提供显式的名称。以下清单展示了这两个队列的 bean 定义:
@Bean
public Queue marketDataQueue() {
return amqpAdmin().declareQueue();
}
@Bean
public Queue traderJoeQueue() {
return amqpAdmin().declareQueue();
}
这些是私有队列,且会自动生成唯一的名称。第一个生成的队列被客户端用来绑定到服务器暴露的市场数据交换器。回想一下,在 AMQP 中,消费者与队列交互,而生产者则与交换器交互。将队列绑定到交换器的操作是告诉代理将来自特定交换器的消息传递(或路由)到队列。由于市场数据交换器是一个主题交换器,绑定可以用路由模式来表达。RabbitClientConfiguration
通过 Binding
对象来实现这一点,而该对象是通过 BindingBuilder
的流式 API 生成的。以下代码展示了 Binding
对象:
@Bean
public Binding marketDataBinding() {
return BindingBuilder.bind(marketDataQueue())
.to(marketDataExchange())
.with("market.data.*");
}
@Value("${stocks.quote.pattern}")
private String marketDataRoutingKey;
@Bean
public Binding marketDataBinding() {
return BindingBuilder.bind(
marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
}
注意到实际值已经被外部化到一个属性文件中(位于 src/main/resources
下的 client.properties
),并且我们使用了 Spring 的 @Value
注解来注入该值。这通常是一个好主意。否则,该值将被硬编码在类中,并且在不重新编译的情况下无法修改。在这种情况下,运行多个版本的客户端并在修改用于绑定的路由模式时变得容易得多。我们现在可以尝试这样做。
首先运行 org.springframework.amqp.rabbit.stocks.Server
,然后运行 org.springframework.amqp.rabbit.stocks.Client
。你应该会看到 NASDAQ
股票的模拟报价,因为 client.properties 文件中与 'stocks.quote.pattern' 键关联的当前值是 'app.stock.quotes.nasdaq.'。现在,在保持现有的 Server
和 Client
运行的同时,将该属性值更改为 'app.stock.quotes.nyse.' 并启动第二个 Client
实例。你应该会看到第一个客户端仍然接收 NASDAQ 的报价,而第二个客户端接收 NYSE 的报价。你也可以更改模式以获取所有股票甚至单个股票的报价。
我们探索的最后一个特性是从客户端视角来看的请求-回复交互。回想一下,我们已经见过 ServerHandler
,它接收 TradeRequest
对象并返回 TradeResponse
对象。在 Client
端对应的代码是位于 org.springframework.amqp.rabbit.stocks.gateway
包中的 RabbitStockServiceGateway
。它委托给 RabbitTemplate
来发送消息。以下清单展示了 send
方法:
public void send(TradeRequest tradeRequest) {
getRabbitTemplate().convertAndSend(tradeRequest, new MessagePostProcessor() {
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setReplyTo(new Address(defaultReplyToQueue));
try {
message.getMessageProperties().setCorrelationId(
UUID.randomUUID().toString().getBytes("UTF-8"));
}
catch (UnsupportedEncodingException e) {
throw new AmqpException(e);
}
return message;
}
});
}
请注意,在发送消息之前,它设置了 replyTo
地址。它提供了由 traderJoeQueue
bean 定义生成的队列(之前展示过)。以下清单展示了 StockServiceGateway
类本身的 @Bean
定义:
@Bean
public StockServiceGateway stockServiceGateway() {
RabbitStockServiceGateway gateway = new RabbitStockServiceGateway();
gateway.setRabbitTemplate(rabbitTemplate());
gateway.setDefaultReplyToQueue(traderJoeQueue());
return gateway;
}
如果你不再运行服务器和客户端,现在启动它们。尝试发送一个格式为 100 TCKR
的请求。在模拟请求“处理”的短暂人工延迟后,你应该会在客户端上看到一条确认消息。
从非 Spring 应用程序接收 JSON
Spring 应用程序在发送 JSON 时,会将 _TypeId_
头设置为完全限定的类名,以帮助接收应用程序将 JSON 转换回 Java 对象。
spring-rabbit-json
示例探讨了将来自非 Spring 应用程序的 JSON 转换的几种技术。