发送消息
发送消息时,您可以使用以下任一方法:
void send(Message message) throws AmqpException;
void send(String routingKey, Message message) throws AmqpException;
void send(String exchange, String routingKey, Message message) throws AmqpException;
我们可以从前一个列表中的最后一个方法开始讨论,因为它实际上是最明确的方法。它允许在运行时提供一个 AMQP 交换名称(以及一个路由键)。最后一个参数是负责实际创建消息实例的回调函数。使用此方法发送消息的示例如下:以下示例展示了如何使用 send
方法发送消息:
amqpTemplate.send("marketData.topic", "quotes.nasdaq.THING1",
new Message("12.34".getBytes(), someProperties));
如果你计划大多数或所有时间都使用该模板实例发送到同一个 exchange
,你可以在模板本身上设置 exchange
属性。在这种情况下,你可以使用前面列表中的第二种方法。以下示例在功能上与前一个示例等效:
amqpTemplate.setExchange("marketData.topic");
amqpTemplate.send("quotes.nasdaq.FOO", new Message("12.34".getBytes(), someProperties));
如果 exchange
和 routingKey
属性都在模板上设置好了,你可以使用仅接受 Message
的方法。以下示例展示了如何做到这一点:
amqpTemplate.setExchange("marketData.topic");
amqpTemplate.setRoutingKey("quotes.nasdaq.FOO");
amqpTemplate.send(new Message("12.34".getBytes(), someProperties));
关于交换和路由键属性的更好理解方式是,显式的方法参数总是会覆盖模板的默认值。事实上,即使你没有在模板上显式设置这些属性,也总是有默认值存在。在这两种情况下,默认值都是一个空的 String
,但这实际上是一个合理的默认值。就路由键而言,首先它并不总是必要的(例如,对于 Fanout
类型的交换器)。此外,队列可以绑定到一个使用空 String
的交换器。这些都是依赖模板的路由键属性默认值为空 String
的合法场景。就交换器名称而言,空 String
是常用的,因为 AMQP 规范将“默认交换器”定义为没有名称的交换器。由于所有队列都自动绑定到该默认交换器(这是一个直接交换器),并使用它们的名称作为绑定值,因此前面的列表中的第二个方法可以用于通过默认交换器向任何队列进行简单的点对点消息传递。你可以通过运行时提供方法参数的方式,将队列名称作为 routingKey
提供。以下示例展示了如何做到这一点:
RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.send("queue.helloWorld", new Message("Hello World".getBytes(), someProperties));
或者,你可以创建一个模板,主要用于或专门用于发布到单个 Queue。以下示例展示了如何做到这一点:
RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.setRoutingKey("queue.helloWorld"); // but we'll always send to this Queue
template.send(new Message("Hello World".getBytes(), someProperties));
消息构建器 API
从 1.3 版本开始,MessageBuilder
和 MessagePropertiesBuilder
提供了消息构建器 API。这些方法提供了一种方便的“流式”方式来创建消息或消息属性。以下示例展示了流式 API 的实际应用:
Message message = MessageBuilder.withBody("foo".getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar", "baz")
.build();
MessageProperties props = MessagePropertiesBuilder.newInstance()
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar", "baz")
.build();
Message message = MessageBuilder.withBody("foo".getBytes())
.andProperties(props)
.build();
可以在 MessageProperties 上定义的每个属性进行设置。其他方法包括 setHeader(String key, String value)
、removeHeader(String key)
、removeHeaders()
和 copyProperties(MessageProperties properties)
。每个属性设置方法都有一个 set*IfAbsent()
的变体。在存在默认初始值的情况下,方法命名为 set*IfAbsentOrDefault()
。
提供了五个静态方法来创建初始消息构建器:
public static MessageBuilder withBody(byte[] body) 1
public static MessageBuilder withClonedBody(byte[] body) 2
public static MessageBuilder withBody(byte[] body, int from, int to) 3
public static MessageBuilder fromMessage(Message message) 4
public static MessageBuilder fromClonedMessage(Message message) 5
由构建器创建的消息具有一个直接引用参数的主体。
由构建器创建的消息具有一个包含参数中字节副本的新数组作为主体。
由构建器创建的消息具有一个包含参数中字节范围的新数组作为主体。有关更多详细信息,请参见 Arrays.copyOfRange()。
由构建器创建的消息具有一个直接引用参数主体的主体。参数的属性被复制到一个新的
MessageProperties
对象中。由构建器创建的消息具有一个包含参数主体副本的新数组作为主体。参数的属性被复制到一个新的
MessageProperties
对象中。
提供了三个静态方法来创建一个 MessagePropertiesBuilder
实例:
public static MessagePropertiesBuilder newInstance() 1
public static MessagePropertiesBuilder fromProperties(MessageProperties properties) 2
public static MessagePropertiesBuilder fromClonedProperties(MessageProperties properties) 3
一个新的消息属性对象使用默认值进行初始化。
构建器使用提供的属性对象进行初始化,并且
build()
将返回该属性对象。参数的属性被复制到一个新的
MessageProperties
对象中。
在使用 AmqpTemplate
的 RabbitTemplate
实现时,每个 send()
方法都有一个重载版本,该版本接受一个额外的 CorrelationData
对象。当启用了发布者确认机制时,该对象将在 AmqpTemplate 中描述的回调中返回。这使得发送者能够将确认(ack
或 nack
)与发送的消息关联起来。
从 1.6.7 版本开始,引入了 CorrelationAwareMessagePostProcessor
接口,允许在消息转换后修改关联数据。以下示例展示了如何使用它:
Message postProcessMessage(Message message, Correlation correlation);
在 2.0 版本中,此接口已被弃用。该方法已移至 MessagePostProcessor
,并提供了一个默认实现,该实现委托给 postProcessMessage(Message message)
。
从 1.6.7 版本开始,还提供了一个新的回调接口 CorrelationDataPostProcessor
。该接口在所有 MessagePostProcessor
实例(在 send()
方法中提供的以及在 setBeforePublishPostProcessors()
中提供的)之后调用。实现可以更新或替换在 send()
方法中提供的关联数据(如果有)。Message
和原始的 CorrelationData
(如果有)作为参数提供。以下示例展示了如何使用 postProcess
方法:
CorrelationData postProcess(Message message, CorrelationData correlationData);
发布者返回
当模板的 mandatory
属性为 true
时,返回的消息由 AmqpTemplate 中描述的回调提供。
从 1.4 版本开始,RabbitTemplate
支持 SpEL mandatoryExpression
属性,该属性针对每个请求消息进行评估,作为根评估对象,解析为 boolean
值。可以在表达式中使用 Bean 引用,例如 @myBean.isMandatory(#root)
。
RabbitTemplate
在发送和接收操作中也可以内部使用 Publisher 返回。更多信息请参阅 回复超时。
批处理
版本 1.4.2 引入了 BatchingRabbitTemplate
。这是 RabbitTemplate
的一个子类,它重写了 send
方法,根据 BatchingStrategy
批量处理消息。只有当批量处理完成时,消息才会被发送到 RabbitMQ。以下清单展示了 BatchingStrategy
接口的定义:
public interface BatchingStrategy {
MessageBatch addToBatch(String exchange, String routingKey, Message message);
Date nextRelease();
Collection<MessageBatch> releaseBatches();
}
批量数据保存在内存中。在系统故障的情况下,未发送的消息可能会丢失。
提供了一个 SimpleBatchingStrategy
。它支持将消息发送到单个交换器或路由键。它具有以下属性:
-
batchSize
: 在发送之前,一个批次中的消息数量。 -
bufferLimit
: 批次消息的最大大小。如果超过此限制,将优先于batchSize
,导致发送部分批次。 -
timeout
: 当没有新的活动将消息添加到批次中时,经过此时间后将发送部分批次。
SimpleBatchingStrategy
通过在每个嵌入消息前添加一个四字节的二进制长度来格式化批次。这是通过将 springBatchFormat
消息属性设置为 lengthHeader4
来告知接收系统的。
默认情况下,批量消息会被监听器容器自动解批量(通过使用 springBatchFormat
消息头)。如果拒绝批量中的任何一条消息,将导致整个批量被拒绝。
然而,更多信息请参阅 @RabbitListener 与批处理。