跳到主要内容

测试支持

DeepSeek V3 中英对照 Testing Support

为异步应用程序编写集成测试必然比测试更简单的应用程序更为复杂。当诸如 @RabbitListener 注解这样的抽象概念出现时,情况会变得更加复杂。问题在于如何验证在发送消息后,监听器是否如预期那样接收到了消息。

该框架本身包含许多单元测试和集成测试。其中一些测试使用模拟(mocks),而另一些则通过与实时的 RabbitMQ broker 进行集成测试。你可以参考这些测试来获取一些测试场景的灵感。

Spring AMQP 1.6 版本引入了 spring-rabbit-test jar 包,它为测试一些更复杂的场景提供了支持。预计该项目将随着时间的推移而扩展,但我们需要社区的反馈来提出有助于测试的功能需求。请使用 JIRAGitHub Issues 提供此类反馈。

@SpringRabbitTest

使用此注解将基础设施 bean 添加到 Spring 测试的 ApplicationContext 中。在使用例如 @SpringBootTest 时,这不是必需的,因为 Spring Boot 的自动配置会添加这些 bean。

已注册的 Beans 有:

  • CachingConnectionFactory (autoConnectionFactory)。如果存在 @RabbitEnabled,则使用其连接工厂。

  • RabbitTemplate (autoRabbitTemplate)

  • RabbitAdmin (autoRabbitAdmin)

  • RabbitListenerContainerFactory (autoContainerFactory)

此外,还会添加与 @EnableRabbit 相关的 beans(以支持 @RabbitListener)。

@SpringJUnitConfig
@SpringRabbitTest
public class MyRabbitTests {

@Autowired
private RabbitTemplate template;

@Autowired
private RabbitAdmin admin;

@Autowired
private RabbitListenerEndpointRegistry registry;

@Test
void test() {
...
}

@Configuration
public static class Config {

...

}

}
java

在 JUnit4 中,将 @SpringJUnitConfig 替换为 @RunWith(SpringRunner.class)

Mockito Answer<?> 实现

目前有两个 Answer<?> 实现来帮助进行测试。

第一个,LatchCountDownAndCallRealMethodAnswer,提供了一个 Answer<Void>,它返回 null 并计数一个 latch。以下示例展示了如何使用 LatchCountDownAndCallRealMethodAnswer

LatchCountDownAndCallRealMethodAnswer answer = this.harness.getLatchAnswerFor("myListener", 2);
doAnswer(answer)
.when(listener).foo(anyString(), anyString());

...

assertThat(answer.await(10)).isTrue();
java

其次,LambdaAnswer<T> 提供了一种机制,可以选择性地调用真实的方法,并基于 InvocationOnMock 和结果(如果有)返回自定义结果。

考虑以下 POJO(Plain Old Java Object):

public class Thing {

public String thing(String thing) {
return thing.toUpperCase();
}

}
java

以下类测试了 Thing POJO:

Thing thing = spy(new Thing());

doAnswer(new LambdaAnswer<String>(true, (i, r) -> r + r))
.when(thing).thing(anyString());
assertEquals("THINGTHING", thing.thing("thing"));

doAnswer(new LambdaAnswer<String>(true, (i, r) -> r + i.getArguments()[0]))
.when(thing).thing(anyString());
assertEquals("THINGthing", thing.thing("thing"));

doAnswer(new LambdaAnswer<String>(false, (i, r) ->
"" + i.getArguments()[0] + i.getArguments()[0])).when(thing).thing(anyString());
assertEquals("thingthing", thing.thing("thing"));
java

从 2.2.3 版本开始,答案会捕获被测方法抛出的任何异常。使用 answer.getExceptions() 来获取这些异常的引用。

当与 @RabbitListenerTest 和 RabbitListenerTestHarness 结合使用时,使用 harness.getLambdaAnswerFor("listenerId", true, …​) 来获取为监听器正确构造的答案。

@RabbitListenerTestRabbitListenerTestHarness

在你的 @Configuration 类上添加 @RabbitListenerTest 注解,会导致框架将标准的 RabbitListenerAnnotationBeanPostProcessor 替换为一个名为 RabbitListenerTestHarness 的子类(同时也会通过 @EnableRabbit 启用 @RabbitListener 的检测)。

