跳到主要内容

使用 RabbitMQ Stream 插件

DeepSeek V3 中英对照 Using the RabbitMQ Stream Plugin

版本 2.4 引入了对 RabbitMQ Stream 插件 Java 客户端 的初步支持,该客户端用于 RabbitMQ Stream 插件

  • RabbitStreamTemplate

  • StreamListenerContainer

spring-rabbit-stream 依赖添加到你的项目中:

<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-stream</artifactId>
<version>3.2.3</version>
</dependency>
xml
compile 'org.springframework.amqp:spring-rabbit-stream:3.2.3'
groovy

你可以像平常一样使用 RabbitAdmin bean 来配置队列,使用 QueueBuilder.stream() 方法来指定队列类型。例如:

@Bean
Queue stream() {
return QueueBuilder.durable("stream.queue1")
.stream()
.build();
}
java

然而,这仅在你同时使用非流式组件(如 SimpleMessageListenerContainerDirectMessageListenerContainer)时才会生效,因为当 AMQP 连接打开时,管理组件会被触发以声明定义的 bean。如果你的应用程序仅使用流式组件,或者你希望使用高级流配置功能,则应配置一个 StreamAdmin 来代替:

@Bean
StreamAdmin streamAdmin(Environment env) {
return new StreamAdmin(env, sc -> {
sc.stream("stream.queue1").maxAge(Duration.ofHours(2)).create();
sc.stream("stream.queue2").create();
});
}
java

有关 StreamCreator 的更多信息,请参阅 RabbitMQ 文档。

发送消息

RabbitStreamTemplate 提供了 RabbitTemplate(AMQP)功能的一个子集。

public interface RabbitStreamOperations extends AutoCloseable {

CompletableFuture<Boolean> send(Message message);

CompletableFuture<Boolean> convertAndSend(Object message);

CompletableFuture<Boolean> convertAndSend(Object message, @Nullable MessagePostProcessor mpp);

CompletableFuture<Boolean> send(com.rabbitmq.stream.Message message);

MessageBuilder messageBuilder();

MessageConverter messageConverter();

StreamMessageConverter streamMessageConverter();

@Override
void close() throws AmqpException;

}
java

RabbitStreamTemplate 实现具有以下构造函数和属性:

public RabbitStreamTemplate(Environment environment, String streamName) {
}

public void setMessageConverter(MessageConverter messageConverter) {
}

public void setStreamConverter(StreamMessageConverter streamConverter) {
}

public synchronized void setProducerCustomizer(ProducerCustomizer producerCustomizer) {
}
java

MessageConverter 用于 convertAndSend 方法中,将对象转换为 Spring AMQP 的 Message

StreamMessageConverter 用于将 Spring AMQP 的 Message 转换为原生流 Message

你也可以直接发送原生的流 Message;通过 messageBuilder() 方法可以访问 Producer 的消息构建器。

ProducerCustomizer 提供了一种机制,用于在生产者构建之前对其进行自定义。

请参考 Java 客户端文档 关于自定义 EnvironmentProducer 的部分。

接收消息

异步消息接收由 StreamListenerContainer(以及在使用 @RabbitListener 时的 StreamRabbitListenerContainerFactory)提供。

监听器容器需要一个 Environment 以及一个流名称。

你可以使用经典的 MessageListener 接收 Spring AMQP 的 Message,或者你可以使用一个新的接口接收原生的流式 Message

public interface StreamMessageListener extends MessageListener {

void onStreamMessage(Message message, Context context);

}
java

有关支持的属性的信息,请参见消息监听器容器配置

与模板类似,容器也有一个 ConsumerCustomizer 属性。

请参考 Java 客户端文档 以了解如何自定义 EnvironmentConsumer

在使用 @RabbitListener 时,配置一个 StreamRabbitListenerContainerFactory;此时,大多数 @RabbitListener 属性(如 concurrency 等)将被忽略。仅支持 idqueuesautoStartupcontainerFactory。此外,queues 只能包含一个流名称。

示例

@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
RabbitStreamTemplate template = new RabbitStreamTemplate(env, "test.stream.queue1");
template.setProducerCustomizer((name, builder) -> builder.name("test"));
return template;
}

@Bean
RabbitListenerContainerFactory<StreamListenerContainer> rabbitListenerContainerFactory(Environment env) {
return new StreamRabbitListenerContainerFactory(env);
}

@RabbitListener(queues = "test.stream.queue1")
void listen(String in) {
...
}

