消息转换器
AmqpTemplate
还定义了几种用于发送和接收消息的方法,这些方法委托给 MessageConverter
。MessageConverter
为每个方向提供了一个单一的方法:一个用于将对象转换为 Message
,另一个用于将 Message
转换回对象。请注意,在将对象转换为 Message
时,除了对象本身,你还可以提供属性。object
参数通常对应于消息体。以下清单展示了 MessageConverter
接口的定义:
public interface MessageConverter {
Message toMessage(Object object, MessageProperties messageProperties)
throws MessageConversionException;
Object fromMessage(Message message) throws MessageConversionException;
}
AmqpTemplate
上相关的 Message
发送方法比我们之前讨论的方法更简单,因为它们不需要 Message
实例。相反,MessageConverter
负责通过将提供的对象转换为 Message
主体的字节数组,然后添加任何提供的 MessageProperties
来“创建”每个 Message
。以下列表显示了各种方法的定义:
void convertAndSend(Object message) throws AmqpException;
void convertAndSend(String routingKey, Object message) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message)
throws AmqpException;
void convertAndSend(Object message, MessagePostProcessor messagePostProcessor)
throws AmqpException;
void convertAndSend(String routingKey, Object message,
MessagePostProcessor messagePostProcessor) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message,
MessagePostProcessor messagePostProcessor) throws AmqpException;
在接收端,只有两种方法:一种是接受队列名称的方法,另一种是依赖于模板中已设置的“queue”属性的方法。以下代码清单展示了这两种方法的定义:
Object receiveAndConvert() throws AmqpException;
Object receiveAndConvert(String queueName) throws AmqpException;
在异步消费者中提到的 MessageListenerAdapter
也使用了 MessageConverter
。
SimpleMessageConverter
MessageConverter
策略的默认实现称为 SimpleMessageConverter
。如果你没有显式配置其他转换器,RabbitTemplate
实例将使用此转换器。它处理基于文本的内容、序列化的 Java 对象以及字节数组。
从 Message
转换
如果输入 Message
的内容类型以 "text" 开头(例如 "text/plain"),它还会检查 content-encoding 属性,以确定在将 Message
主体字节数组转换为 Java String
时应使用的字符集。如果在输入 Message
上没有设置 content-encoding 属性,则默认使用 UTF-8 字符集。如果你需要覆盖该默认设置,可以配置一个 SimpleMessageConverter
实例,设置其 defaultCharset
属性,并将其注入到 RabbitTemplate
实例中。
如果输入 Message
的 content-type 属性值设置为 "application/x-java-serialized-object",SimpleMessageConverter
会尝试将字节数组反序列化(重新水合)为 Java 对象。虽然这对于简单的原型设计可能有用,但我们不建议依赖 Java 序列化,因为它会导致生产者和消费者之间的紧密耦合。当然,这也排除了在任一端使用非 Java 系统的可能性。由于 AMQP 是一种线路级协议,如果因为这样的限制而失去其大部分优势,那将是非常遗憾的。在接下来的两节中,我们将探讨一些在不依赖 Java 序列化的情况下传递富域对象内容的替代方案。
对于所有其他内容类型,SimpleMessageConverter
直接将 Message
的主体内容作为字节数组返回。
请参阅 Java 反序列化 以获取重要信息。
转换为 Message
当从任意 Java 对象转换为 Message
时,SimpleMessageConverter
同样处理字节数组、字符串和可序列化的实例。它将每种类型转换为字节(对于字节数组,无需转换),并相应地设置内容类型属性。如果要转换的 Object
不匹配这些类型之一,则 Message
的主体为 null。
SerializerMessageConverter
该转换器与 SimpleMessageConverter
类似,不同之处在于它可以配置其他 Spring Framework 的 Serializer
和 Deserializer
实现,用于 application/x-java-serialized-object
的转换。
请参阅 Java 反序列化 以获取重要信息。
Jackson2JsonMessageConverter
本节介绍如何使用 Jackson2JsonMessageConverter
在 Message
之间进行转换。它包含以下部分:
转换为 Message
如前一节所述,通常不建议依赖 Java 序列化。一种更灵活且在不同语言和平台之间更具可移植性的常见替代方案是 JSON(JavaScript 对象表示法)。可以在任何 RabbitTemplate
实例上配置转换器,以覆盖其对默认的 SimpleMessageConverter
的使用。Jackson2JsonMessageConverter
使用了 com.fasterxml.jackson
2.x 库。以下示例配置了一个 Jackson2JsonMessageConverter
:
<bean class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<property name="connectionFactory" ref="rabbitConnectionFactory"/>
<property name="messageConverter">
<bean class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter">
<!-- if necessary, override the DefaultClassMapper -->
<property name="classMapper" ref="customClassMapper"/>
</bean>
</property>
</bean>
如上所示,Jackson2JsonMessageConverter
默认使用 DefaultClassMapper
。类型信息会被添加到 MessageProperties
中(并从其中检索)。如果入站消息在 MessageProperties
中不包含类型信息,但你知道预期的类型,你可以通过使用 defaultType
属性来配置一个静态类型,如下例所示:
<bean id="jsonConverterWithDefaultType"
class="o.s.amqp.support.converter.Jackson2JsonMessageConverter">
<property name="classMapper">
<bean class="org.springframework.amqp.support.converter.DefaultClassMapper">
<property name="defaultType" value="thing1.PurchaseOrder"/>
</bean>
</property>
</bean>
此外,你可以提供从 _TypeId_
头部中的值到自定义映射。以下示例展示了如何实现这一点:
@Bean
public Jackson2JsonMessageConverter jsonMessageConverter() {
Jackson2JsonMessageConverter jsonConverter = new Jackson2JsonMessageConverter();
jsonConverter.setClassMapper(classMapper());
return jsonConverter;
}
@Bean
public DefaultClassMapper classMapper() {
DefaultClassMapper classMapper = new DefaultClassMapper();
Map<String, Class<?>> idClassMapping = new HashMap<>();
idClassMapping.put("thing1", Thing1.class);
idClassMapping.put("thing2", Thing2.class);
classMapper.setIdClassMapping(idClassMapping);
return classMapper;
}
现在,如果发送系统将标头设置为 thing1
,转换器就会创建一个 Thing1
对象,依此类推。关于如何转换来自非 Spring 应用程序的消息的完整讨论,请参阅 从非 Spring 应用程序接收 JSON 示例应用程序。
从 2.4.3 版本开始,如果 supportedMediaType
包含 charset
参数,转换器将不会添加 contentEncoding
消息属性;这也用于编码。新增了一个方法 setSupportedMediaType
:
String utf16 = "application/json; charset=utf-16";
converter.setSupportedContentType(MimeTypeUtils.parseMimeType(utf16));
从 Message
转换
入站消息根据发送系统添加到头部的类型信息被转换为对象。
从 2.4.3 版本开始,如果消息属性中没有 contentEncoding
,转换器将尝试检测 contentType
消息属性中的 charset
参数并使用它。如果两者都不存在,且 supportedMediaType
具有 charset
参数,则将使用该参数进行解码,最终回退到 defaultCharset
属性。新增了一个方法 setSupportedMediaType
:
String utf16 = "application/json; charset=utf-16";
converter.setSupportedContentType(MimeTypeUtils.parseMimeType(utf16));
在 1.6 版本之前,如果类型信息不存在,转换将会失败。从 1.6 版本开始,如果缺少类型信息,转换器将使用 Jackson 的默认设置(通常是一个 map)来转换 JSON。
从 1.6 版本开始,当你使用 @RabbitListener
注解(在方法上)时,推断的类型信息会被添加到 MessageProperties
中。这使得转换器能够将消息转换为目标方法的参数类型。这仅适用于没有注解的单个参数或带有 @Payload
注解的单个参数。在分析过程中,Message
类型的参数会被忽略。
默认情况下,推断的类型信息将覆盖发送系统创建的入站 _TypeId_
及相关头部信息。这使得接收系统能够自动转换为不同的领域对象。这仅适用于参数类型是具体类型(非抽象类或接口)或来自 java.util
包的情况。在所有其他情况下,将使用 _TypeId_
及相关头部信息。在某些情况下,您可能希望覆盖默认行为并始终使用 _TypeId_
信息。例如,假设您有一个 @RabbitListener
,它接受 Thing1
参数,但消息包含 Thing2
,而 Thing2
是 Thing1
的子类(Thing1
是具体类)。推断的类型将是不正确的。为了处理这种情况,请将 Jackson2JsonMessageConverter
的 TypePrecedence
属性设置为 TYPE_ID
,而不是默认的 INFERRED
。(该属性实际上位于转换器的 DefaultJackson2JavaTypeMapper
上,但为了方便起见,转换器上提供了一个设置器。)如果您注入了自定义类型映射器,则应在映射器上设置该属性。
在从 Message
转换时,传入的 MessageProperties.getContentType()
必须是符合 JSON 规范的(使用 contentType.contains("json")
进行检查)。从版本 2.2 开始,如果没有 contentType
属性,或者其值为默认的 application/octet-stream
,则假定为 application/json
。要恢复到之前的行为(返回未转换的 byte[]
),请将转换器的 assumeSupportedContentType
属性设置为 false
。如果内容类型不受支持,则会发出 WARN
日志消息 Could not convert incoming message with content-type […]
,并且 message.getBody()
会按原样返回 — 即作为 byte[]
。因此,为了满足消费者端的 Jackson2JsonMessageConverter
要求,生产者必须添加 contentType
消息属性 — 例如,设置为 application/json
或 text/x-json
,或者使用 Jackson2JsonMessageConverter
,它会自动设置该头部。以下清单展示了多个转换器调用:
@RabbitListener
public void thing1(Thing1 thing1) {...}
@RabbitListener
public void thing1(@Payload Thing1 thing1, @Header("amqp_consumerQueue") String queue) {...}
@RabbitListener
public void thing1(Thing1 thing1, o.s.amqp.core.Message message) {...}
@RabbitListener
public void thing1(Thing1 thing1, o.s.messaging.Message<Foo> message) {...}
@RabbitListener
public void thing1(Thing1 thing1, String bar) {...}
@RabbitListener
public void thing1(Thing1 thing1, o.s.messaging.Message<?> message) {...}
在前面的列表中的前四种情况下,转换器尝试转换为 Thing1
类型。第五个示例是无效的,因为我们无法确定哪个参数应该接收消息负载。在第六个示例中,由于泛型类型是 WildcardType
,因此应用了 Jackson 的默认设置。
然而,您可以创建一个自定义转换器,并使用 targetMethod
消息属性来决定将 JSON 转换为哪种类型。
这种类型推断只有在 @RabbitListener
注解声明在方法级别时才能实现。对于类级别的 @RabbitListener
,转换后的类型用于选择调用哪个 @RabbitHandler
方法。因此,基础设施提供了 targetObject
消息属性,您可以在自定义转换器中使用它来确定类型。
从版本 1.6.11 开始,Jackson2JsonMessageConverter
以及因此的 DefaultJackson2JavaTypeMapper
(DefaultClassMapper
)提供了 trustedPackages
选项,以克服 Serialization Gadgets 漏洞。默认情况下,为了向后兼容,Jackson2JsonMessageConverter
信任所有包 —— 也就是说,它对该选项使用 *
。
从 2.4.7 版本开始,可以配置转换器在 Jackson 反序列化消息体后返回 null
时返回 Optional.empty()
。这为 @RabbitListener
提供了两种方式接收空负载:
@RabbitListener(queues = "op.1")
void listen(@Payload(required = false) Thing payload) {
handleOptional(payload); // payload might be null
}
@RabbitListener(queues = "op.2")
void listen(Optional<Thing> optional) {
handleOptional(optional.orElse(this.emptyThing));
}
要启用此功能,请将 setNullAsOptionalEmpty
设置为 true
;当设置为 false
(默认值)时,转换器将回退到原始消息体(byte[]
)。
@Bean
Jackson2JsonMessageConverter converter() {
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
converter.setNullAsOptionalEmpty(true);
return converter;
}
反序列化抽象类
在 2.2.8 版本之前,如果 @RabbitListener
推断的类型是一个抽象类(包括接口),转换器会回退到在消息头中查找类型信息,如果存在,则使用该信息;如果不存在,它会尝试创建该抽象类。当使用配置了自定义反序列化器来处理抽象类的自定义 ObjectMapper
时,如果传入的消息具有无效的类型头信息,这会导致问题。
从版本 2.2.8 开始,默认情况下保留了之前的行为。如果你有这样一个自定义的 ObjectMapper
,并且希望忽略类型头信息,始终使用推断的类型进行转换,请将 alwaysConvertToInferredType
设置为 true
。这是为了向后兼容,并避免在转换失败时(使用标准的 ObjectMapper
)尝试转换的开销。
使用 Spring Data 投影接口
从 2.2 版本开始,你可以将 JSON 转换为 Spring Data Projection 接口,而不是具体类型。这允许非常选择性和低耦合的数据绑定,包括从 JSON 文档中的多个位置查找值。例如,以下接口可以定义为消息负载类型:
interface SomeSample {
@JsonPath({ "$.username", "$.user.name" })
String getUsername();
}
@RabbitListener(queues = "projection")
public void projection(SomeSample in) {
String username = in.getUsername();
...
}
默认情况下,访问器方法将用于在接收到的 JSON 文档中查找属性名称作为字段。@JsonPath
表达式允许自定义值的查找,甚至可以定义多个 JSON 路径表达式,以便从多个位置查找值,直到某个表达式返回实际值。
要启用此功能,请在消息转换器上将 useProjectionForInterfaces
设置为 true
。你还需要将 spring-data:spring-data-commons
和 com.jayway.jsonpath:json-path
添加到类路径中。
当用作 @RabbitListener
方法的参数时,接口类型会正常地自动传递给转换器。
使用 RabbitTemplate
从 Message
转换
如前所述,类型信息在消息头中传递,以帮助转换器在从消息转换时使用。在大多数情况下,这都能很好地工作。然而,当使用泛型类型时,它只能转换简单的对象和已知的“容器”对象(列表、数组和映射)。从 2.0 版本开始,Jackson2JsonMessageConverter
实现了 SmartMessageConverter
,这使得它可以与新的 RabbitTemplate
方法一起使用,这些方法接受 ParameterizedTypeReference
参数。这允许转换复杂的泛型类型,如下例所示:
Thing1<Thing2<Cat, Hat>> thing1 =
rabbitTemplate.receiveAndConvert(new ParameterizedTypeReference<Thing1<Thing2<Cat, Hat>>>() { });
从版本 2.1 开始,AbstractJsonMessageConverter
类已被移除。它不再是 Jackson2JsonMessageConverter
的基类。它已被 AbstractJackson2MessageConverter
取代。
MarshallingMessageConverter
另一个选项是 MarshallingMessageConverter
。它委托给 Spring OXM 库中 Marshaller
和 Unmarshaller
策略接口的实现。你可以在这里阅读更多关于该库的信息 here。在配置方面,通常只提供构造函数参数,因为大多数 Marshaller
的实现也实现了 Unmarshaller
。以下示例展示了如何配置 MarshallingMessageConverter
:
<bean class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<property name="connectionFactory" ref="rabbitConnectionFactory"/>
<property name="messageConverter">
<bean class="org.springframework.amqp.support.converter.MarshallingMessageConverter">
<constructor-arg ref="someImplemenationOfMarshallerAndUnmarshaller"/>
</bean>
</property>
</bean>
Jackson2XmlMessageConverter
该类在版本 2.1 中被引入,可用于将消息从 XML 转换或转换为 XML。
Jackson2XmlMessageConverter
和 Jackson2JsonMessageConverter
都继承自同一个基类:AbstractJackson2MessageConverter
。
AbstractJackson2MessageConverter
类被引入以替代一个已被移除的类:AbstractJsonMessageConverter
。
Jackson2XmlMessageConverter
使用了 com.fasterxml.jackson
2.x 库。
你可以像使用 Jackson2JsonMessageConverter
一样使用它,只不过它支持的是 XML 而不是 JSON。以下示例配置了一个 Jackson2JsonMessageConverter
:
<bean id="xmlConverterWithDefaultType"
class="org.springframework.amqp.support.converter.Jackson2XmlMessageConverter">
<property name="classMapper">
<bean class="org.springframework.amqp.support.converter.DefaultClassMapper">
<property name="defaultType" value="foo.PurchaseOrder"/>
</bean>
</property>
</bean>
更多信息请参见 Jackson2JsonMessageConverter。
从 2.2 版本开始,如果没有 contentType
属性,或者其值为默认的 application/octet-stream
,则会假定为 application/xml
。要恢复之前的行为(返回未转换的 byte[]
),请将转换器的 assumeSupportedContentType
属性设置为 false
。
ContentTypeDelegatingMessageConverter
该类在 1.4.2 版本中引入,允许根据 MessageProperties
中的 contentType
属性委托给特定的 MessageConverter
。默认情况下,如果没有 contentType
属性或存在与任何已配置的转换器都不匹配的值,它将委托给 SimpleMessageConverter
。以下示例配置了一个 ContentTypeDelegatingMessageConverter
:
<bean id="contentTypeConverter" class="ContentTypeDelegatingMessageConverter">
<property name="delegates">
<map>
<entry key="application/json" value-ref="jsonMessageConverter" />
<entry key="application/xml" value-ref="xmlMessageConverter" />
</map>
</property>
</bean>
Java 反序列化
本节介绍如何反序列化 Java 对象。
当从不受信任的来源反序列化 Java 对象时,可能存在漏洞。
如果你接受来自不受信任来源的消息,并且这些消息的 content-type
为 application/x-java-serialized-object
,你应该考虑配置允许反序列化的包和类。这适用于 SimpleMessageConverter
和 SerializerMessageConverter
,当它们通过隐式或配置方式使用 DefaultDeserializer
时。
默认情况下,允许列表为空,这意味着不会反序列化任何类。
你可以设置一个模式列表,例如 thing1.**
、thing1.thing2.Cat
或 **.MySafeClass
。
这些模式会按顺序检查,直到找到匹配项。如果没有匹配项,则会抛出 SecurityException
。
你可以使用这些转换器上的 allowedListPatterns
属性来设置模式。或者,如果你信任所有消息的发起者,可以将环境变量 SPRING_AMQP_DESERIALIZATION_TRUST_ALL
或系统属性 spring.amqp.deserialization.trust.all
设置为 true
。
消息属性转换器
MessagePropertiesConverter
策略接口用于在 Rabbit Client 的 BasicProperties
和 Spring AMQP 的 MessageProperties
之间进行转换。默认实现(DefaultMessagePropertiesConverter
)通常足以满足大多数需求,但如果有需要,你也可以实现自己的转换器。默认的属性转换器在 LongString
类型的 BasicProperties
元素大小不超过 1024
字节时,会将其转换为 String
实例。较大的 LongString
实例则不会被转换(参见下一段)。这个限制可以通过构造函数参数进行覆盖。
从 1.6 版本开始,DefaultMessagePropertiesConverter
默认会将超过长字符串限制(默认:1024)的头部保留为 LongString
实例。你可以通过 getBytes[]
、toString()
或 getStream()
方法访问其内容。
之前,DefaultMessagePropertiesConverter
将这类头信息“转换”为 DataInputStream
(实际上它只是引用了 LongString
实例的 DataInputStream
)。在输出时,这个头信息并没有被转换(除了转换为字符串——例如,通过对流调用 toString()
方法得到 java.io.DataInputStream@1d057a39
)。
现在,大的传入 LongString
头在输出时也能正确“转换”了(默认情况下)。
提供了一个新的构造函数,让你可以配置转换器以像之前一样工作。以下清单展示了该方法的 Javadoc 注释和声明:
/**
* Construct an instance where LongStrings will be returned
* unconverted or as a java.io.DataInputStream when longer than this limit.
* Use this constructor with 'true' to restore pre-1.6 behavior.
* @param longStringLimit the limit.
* @param convertLongLongStrings LongString when false,
* DataInputStream when true.
* @since 1.6
*/
public DefaultMessagePropertiesConverter(int longStringLimit, boolean convertLongLongStrings) { ... }
从 1.6 版本开始,MessageProperties
新增了一个名为 correlationIdString
的属性。之前,在与 RabbitMQ 客户端使用的 BasicProperties
进行相互转换时,由于 MessageProperties.correlationId
是 byte[]
类型,而 BasicProperties
使用的是 String
类型,因此会执行不必要的 byte[] <→ String
转换。(最终,RabbitMQ 客户端会使用 UTF-8 将 String
转换为字节并放入协议消息中)。
为了提供最大的向后兼容性,DefaultMessagePropertiesConverter
新增了一个名为 correlationIdPolicy
的属性。该属性接受一个 DefaultMessagePropertiesConverter.CorrelationIdPolicy
枚举参数。默认情况下,它被设置为 BYTES
,以复制之前的行为。
对于入站消息:
-
STRING
:仅映射correlationIdString
属性 -
BYTES
:仅映射correlationId
属性 -
BOTH
:映射两个属性
对于出站消息:
-
STRING
:仅映射correlationIdString
属性 -
BYTES
:仅映射correlationId
属性 -
BOTH
:同时考虑这两个属性,String
属性优先
从 1.6 版本开始,入站的 deliveryMode
属性不再映射到 MessageProperties.deliveryMode
,而是映射到 MessageProperties.receivedDeliveryMode
。同样,入站的 userId
属性也不再映射到 MessageProperties.userId
,而是映射到 MessageProperties.receivedUserId
。这些更改是为了避免在将同一个 MessageProperties
对象用于出站消息时,这些属性意外传播。
从 2.2 版本开始,DefaultMessagePropertiesConverter
会将任何自定义 header 中类型为 Class<?>
的值使用 getName()
而不是 toString()
进行转换;这样可以避免消费应用程序需要从 toString()
的表示中解析类名。为了实现滚动升级,您可能需要更改消费者以理解这两种格式,直到所有生产者都升级完毕。