RabbitListenerTestHarness 在两个方面增强了监听器。首先,它将监听器包装在 Mockito Spy 中,从而支持正常的 Mockito 存根和验证操作。它还可以向监听器添加一个 Advice,从而能够访问参数、结果以及抛出的任何异常。你可以通过 @RabbitListenerTest 上的属性来控制启用哪些(或两者都启用)。后者是为了访问有关调用的更低层次的数据。它还支持阻塞测试线程,直到异步监听器被调用。

important

final 修饰的 @RabbitListener 方法不能被监视或增强。此外,只有带有 id 属性的监听器才能被监视或增强。

考虑一些示例。

以下示例使用了 spy

@Configuration
@RabbitListenerTest
public class Config {

@Bean
public Listener listener() {
return new Listener();
}

...

}

public class Listener {

@RabbitListener(id="foo", queues="#{queue1.name}")
public String foo(String foo) {
return foo.toUpperCase();
}

@RabbitListener(id="bar", queues="#{queue2.name}")
public void foo(@Payload String foo, @Header("amqp_receivedRoutingKey") String rk) {
...
}

}

public class MyTests {

@Autowired
private RabbitListenerTestHarness harness; 1

@Test
public void testTwoWay() throws Exception {
assertEquals("FOO", this.rabbitTemplate.convertSendAndReceive(this.queue1.getName(), "foo"));

Listener listener = this.harness.getSpy("foo"); 2
assertNotNull(listener);
verify(listener).foo("foo");
}

@Test
public void testOneWay() throws Exception {
Listener listener = this.harness.getSpy("bar");
assertNotNull(listener);

LatchCountDownAndCallRealMethodAnswer answer = this.harness.getLatchAnswerFor("bar", 2); 3
doAnswer(answer).when(listener).foo(anyString(), anyString()); 4

this.rabbitTemplate.convertAndSend(this.queue2.getName(), "bar");
this.rabbitTemplate.convertAndSend(this.queue2.getName(), "baz");

assertTrue(answer.await(10));
verify(listener).foo("bar", this.queue2.getName());
verify(listener).foo("baz", this.queue2.getName());
}

}
java
  • 将测试工具注入到测试用例中,以便我们可以访问间谍。

  • 获取对间谍的引用,以便我们可以验证它是否按预期调用。由于这是一个发送和接收操作,因此不需要挂起测试线程,因为它已经在 RabbitTemplate 中等待回复时挂起。

  • 在这种情况下,我们只使用发送操作,因此需要一个锁存器来等待容器线程上对监听器的异步调用。我们使用 Answer<?> 实现之一来帮助完成此操作。重要提示:由于监听器的间谍方式,使用 harness.getLatchAnswerFor() 获取正确配置的答案对于间谍非常重要。

  • 配置间谍以调用 Answer

以下示例使用了捕获通知(capture advice):

@Configuration
@ComponentScan
@RabbitListenerTest(spy = false, capture = true)
public class Config {

}

@Service
public class Listener {

private boolean failed;

@RabbitListener(id="foo", queues="#{queue1.name}")
public String foo(String foo) {
return foo.toUpperCase();
}

@RabbitListener(id="bar", queues="#{queue2.name}")
public void foo(@Payload String foo, @Header("amqp_receivedRoutingKey") String rk) {
if (!failed && foo.equals("ex")) {
failed = true;
throw new RuntimeException(foo);
}
failed = false;
}

}

public class MyTests {

@Autowired
private RabbitListenerTestHarness harness; 1

@Test
public void testTwoWay() throws Exception {
assertEquals("FOO", this.rabbitTemplate.convertSendAndReceive(this.queue1.getName(), "foo"));

InvocationData invocationData =
this.harness.getNextInvocationDataFor("foo", 0, TimeUnit.SECONDS); 2
assertThat(invocationData.getArguments()[0], equalTo("foo")); 3
assertThat((String) invocationData.getResult(), equalTo("FOO"));
}

@Test
public void testOneWay() throws Exception {
this.rabbitTemplate.convertAndSend(this.queue2.getName(), "bar");
this.rabbitTemplate.convertAndSend(this.queue2.getName(), "baz");
this.rabbitTemplate.convertAndSend(this.queue2.getName(), "ex");

InvocationData invocationData =
this.harness.getNextInvocationDataFor("bar", 10, TimeUnit.SECONDS); 4
Object[] args = invocationData.getArguments();
assertThat((String) args[0], equalTo("bar"));
assertThat((String) args[1], equalTo(queue2.getName()));

invocationData = this.harness.getNextInvocationDataFor("bar", 10, TimeUnit.SECONDS);
args = invocationData.getArguments();
assertThat((String) args[0], equalTo("baz"));

invocationData = this.harness.getNextInvocationDataFor("bar", 10, TimeUnit.SECONDS);
args = invocationData.getArguments();
assertThat((String) args[0], equalTo("ex"));
assertEquals("ex", invocationData.getThrowable().getMessage()); 5
}

}
java
  • 将测试工具注入到测试用例中,以便我们可以访问 spy。

  • 使用 harness.getNextInvocationDataFor() 来检索调用数据 - 在这种情况下,由于这是一个请求/回复场景,无需等待任何时间,因为测试线程在 RabbitTemplate 中等待结果时被挂起。

  • 然后我们可以验证参数和结果是否符合预期。

  • 这次我们需要一些时间来等待数据,因为这是容器线程上的异步操作,我们需要挂起测试线程。

  • 当监听器抛出异常时,异常会出现在调用数据的 throwable 属性中。

