消息
消息
The Spring Integration Message 是一个通用的数据容器。
任何对象都可以作为载荷提供,并且每个 Message 实例包含标头,其中包含以键值对形式提供的可扩展属性。
这Message接口
以下代码清单展示了 Message 接口的定义:
public interface Message<T> {
T getPayload();
MessageHeaders getHeaders();
}
Message接口是API的核心部分。
通过使用通用包装器封装数据,消息系统可以在不理会数据类型的情况下传递这些数据。
随着应用程序发展以支持新类型或当类型本身被修改或扩展时,消息系统不会受到影响。
另一方面,当消息系统的某些组件需要访问关于Message的信息(如元数据)时,这类信息通常可以存储在消息头的元数据中并从中检索。
消息头
正如 Spring Integration 允许任何 Object 作为 Message 的载荷一样,它还支持任何 Object 类型作为头值。
事实上,MessageHeaders 类实现了 java.util.Map_ interface,如下类定义所示:
public final class MessageHeaders implements Map<String, Object>, Serializable {
...
}
尽管 MessageHeaders 类实现了 Map,但它实际上是一个只读实现。
任何尝试在 Map 中 put 值的操作都会导致 UnsupportedOperationException。
同样适用于 remove 和 clear。
由于消息可能被传递给多个消费者,Map 的结构不能被修改。
同样,消息的有效负载 Object 在初始创建后也不能被 set。
然而,标头值本身(或有效负载对象)的可变性被有意留给框架用户自行决定。 |
作为Map的实现,可以通过调用get(..)并传入头部名称来获取头部信息。
还可以将预期的Class作为一个额外参数提供。
甚至更好的是,在获取预定义值时,方便的方法可以直接使用。
以下示例展示了这三种选项:
Object someValue = message.getHeaders().get("someKey");
CustomerId customerId = message.getHeaders().get("customerId", CustomerId.class);
Long timestamp = message.getHeaders().getTimestamp();
以下表格描述了预定义的消息头:
| Header Name | header type | 用法 |
|---|---|---|
MessageHeaders.ID |
java.util.UUID |
此消息实例的标识符。 每次消息发生变更时都会改变。 |
MessageHeaders. TIMESTAMP |
java.lang.Long |
消息创建的时间。 每次消息发生变更时都会改变。 |
MessageHeaders. REPLY_CHANNEL |
java.lang.Object (String or MessageChannel) |
一个通道,当没有配置显式的输出通道且不存在 |
MessageHeaders. ERROR_CHANNEL |
java.lang.Object (String or MessageChannel) |
一个错误发送的通道。
如果该值是 |
许多内出站适配器实现也提供或期望某些标头,并且你可以配置额外的用户定义标头。
这些标头的常量可以在存在此类标头的模块中找到——例如。AmqpHeaders,JmsHeaders,等等。
MessageHeaderAccessorAPI
自 Spring Framework 4.0 和 Spring Integration 4.0 起,核心消息抽象已移至 spring-messaging 模块,并引入了 MessageHeaderAccessor API,以在消息实现之上提供额外的抽象。
所有(核心)Spring Integration 特定的消息头常量现在均在 IntegrationMessageHeaderAccessor 类中声明。
下表描述了预定义的消息头:
| Header Name | header type | 用法 |
|---|---|---|
IntegrationMessageHeaderAccessor. CORRELATION_ID |
java.lang.Object |
用于关联两个或多个消息。 |
IntegrationMessageHeaderAccessor. SEQUENCE_NUMBER |
java.lang.Integer |
通常,一个序列号可以与一组带有 |
IntegrationMessageHeaderAccessor. SEQUENCE_SIZE |
java.lang.Integer |
消息组中相关消息的数量。 |
IntegrationMessageHeaderAccessor. EXPIRATION_DATE |
java.lang.Long |
表示消息过期的时间。
不是框架直接使用的,但可以通过头部丰富器设置,并在配置了 |
IntegrationMessageHeaderAccessor. PRIORITY |
java.lang.Integer |
消息优先级——例如,在 |
IntegrationMessageHeaderAccessor. DUPLICATE_MESSAGE |
java.lang.Boolean |
如果消息被幂等接收器拦截器识别为重复,则为 true。 参见 幂等接收器企业集成模式。 |
IntegrationMessageHeaderAccessor. CLOSEABLE_RESOURCE |
java.io.Closeable |
此头部仅在消息与应在消息处理完成后关闭的 |
IntegrationMessageHeaderAccessor. DELIVERY_ATTEMPT |
java.lang. AtomicInteger |
如果消息驱动的通道适配器支持配置 |
IntegrationMessageHeaderAccessor. ACKNOWLEDGMENT_CALLBACK |
o.s.i.support. Acknowledgment Callback |
这些头部的一些方便的类型化获取器在IntegrationMessageHeaderAccessor类上提供,如下例所示:
IntegrationMessageHeaderAccessor accessor = new IntegrationMessageHeaderAccessor(message);
int sequenceNumber = accessor.getSequenceNumber();
Object correlationId = accessor.getCorrelationId();
...
以下表格描述了在 IntegrationMessageHeaderAccessor 中出现但在用户代码中通常不使用的标头(即,它们通常被 Spring Integration 的内部部分使用——在此处包括它们是为了完整性):
| Header Name | header type | 用法 |
|---|---|---|
IntegrationMessageHeaderAccessor. SEQUENCE_DETAILS |
java.util. List<List<Object>> |
当需要嵌套相关性时(例如, |
IntegrationMessageHeaderAccessor. ROUTING_SLIP |
java.util. Map<List<Object>, Integer> |
查看 路由单。 |
消息 ID 生成
当消息在应用程序中转换时,每次对其进行修改(例如通过转换器),都会分配一个新的消息 ID。
消息 ID 是一个 UUID。
从 Spring Integration 3.0 开始,用于 IS 生成的默认策略比之前的 java.util.UUID.randomUUID() 实现更高效。
它使用基于安全随机种子的简单随机数,而不是每次都创建一个新的安全随机数。
可以通过在应用程序上下文中声明一个实现UUID生成策略接口的bean来选择不同的UUID生成策略。
一个类加载器中只能使用一种 UUID 生成策略。
这意味着,如果两个或更多应用程序上下文在同一个类加载器中运行,它们将共享相同的策略。
如果一个上下文更改了该策略,所有上下文都将使用该策略。
如果在同一个类加载器中的两个或更多上下文声明了类型为 org.springframework.util.IdGenerator 的 Bean,则它们都必须是同一类的实例。
否则,尝试替换自定义策略的上下文将无法初始化。
如果策略相同但进行了参数化,则将使用第一个被初始化的上下文中定义的策略。 |
除了默认策略外,还提供了两个额外的IdGenerators。
org.springframework.util.JdkIdGenerator使用了之前的UUID.randomUUID()机制。
当你不需要UUID而只需要一个简单的递增值时,可以使用o.s.i.support.IdGenerators.SimpleIncrementingIdGenerator。
只读标头
MessageHeaders.ID 和 MessageHeaders.TIMESTAMP 是只读标头,无法被覆盖。
自 4.3.2 版本起,MessageBuilder 提供了 readOnlyHeaders(String… readOnlyHeaders) API,用于自定义不应从上游 Message 复制的标头列表。
默认情况下,仅 MessageHeaders.ID 和 MessageHeaders.TIMESTAMP 为只读。
全局 spring.integration.readOnly.headers 属性(参见 全局属性)可用于自定义框架组件的 DefaultMessageBuilderFactory。
当您希望不自动填充某些开箱即用的标头(例如由 ObjectToJsonTransformer 提供的 contentType)时,此功能非常有用(参见 JSON 转换器)。
当您尝试使用MessageBuilder构建新消息时,这种头会被忽略,并且会向日志中发出一个特定的INFO消息。
标题传播
当消息由消息生成端点(例如 服务激活器)处理(和修改)时,通常入站头会传播到出站消息。 一个例外是 转换器,当它向框架返回完整消息时。 在这种情况下,用户代码负责整个出站消息。 当转换器仅返回负载(payload)时,入站头会被传播。 此外,只有当出站消息中尚不存在该头时,才会进行传播,从而允许您根据需要修改变量值。
从版本 4.3.10 开始,您可以配置消息处理器(用于修改消息并产生输出)以抑制特定头的传播。
要配置不希望被复制的头,请在 MessageProducingMessageHandler 抽象类上调用 setNotPropagatedHeaders() 或 addNotPropagatedHeaders() 方法。
您也可以通过将readOnlyHeaders属性在META-INF/spring.integration.properties中设置为逗号分隔的头部列表,来全局抑制特定消息头的传播。
从版本 5.0 开始,AbstractMessageProducingHandler上的setNotPropagatedHeaders()实现应用简单模式(xxx*、xxx、*xxx或xxx*yyy),以便过滤具有公共后缀或前缀的头部。
有关更多信息,请参阅PatternMatchUtilsJavadoc。
当其中一个模式为*(星号)时,不传播任何头部。
所有其他模式将被忽略。
在这种情况下,服务激活器的行为与转换器相同,任何必需的头部必须在服务方法返回的Message中提供。notPropagatedHeaders()选项在 Java DSL 的ConsumerEndpointSpec中可用。
它也可作为<service-activator>组件的 XML 配置中的not-propagated-headers属性使用。
消息实现
The base implementation of the Message interface is GenericMessage<T>, and it provides two constructors, shown in the following listing:
new GenericMessage<T>(T payload);
new GenericMessage<T>(T payload, Map<String, Object> headers)
当创建一个 Message 时,会生成一个随机唯一ID。
接受一个 Map 头部的构造函数会将提供的头部复制到新创建的 Message 中。
也有一个方便的实现 Message,用于通信错误条件。
这个实现将一个 Throwable 对象作为其负载,如下例所示:
ErrorMessage message = new ErrorMessage(someThrowable);
Throwable t = message.getPayload();
请注意,此实现利用了GenericMessage基类泛型化的事实。因此,正如两个示例所示,在获取Message负载Object时无需进行类型转换。
这MessageBuilder辅助类
您可能会注意到Message接口定义了用于获取其负载和标头的方法,但没有提供设置器。这是原因在于,一旦Message被创建,就无法对其进行修改。因此,当一个 Message 实例被发送给多个消费者(例如,
通过发布 - 订阅 Channel),如果其中一个消费者需要使用不同的负载类型发送回复,它必须创建一个新的 Message。因此,其他消费者不会受到这些变化的影响。请记住,多个消费者可能会访问同一个负载实例或头信息值,至于该实例是否本身是不可变的,则由您自行决定。换句话说,`Message` 实例的合约类似于不可修改的 `Collection`,而 `MessageHeaders` 映射进一步举例说明了这一点。尽管 MessageHeaders 类实现了 java.util.Map,但在 MessageHeaders 实例上尝试调用 put 操作(或 'remove' 或 'clear')会导致 UnsupportedOperationException。
而不是要求创建并填充一个Map来传递给GenericMessage构造函数,Spring Integration确实提供了一种更方便的方式来构建消息:MessageBuilder。
MessageBuilder提供了两个工厂方法,可以从现有的Message或带有负载Object来创建Message实例。
当从现有Message构建时,该Message的头信息和负载将被复制到新Message中,如以下示例所示:
Message<String> message1 = MessageBuilder.withPayload("test")
.setHeader("foo", "bar")
.build();
Message<String> message2 = MessageBuilder.fromMessage(message1).build();
assertEquals("test", message2.getPayload());
assertEquals("bar", message2.getHeaders().get("foo"));
如果需要创建一个带有新负载的Message,但仍希望从现有的Message复制头信息,可以使用其中一个'copy'方法,如下例所示:
Message<String> message3 = MessageBuilder.withPayload("test3")
.copyHeaders(message1.getHeaders())
.build();
Message<String> message4 = MessageBuilder.withPayload("test4")
.setHeader("foo", 123)
.copyHeadersIfAbsent(message1.getHeaders())
.build();
assertEquals("bar", message3.getHeaders().get("foo"));
assertEquals(123, message4.getHeaders().get("foo"));
注意,copyHeadersIfAbsent方法不会覆盖现有值。
此外,在前面的示例中,您可以看到如何使用setHeader设置任何自定义头。
最后,还有set个方法可供设置预定义头,并且有一个非破坏性的方法用于设置任何头(MessageHeaders还为预定义头名称定义了常量)。
您也可以使用MessageBuilder来设置消息的优先级,如下例所示:
Message<Integer> importantMessage = MessageBuilder.withPayload(99)
.setPriority(5)
.build();
assertEquals(5, importantMessage.getHeaders().getPriority());
Message<Integer> lessImportantMessage = MessageBuilder.fromMessage(importantMessage)
.setHeaderIfAbsent(IntegrationMessageHeaderAccessor.PRIORITY, 2)
.build();
assertEquals(2, lessImportantMessage.getHeaders().getPriority());
The priority 头部仅在使用 PriorityChannel 时考虑(如下一章所述)。
它被定义为一个 java.lang.Integer。