@Bean
RabbitListenerContainerFactory<StreamListenerContainer> nativeFactory(Environment env) {
StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(env);
factory.setNativeListener(true);
factory.setConsumerCustomizer((id, builder) -> {
builder.name("myConsumer")
.offset(OffsetSpecification.first())
.manualTrackingStrategy();
});
return factory;
}

@RabbitListener(id = "test", queues = "test.stream.queue2", containerFactory = "nativeFactory")
void nativeMsg(Message in, Context context) {
...
context.storeOffset();
}

@Bean
Queue stream() {
return QueueBuilder.durable("test.stream.queue1")
.stream()
.build();
}

@Bean
Queue stream() {
return QueueBuilder.durable("test.stream.queue2")
.stream()
.build();
}
java

版本 2.4.5 向 StreamListenerContainer(及其工厂)添加了 adviceChain 属性。还提供了一个新的工厂 bean,用于创建一个无状态的重试拦截器,并带有一个可选的 StreamMessageRecoverer,用于消费原始流消息时使用。

@Bean
public StreamRetryOperationsInterceptorFactoryBean sfb(RetryTemplate retryTemplate) {
StreamRetryOperationsInterceptorFactoryBean rfb =
new StreamRetryOperationsInterceptorFactoryBean();
rfb.setRetryOperations(retryTemplate);
rfb.setStreamMessageRecoverer((msg, context, throwable) -> {
...
});
return rfb;
}
java
important

此容器不支持有状态重试。

超级流

Super Stream 是一种用于分区流的抽象概念,通过将多个流队列绑定到一个具有参数 x-super-stream: true 的 exchange 来实现。

资源调配

为了方便,可以通过定义一个 SuperStream 类型的单一 bean 来配置超级流。

@Bean
SuperStream superStream() {
return new SuperStream("my.super.stream", 3);
}
java

RabbitAdmin 会检测到这个 bean,并声明交换器 (my.super.stream) 和 3 个队列(分区)—— my.super-stream-n,其中 n012,这些队列会绑定到路由键等于 n 的路由键上。

如果你也希望通过 AMQP 发布到 exchange,你可以提供自定义的路由键:

@Bean
SuperStream superStream() {
return new SuperStream("my.super.stream", 3, (q, i) -> IntStream.range(0, i)
.mapToObj(j -> "rk-" + j)
.collect(Collectors.toList()));
}
java

键的数量必须等于分区的数量。

向 SuperStream 生产数据

你必须在 RabbitStreamTemplate 中添加一个 superStreamRoutingFunction

@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
RabbitStreamTemplate template = new RabbitStreamTemplate(env, "stream.queue1");
template.setSuperStreamRouting(message -> {
// some logic to return a String for the client's hashing algorithm
});
return template;
}
java

你也可以使用 RabbitTemplate 通过 AMQP 发布消息。

使用单一活跃消费者消费超级流

在监听器容器上调用 superStream 方法,以在超级流上启用单一活跃消费者。

@Bean
StreamListenerContainer container(Environment env, String name) {
StreamListenerContainer container = new StreamListenerContainer(env);
container.superStream("ss.sac", "myConsumer", 3); // concurrency = 3
container.setupMessageListener(msg -> {
...
});
container.setConsumerCustomizer((id, builder) -> builder.offset(OffsetSpecification.last()));
return container;
}
java
important

此时,当并发数大于 1 时,实际并发数会进一步由 Environment 控制;要实现完全并发,请将环境的 maxConsumersByConnection 设置为 1。请参阅 配置环境

Micrometer 观测

从 3.0.5 版本开始,RabbitStreamTemplate 和流监听器容器现在支持使用 Micrometer 进行观测。此外,容器现在也支持 Micrometer 计时器(当未启用观测时)。

在每个组件上设置 observationEnabled 以启用观察;这将禁用 Micrometer Timers,因为计时器现在将由每个观察进行管理。在使用注解监听器时,请在容器工厂上设置 observationEnabled

更多信息请参考 Micrometer Tracing

要为计时器/跟踪添加标签,分别向模板或监听器容器配置自定义的 RabbitStreamTemplateObservationConventionRabbitStreamListenerObservationConvention

默认实现会为模板观察添加 name 标签,为容器添加 listener.id 标签。

你可以选择继承 DefaultRabbitStreamTemplateObservationConventionDefaultStreamRabbitListenerObservationConvention,或者提供全新的实现。

更多详情请参阅 Micrometer Observation 文档