important

在使用自定义的 Answer<?> 与测试工具(harness)结合时,为了确保其正常运行,这些 Answer 应该继承 ForwardsInvocation,并从测试工具中获取实际的监听器(而不是 spy 对象)(getDelegate("myListener")),然后调用 super.answer(invocation)。请参考提供的 Mockito Answer<?> 实现 源代码以获取示例。

使用 TestRabbitTemplate

TestRabbitTemplate 用于在没有代理的情况下执行一些基本的集成测试。当你在测试用例中将其作为 @Bean 添加时,它会发现上下文中的所有监听器容器,无论这些容器是通过 @Bean<bean/> 还是使用 @RabbitListener 注解声明的。目前,它仅支持通过队列名称进行路由。该模板从容器中提取消息监听器,并在测试线程上直接调用它。对于返回回复的监听器,支持请求-回复消息传递(sendAndReceive 方法)。

以下测试用例使用了模板:

@RunWith(SpringRunner.class)
public class TestRabbitTemplateTests {

@Autowired
private TestRabbitTemplate template;

@Autowired
private Config config;

@Test
public void testSimpleSends() {
this.template.convertAndSend("foo", "hello1");
assertThat(this.config.fooIn, equalTo("foo:hello1"));
this.template.convertAndSend("bar", "hello2");
assertThat(this.config.barIn, equalTo("bar:hello2"));
assertThat(this.config.smlc1In, equalTo("smlc1:"));
this.template.convertAndSend("foo", "hello3");
assertThat(this.config.fooIn, equalTo("foo:hello1"));
this.template.convertAndSend("bar", "hello4");
assertThat(this.config.barIn, equalTo("bar:hello2"));
assertThat(this.config.smlc1In, equalTo("smlc1:hello3hello4"));

this.template.setBroadcast(true);
this.template.convertAndSend("foo", "hello5");
assertThat(this.config.fooIn, equalTo("foo:hello1foo:hello5"));
this.template.convertAndSend("bar", "hello6");
assertThat(this.config.barIn, equalTo("bar:hello2bar:hello6"));
assertThat(this.config.smlc1In, equalTo("smlc1:hello3hello4hello5hello6"));
}

@Test
public void testSendAndReceive() {
assertThat(this.template.convertSendAndReceive("baz", "hello"), equalTo("baz:hello"));
}
java
@Configuration
@EnableRabbit
public static class Config {

public String fooIn = "";

public String barIn = "";

public String smlc1In = "smlc1:";

@Bean
public TestRabbitTemplate template() throws IOException {
return new TestRabbitTemplate(connectionFactory());
}

@Bean
public ConnectionFactory connectionFactory() throws IOException {
ConnectionFactory factory = mock(ConnectionFactory.class);
Connection connection = mock(Connection.class);
Channel channel = mock(Channel.class);
willReturn(connection).given(factory).createConnection();
willReturn(channel).given(connection).createChannel(anyBoolean());
given(channel.isOpen()).willReturn(true);
return factory;
}

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() throws IOException {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
return factory;
}

@RabbitListener(queues = "foo")
public void foo(String in) {
this.fooIn += "foo:" + in;
}

@RabbitListener(queues = "bar")
public void bar(String in) {
this.barIn += "bar:" + in;
}

@RabbitListener(queues = "baz")
public String baz(String in) {
return "baz:" + in;
}

@Bean
public SimpleMessageListenerContainer smlc1() throws IOException {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
container.setQueueNames("foo", "bar");
container.setMessageListener(new MessageListenerAdapter(new Object() {

public void handleMessage(String in) {
smlc1In += in;
}

}));
return container;
}

}

}
java

