AMQP 1.0 支持
从 7.0 版本开始,Spring Integration 提供了用于 RabbitMQ AMQP 1.0 支持的通道适配器。这些通道适配器基于 org.springframework.amqp:spring-rabbitmq-client 库。
Spring AMQP 文档提供了关于 RabbitMQ AMQP 1.0 支持 的更多详细信息。
AMQP 1.0 出站通道适配器
AmqpClientMessageHandler 是一个 AbstractReplyProducingMessageHandler 实现,根据 setRequiresReply() 配置,它可以充当单向通道适配器或出站网关。此通道适配器的实例需要一个用于 AMQP 1.0 协议的 AsyncAmqpTemplate 实现,例如来自上述 spring-rabbitmq-client 库的 RabbitAmqpTemplate。此消息处理器默认是异步的;因此,发布错误应通过请求消息中的 errorChannel 头或应用程序上下文中的全局默认 errorChannel 来处理。
exchange(与可选的 routingKey 一起)用于发布消息,它与用于发布的 queue 是互斥的。如果两者都未提供,则 AsyncAmqpTemplate 实现必须确保这些目标部分有默认值;否则,消息将被拒绝,视为未投递。
默认情况下,MessageConverter 是一个 org.springframework.amqp.support.converter.SimpleMessageConverter,它处理字符串、可序列化实例和字节数组。同时,默认的 AmqpHeaderMapper 是一个 DefaultAmqpHeaderMapper.outboundMapper()。此头部映射器也用于将 AMQP 消息属性映射回回复中的头部。
在网关模式下,可以配置 replyPayloadType 来转换回复消息体。但此时 MessageConverter 必须是 SmartMessageConverter 的实现类,例如 JacksonJsonMessageConverter。此外,与 replyPayloadType 互斥的另一种方式是设置 returnMessage 标志为 true,这将返回完整的 org.springframework.amqp.core.Message 实例作为回复消息负载。
以下示例演示了如何将 AmqpClientMessageHandler 配置为简单的 @ServiceActivator:
- Java DSL
- Kotlin DSL
- Groovy DSL
- Java
@Bean
IntegrationFlow sendFlow(RabbitAmqpTemplate rabbitTemplate) {
return f -> f
.handle(AmqpClient.outboundAdapter(rabbitTemplate)
.exchange("e1")
.routingKeyExpression("'k1'"));
}
@Bean
fun sendFlow(rabbitTemplate: RabbitAmqpTemplate) =
integrationFlow {
handle(AmqpClient.outboundAdapter(rabbitTemplate)
.apply {
exchange("e1")
routingKeyExpression("'k1'")
}
)
}
@Bean
sendFlow() {
integrationFlow {
handle(AmqpClient.outboundAdapter(rabbitTemplate)
.with {
exchange 'e1'
routingKeyExpression '''k1'''
}
)
}
}
@Bean
@ServiceActivator(inputChannel = "amqpClientSendChannel")
AmqpClientMessageHandler amqpClientMessageHandler(RabbitAmqpTemplate rabbitTemplate) {
AmqpClientMessageHandler messageHandler = new AmqpClientMessageHandler(rabbitTemplate);
messageHandler.setExchangeExpressionString("headers[exchange]");
messageHandler.setRoutingKeyExpressionString("headers[routingKey]");
return messageHandler;
}
AmqpClientMessageHandler 的网关变体可能如下:
- Java DSL
- Kotlin DSL
- Groovy DSL
- Java
@Bean
IntegrationFlow requestReplyOutboundFlow(RabbitAmqpTemplate rabbitTemplate) {
return f -> f
.handle(AmqpClient.outboundGateway(rabbitTemplate)
.queueFunction(m -> "requestReply"));
}
@Bean
fun sendFlow(rabbitTemplate: RabbitAmqpTemplate) =
integrationFlow {
handle(AmqpClient.outboundGateway(rabbitTemplate)
.queueFunction { "requestReply" }
)
}
@Bean
sendFlow() {
integrationFlow {
handle(AmqpClient.outboundGateway(rabbitTemplate)
.with {
queueFunction { 'requestReply' }
}
)
}
}
@Bean
@ServiceActivator(inputChannel = "amqpClientSendAndReceiveChannel")
AmqpClientMessageHandler amqpClientGateway(RabbitAmqpTemplate rabbitTemplate) {
AmqpClientMessageHandler messageHandler = new AmqpClientMessageHandler(rabbitTemplate);
messageHandler.setRequiresReply(true);
messageHandler.setReplyPayloadType(String.class);
messageHandler.setMessageConverter(new JacksonJsonMessageConverter());
messageHandler.setQueue("q1");
return messageHandler;
}
AMQP 1.0 消息驱动通道适配器
AmqpClientMessageProducer 是一个 MessageProducerSupport 实现,作为消息驱动通道适配器,用于通过 RabbitMQ AMQP 1.0 协议从队列消费消息。它需要一个 AmqpConnectionFactory 和至少一个要消费的队列。其内部逻辑基于 RabbitAmqpListenerContainer 和 IntegrationRabbitAmqpMessageListener,将消费的 AMQP 消息(转换后)中继到 outputChannel。RabbitAmqpListenerContainer 的一些配置选项通过 AmqpClientMessageProducer 的 setter 方法暴露。
默认情况下,MessageConverter 是一个 org.springframework.amqp.support.converter.SimpleMessageConverter,它处理字符串、可序列化实例和字节数组。同时,默认的 AmqpHeaderMapper 是 DefaultAmqpHeaderMapper.inboundMapper()。可以将 messageConverter 选项设置为 null 以完全跳过转换(包括头部映射),并将接收到的 AMQP 消息作为要生成的 Spring 消息的有效负载返回。
此外,AmqpClientMessageProducer 实现了 Pausable 契约,并委托给相应的 RabbitAmqpListenerContainer API。
当 AmqpClientMessageProducer.setBatchSize() > 1 时,此通道适配器将工作在批处理模式下。在这种情况下,接收到的消息会被收集,直到达到设定的批次大小,或者 batchReceiveTimeout 超时周期耗尽。所有批处理的 AMQP 消息随后会被转换为 Spring 消息,并将结果列表作为包装消息的有效载荷发送到 outputChannel。批处理模式由于能够一次性完成所有批处理消息的结算,从而带来一定的性能提升。
当 autoSettle 标志设置为 false 时,AcknowledgmentCallback 实例会作为 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 消息头提供,以便对接收到的消息或整个批次做出确认决策。
以下示例展示了如何将 AmqpClientMessageProducer 配置为简单入站端点:
- Java DSL
- Kotlin DSL
- Groovy DSL
- Java
@Bean
IntegrationFlow receiveFlow(AmqpConnectionFactory connectionFactory) {
return IntegrationFlow.from(AmqpClient.inboundChannelAdapter(connectionFactory, "q1"))
.channel(c -> c.queue("receiveChannel"))
.get();
}
@Bean
fun receiveFlow(connectionFactory: AmqpConnectionFactory) =
integrationFlow(AmqpClient.inboundChannelAdapter(connectionFactory, "q1")) {
channel("inputChannel")
}
@Bean
receiveFlow(AmqpConnectionFactory connectionFactory) {
integrationFlow(AmqpClient.inboundChannelAdapter(connectionFactory, 'q1')) {
channel 'inputChannel'
}
}
@Bean
AmqpClientMessageProducer batchAmqpClientMessageProducer(AmqpConnectionFactory connectionFactory,
QueueChannel inputChannel) {
AmqpClientMessageProducer amqpClientMessageProducer = new AmqpClientMessageProducer(connectionFactory, "q3");
amqpClientMessageProducer.setOutputChannel(inputChannel);
amqpClientMessageProducer.setBatchSize(2);
return amqpClientMessageProducer;
}
AMQP 1.0 入站网关
AmqpClientInboundGateway 是一个 MessagingGatewaySupport 实现,用于通过 RabbitMQ AMQP 1.0 协议接收请求并生成回复。它与上文提到的 AmqpClientMessageProducer 类似,并共享许多 RabbitAmqpListenerContainer 配置选项。此外,为了生成 AMQP 1.0 回复,AmqpClientInboundGateway 在内部使用了一个 RabbitAmqpTemplate。
为了实现自动回复与请求的关联,必须在请求消息中提供 replyTo 属性。例如,RabbitAmqpTemplate.sendAndReceive() 依赖于 RabbitMQ AMQP 1.0 库中的 RpcClient,该库会生成一个独占且自动删除的队列。或者,可以在 AmqpClientInboundGateway 上设置回复地址为 replyExchange(以及可选的 replyRoutingKey)或 replyQueue(但两者不能同时设置),这些设置会委托给 RabbitAmqpTemplate 的默认选项。请求消息的 messageId 或 correlationId 属性可用于与回复进行关联。如果缺失,RabbitAmqpTemplate.sendAndReceive() 中的 RpcClient 会自动生成一个。AmqpClientInboundGateway 能够将此类关联键映射回回复消息。
以下示例演示了如何将 AmqpClientInboundGateway 配置为简单的入站网关:
- Java DSL
- Kotlin DSL
- Groovy DSL
- Java
@Bean
IntegrationFlow amqpClientInboundGatewayFlow(AmqpConnectionFactory connectionFactory) {
return IntegrationFlow.from(AmqpClient.inboundGateway(connectionFactory, "requestReply"))
.channel(c -> c.queue("inputChannel"))
.get();
}
@Bean
fun receiveFlow(connectionFactory: AmqpConnectionFactory) =
integrationFlow(AmqpClient.inboundGateway(connectionFactory, "requestReply")) {
channel { queue("inputChannel") }
}
@Bean
receiveFlow(AmqpConnectionFactory connectionFactory) {
integrationFlow(AmqpClient.inboundGateway(connectionFactory, 'requestReply')) {
channel { queue 'inputChannel' }
}
}
@Bean
AmqpClientInboundGateway amqpClientInboundGateway(AmqpConnectionFactory connectionFactory) {
AmqpClientInboundGateway amqpClientInboundGateway = new AmqpClientInboundGateway(connectionFactory, "requestReply");
amqpClientInboundGateway.setRequestChannelName("inputChannel");
return amqpClientInboundGateway;
}