跳到主要内容

多 Broker(或集群)支持

DeepSeek V3 中英对照 Multiple Broker (or Cluster) Support

版本 2.3 在单个应用程序与多个 broker 或 broker 集群之间进行通信时增加了更多便利性。其主要优点在于,在消费者端,基础设施可以自动将自动声明的队列与适当的 broker 关联起来。

以下通过一个例子来最好地说明这一点:

@SpringBootApplication(exclude = RabbitAutoConfiguration.class)
public class Application {

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

@Bean
CachingConnectionFactory cf1() {
return new CachingConnectionFactory("localhost");
}

@Bean
CachingConnectionFactory cf2() {
return new CachingConnectionFactory("otherHost");
}

@Bean
CachingConnectionFactory cf3() {
return new CachingConnectionFactory("thirdHost");
}

@Bean
SimpleRoutingConnectionFactory rcf(CachingConnectionFactory cf1,
CachingConnectionFactory cf2, CachingConnectionFactory cf3) {

SimpleRoutingConnectionFactory rcf = new SimpleRoutingConnectionFactory();
rcf.setDefaultTargetConnectionFactory(cf1);
rcf.setTargetConnectionFactories(Map.of("one", cf1, "two", cf2, "three", cf3));
return rcf;
}

@Bean("factory1-admin")
RabbitAdmin admin1(CachingConnectionFactory cf1) {
return new RabbitAdmin(cf1);
}

@Bean("factory2-admin")
RabbitAdmin admin2(CachingConnectionFactory cf2) {
return new RabbitAdmin(cf2);
}

@Bean("factory3-admin")
RabbitAdmin admin3(CachingConnectionFactory cf3) {
return new RabbitAdmin(cf3);
}

@Bean
public RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry() {
return new RabbitListenerEndpointRegistry();
}

@Bean
public RabbitListenerAnnotationBeanPostProcessor postProcessor(RabbitListenerEndpointRegistry registry) {
MultiRabbitListenerAnnotationBeanPostProcessor postProcessor
= new MultiRabbitListenerAnnotationBeanPostProcessor();
postProcessor.setEndpointRegistry(registry);
postProcessor.setContainerFactoryBeanName("defaultContainerFactory");
return postProcessor;
}

@Bean
public SimpleRabbitListenerContainerFactory factory1(CachingConnectionFactory cf1) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cf1);
return factory;
}

@Bean
public SimpleRabbitListenerContainerFactory factory2(CachingConnectionFactory cf2) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cf2);
return factory;
}

@Bean
public SimpleRabbitListenerContainerFactory factory3(CachingConnectionFactory cf3) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cf3);
return factory;
}

@Bean
RabbitTemplate template(SimpleRoutingConnectionFactory rcf) {
return new RabbitTemplate(rcf);
}

@Bean
ConnectionFactoryContextWrapper wrapper(SimpleRoutingConnectionFactory rcf) {
return new ConnectionFactoryContextWrapper(rcf);
}

}

@Component
class Listeners {

@RabbitListener(queuesToDeclare = @Queue("q1"), containerFactory = "factory1")
public void listen1(String in) {

}

@RabbitListener(queuesToDeclare = @Queue("q2"), containerFactory = "factory2")
public void listen2(String in) {

}

@RabbitListener(queuesToDeclare = @Queue("q3"), containerFactory = "factory3")
public void listen3(String in) {

}

}
java

如你所见,我们已经声明了 3 组基础设施(连接工厂、管理员、容器工厂)。如前所述,@RabbitListener 可以定义使用哪个容器工厂;在这种情况下,它们还使用了 queuesToDeclare,如果队列在代理上不存在,这将导致队列被声明。通过按照约定 <container-factory-name>-admin 命名 RabbitAdmin bean,基础设施能够确定应由哪个管理员来声明队列。这也适用于 bindings = @QueueBinding(…​),其中交换和绑定也将被声明。但它不适用于 queues,因为 queues 期望队列已经存在。

在生产者端,提供了一个方便的 ConnectionFactoryContextWrapper 类,以便更简单地使用 RoutingConnectionFactory(参见 路由连接工厂)。

如上所示,已经添加了一个带有路由键 onetwothreeSimpleRoutingConnectionFactory bean。还有一个使用该工厂的 RabbitTemplate。以下是一个使用该模板与包装器一起路由到其中一个 broker 集群的示例。

@Bean
public ApplicationRunner runner(RabbitTemplate template, ConnectionFactoryContextWrapper wrapper) {
return args -> {
wrapper.run("one", () -> template.convertAndSend("q1", "toCluster1"));
wrapper.run("two", () -> template.convertAndSend("q2", "toCluster2"));
wrapper.run("three", () -> template.convertAndSend("q3", "toCluster3"));
};
}
java