消息转换
消息转换
转换器
消息转换器在实现消息生产者和消息消费者的松散耦合方面起着非常重要的作用。
您可以在这些组件之间添加转换器,而不是要求每个消息生成组件都知道下一个使用者期望什么类型。
通用转换器,例如将String
添加到 XML 文档,也是高度可重用的。
对于某些系统,最好提供规范数据模型,但 Spring Integration 的一般理念是不需要任何特定格式。 相反,为了获得最大的灵活性,Spring Integration 旨在提供最简单的扩展模型。 与其他端点类型一样,在 XML 或 Java 注释中使用声明性配置使简单的 POJO 能够适应消息转换器的角色。 本章的其余部分将介绍这些配置选项。
为了最大限度地提高灵活性,Spring 不需要基于 XML 的消息有效负载。 尽管如此,该框架确实提供了一些方便的转换器来处理基于 XML 的有效负载,如果这确实是您的应用程序的正确选择。 有关这些转换器的详细信息,请参阅 XML 支持 - 处理 XML 有效负载。 |
使用 XML 配置 Transformer
这<transformer>
元素用于创建消息转换端点。
除了input-channel
和output-channel
属性,它需要一个ref
属性。
这ref
可以指向包含@Transformer
注释(请参阅使用注释配置 Transformer),或者它可以与method
属性。
<int:transformer id="testTransformer" ref="testTransformerBean" input-channel="inChannel"
method="transform" output-channel="outChannel"/>
<beans:bean id="testTransformerBean" class="org.foo.TestTransformer" />
使用ref
如果自定义 Transformer 处理程序实现可以在其他<transformer>
定义。
但是,如果自定义转换器处理程序实现的范围应限定为<transformer>
,您可以定义内部 bean 定义,如以下示例所示:
<int:transformer id="testTransformer" input-channel="inChannel" method="transform"
output-channel="outChannel">
<beans:bean class="org.foo.TestTransformer"/>
</transformer>
同时使用ref 属性和内部处理程序定义<transformer> 不允许配置,因为它会创建不明确的条件并导致抛出异常。 |
如果ref 属性引用扩展的 beanAbstractMessageProducingHandler (例如框架本身提供的 transformer),通过将输出通道直接注入处理程序来优化配置。
在这种情况下,每个ref 必须是单独的 bean 实例(或prototype -scoped bean)或使用<bean/> 配置类型。
如果您无意中从多个 Bean 引用了相同的消息处理程序,则会收到配置异常。 |
使用 POJO 时,用于转换的方法可能需要Message
类型或入站消息的有效负载类型。
它还可以通过使用@Header
和@Headers
参数注释。
该方法的返回值可以是任何类型。
如果返回值本身是Message
,传递到转换器的输出通道。
从 Spring Integration 2.0 开始,消息转换器的转换方法不能再返回null
.
返回null
导致异常,因为应始终期望消息转换器将每个源消息转换为有效的目标消息。
换句话说,不应将消息转换器用作消息过滤器,因为有一个专用的<filter>
选项。
但是,如果您确实需要这种类型的行为(组件可能会返回null
这不应被视为错误),您可以使用服务激活器。
其requires-reply
值为false
默认情况下,但可以设置为true
为了抛出异常null
返回值,与 transformer 一样。
Transformer 和 Spring 表达式语言 (SpEL)
与路由器、聚合器和其他组件一样,从 Spring Integration 2.0 开始,只要转换逻辑相对简单,transformer 也可以从 SpEL 支持中受益。 以下示例显示了如何使用 SpEL 表达式:
<int:transformer input-channel="inChannel"
output-channel="outChannel"
expression="payload.toUpperCase() + '- [' + T(System).currentTimeMillis() + ']'"/>
前面的示例在不编写自定义转换器的情况下转换有效负载。
我们的有效负载(假设是String
) 是大写的,与当前时间戳连接,并应用了一些格式。
普通转换器
Spring Integration 提供了一些 Transformer 实现。
对象到字符串转换器
因为使用toString()
表示Object
,Spring Integration 提供了一个ObjectToStringTransformer
其输出为Message
使用 Stringpayload
.
那String
是调用toString()
对入站消息的有效负载进行作。
以下示例显示了如何声明对象到字符串转换器的实例:
<int:object-to-string-transformer input-channel="in" output-channel="out"/>
此转换器的一个潜在用途是将一些任意对象发送到file
Namespace。
而该通道适配器仅支持String
、byte-array 或java.io.File
有效负载,在适配器处理必要的转换之前立即添加此转换器。
只要结果toString()
call 是你想要写入文件的内容。
否则,您可以使用前面显示的通用“transformer”元素提供基于 POJO 的自定义 transformer。
调试时,通常不需要此转换器,因为logging-channel-adapter 能够记录消息有效负载。
有关更多详细信息,请参阅窃听。 |
对象到字符串的转换器非常简单。
它调用
为了更复杂(例如在运行时动态选择字符集),您可以使用基于 SpEL 表达式的转换器,如以下示例所示:
|
如果您需要序列化Object
到字节数组或将字节数组反序列化回Object
,Spring Integration 提供了对称序列化转换器。
默认情况下,这些使用标准的 Java 序列化,但您可以提供 Spring 的实现Serializer
或Deserializer
策略,使用serializer
和deserializer
属性。
以下示例演示如何使用 Spring 的序列化器和反序列化器:
<int:payload-serializing-transformer input-channel="objectsIn" output-channel="bytesOut"/>
<int:payload-deserializing-transformer input-channel="bytesIn" output-channel="objectsOut"
allow-list="com.mycom.*,com.yourcom.*"/>
从不受信任的来源反序列化数据时,应考虑添加allow-list 包和类模式。
默认情况下,所有类都被反序列化。 |
Object
-自-Map
和Map
-自-Object
变形金刚
Spring Integration 还提供了Object
-自-Map
和Map
-自-Object
transformer,它使用 JSON 序列化和反序列化对象图。
对象层次结构被内省到最原始的类型(String
,int
,依此类推)。
这种类型的路径用 SpEL 描述,它成为key
在改造后的Map
.
基元类型成为值。
请考虑以下示例:
public class Parent{
private Child child;
private String name;
// setters and getters are omitted
}
public class Child{
private String name;
private List<String> nickNames;
// setters and getters are omitted
}
前面示例中的两个类将转换为以下内容Map
:
{person.name=George, person.child.name=Jenna, person.child.nickNames[0]=Jen ...}
基于 JSON 的Map
允许您在不共享实际类型的情况下描述对象结构,这允许您将对象图还原并重建为不同类型的对象图,只要您保持该结构。
例如,可以使用Map
-自-Object
转换器:
public class Father {
private Kid child;
private String name;
// setters and getters are omitted
}
public class Kid {
private String name;
private List<String> nickNames;
// setters and getters are omitted
}
如果您需要创建“结构化”地图,则可以提供flatten
属性。
默认值为 'true'。
如果将其设置为 'false',则结构是Map
之Map
对象。
请考虑以下示例:
public class Parent {
private Child child;
private String name;
// setters and getters are omitted
}
public class Child {
private String name;
private List<String> nickNames;
// setters and getters are omitted
}
前面示例中的两个类将转换为以下内容Map
:
{name=George, child={name=Jenna, nickNames=[Bimbo, ...]}}
为了配置这些转换器,Spring Integration 为 Object-to-Map 提供了命名空间支持,如以下示例所示:
<int:object-to-map-transformer input-channel="directInput" output-channel="output"/>
您还可以将flatten
属性设置为 false,如下所示:
<int:object-to-map-transformer input-channel="directInput" output-channel="output" flatten="false"/>
Spring Integration 为 Map-to-Object 提供命名空间支持,如以下示例所示:
<int:map-to-object-transformer input-channel="input"
output-channel="output"
type="org.something.Person"/>
或者,您可以使用ref
属性和原型范围的 bean,如以下示例所示:
<int:map-to-object-transformer input-channel="inputA"
output-channel="outputA"
ref="person"/>
<bean id="person" class="org.something.Person" scope="prototype"/>
'ref' 和 'type' 属性是互斥的。此外,如果使用 'ref' 属性,则必须指向 'prototype' 作用域的 bean。否则,一个BeanCreationException 被抛出。 |
从 5.0 版开始,您可以提供ObjectToMapTransformer
与定制的JsonObjectMapper
— 当您需要特殊格式的日期或空集合的空值(以及其他用途)时。有关 JSON 转换器的更多信息,请参阅 JSON 转换器JsonObjectMapper
实现。
流式转换器
这StreamTransformer
变换InputStream
有效负载到byte[]
( 或String
如果charset
提供)。
以下示例演示如何使用stream-transformer
元素:
<int:stream-transformer input-channel="directInput" output-channel="output"/> <!-- byte[] -->
<int:stream-transformer id="withCharset" charset="UTF-8"
input-channel="charsetChannel" output-channel="output"/> <!-- String -->
以下示例演示如何使用StreamTransformer
class 和@Transformer
注释以在 Java 中配置流转换器:
@Bean
@Transformer(inputChannel = "stream", outputChannel = "data")
public StreamTransformer streamToBytes() {
return new StreamTransformer(); // transforms to byte[]
}
@Bean
@Transformer(inputChannel = "stream", outputChannel = "data")
public StreamTransformer streamToString() {
return new StreamTransformer("UTF-8"); // transforms to String
}
JSON 转换器
Spring Integration 提供了对象到 JSON 和 JSON 到对象转换器。以下一对示例展示了如何在 XML 中声明它们:
<int:object-to-json-transformer input-channel="objectMapperInput"/>
<int:json-to-object-transformer input-channel="objectMapperInput"
type="foo.MyDomainObject"/>
默认情况下,前面列表中的转换器使用普通JsonObjectMapper
. 它基于类路径中的实现。您可以提供自己的自定义JsonObjectMapper
使用适当的选项或基于所需库(例如 GSON)的实现,如以下示例所示:
<int:json-to-object-transformer input-channel="objectMapperInput"
type="something.MyDomainObject" object-mapper="customObjectMapper"/>
从 3.0 版开始, |
不妨考虑使用FactoryBean
或工厂方法来创建JsonObjectMapper
具有所需的特征。以下示例显示如何使用这样的工厂:
public class ObjectMapperFactory {
public static Jackson2JsonObjectMapper getMapper() {
ObjectMapper mapper = new ObjectMapper();
mapper.configure(JsonParser.Feature.ALLOW_COMMENTS, true);
return new Jackson2JsonObjectMapper(mapper);
}
}
以下示例演示如何在 XML 中执行相同的作
<bean id="customObjectMapper" class="something.ObjectMapperFactory"
factory-method="getMapper"/>
从 2.2 版开始, 如果您希望将 |
从 3.0 版开始,ObjectToJsonTransformer
将反映源类型的标头添加到消息中。同样,JsonToObjectTransformer
在将 JSON 转换为对象时可以使用这些类型标头。这些标头映射在 AMQP 适配器中,以便它们与 Spring-AMQP 完全兼容JsonMessageConverter
.
这样,以无需任何特殊配置即可工作:
-
…→amqp-outbound-adapter---→
-
---→amqp-inbound-adapter→json-to-object-transformer→…
其中出站适配器配置了
JsonMessageConverter
并且入站适配器使用默认的SimpleMessageConverter
. -
…→object-to-json-transformer→amqp-outbound-adapter---→
-
---→amqp-inbound-adapter→…
其中出站适配器配置了
SimpleMessageConverter
并且入站适配器使用默认的JsonMessageConverter
. -
…→object-to-json-transformer→amqp-outbound-adapter---→
-
---→amqp-inbound-adapter→json-to-object-transformer→
如果两个适配器都配置了
SimpleMessageConverter
.
使用标头确定类型时,不应提供class 属性,因为它优先于标头。 |
除了 JSON Transformer 之外,Spring Integration 还提供了一个内置的#jsonPath
用于表达式的 SpEL 函数。
有关更多信息,请参阅 Spring 表达式语言 (SpEL)。
从 3.0 版本开始,Spring Integration 还提供了一个内置的#xpath
用于表达式的 SpEL 函数。
有关详细信息,请参阅 SpEL 函数 #xpath。
从 4.0 版开始,ObjectToJsonTransformer
支持resultType
属性,以指定节点 JSON 表示形式。
结果节点树表示取决于提供的JsonObjectMapper
.
默认情况下,ObjectToJsonTransformer
使用Jackson2JsonObjectMapper
并将对象到节点树的转换委托给ObjectMapper#valueToTree
方法。
节点 JSON 表示形式为使用JsonPropertyAccessor
当下游消息流使用 SpEL 表达式来访问 JSON 数据的属性时。
有关详细信息,请参阅属性访问器。
从 5.1 版开始,resultType
可以配置为BYTES
生成带有byte[]
有效负载,以便在使用使用此数据类型运行的下游处理程序时使用。
从 5.2 版开始,JsonToObjectTransformer
可以配置为ResolvableType
在使用目标 JSON 处理器反序列化期间支持泛型。此外,此组件现在首先查询请求消息标头,以了解JsonHeaders.RESOLVABLE_TYPE
或JsonHeaders.TYPE_ID
否则回退到配置的类型。 这ObjectToJsonTransformer
now 还填充了JsonHeaders.RESOLVABLE_TYPE
标头,基于任何可能的下游方案的请求消息有效负载。
从 5.2.6 版本开始,JsonToObjectTransformer
可以提供valueTypeExpression
以解决ResolvableType
用于有效负载在运行时根据请求消息从 JSON 进行转换。默认情况下,它会咨询JsonHeaders
在请求消息中。如果此表达式返回null
或ResolvableType
架构物会抛出一个ClassNotFoundException
,转换器回退到提供的targetType
. 此逻辑作为表达式存在,因为JsonHeaders
可能没有实际的类值,而是一些类型 ID,这些类型 ID 必须根据某些外部注册表映射到目标类。
Apache Avro 转换器
5.2 版添加了简单的转换器来与 Apache Avro 进行转换。
它们并不复杂,因为没有模式注册表;转换器只是使用嵌入在SpecificRecord
从 Avro 架构生成的实现。
发送到SimpleToAvroTransformer
必须具有实现SpecificRecord
; 转换器可以处理多种类型。 这SimpleFromAvroTransformer
必须配置SpecificRecord
类,该类用作反序列化的默认类型。您还可以指定 SpEL 表达式,以使用setTypeExpression
方法。 默认的 SpEL 表达式为headers[avro_type]
(AvroHeaders.TYPE
),默认情况下,由SimpleToAvroTransformer
替换为源类的完全限定类名。如果表达式返回null
这defaultType
被使用。
这SimpleToAvroTransformer
还有一个setTypeExpression
方法。 这允许生产者和消费者的解耦,其中发送者可以将标头设置为表示该类型的某个Tokens,然后消费者将该Tokens映射到一个类型。
使用注释配置 Transformer
您可以添加@Transformer
注释到期望Message
type 或消息有效负载类型。返回值的处理方式与前面所述的完全相同在描述<transformer>
元素.
以下示例演示如何使用@Transformer
注释以转换String
变成一个Order
:
@Transformer
Order generateOrder(String productId) {
return new Order(productId);
}
Transformer 方法也可以接受@Header
和@Headers
注释,如Annotation Support
.
以下示例显示如何使用@Header
注解:
@Transformer
Order generateOrder(String productId, @Header("customerName") String customer) {
return new Order(productId, customer);
}
另请参阅使用注释为端点提供建议。
标头过滤器
有时,您的转换用例可能就像删除几个标头一样简单。 对于此类用例,Spring Integration提供了一个标头过滤器,可让您指定应从输出消息中删除的某些标头名称(例如,出于安全原因删除标头或仅暂时需要的值)。 基本上,标头过滤器与标头丰富器相反。 后者在 Header Enricher 中进行了讨论。 以下示例定义标头筛选器:
<int:header-filter input-channel="inputChannel"
output-channel="outputChannel" header-names="lastName, state"/>
如您所见,标头过滤器的配置非常简单。
它是一个典型的端点,具有输入和输出通道以及header-names
属性。
该属性接受需要删除的标头的名称(如果有多个,则用逗号分隔)。
因此,在前面的示例中,名为“lastName”和“state”的标头不存在于出站邮件中。
基于编解码器的转换器
请参阅编解码器。
内容丰富器
有时,您可能需要使用比目标系统提供的信息更多的信息来增强请求。 数据扩充器模式描述了各种方案以及允许您满足此类要求的组件(扩充器)。
Spring 集成Core
模块包括两个扩充器:
它还包括三个特定于适配器的标头扩充器:
请参阅本参考手册中特定于适配器的部分,以了解有关这些适配器的更多信息。
有关表达式支持的更多信息,请参阅 Spring 表达式语言 (SpEL)。
标题丰富器
如果您只需要向消息添加标头,并且标头不是由消息内容动态确定的,那么引用转换器的自定义实现可能有点矫枉过正。
因此,Spring Integration 提供了对标头扩充器模式的支持。
它通过<header-enricher>
元素。
以下示例演示如何使用它:
<int:header-enricher input-channel="in" output-channel="out">
<int:header name="foo" value="123"/>
<int:header name="bar" ref="someBean"/>
</int:header-enricher>
标头扩充器还提供了有用的子元素来设置众所周知的标头名称,如以下示例所示:
<int:header-enricher input-channel="in" output-channel="out">
<int:error-channel ref="applicationErrorChannel"/>
<int:reply-channel ref="quoteReplyChannel"/>
<int:correlation-id value="123"/>
<int:priority value="HIGHEST"/>
<routing-slip value="channel1; routingSlipRoutingStrategy; request.headers[myRoutingSlipChannel]"/>
<int:header name="bar" ref="someBean"/>
</int:header-enricher>
前面的配置显示,对于众所周知的标头(例如errorChannel
,correlationId
,priority
,replyChannel
,routing-slip
等),而不是使用泛型<header>
子元素,您必须同时提供标头 'name' 和 'value',您可以使用方便的子元素直接设置这些值。
从 4.1 版开始,标头扩充器提供了一个routing-slip
子元素。
有关更多信息,请参阅路由单。
POJO 支持
通常,标头值不能静态定义,必须根据消息中的某些内容动态确定。这就是为什么标头扩充器还允许您使用ref
和method
属性。 指定的方法计算标头值。考虑以下配置和一个 bean ,其方法修改了String
:
<int:header-enricher input-channel="in" output-channel="out">
<int:header name="something" method="computeValue" ref="myBean"/>
</int:header-enricher>
<bean id="myBean" class="thing1.thing2.MyBean"/>
public class MyBean {
public String computeValue(String payload){
return payload.toUpperCase() + "_US";
}
}
您还可以将 POJO 配置为内部 bean,如以下示例所示:
<int:header-enricher input-channel="inputChannel" output-channel="outputChannel">
<int:header name="some_header">
<bean class="org.MyEnricher"/>
</int:header>
</int:header-enricher>
您可以类似地指向 Groovy 脚本,如以下示例所示:
<int:header-enricher input-channel="inputChannel" output-channel="outputChannel">
<int:header name="some_header">
<int-groovy:script location="org/SampleGroovyHeaderEnricher.groovy"/>
</int:header>
</int:header-enricher>
SpEL 支持
在 Spring Integration 2.0 中,我们引入了 Spring 表达式语言 (SpEL) 的便利性,以帮助配置许多不同的组件。标头丰富器就是其中之一。再看前面显示的 POJO 示例。您可以看到确定标头值的计算逻辑非常简单。一个自然的问题是:“有没有更简单的方法来实现这一点?这就是 SpEL 展示其真正力量的地方。考虑以下示例:
<int:header-enricher input-channel="in" output-channel="out">
<int:header name="foo" expression="payload.toUpperCase() + '_US'"/>
</int:header-enricher>
通过在此类简单情况下使用 SpEL,您不再需要提供单独的类并在应用程序上下文中对其进行配置。
您需要做的就是配置expression
属性,并具有有效的 SpEL 表达式。
'payload' 和 'headers' 变量绑定到 SpEL 评估上下文,使您可以完全访问传入消息。
使用 Java 配置配置标头扩充器
以下两个示例显示了如何使用 Java 配置进行标头扩充器:
@Bean
@Transformer(inputChannel = "enrichHeadersChannel", outputChannel = "emailChannel")
public HeaderEnricher enrichHeaders() {
Map<String, ? extends HeaderValueMessageProcessor<?>> headersToAdd =
Collections.singletonMap("emailUrl",
new StaticHeaderValueMessageProcessor<>(this.imapUrl));
HeaderEnricher enricher = new HeaderEnricher(headersToAdd);
return enricher;
}
@Bean
@Transformer(inputChannel="enrichHeadersChannel", outputChannel="emailChannel")
public HeaderEnricher enrichHeaders() {
Map<String, HeaderValueMessageProcessor<?>> headersToAdd = new HashMap<>();
headersToAdd.put("emailUrl", new StaticHeaderValueMessageProcessor<String>(this.imapUrl));
Expression expression = new SpelExpressionParser().parseExpression("payload.from[0].toString()");
headersToAdd.put("from",
new ExpressionEvaluatingHeaderValueMessageProcessor<>(expression, String.class));
HeaderEnricher enricher = new HeaderEnricher(headersToAdd);
return enricher;
}
第一个示例添加了单个文字标头。 第二个示例添加了两个标头,一个是文字标头,一个是基于 SpEL 表达式的标头。
使用 Java DSL 配置标头扩充器
以下示例显示了标头扩充器的 Java DSL 配置:
@Bean
public IntegrationFlow enrichHeadersInFlow() {
return f -> f
...
.enrichHeaders(h -> h.header("emailUrl", this.emailUrl)
.headerExpression("from", "payload.from[0].toString()"))
.handle(...);
}
标头通道注册表
从 Spring Integration 3.0 开始,一个新的子元素<int:header-channels-to-string/>
可用。
它没有属性。
这个新的子元素将现有的replyChannel
和errorChannel
标头(当它们是MessageChannel
) 设置为String
并将通道存储在注册表中,以便在需要发送回复或处理错误时进行解析。
这对于标头可能丢失的情况非常有用 - 例如,在将消息序列化到消息存储中或通过 JMS 传输消息时。
如果标头尚不存在或不是MessageChannel
,不进行任何更改。
使用此功能需要存在HeaderChannelRegistry
豆。
默认情况下,框架会创建一个DefaultHeaderChannelRegistry
默认到期时间(60 秒)。
在此时间之后,通道将从注册表中删除。
要更改此行为,请使用id
之integrationHeaderChannelRegistry
并使用构造函数参数(以毫秒为单位)配置所需的默认延迟。
从 4.1 版开始,您可以设置一个名为removeOnGet
自true
在<bean/>
定义,并且映射条目在首次使用时立即被删除。
这在大容量环境中以及当通道仅使用一次而不是等待收割器将其移除时可能很有用。
这HeaderChannelRegistry
有一个size()
确定注册表当前大小的方法。
这runReaper()
方法取消当前计划任务并立即运行 Reaper。
然后,根据当前延迟计划任务再次运行。
可以通过获取对注册表的引用直接调用这些方法,也可以将包含以下内容的消息发送到控制总线:
"@integrationHeaderChannelRegistry.runReaper()"
此子元素是一种方便,相当于指定以下配置:
<int:reply-channel
expression="@integrationHeaderChannelRegistry.channelToChannelName(headers.replyChannel)"
overwrite="true" />
<int:error-channel
expression="@integrationHeaderChannelRegistry.channelToChannelName(headers.errorChannel)"
overwrite="true" />
从 4.1 版开始,您现在可以覆盖注册表配置的收割器延迟,以便通道映射至少保留指定时间,而不管收割器延迟如何。以下示例显示了如何执行此作:
<int:header-enricher input-channel="inputTtl" output-channel="next">
<int:header-channels-to-string time-to-live-expression="120000" />
</int:header-enricher>
<int:header-enricher input-channel="inputCustomTtl" output-channel="next">
<int:header-channels-to-string
time-to-live-expression="headers['channelTTL'] ?: 120000" />
</int:header-enricher>
在第一种情况下,每个标头通道映射的生存时间将为两分钟。在第二种情况下,在消息标头中指定生存时间,如果没有标头,则使用 Elvis 运算符使用两分钟。
有效载荷丰富器
在某些情况下,如前所述,标头扩充器可能不够用,并且可能必须使用附加信息来扩充有效负载本身。例如,进入 Spring Integration 消息传递系统的订单消息必须根据提供的客户编号查找订单的客户,然后使用该信息扩充原始有效负载。
Spring Integration 2.1 引入了有效负载扩充器。
有效负载扩充器定义了一个端点,该端点将Message
到公开的请求通道,然后需要回复消息。
然后,回复消息成为用于评估表达式以丰富目标有效负载的根对象。
有效负载扩充器通过enricher
元素。
为了发送请求消息,有效负载扩充器具有request-channel
属性,用于将消息分派到请求通道。
基本上,通过定义请求通道,有效负载扩充器充当网关,等待发送到请求通道的消息返回。 然后,扩充器使用回复消息提供的数据来增强消息的有效负载。
将消息发送到请求通道时,您还可以选择使用request-payload-expression
属性。
有效载荷的丰富是通过 SpEL 表达式配置的,提供了最大程度的灵活性。
因此,您不仅可以使用回复通道的Message
,但您可以使用 SpEL 表达式从该消息中提取子集或应用其他内联转换,从而进一步作数据。
如果您只需要使用静态值扩充有效负载,则无需提供request-channel
属性。
富集器是转换器的一种变体。 在许多情况下,您可以使用有效负载扩充器或通用转换器实现将其他数据添加到消息有效负载中。 您应该熟悉 Spring Integration 提供的所有支持转换的组件,并仔细选择在语义上最适合您的业务案例的实现。 |
配置
以下示例显示了有效负载扩充器的所有可用配置选项:
<int:enricher request-channel="" (1)
auto-startup="true" (2)
id="" (3)
order="" (4)
output-channel="" (5)
request-payload-expression="" (6)
reply-channel="" (7)
error-channel="" (8)
send-timeout="" (9)
should-clone-payload="false"> (10)
<int:poller></int:poller> (11)
<int:property name="" expression="" null-result-expression="'Could not determine the name'"/> (12)
<int:property name="" value="23" type="java.lang.Integer" null-result-expression="'0'"/>
<int:header name="" expression="" null-result-expression=""/> (13)
<int:header name="" value="" overwrite="" type="" null-result-expression=""/>
</int:enricher>
1 | 将消息发送到的通道,以获取用于扩充的数据。 自选。 |
2 | 生命周期属性指示是否应在应用程序上下文启动期间启动此组件。 默认为 true。 自选。 |
3 | 底层 bean 定义的 ID,即EventDrivenConsumer 或PollingConsumer .
自选。 |
4 | 指定此端点作为通道的订阅者连接到时的调用顺序。 当该通道使用“故障转移”调度策略时,这一点尤其重要。 当此端点本身是具有队列的通道的轮询使用者时,它不起作用。 自选。 |
5 | 标识在此终结点处理消息后发送消息的消息通道。 自选。 |
6 | 默认情况下,原始消息的有效负载用作发送到request-channel .
通过将 SpEL 表达式指定为request-payload-expression 属性,您可以使用原始有效负载的子集、标头值或任何其他可解析的 SpEL 表达式作为发送到请求通道的有效负载的基础。
对于表达式评估,完整消息可用作“根对象”。
例如,以下 SpEL 表达式(以及其他)是可能的:payload.something ,headers.something ,new java.util.Date() ,'thing1' + 'thing2' |
7 | 需要回复消息的通道。 这是可选的。 通常,自动生成的临时回复通道就足够了。 自选。 |
8 | 一个ErrorMessage 如果Exception 发生在request-channel .
这使您能够返回用于扩充的替代对象。
如果未设置,则Exception 抛给调用方。
自选。 |
9 | 如果通道可能阻塞,则向通道发送消息时等待的最长时间(以毫秒为单位)。
例如,如果队列通道已达到其最大容量,则队列通道可能会阻塞,直到空间可用。
在内部,send() timeout 在MessagingTemplate 并最终在调用 send作时应用MessageChannel .
默认情况下,send() timeout is set to '-1', which can cause the send operation on the `MessageChannel ,根据实现,无限期阻止。
自选。 |
10 | 布尔值,指示是否实现Cloneable 应在将消息发送到请求通道以获取扩充数据之前进行克隆。
克隆版本将用作最终回复的目标有效负载。
默认值为false .
自选。 |
11 | 如果此端点是轮询使用者,则允许您配置消息轮询器。 自选。 |
12 | 每property 子元素提供属性的名称(通过强制的name 属性)。
该属性应在目标有效负载实例上设置。
恰好是其中之一value 或expression 还必须提供属性 - 前者用于设置要设置的文字值,后者用于要计算的 SpEL 表达式。
评估上下文的根对象是从此扩充器启动的流返回的消息 - 如果没有请求通道或应用程序上下文,则为输入消息(使用@<beanName>.<beanProperty> SpEL 语法)。
从 4.0 版开始,当指定value 属性,您还可以指定可选的type 属性。
当目标是类型化 setter 方法时,框架会适当地强制该值(只要PropertyEditor ) 来处理转换。
但是,如果目标有效负载是Map ,则条目将填充值而不进行转换。
这type 属性允许您,例如,将String 将数字包含在Integer 目标有效负载中的值。
从 4.1 版开始,您还可以指定可选的null-result-expression 属性。
当enricher 返回 null,则对其进行评估,并返回评估的输出。 |
13 | 每header sub元素提供消息头的名称(通过强制的name 属性)。
恰好是其中之一value 或expression 还必须提供属性——前者用于设置要设置的文字值,后者用于要计算的 SpEL 表达式。
评估上下文的根对象是从此扩充器启动的流返回的消息 - 如果没有请求通道或应用程序上下文(使用 '@<beanName>.<beanProperty>' SpEL 语法,则为输入消息)。
请注意,与<header-enricher> 这<enricher> 元素的header 元素有type 和overwrite 属性。
然而,一个关键的区别是,使用<enricher> 这overwrite 属性为true 默认情况下,要与<enricher> 元素的<property> 子元素。
从 4.1 版开始,您还可以指定可选的null-result-expression 属性。
当enricher 返回 null,则对其进行评估,并返回评估的输出。 |
例子
本节包含在各种情况下使用有效负载扩充器的几个示例。
此处显示的代码示例是 Spring Integration Samples 项目的一部分。 请参阅 Spring Integration 示例。 |
在以下示例中,User
对象作为Message
:
<int:enricher id="findUserEnricher"
input-channel="findUserEnricherChannel"
request-channel="findUserServiceChannel">
<int:property name="email" expression="payload.email"/>
<int:property name="password" expression="payload.password"/>
</int:enricher>
这User
有多个属性,但只有username
初始设置。
富集者的request-channel
属性配置为传递User
到findUserServiceChannel
.
通过隐式设置reply-channel
一个User
对象,并且通过使用property
子元素,则从回复中提取属性并用于丰富原始有效负载。
如何仅将数据子集传递给请求通道?
使用request-payload-expression
属性,则可以将有效负载的单个属性而不是完整消息传递到请求通道。
在以下示例中,username 属性被传递给请求通道:
<int:enricher id="findUserByUsernameEnricher"
input-channel="findUserByUsernameEnricherChannel"
request-channel="findUserByUsernameServiceChannel"
request-payload-expression="payload.username">
<int:property name="email" expression="payload.email"/>
<int:property name="password" expression="payload.password"/>
</int:enricher>
请记住,尽管只传递了用户名,但发送到请求通道的生成消息包含完整的MessageHeaders
.
如何扩充由收集数据组成的有效负载?
在下面的示例中,而不是User
对象,一个Map
传入:
<int:enricher id="findUserWithMapEnricher"
input-channel="findUserWithMapEnricherChannel"
request-channel="findUserByUsernameServiceChannel"
request-payload-expression="payload.username">
<int:property name="user" expression="payload"/>
</int:enricher>
这Map
包含用户名,位于username
map 键。
只有username
传递给请求通道。
回复包含完整的User
对象,最终添加到Map
在user
钥匙。
如何在不使用请求通道的情况下使用静态信息丰富有效负载?
以下示例根本不使用请求通道,而仅使用静态值丰富消息的有效负载:
<int:enricher id="userEnricher"
input-channel="input">
<int:property name="user.updateDate" expression="new java.util.Date()"/>
<int:property name="user.firstName" value="William"/>
<int:property name="user.lastName" value="Shakespeare"/>
<int:property name="user.age" value="42"/>
</int:enricher>
请注意,“静态”一词在这里使用得很松散。 您仍然可以使用 SpEL 表达式来设置这些值。
索赔检查
在前面的部分中,我们介绍了几个内容丰富组件,它们可以帮助您处理消息缺少一段数据的情况。 我们还讨论了内容过滤,它允许您从消息中删除数据项。 但是,有时我们想暂时隐藏数据。 例如,在分布式系统中,我们可能会收到一条有效负载非常大的消息。 某些间歇性消息处理步骤可能不需要访问此有效负载,而某些步骤可能只需要访问某些标头,因此通过每个处理步骤携带大型消息有效负载可能会导致性能下降,可能产生安全风险,并可能使调试更加困难。
存储在库中(或声明检查)模式描述了一种机制,该机制允许您将数据存储在已知位置,同时仅维护指向该数据所在位置的指针(声明检查)。 您可以将该指针作为新消息的有效负载传递,从而让消息流中的任何组件在需要时立即获取实际数据。 这种方法与挂号信流程非常相似,您在邮箱中收到索赔支票,然后必须去邮局领取您的实际包裹。 这也与飞行后或酒店的行李领取理念相同。
Spring Integration 提供了两种类型的声明检查转换器:
-
传入索赔检查转换器
-
传出索赔检查转换器
可以使用方便的基于命名空间的机制来配置它们。
传入索赔检查转换器
传入声明检查转换器通过将传入消息存储在其标识的消息存储中来转换传入消息message-store
属性。 以下示例定义传入声明检查转换器:
<int:claim-check-in id="checkin"
input-channel="checkinChannel"
message-store="testMessageStore"
output-channel="output"/>
在前面的配置中,在input-channel
会持久化到用message-store
属性并使用生成的 ID 进行索引。
该 ID 是该邮件的声明检查。
声明检查也成为发送到output-channel
.
现在,假设在某个时候您确实需要访问实际消息。 可以手动访问消息存储并获取消息的内容,也可以使用相同的方法(创建转换器),只是现在使用传出声明检查转换器将声明检查转换为实际消息。
以下列表概述了传入声明检查转换器的所有可用参数:
<int:claim-check-in auto-startup="true" (1)
id="" (2)
input-channel="" (3)
message-store="messageStore" (4)
order="" (5)
output-channel="" (6)
send-timeout=""> (7)
<int:poller></int:poller> (8)
</int:claim-check-in>
1 | 生命周期属性指示是否应在应用程序上下文启动期间启动此组件。
它默认为true .
此属性在Chain 元素。
自选。 |
2 | 标识基础 Bean 定义的 ID (MessageTransformingHandler ).
此属性在Chain 元素。
自选。 |
3 | 此端点的接收消息通道。
此属性在Chain 元素。
自选。 |
4 | 引用MessageStore 由此索赔检查转换器使用。
如果未指定,则默认引用为名为messageStore .
自选。 |
5 | 指定此端点作为通道的订阅者连接到时的调用顺序。
当该通道使用failover 调度策略。
当此端点本身是具有队列的通道的轮询使用者时,它不起作用。
此属性在Chain 元素。
自选。 |
6 | 标识消息在此终结点处理后发送的消息通道。
此属性在Chain 元素。
自选。 |
7 | 指定将应答消息发送到输出通道时等待的最长时间(以毫秒为单位)。
默认为-1 ——无限期封锁。
此属性在Chain 元素。
自选。 |
8 | 定义轮询器。
此元素在Chain 元素。
自选。 |
传出索赔检查转换器
通过传出声明检查转换器,您可以将具有声明检查有效负载的邮件转换为以原始内容作为有效负载的邮件。
<int:claim-check-out id="checkout"
input-channel="checkoutChannel"
message-store="testMessageStore"
output-channel="output"/>
在前面的配置中,在input-channel
应将声明检查作为其有效负载。传出声明检查转换器通过查询消息存储中提供的声明检查标识的消息,将其转换为具有原始有效负载的消息。然后,它将新签出的消息发送到output-channel
.
以下列表概述了传出索赔检查转换器的所有可用参数:
<int:claim-check-out auto-startup="true" (1)
id="" (2)
input-channel="" (3)
message-store="messageStore" (4)
order="" (5)
output-channel="" (6)
remove-message="false" (7)
send-timeout=""> (8)
<int:poller></int:poller> (9)
</int:claim-check-out>
1 | 生命周期属性指示是否应在应用程序上下文启动期间启动此组件。
它默认为true .
此属性在Chain 元素。
自选。 |
2 | 标识基础 Bean 定义的 ID (MessageTransformingHandler ).
此属性在Chain 元素。
自选。 |
3 | 此端点的接收消息通道。
此属性在Chain 元素。
自选。 |
4 | 引用MessageStore 由此索赔检查转换器使用。
如果未指定,则默认引用为名为messageStore .
自选。 |
5 | 指定此端点作为通道的订阅者连接到时的调用顺序。
当该通道使用failover 调度策略。
当此端点本身是具有队列的通道的轮询使用者时,它不起作用。
此属性在Chain 元素。
自选。 |
6 | 标识消息在此终结点处理后发送的消息通道。
此属性在Chain 元素。
自选。 |
7 | 如果设置为true ,则该消息将从MessageStore 通过这个转换器。当 Message 只能“声明”一次时,此设置很有用。它默认为false .
自选。 |
8 | 指定向输出通道发送回复消息时等待的最长时间(以毫秒为单位)。它默认为-1 ——无限期封锁。
此属性在Chain 元素。
自选。 |
9 | 定义轮询器。
此元素在Chain 元素。
自选。 |
领取一次
有时,特定消息只能领取一次。以此类推,考虑处理飞机行李的流程。您在出发时托运行李,并在抵达时领取行李。行李一旦被认领,如果不先重新托运,就无法再次领取行李。为了适应这种情况,我们引入了remove-message
boolean 属性claim-check-out
转换器。 此属性设置为false
默认情况下。但是,如果设置为true
,则已声明的邮件将从MessageStore
这样就不能再次认领了。
此功能对存储空间有影响,尤其是在内存中Map
-基于SimpleMessageStore
,其中未能删除消息最终可能导致OutOfMemoryException
. 因此,如果您预计不会提出多个声明,我们建议您将remove-message
属性的值设置为true
.
以下示例展示了如何使用remove-message
属性:
<int:claim-check-out id="checkout"
input-channel="checkoutChannel"
message-store="testMessageStore"
output-channel="output"
remove-message="true"/>
Codec
Spring Integration 的 4.2 版引入了Codec
抽象化。
编解码器对对象进行编码和解码byte[]
.
它们提供了 Java 序列化的替代方案。
一个优点是,通常,对象不需要实现Serializable
.
我们提供了一种使用 Kryo 进行序列化的实现,但您可以提供自己的实现以用于以下任何组件:
-
EncodingPayloadTransformer
-
DecodingTransformer
-
CodecMessageConverter
DecodingTransformer
该转换器解码byte[]
通过使用编解码器。它需要配置Class
对象应解码到的表达式(或解析为Class
). 如果生成的对象是Message<?>
,则不会保留入站标头。
有关更多信息,请参阅 Javadoc。
CodecMessageConverter
某些端点(例如 TCP 和 Redis)没有消息头的概念。它们支持使用MessageConverter
和CodecMessageConverter
可用于将消息转换为byte[]
用于传输。
有关更多信息,请参阅 Javadoc。
克里奥
目前,这是唯一的实现Codec
,它提供了两种Codec
:
-
PojoCodec
:用于转换器 -
MessageCodec
:用于CodecMessageConverter
该框架提供了几个自定义序列化程序:
-
FileSerializer
-
MessageHeadersSerializer
-
MutableMessageHeadersSerializer
第一个可以与PojoCodec
通过使用FileKryoRegistrar
.
第二个和第三个与MessageCodec
,它使用MessageKryoRegistrar
.
自定义 Kryo
默认情况下,Kryo 将未知的 Java 类型委托给其FieldSerializer
.
Kryo 还为每个基元类型注册默认序列化器,以及String
,Collection
和Map
.FieldSerializer
使用反射来导航对象图表。
一种更有效的方法是实现一个自定义序列化器,该序列化器知道对象的结构,并且可以直接序列化选定的基元字段。
以下示例显示了这样的序列化程序:
public class AddressSerializer extends Serializer<Address> {
@Override
public void write(Kryo kryo, Output output, Address address) {
output.writeString(address.getStreet());
output.writeString(address.getCity());
output.writeString(address.getCountry());
}
@Override
public Address read(Kryo kryo, Input input, Class<Address> type) {
return new Address(input.readString(), input.readString(), input.readString());
}
}
这Serializer
接口公开Kryo
,Input
和Output
,提供对包含哪些字段和其他内部设置的完全控制,如 Kryo 文档中所述。
注册自定义序列化程序时,需要注册 ID。 注册 ID 是任意的。 但是,在我们的例子中,必须显式定义 ID,因为分布式应用程序中的每个 Kryo 实例都必须使用相同的 ID。 Kryo 推荐小的正整数并保留几个 id(值< 10)。 Spring Integration 目前默认使用 40、41 和 42(对于前面提到的文件和消息头序列化程序)。 我们建议您从 60 开始,以便在框架中进行扩展。 您可以通过配置前面提到的注册商来覆盖这些框架默认值。 |
使用自定义 Kryo 序列化器
如果您需要自定义序列化,请参阅 Kryo 文档,因为您需要使用本机 API 来进行自定义。
有关示例,请参阅MessageCodec
实现。
实现 KryoSerializable
如果您有write
访问域对象源代码,您可以实现KryoSerializable
如此处所述。
在这种情况下,类本身提供序列化方法,不需要进一步配置。
但是,基准测试表明,这不如显式注册自定义序列化程序那么有效。
以下示例显示了一个自定义的 Kryo 序列化器:
public class Address implements KryoSerializable {
...
@Override
public void write(Kryo kryo, Output output) {
output.writeString(this.street);
output.writeString(this.city);
output.writeString(this.country);
}
@Override
public void read(Kryo kryo, Input input) {
this.street = input.readString();
this.city = input.readString();
this.country = input.readString();
}
}
您还可以使用此技术包装 Kryo 以外的序列化库。
使用@DefaultSerializer
注解
Kryo 还提供了一个@DefaultSerializer
注释,如此处所述。
@DefaultSerializer(SomeClassSerializer.class)
public class SomeClass {
// ...
}
如果您有write
访问域对象,这可能是指定自定义序列化程序的更简单方法。
请注意,这不会向 ID 注册类,这可能会使该技术在某些情况下无济于事。