跳到主要内容

发送消息

DeepSeek V3 中英对照 Sending Messages

发送消息时,您可以使用以下任一方法:

void send(Message message) throws AmqpException;

void send(String routingKey, Message message) throws AmqpException;

void send(String exchange, String routingKey, Message message) throws AmqpException;
java

我们可以从前一个列表中的最后一个方法开始讨论,因为它实际上是最明确的方法。它允许在运行时提供一个 AMQP 交换名称(以及一个路由键)。最后一个参数是负责实际创建消息实例的回调函数。使用此方法发送消息的示例如下:以下示例展示了如何使用 send 方法发送消息:

amqpTemplate.send("marketData.topic", "quotes.nasdaq.THING1",
new Message("12.34".getBytes(), someProperties));
java

如果你计划大多数或所有时间都使用该模板实例发送到同一个 exchange,你可以在模板本身上设置 exchange 属性。在这种情况下,你可以使用前面列表中的第二种方法。以下示例在功能上与前一个示例等效:

amqpTemplate.setExchange("marketData.topic");
amqpTemplate.send("quotes.nasdaq.FOO", new Message("12.34".getBytes(), someProperties));
java

如果 exchangeroutingKey 属性都在模板上设置好了,你可以使用仅接受 Message 的方法。以下示例展示了如何做到这一点:

amqpTemplate.setExchange("marketData.topic");
amqpTemplate.setRoutingKey("quotes.nasdaq.FOO");
amqpTemplate.send(new Message("12.34".getBytes(), someProperties));
java

关于交换和路由键属性的更好理解方式是,显式的方法参数总是会覆盖模板的默认值。事实上,即使你没有在模板上显式设置这些属性,也总是有默认值存在。在这两种情况下,默认值都是一个空的 String,但这实际上是一个合理的默认值。就路由键而言,首先它并不总是必要的(例如,对于 Fanout 类型的交换器)。此外,队列可以绑定到一个使用空 String 的交换器。这些都是依赖模板的路由键属性默认值为空 String 的合法场景。就交换器名称而言,空 String 是常用的,因为 AMQP 规范将“默认交换器”定义为没有名称的交换器。由于所有队列都自动绑定到该默认交换器(这是一个直接交换器),并使用它们的名称作为绑定值,因此前面的列表中的第二个方法可以用于通过默认交换器向任何队列进行简单的点对点消息传递。你可以通过运行时提供方法参数的方式,将队列名称作为 routingKey 提供。以下示例展示了如何做到这一点:

RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.send("queue.helloWorld", new Message("Hello World".getBytes(), someProperties));
java

或者,你可以创建一个模板,主要用于或专门用于发布到单个 Queue。以下示例展示了如何做到这一点:

RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.setRoutingKey("queue.helloWorld"); // but we'll always send to this Queue
template.send(new Message("Hello World".getBytes(), someProperties));
java

消息构建器 API

从 1.3 版本开始,MessageBuilderMessagePropertiesBuilder 提供了消息构建器 API。这些方法提供了一种方便的“流式”方式来创建消息或消息属性。以下示例展示了流式 API 的实际应用:

