|
请使用 Spring AMQP 4.0.2(最新稳定版本)! |
消息转换器
类 AmqpTemplate 还定义了若干用于发送和接收消息的方法,这些方法会委托给一个 MessageConverter。MessageConverter 为每个方向提供了一个单一方法:一个用于将数据 转换为 Message,另一个用于将数据 从 Message 转换出来。请注意,当将数据转换为 Message 时,除了对象本身外,您还可以提供额外的属性。object 参数通常对应于消息体。以下列表展示了 MessageConverter 接口的定义:
public interface MessageConverter {
Message toMessage(Object object, MessageProperties messageProperties)
throws MessageConversionException;
Object fromMessage(Message message) throws MessageConversionException;
}
相关 Message 发送方法在 AmqpTemplate 上比我们之前讨论的方法更简单,因为它们不需要 Message 实例。相反,MessageConverter 负责通过将提供的对象转换为 Message 正文的字节数组,然后添加任何提供的 MessageProperties,来“创建”每个 Message。以下列表展示了各种方法的定义:
void convertAndSend(Object message) throws AmqpException;
void convertAndSend(String routingKey, Object message) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message)
throws AmqpException;
void convertAndSend(Object message, MessagePostProcessor messagePostProcessor)
throws AmqpException;
void convertAndSend(String routingKey, Object message,
MessagePostProcessor messagePostProcessor) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message,
MessagePostProcessor messagePostProcessor) throws AmqpException;
在接收端,仅有两种方法:一种是接受队列名称的方法,另一种则依赖于模板的“queue”属性已被设置。</p><p>以下列表展示了这两种方法的定义:
Object receiveAndConvert() throws AmqpException;
Object receiveAndConvert(String queueName) throws AmqpException;
在异步消费者中提到的MessageListenerAdapter也使用了MessageConverter。 |
SimpleMessageConverter
默认的 MessageConverter 策略实现称为 SimpleMessageConverter。这是当您未显式配置其他替代方案时,RabbitTemplate 实例所使用的转换器。它可处理文本内容、序列化的 Java 对象以及字节数组。
从...Message
如果输入 Message 的内容类型以 "text" 开头(例如,text/plain),它还会检查 content-encoding 属性,以确定在将 Message 的主体字节数组转换为 Java String 时所使用的字符集。如果输入 Message 上未设置 content-encoding 属性,则默认使用 UTF-8 字符集。如果您需要覆盖此默认设置,可以配置一个 SimpleMessageConverter 实例,设置其 defaultCharset 属性,并将其注入到 RabbitTemplate 实例中。
如果输入 Message 的 content-type 属性值设置为 "application/x-java-serialized-object",则 SimpleMessageConverter 尝试将字节数组反序列化(重新构建)为 Java 对象。虽然这可能对简单的原型设计有所帮助,但我们不建议依赖 Java 序列化,因为它会导致生产者与消费者之间产生紧密耦合。当然,这也排除了在任一端使用非 Java 系统的可能性。由于 AMQP 是一种传输层协议,若因此类限制而丧失其大部分优势,将十分可惜。在接下来的两个部分中,我们将探讨一些替代方案,以便在不依赖 Java 序列化的情况下传递丰富的领域对象内容。
对于所有其他内容类型,SimpleMessageConverter 直接将 Message 的主体内容作为字节数组返回。
请参阅 Java 反序列化 以获取重要信息。
SerializerMessageConverter
此转换器与 SimpleMessageConverter 类似,但可配置其他 Spring FrameworkSerializer 和 Deserializer 实现以用于 application/x-java-serialized-object 转换。
请参阅 Java 反序列化 以获取重要信息。
Jackson2JsonMessageConverter
本节介绍如何使用 Jackson2JsonMessageConverter 在 Message 之间进行转换。它包含以下部分:
转换为Message
如前一节所述,通常不建议依赖 Java 序列化。一种更为常见且更灵活、跨不同语言和平台可移植的替代方案是 JSON(JavaScript 对象表示法)。可以在任何 RabbitTemplate 实例上配置转换器,以覆盖其对 SimpleMessageConverter 默认实现的使用。Jackson2JsonMessageConverter 使用 com.fasterxml.jackson 2.x 库。以下示例配置了一个 Jackson2JsonMessageConverter:
<bean class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<property name="connectionFactory" ref="rabbitConnectionFactory"/>
<property name="messageConverter">
<bean class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter">
<!-- if necessary, override the DefaultClassMapper -->
<property name="classMapper" ref="customClassMapper"/>
</bean>
</property>
</bean>
如上所示,Jackson2JsonMessageConverter 默认使用 DefaultClassMapper。类型信息被添加到(并从)MessageProperties 中(获取和存储)。如果传入的消息在 MessageProperties 中不包含类型信息,但您已知预期类型,您可以使用 defaultType 属性配置一个静态类型,如下例所示:
<bean id="jsonConverterWithDefaultType"
class="o.s.amqp.support.converter.Jackson2JsonMessageConverter">
<property name="classMapper">
<bean class="org.springframework.amqp.support.converter.DefaultClassMapper">
<property name="defaultType" value="thing1.PurchaseOrder"/>
</bean>
</property>
</bean>
此外,您还可以提供自定义映射,将 TypeId 头部中的值映射到其他值。以下示例展示了如何实现此操作:
@Bean
public Jackson2JsonMessageConverter jsonMessageConverter() {
Jackson2JsonMessageConverter jsonConverter = new Jackson2JsonMessageConverter();
jsonConverter.setClassMapper(classMapper());
return jsonConverter;
}
@Bean
public DefaultClassMapper classMapper() {
DefaultClassMapper classMapper = new DefaultClassMapper();
Map<String, Class<?>> idClassMapping = new HashMap<>();
idClassMapping.put("thing1", Thing1.class);
idClassMapping.put("thing2", Thing2.class);
classMapper.setIdClassMapping(idClassMapping);
return classMapper;
}
现在,如果发送系统将标题设置为 thing1,转换器就会创建一个 Thing1 对象,依此类推。
有关从非 Spring 应用程序转换消息的完整讨论,请参阅 非 Spring 应用程序接收 JSON 示例应用程序。
从版本 2.4.3 开始,转换器在 supportedMediaType 包含 charset 参数时,将不再添加 contentEncoding 消息属性;此行为也用于编码。
String utf16 = "application/json; charset=utf-16";
converter.setSupportedContentType(MimeTypeUtils.parseMimeType(utf16));
从...Message
传入的消息会根据发送系统添加到报头中的类型信息转换为对象。
从版本 2.4.3 开始,如果没有 contentEncoding 消息属性,转换器将尝试检测 charset 参数是否存在于 contentType 消息属性中,并使用该参数。如果两者均不存在,且 supportedMediaType 具有 charset 参数,则将使用该参数进行解码,最终回退到 defaultCharset 属性。新增了一个方法 setSupportedMediaType:
String utf16 = "application/json; charset=utf-16";
converter.setSupportedContentType(MimeTypeUtils.parseMimeType(utf16));
在 1.6 版本之前的版本中,如果缺少类型信息,则转换将失败。</p><p>从 1.6 版本开始,如果缺少类型信息,转换器将使用 Jackson 默认值(通常是 Map)来转换 JSON。
此外,从版本 1.6 开始,当您在方法上使用 @RabbitListener 注解时,推断的类型信息会被添加到 MessageProperties 中。这使得转换器能够根据目标方法的参数类型进行转换。此功能仅在存在一个无注解的参数,或仅有一个带有 @Payload 注解的参数时才适用。在分析过程中,Message 类型的参数将被忽略。
默认情况下,推断的类型信息将覆盖由发送系统创建的传入 TypeId 及相关头信息。这使得接收系统能够自动转换为不同的领域对象。仅当参数类型为具体类(而非抽象类或接口)或来自 java.util 包时,此规则才适用。在所有其他情况下,使用 TypeId 及相关头信息。在某些情况下,您可能希望覆盖默认行为,并始终使用 TypeId 信息。例如,假设您有一个 @RabbitListener,它接受一个 Thing1 参数,但消息中包含一个 Thing2,该 Thing2 是 Thing1 的子类(而 Thing1 是具体的)。推断的类型将是错误的。为处理此情况,请将 TypePrecedence 属性设置在 Jackson2JsonMessageConverter 上为 TYPE_ID,而不是默认的 INFERRED。(该属性实际上位于转换器的 DefaultJackson2JavaTypeMapper 上,但为方便起见,转换器上提供了相应的 setter 方法。)如果您注入了一个自定义类型映射器,应将属性设置在映射器上。
|
当从 Message 转换时,传入的 MessageProperties.getContentType() 必须符合 JSON 格式(contentType.contains("json") 用于检查)。从版本 2.2 开始,如果不存在 contentType 属性,或其值为默认值 application/octet-stream,则假定使用 application/json。若要恢复到以前的行为(返回未转换的 byte[]),请将转换器的 assumeSupportedContentType 属性设置为 false。如果内容类型不受支持,则会发出一条 WARN 日志消息 Could not convert incoming message with content-type […],并原样返回 message.getBody()——即以 byte[] 的形式返回。因此,为了满足消费者端的 Jackson2JsonMessageConverter 要求,生产者必须添加 contentType 消息属性——例如,可作为 application/json、text/x-json 或通过使用 Jackson2JsonMessageConverter(该方法会自动设置标头)来实现。以下列表展示了多个转换器调用: |
@RabbitListener
public void thing1(Thing1 thing1) {...}
@RabbitListener
public void thing1(@Payload Thing1 thing1, @Header("amqp_consumerQueue") String queue) {...}
@RabbitListener
public void thing1(Thing1 thing1, o.s.amqp.core.Message message) {...}
@RabbitListener
public void thing1(Thing1 thing1, o.s.messaging.Message<Foo> message) {...}
@RabbitListener
public void thing1(Thing1 thing1, String bar) {...}
@RabbitListener
public void thing1(Thing1 thing1, o.s.messaging.Message<?> message) {...}
在前面列表中的前四种情况中,转换器尝试将数据转换为 Thing1 类型。第五个示例无效,因为我们无法确定哪个参数应接收消息负载。在第六个示例中,由于泛型类型为 WildcardType,因此应用 Jackson 的默认设置。
然而,您可以创建一个自定义转换器,并使用 targetMethod 消息属性来决定要转换为何种类型。
这种类型推断只有在将 @RabbitListener 注解声明在方法级别时才能实现。对于类级别 @RabbitListener,所转换的类型将用于选择要调用的 @RabbitHandler 方法。因此,基础设施提供了 targetObject 消息属性,您可以在自定义转换器中使用它来确定类型。 |
从版本 1.6.11 开始,Jackson2JsonMessageConverter 以及因此 DefaultJackson2JavaTypeMapper(DefaultClassMapper)提供了 trustedPackages 选项,以应对 序列化工具链(Serialization Gadgets) 漏洞。默认情况下并为保持向后兼容性, Jackson2JsonMessageConverter 信任所有包——即它对选项使用 *。 |
从版本 2.4.7 开始,转换器可配置为在 Jackson 反序列化消息体后返回 Optional.empty(),而 Jackson 返回 null。这有助于 @RabbitListener 接收空负载(null payloads),方式有两种:
@RabbitListener(queues = "op.1")
void listen(@Payload(required = false) Thing payload) {
handleOptional(payload); // payload might be null
}
@RabbitListener(queues = "op.2")
void listen(Optional<Thing> optional) {
handleOptional(optional.orElse(this.emptyThing));
}
要启用此功能,请将 setNullAsOptionalEmpty 设置为 true;当设置为 false(默认值)时,转换器将回退到原始消息体(byte[])。
@Bean
Jackson2JsonMessageConverter converter() {
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
converter.setNullAsOptionalEmpty(true);
return converter;
}
反序列化抽象类
在 2.2.8 版本之前,如果某个 @RabbitListener 的推断类型为抽象类(包括接口),转换器会回退到检查请求头中的类型信息;若存在该信息,则使用它;若不存在,则尝试创建该抽象类。这会导致一个问题:当配置了自定义 ObjectMapper(并使用自定义反序列化器来处理该抽象类)时,若传入的消息中包含无效的类型头信息,就会出现问题。
从版本 2.2.8 开始,默认情况下保留之前的处理行为。如果您有自定义的 ObjectMapper,并且希望忽略类型头信息,并始终使用推断的类型进行转换,请将 alwaysConvertToInferredType 设置为 true。此举是为了确保向后兼容性,并避免在转换必然失败时(使用标准 ObjectMapper)产生额外的开销。
使用 Spring Data 投影接口
从版本 2.2 开始,您可以将 JSON 转换为 Spring Data 投影接口,而不仅仅是具体类型。这使得可以实现非常灵活且低耦合的数据绑定,包括从 JSON 文档中的多个位置查找值。例如,以下接口可被定义为消息负载类型:
interface SomeSample {
@JsonPath({ "$.username", "$.user.name" })
String getUsername();
}
@RabbitListener(queues = "projection")
public void projection(SomeSample in) {
String username = in.getUsername();
...
}
访问器方法将默认用于通过接收到的 JSON 文档中的字段来查找属性名称。@JsonPath 表达式允许自定义值查找方式,甚至可以定义多个 JSON 路径表达式,以从多个位置查找值,直到某个表达式返回实际值。
要启用此功能,请将消息转换器中的 useProjectionForInterfaces 设置为 true。
您还必须将 spring-data:spring-data-commons 和 com.jayway.jsonpath:json-path 添加到类路径中。
当用作 @RabbitListener 方法的参数时,接口类型会像平常一样自动传递给转换器。
从...Message使用RabbitTemplate
如前所述,类型信息通过消息头传递,以协助转换器在将消息转换为其他格式时进行操作。这在大多数情况下都能正常工作。然而,当使用泛型类型时,它只能转换简单对象和已知的“容器”对象(列表、数组和映射)。从 2.0 版本开始,Jackson2JsonMessageConverter 实现了 SmartMessageConverter,使其能够与新的 RabbitTemplate 方法配合使用,这些方法接受一个 ParameterizedTypeReference 参数。这使得复杂泛型类型的转换成为可能,如下例所示:
Thing1<Thing2<Cat, Hat>> thing1 =
rabbitTemplate.receiveAndConvert(new ParameterizedTypeReference<Thing1<Thing2<Cat, Hat>>>() { });
从版本 2.1 开始,AbstractJsonMessageConverter 类已被移除。它不再作为 Jackson2JsonMessageConverter 的基类。它已被 AbstractJackson2MessageConverter 所替代。 |
MarshallingMessageConverter
另一种选择是 MarshallingMessageConverter。
它将委托给 Spring OXM 库中对 Marshaller 和 Unmarshaller 策略接口的实现。
您可在此处阅读有关该库的更多信息 这里。
在配置方面,最常见的方式是仅提供构造函数参数,因为大多数 Marshaller 的实现也实现了 Unmarshaller。
以下示例展示了如何配置一个 MarshallingMessageConverter:
<bean class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<property name="connectionFactory" ref="rabbitConnectionFactory"/>
<property name="messageConverter">
<bean class="org.springframework.amqp.support.converter.MarshallingMessageConverter">
<constructor-arg ref="someImplemenationOfMarshallerAndUnmarshaller"/>
</bean>
</property>
</bean>
Jackson2XmlMessageConverter
该类于 2.1 版本中引入,可用于在 XML 与消息之间进行转换。
两者 Jackson2XmlMessageConverter 和 Jackson2JsonMessageConverter 都具有相同的基类:AbstractJackson2MessageConverter。
类 AbstractJackson2MessageConverter 被引入以替代已移除的类:AbstractJsonMessageConverter。 |
The Jackson2XmlMessageConverter uses the com.fasterxml.jackson 2.x library.
您可以以与 Jackson2JsonMessageConverter 相同的方式使用它,只不过它支持 XML 而非 JSON。
以下示例配置了一个 Jackson2JsonMessageConverter:
<bean id="xmlConverterWithDefaultType"
class="org.springframework.amqp.support.converter.Jackson2XmlMessageConverter">
<property name="classMapper">
<bean class="org.springframework.amqp.support.converter.DefaultClassMapper">
<property name="defaultType" value="foo.PurchaseOrder"/>
</bean>
</property>
</bean>
请参阅 Jackson2JsonMessageConverter 以获取更多信息。
从版本 2.2 开始,如果没有 contentType 属性或其具有默认值 application/octet-stream,则假定 application/xml。要恢复到以前的行为(返回未转换的 byte[]),请将转换器的 assumeSupportedContentType 属性设置为 false。 |
ContentTypeDelegatingMessageConverter
该类于 1.4.2 版本引入,允许根据 MessageProperties 中的内容类型属性将请求委托给特定的 MessageConverter。默认情况下,如果不存在 contentType 属性,或者其值与所有已配置的转换器均不匹配,则会委托给一个 SimpleMessageConverter。以下示例配置了一个 ContentTypeDelegatingMessageConverter:
<bean id="contentTypeConverter" class="ContentTypeDelegatingMessageConverter">
<property name="delegates">
<map>
<entry key="application/json" value-ref="jsonMessageConverter" />
<entry key="application/xml" value-ref="xmlMessageConverter" />
</map>
</property>
</bean>
Java 反序列化
本节介绍如何反序列化 Java 对象。
|
从不可信来源反序列化 Java 对象时可能存在漏洞。 如果您接受来自不可信源的消息(其中 默认情况下,允许列表为空,这意味着不会反序列化任何类。 您可以设置一组模式,例如 模式按顺序进行检查,直到找到匹配项为止。如果没有匹配项,则抛出 您可以使用这些转换器上的 |
消息属性转换器
策略接口 MessagePropertiesConverter 用于在 Rabbit 客户端 BasicProperties 与 Spring AMQP MessageProperties 之间进行转换。默认实现(DefaultMessagePropertiesConverter)通常已能满足大多数需求,但若需要,您也可以自定义实现。默认属性转换器会在元素大小不超过 1024 字节时,将 BasicProperties 类型为 LongString 的元素转换为 String 实例。更大的 LongString 实例则不会被转换(详见下一段)。此限制可通过构造函数参数进行覆盖。
从版本 1.6 开始,长度超过长字符串限制(默认:1024)的标题现在默认由 DefaultMessagePropertiesConverter 作为 LongString 实例保留。您可以通过 getBytes[]、toString() 或 getStream() 方法访问其内容。
之前,DefaultMessagePropertiesConverter 将此类头信息“转换”为 DataInputStream(实际上只是引用了 LongString 实例的 DataInputStream)。在输出时,此头信息未被转换(仅转换为字符串——例如,通过在流上调用 toString() 来实现,如 java.io.DataInputStream@1d057a39)。
大型传入 LongString 头部现在也将在输出时正确“转换”(默认情况下)。
提供了一个新的构造函数,以便您可以配置转换器以继续按之前的方式工作。</p><p>以下列表显示了该方法的 Javadoc 注释和声明:
/**
* Construct an instance where LongStrings will be returned
* unconverted or as a java.io.DataInputStream when longer than this limit.
* Use this constructor with 'true' to restore pre-1.6 behavior.
* @param longStringLimit the limit.
* @param convertLongLongStrings LongString when false,
* DataInputStream when true.
* @since 1.6
*/
public DefaultMessagePropertiesConverter(int longStringLimit, boolean convertLongLongStrings) { ... }
自版本 1.6 起,已在 MessageProperties 中新增了一个名为 correlationIdString 的属性。此前,在与 RabbitMQ 客户端使用的 BasicProperties 进行相互转换时,由于 MessageProperties.correlationId 是一种 byte[],而 BasicProperties 使用的是 String,因此执行了一次不必要的 byte[] <→ String 转换。(最终,RabbitMQ 客户端使用 UTF-8 将 String 转换为字节,以放入协议消息中)。
为了提供最大的向后兼容性,已在 DefaultMessagePropertiesConverter 中添加了一个名为 correlationIdPolicy 的新属性。该属性接受一个 DefaultMessagePropertiesConverter.CorrelationIdPolicy 枚举类型的参数。默认情况下,它被设置为 BYTES,这将复制之前的行为。
对于传入消息:
-
STRING: 仅映射了correlationIdString属性 -
BYTES: 仅映射了correlationId属性 -
BOTH: 两个属性均已完成映射
对于出站消息:
-
STRING: 仅映射了correlationIdString属性 -
BYTES: 仅映射了correlationId属性 -
BOTH: 两种属性均被考虑,其中String属性具有优先权
自版本 1.6 起,入站 deliveryMode 属性不再映射到 MessageProperties.deliveryMode。而是映射到 MessageProperties.receivedDeliveryMode。此外,入站 userId 属性也不再映射到 MessageProperties.userId,而是映射到 MessageProperties.receivedUserId。这些更改旨在避免在使用同一 MessageProperties 对象发送出站消息时,这些属性被意外传播。
从版本 2.2 开始,DefaultMessagePropertiesConverter 会使用 getName() 而非 toString() 来转换任何值类型为 Class<?> 的自定义头信息;这可避免应用程序必须从 toString() 表示形式中解析类名。