JUnit4 @Rules

Spring AMQP 1.7 及更高版本提供了一个额外的 jar,名为 spring-rabbit-junit。这个 jar 包含了一些用于运行 JUnit4 测试时的 @Rule 工具实例。有关 JUnit5 测试的信息,请参见 JUnit5 条件

使用 BrokerRunning

BrokerRunning 提供了一种机制,允许在 broker 未运行时(默认在 localhost 上)让测试成功通过。

它还提供了实用方法来初始化和清空队列,以及删除队列和交换器。

以下示例展示了其用法:

@ClassRule
public static BrokerRunning brokerRunning = BrokerRunning.isRunningWithEmptyQueues("foo", "bar");

@AfterClass
public static void tearDown() {
brokerRunning.removeTestQueues("some.other.queue.too") // removes foo, bar as well
}
java

有几个 isRunning…​ 静态方法,例如 isBrokerAndManagementRunning(),用于验证代理是否启用了管理插件。

配置规则

有时你希望在没有 broker 的情况下测试失败,例如在夜间 CI 构建中。要在运行时禁用此规则,可以将名为 RABBITMQ_SERVER_REQUIRED 的环境变量设置为 true

你可以通过 setter 方法或环境变量来覆盖 broker 的属性,例如 hostname:

以下示例展示了如何使用 setter 覆盖属性:

@ClassRule
public static BrokerRunning brokerRunning = BrokerRunning.isRunningWithEmptyQueues("foo", "bar");

static {
brokerRunning.setHostName("10.0.0.1")
}

@AfterClass
public static void tearDown() {
brokerRunning.removeTestQueues("some.other.queue.too") // removes foo, bar as well
}
java

你也可以通过设置以下环境变量来覆盖属性:

public static final String BROKER_ADMIN_URI = "RABBITMQ_TEST_ADMIN_URI";
public static final String BROKER_HOSTNAME = "RABBITMQ_TEST_HOSTNAME";
public static final String BROKER_PORT = "RABBITMQ_TEST_PORT";
public static final String BROKER_USER = "RABBITMQ_TEST_USER";
public static final String BROKER_PW = "RABBITMQ_TEST_PASSWORD";
public static final String BROKER_ADMIN_USER = "RABBITMQ_TEST_ADMIN_USER";
public static final String BROKER_ADMIN_PW = "RABBITMQ_TEST_ADMIN_PASSWORD";
java

