此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring Integration 6.5.1! |
消息
Spring 集成Message
是数据的通用容器。
任何对象都可以作为有效负载提供,并且每个Message
实例包括包含用户可扩展属性作为键值对的标头。
这Message
接口
以下列表显示了Message
接口:
public interface Message<T> {
T getPayload();
MessageHeaders getHeaders();
}
这Message
接口是 API 的核心部分。
通过将数据封装在通用包装器中,消息传递系统可以在不了解数据类型的情况下传递数据。
当应用程序发展为支持新类型时,或者当类型本身被修改或扩展时,消息传递系统不会受到影响。
另一方面,当消息传递系统中的某些组件确实需要访问有关Message
,此类元数据通常可以存储到消息标头中的元数据中并从中检索。
邮件头
就像 Spring Integration 允许任何Object
用作Message
,它还支持任何Object
类型作为标头值。
事实上,MessageHeaders
类实现了java.util.Map_ interface
,如以下类定义所示:
public final class MessageHeaders implements Map<String, Object>, Serializable {
...
}
尽管MessageHeaders 类实现Map ,它实际上是一个只读实现。
任何尝试put Map 中的值会导致UnsupportedOperationException .
这同样适用于remove 和clear .
由于消息可以传递给多个消费者,因此Map 无法修改。
同样,消息的有效负载Object 不能set 在初始创建之后。
但是,标头值本身(或有效负载对象)的可变性被有意保留为框架用户的决策。 |
作为Map
,可以通过调用get(..)
替换为标头的名称。
或者,您可以提供预期的Class
作为附加参数。
更好的是,在检索预定义值之一时,可以使用方便的 getter。
以下示例显示了这三个选项中的每一个:
Object someValue = message.getHeaders().get("someKey");
CustomerId customerId = message.getHeaders().get("customerId", CustomerId.class);
Long timestamp = message.getHeaders().getTimestamp();
下表描述了预定义的邮件头:
标头名称 | 标题类型 | 用法 |
---|---|---|
MessageHeaders.ID |
java.util.UUID |
此消息实例的标识符。 每次消息发生变化时都会发生变化。 |
MessageHeaders. TIMESTAMP |
java.lang.Long |
消息的创建时间。 每次消息发生变化时都会发生变化。 |
MessageHeaders. REPLY_CHANNEL |
java.lang.Object (String or MessageChannel) |
当未配置显式输出通道且没有显式输出通道时,向其发送回复(如果有)的通道 |
MessageHeaders. ERROR_CHANNEL |
java.lang.Object (String or MessageChannel) |
发送错误的通道。
如果值是 |
许多入站和出站适配器实现还提供或期望某些标头,您可以配置其他用户定义的标头。
例如,可以在存在此类标头的模块中找到这些标头的常量。AmqpHeaders
,JmsHeaders
,依此类推。
MessageHeaderAccessor
应用程序接口
从 Spring Framework 4.0 和 Spring Integration 4.0 开始,核心消息传递抽象已移至spring-messaging
模块和MessageHeaderAccessor
引入了 API 来提供对消息传递实现的额外抽象。
所有(核心)特定于 Spring Integration 的消息头常量现在都在IntegrationMessageHeaderAccessor
类。
下表描述了预定义的邮件头:
标头名称 | 标题类型 | 用法 |
---|---|---|
IntegrationMessageHeaderAccessor. CORRELATION_ID |
java.lang.Object |
用于关联两个或多个消息。 |
IntegrationMessageHeaderAccessor. SEQUENCE_NUMBER |
java.lang.Integer |
通常,带有一组消息的序列号,带有 |
IntegrationMessageHeaderAccessor. SEQUENCE_SIZE |
java.lang.Integer |
一组相关消息中的消息数。 |
IntegrationMessageHeaderAccessor. EXPIRATION_DATE |
java.lang.Long |
指示消息何时过期。
框架不直接使用,但可以使用标头扩充器进行设置,并在 |
IntegrationMessageHeaderAccessor. PRIORITY |
java.lang.Integer |
消息优先级 — 例如,在 |
IntegrationMessageHeaderAccessor. DUPLICATE_MESSAGE |
java.lang.Boolean |
如果幂等接收方拦截器将消息检测为重复项,则为 true。 请参阅幂等接收方企业集成模式。 |
IntegrationMessageHeaderAccessor. CLOSEABLE_RESOURCE |
java.io.Closeable |
如果邮件与 |
IntegrationMessageHeaderAccessor. DELIVERY_ATTEMPT |
java.lang. AtomicInteger |
如果消息驱动的通道适配器支持 |
IntegrationMessageHeaderAccessor. ACKNOWLEDGMENT_CALLBACK |
o.s.i.support. Acknowledgment Callback |
其中一些标头的方便类型化 getter 在IntegrationMessageHeaderAccessor
类,如以下示例所示:
IntegrationMessageHeaderAccessor accessor = new IntegrationMessageHeaderAccessor(message);
int sequenceNumber = accessor.getSequenceNumber();
Object correlationId = accessor.getCorrelationId();
...
下表描述了也出现在IntegrationMessageHeaderAccessor
但通常不被用户代码使用(也就是说,它们通常由 Spring Integration 的内部部分使用 - 在这里包含它们是为了完整):
标头名称 | 标题类型 | 用法 |
---|---|---|
IntegrationMessageHeaderAccessor. SEQUENCE_DETAILS |
java.util. List<List<Object>> |
需要嵌套关联时使用的相关数据堆栈(例如, |
IntegrationMessageHeaderAccessor. ROUTING_SLIP |
java.util. Map<List<Object>, Integer> |
请参阅布线单。 |
消息 ID 生成
当消息在应用程序中转换时,每次更改时(例如
通过转换器)分配新的消息 ID。
消息 ID 是UUID
.
从 Spring Integration 3.0 开始,用于 IS 生成的默认策略比以前的更有效java.util.UUID.randomUUID()
实现。
它使用基于安全随机种子的简单随机数,而不是每次都创建安全随机数。
可以通过声明实现org.springframework.util.IdGenerator
在应用程序上下文中。
类加载器中只能使用一种 UUID 生成策略。
这意味着,如果两个或多个应用程序上下文在同一类加载器中运行,它们将共享相同的策略。
如果其中一个上下文更改了策略,则所有上下文都会使用它。
如果同一类加载器中的两个或多个上下文声明了类型为org.springframework.util.IdGenerator ,它们都必须是同一类的实例。
否则,尝试替换自定义策略的上下文将无法初始化。
如果策略相同但已参数化,则使用要初始化的第一个上下文中的策略。 |
除了默认策略外,另外两个IdGenerators
被提供。org.springframework.util.JdkIdGenerator
使用前面的UUID.randomUUID()
机制。
您可以使用o.s.i.support.IdGenerators.SimpleIncrementingIdGenerator
当 UUID 并不真正需要并且简单的递增值就足够时。
只读标头
这MessageHeaders.ID
和MessageHeaders.TIMESTAMP
是只读标头,无法覆盖。
从 4.3.2 版本开始,MessageBuilder
提供readOnlyHeaders(String… readOnlyHeaders)
用于自定义不应从上游复制的标头列表的 APIMessage
.
只有MessageHeaders.ID
和MessageHeaders.TIMESTAMP
默认情况下为只读。
全局spring.integration.readOnly.headers
属性(参见全局属性)来自定义DefaultMessageBuilderFactory
对于框架组件。
当您不想填充一些现成的标头(例如contentType
通过ObjectToJsonTransformer
(参见 JSON 转换器)。
当您尝试使用MessageBuilder
,这种标头将被忽略,并且特定的INFO
消息发送到日志。
从版本 5.0 开始,Messaging Gateway、Header Enricher、Content Enricher 和 Header Filter 不允许您配置MessageHeaders.ID
和MessageHeaders.TIMESTAMP
标头名称DefaultMessageBuilderFactory
被使用,他们扔BeanInitializationException
.
标头传播
当消息生成终结点(如服务激活器)处理(和修改)消息时,通常,入站标头会传播到出站消息。 一个例外是转换器,当一个完整的消息返回到框架时。 在这种情况下,用户代码负责整个出站消息。 当转换器仅返回有效负载时,入站标头会传播。 此外,仅当标头在出站消息中尚不存在时才会传播,允许您根据需要更改标头值。
从 V4.3.10 开始,您可以配置消息处理程序(用于修改消息并生成输出)以抑制特定标头的传播。
要配置您不想复制的标头,请调用setNotPropagatedHeaders()
或addNotPropagatedHeaders()
方法MessageProducingMessageHandler
abstract 类。
您还可以通过设置readOnlyHeaders
属性META-INF/spring.integration.properties
设置为以逗号分隔的标头列表。
从 5.0 版开始,setNotPropagatedHeaders()
在AbstractMessageProducingHandler
应用简单模式 (xxx*
,xxx
,*xxx
或xxx*yyy
) 以允许过滤具有通用后缀或前缀的标头。
看PatternMatchUtils
Javadoc了解更多信息。
当其中一种模式为(星号)时,不会传播标头。
所有其他模式都将被忽略。
在这种情况下,服务激活器的行为方式与转换器相同,并且必须在*
Message
从 service 方法返回。
这notPropagatedHeaders()
选项在ConsumerEndpointSpec
对于 Java DSL
它也可用于<service-activator>
组件作为not-propagated-headers
属性。
消息实现
的基本实现Message
接口是GenericMessage<T>
,它提供了两个构造函数,如以下列表所示:
new GenericMessage<T>(T payload);
new GenericMessage<T>(T payload, Map<String, Object> headers)
当Message
创建时,将生成一个随机的唯一 ID。
接受Map
of headers 将提供的标头复制到新创建的Message
.
还有一个方便的实现Message
旨在传达错误条件。
此实现采用Throwable
object 作为其有效负载,如以下示例所示:
ErrorMessage message = new ErrorMessage(someThrowable);
Throwable t = message.getPayload();
请注意,此实现利用了以下事实:GenericMessage
基类是参数化的。
因此,如两个示例所示,在检索Message
有效载荷Object
.
这MessageBuilder
辅助类
您可能会注意到Message
接口定义其有效负载和标头的检索方法,但不提供 setter。
这样做的原因是Message
初始创建后无法修改。
因此,当Message
实例被发送给多个消费者(例如,
通过发布-订阅通道),如果其中一个消费者需要发送具有不同有效负载类型的回复,则必须创建一个新的Message
.
因此,其他消费者不会受到这些更改的影响。
请记住,多个使用者可能会访问相同的有效负载实例或标头值,而此类实例本身是否不可变是留给您的决定。
换句话说,合同Message
实例类似于不可修改的实例Collection
和MessageHeaders
地图进一步举例说明了这一点。
尽管MessageHeaders
类实现java.util.Map
,任何调用put
作(或“删除”或“清除”)对MessageHeaders
实例结果为UnsupportedOperationException
.
Spring Integration 不需要创建和填充 Map 以传递给 GenericMessage 构造函数,而是提供了一种更方便的方式来构造 Messages:MessageBuilder
.
这MessageBuilder
提供了两种工厂方法来创建Message
实例来自现有的Message
或带有有效载荷Object
.
从现有Message
,标头和有效负载Message
被复制到新的Message
,如以下示例所示:
Message<String> message1 = MessageBuilder.withPayload("test")
.setHeader("foo", "bar")
.build();
Message<String> message2 = MessageBuilder.fromMessage(message1).build();
assertEquals("test", message2.getPayload());
assertEquals("bar", message2.getHeaders().get("foo"));
如果您需要创建Message
具有新有效负载,但仍希望从现有有效负载中复制标头Message
,您可以使用其中一种“copy”方法,如以下示例所示:
Message<String> message3 = MessageBuilder.withPayload("test3")
.copyHeaders(message1.getHeaders())
.build();
Message<String> message4 = MessageBuilder.withPayload("test4")
.setHeader("foo", 123)
.copyHeadersIfAbsent(message1.getHeaders())
.build();
assertEquals("bar", message3.getHeaders().get("foo"));
assertEquals(123, message4.getHeaders().get("foo"));
请注意,copyHeadersIfAbsent
方法不会覆盖现有值。
此外,在前面的示例中,您可以看到如何使用setHeader
.
最后,还有set
可用于预定义标头的方法以及用于设置任何标头(MessageHeaders
还定义了预定义标头名称的常量)。
您还可以使用MessageBuilder
设置消息的优先级,如以下示例所示:
Message<Integer> importantMessage = MessageBuilder.withPayload(99)
.setPriority(5)
.build();
assertEquals(5, importantMessage.getHeaders().getPriority());
Message<Integer> lessImportantMessage = MessageBuilder.fromMessage(importantMessage)
.setHeaderIfAbsent(IntegrationMessageHeaderAccessor.PRIORITY, 2)
.build();
assertEquals(2, lessImportantMessage.getHeaders().getPriority());
这priority
header 仅在使用PriorityChannel
(如下一章所述)。
它被定义为java.lang.Integer
.