Message message = MessageBuilder.withBody("foo".getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar", "baz")
.build();
java
MessageProperties props = MessagePropertiesBuilder.newInstance()
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar", "baz")
.build();
Message message = MessageBuilder.withBody("foo".getBytes())
.andProperties(props)
.build();
java

可以在 MessageProperties 上定义的每个属性进行设置。其他方法包括 setHeader(String key, String value)removeHeader(String key)removeHeaders()copyProperties(MessageProperties properties)。每个属性设置方法都有一个 set*IfAbsent() 的变体。在存在默认初始值的情况下,方法命名为 set*IfAbsentOrDefault()

提供了五个静态方法来创建初始消息构建器:

public static MessageBuilder withBody(byte[] body) 1

public static MessageBuilder withClonedBody(byte[] body) 2

public static MessageBuilder withBody(byte[] body, int from, int to) 3

public static MessageBuilder fromMessage(Message message) 4

public static MessageBuilder fromClonedMessage(Message message) 5
java
  • 由构建器创建的消息具有一个直接引用参数的主体。

  • 由构建器创建的消息具有一个包含参数中字节副本的新数组作为主体。

  • 由构建器创建的消息具有一个包含参数中字节范围的新数组作为主体。有关更多详细信息,请参见 Arrays.copyOfRange()

  • 由构建器创建的消息具有一个直接引用参数主体的主体。参数的属性被复制到一个新的 MessageProperties 对象中。

  • 由构建器创建的消息具有一个包含参数主体副本的新数组作为主体。参数的属性被复制到一个新的 MessageProperties 对象中。

提供了三个静态方法来创建一个 MessagePropertiesBuilder 实例:

public static MessagePropertiesBuilder newInstance() 1

public static MessagePropertiesBuilder fromProperties(MessageProperties properties) 2

public static MessagePropertiesBuilder fromClonedProperties(MessageProperties properties) 3
java
  • 一个新的消息属性对象使用默认值进行初始化。

  • 构建器使用提供的属性对象进行初始化,并且 build() 将返回该属性对象。

  • 参数的属性被复制到一个新的 MessageProperties 对象中。

在使用 AmqpTemplateRabbitTemplate 实现时,每个 send() 方法都有一个重载版本,该版本接受一个额外的 CorrelationData 对象。当启用了发布者确认机制时,该对象将在 AmqpTemplate 中描述的回调中返回。这使得发送者能够将确认(acknack)与发送的消息关联起来。

从 1.6.7 版本开始,引入了 CorrelationAwareMessagePostProcessor 接口,允许在消息转换后修改关联数据。以下示例展示了如何使用它:

Message postProcessMessage(Message message, Correlation correlation);
java

在 2.0 版本中,此接口已被弃用。该方法已移至 MessagePostProcessor,并提供了一个默认实现,该实现委托给 postProcessMessage(Message message)

从 1.6.7 版本开始,还提供了一个新的回调接口 CorrelationDataPostProcessor。该接口在所有 MessagePostProcessor 实例(在 send() 方法中提供的以及在 setBeforePublishPostProcessors() 中提供的)之后调用。实现可以更新或替换在 send() 方法中提供的关联数据(如果有)。Message 和原始的 CorrelationData(如果有)作为参数提供。以下示例展示了如何使用 postProcess 方法:

CorrelationData postProcess(Message message, CorrelationData correlationData);
java

发布者返回

当模板的 mandatory 属性为 true 时,返回的消息由 AmqpTemplate 中描述的回调提供。

从 1.4 版本开始,RabbitTemplate 支持 SpEL mandatoryExpression 属性,该属性针对每个请求消息进行评估,作为根评估对象,解析为 boolean 值。可以在表达式中使用 Bean 引用,例如 @myBean.isMandatory(#root)

RabbitTemplate 在发送和接收操作中也可以内部使用 Publisher 返回。更多信息请参阅 回复超时

批处理

版本 1.4.2 引入了 BatchingRabbitTemplate。这是 RabbitTemplate 的一个子类,它重写了 send 方法,根据 BatchingStrategy 批量处理消息。只有当批量处理完成时,消息才会被发送到 RabbitMQ。以下清单展示了 BatchingStrategy 接口的定义:

public interface BatchingStrategy {

MessageBatch addToBatch(String exchange, String routingKey, Message message);

Date nextRelease();

Collection<MessageBatch> releaseBatches();

}
java
警告

批量数据保存在内存中。在系统故障的情况下,未发送的消息可能会丢失。

提供了一个 SimpleBatchingStrategy。它支持将消息发送到单个交换器或路由键。它具有以下属性:

  • batchSize: 在发送之前,一个批次中的消息数量。

  • bufferLimit: 批次消息的最大大小。如果超过此限制,将优先于 batchSize,导致发送部分批次。

  • timeout: 当没有新的活动将消息添加到批次中时,经过此时间后将发送部分批次。

SimpleBatchingStrategy 通过在每个嵌入消息前添加一个四字节的二进制长度来格式化批次。这是通过将 springBatchFormat 消息属性设置为 lengthHeader4 来告知接收系统的。

important

默认情况下,批量消息会被监听器容器自动解批量(通过使用 springBatchFormat 消息头)。如果拒绝批量中的任何一条消息,将导致整个批量被拒绝。

然而,更多信息请参阅 @RabbitListener 与批处理