这些环境变量会覆盖默认设置(AMQP 的默认设置为 localhost:5672,管理 REST API 的默认设置为 [localhost:15672/api/](http://localhost:15672/api/))。

更改主机名会影响 amqpmanagement REST API 的连接(除非显式设置了管理 URI)。

BrokerRunning 还提供了一个名为 setEnvironmentVariableOverridesstatic 方法,允许你传入一个包含这些变量的映射表。这些变量将覆盖系统环境变量。如果你希望在多个测试套件中使用不同的配置进行测试,这可能会很有用。重要提示:该方法必须在调用任何创建规则实例的 isRunning() 静态方法之前调用。变量值将应用于此调用后创建的所有实例。调用 clearEnvironmentVariableOverrides() 以重置规则以使用默认值(包括任何实际的环境变量)。

在你的测试用例中,你可以在创建连接工厂时使用 brokerRunninggetConnectionFactory() 返回规则中的 RabbitMQ ConnectionFactory。以下示例展示了如何做到这一点:

@Bean
public CachingConnectionFactory rabbitConnectionFactory() {
return new CachingConnectionFactory(brokerRunning.getConnectionFactory());
}
java

使用 LongRunningIntegrationTest

LongRunningIntegrationTest 是一个禁用长时间运行测试的规则。你可能希望在开发者系统上使用此规则,但要确保在例如夜间 CI 构建中禁用该规则。

以下示例展示了其用法:

@Rule
public LongRunningIntegrationTest longTests = new LongRunningIntegrationTest();
java

要在运行时禁用该规则,请将名为 RUN_LONG_INTEGRATION_TESTS 的环境变量设置为 true

JUnit5 条件

版本 2.0.2 引入了对 JUnit5 的支持。

使用 @RabbitAvailable 注解

这个类级别的注解与 JUnit4 @Rules 中讨论的 BrokerRunning @Rule 类似。它由 RabbitAvailableCondition 处理。

注解具有三个属性:

  • queues: 一个队列数组,这些队列在每个测试之前被声明(并清空),并在所有测试完成后被删除。

  • management: 如果你的测试还需要在 broker 上安装管理插件,请将此设置为 true

  • purgeAfterEach: (自版本 2.2 起)当为 true(默认值)时,queues 将在测试之间被清空。

它用于检查 broker 是否可用,如果不可用则跳过测试。正如在配置规则中所讨论的,如果环境变量 RABBITMQ_SERVER_REQUIRED 设置为 true,则在没有 broker 的情况下会导致测试快速失败。你可以通过使用环境变量来配置此条件,如配置规则中所述。

此外,RabbitAvailableCondition 支持参数化测试构造函数和方法的参数解析。支持两种参数类型:

  • BrokerRunningSupport: 实例(在 2.2 版本之前,这是一个 JUnit 4 的 BrokerRunning 实例)

  • ConnectionFactory: BrokerRunningSupport 实例的 RabbitMQ 连接工厂

以下示例展示了两种情况:

@RabbitAvailable(queues = "rabbitAvailableTests.queue")
public class RabbitAvailableCTORInjectionTests {

private final ConnectionFactory connectionFactory;

public RabbitAvailableCTORInjectionTests(BrokerRunningSupport brokerRunning) {
this.connectionFactory = brokerRunning.getConnectionFactory();
}

@Test
public void test(ConnectionFactory cf) throws Exception {
assertSame(cf, this.connectionFactory);
Connection conn = this.connectionFactory.newConnection();
Channel channel = conn.createChannel();
DeclareOk declareOk = channel.queueDeclarePassive("rabbitAvailableTests.queue");
assertEquals(0, declareOk.getConsumerCount());
channel.close();
conn.close();
}

}
java

前面的测试位于框架本身,用于验证参数注入以及条件是否正确创建了队列。

一个实际的用户测试可能如下所示:

@RabbitAvailable(queues = "rabbitAvailableTests.queue")
public class RabbitAvailableCTORInjectionTests {

private final CachingConnectionFactory connectionFactory;

public RabbitAvailableCTORInjectionTests(BrokerRunningSupport brokerRunning) {
this.connectionFactory =
new CachingConnectionFactory(brokerRunning.getConnectionFactory());
}

@Test
public void test() throws Exception {
RabbitTemplate template = new RabbitTemplate(this.connectionFactory);
...
}
}
java

当你在测试类中使用 Spring 注解应用程序上下文时,你可以通过一个名为 RabbitAvailableCondition.getBrokerRunning() 的静态方法获取到条件连接工厂的引用。

important

从 2.2 版本开始,getBrokerRunning() 返回一个 BrokerRunningSupport 对象;在此之前,返回的是 JUnit 4 的 BrokerRunnning 实例。新类的 API 与 BrokerRunning 相同。

以下测试来自框架并展示了其用法:

@RabbitAvailable(queues = {
RabbitTemplateMPPIntegrationTests.QUEUE,
RabbitTemplateMPPIntegrationTests.REPLIES })
@SpringJUnitConfig
@DirtiesContext(classMode = ClassMode.AFTER_EACH_TEST_METHOD)
public class RabbitTemplateMPPIntegrationTests {

public static final String QUEUE = "mpp.tests";

public static final String REPLIES = "mpp.tests.replies";

@Autowired
private RabbitTemplate template;

@Autowired
private Config config;

@Test
public void test() {

...

}

@Configuration
@EnableRabbit
public static class Config {

@Bean
public CachingConnectionFactory cf() {
return new CachingConnectionFactory(RabbitAvailableCondition
.getBrokerRunning()
.getConnectionFactory());
}

@Bean
public RabbitTemplate template() {

...

}

@Bean
public SimpleRabbitListenerContainerFactory
rabbitListenerContainerFactory() {

...

}

@RabbitListener(queues = QUEUE)
public byte[] foo(byte[] in) {
return in;
}

}

}
java

使用 @LongRunning 注解

类似于 LongRunningIntegrationTest 这个 JUnit4 的 @Rule,此注解会导致测试被跳过,除非环境变量(或系统属性)被设置为 true。以下示例展示了如何使用它:

@RabbitAvailable(queues = SimpleMessageListenerContainerLongTests.QUEUE)
@LongRunning
public class SimpleMessageListenerContainerLongTests {

public static final String QUEUE = "SimpleMessageListenerContainerLongTests.queue";

...

}
java

默认情况下,变量为 RUN_LONG_INTEGRATION_TESTS,但你可以在注解的 value 属性中指定变量名称。