此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring Integration 6.5.1spring-doc.cadn.net.cn

消息

Spring 集成Message是数据的通用容器。 任何对象都可以作为有效负载提供,并且每个Message实例包括包含用户可扩展属性作为键值对的标头。spring-doc.cadn.net.cn

Message接口

以下列表显示了Message接口:spring-doc.cadn.net.cn

public interface Message<T> {

    T getPayload();

    MessageHeaders getHeaders();

}

Message接口是 API 的核心部分。 通过将数据封装在通用包装器中,消息传递系统可以在不了解数据类型的情况下传递数据。 当应用程序发展为支持新类型时,或者当类型本身被修改或扩展时,消息传递系统不会受到影响。 另一方面,当消息传递系统中的某些组件确实需要访问有关Message,此类元数据通常可以存储到消息标头中的元数据中并从中检索。spring-doc.cadn.net.cn

邮件头

就像 Spring Integration 允许任何Object用作Message,它还支持任何Object类型作为标头值。 事实上,MessageHeaders类实现了java.util.Map_ interface,如以下类定义所示:spring-doc.cadn.net.cn

public final class MessageHeaders implements Map<String, Object>, Serializable {
  ...
}
尽管MessageHeaders类实现Map,它实际上是一个只读实现。 任何尝试putMap 中的值会导致UnsupportedOperationException. 这同样适用于removeclear. 由于消息可以传递给多个消费者,因此Map无法修改。 同样,消息的有效负载Object不能set在初始创建之后。 但是,标头值本身(或有效负载对象)的可变性被有意保留为框架用户的决策。

作为Map,可以通过调用get(..)替换为标头的名称。 或者,您可以提供预期的Class作为附加参数。 更好的是,在检索预定义值之一时,可以使用方便的 getter。 以下示例显示了这三个选项中的每一个:spring-doc.cadn.net.cn

Object someValue = message.getHeaders().get("someKey");

CustomerId customerId = message.getHeaders().get("customerId", CustomerId.class);

Long timestamp = message.getHeaders().getTimestamp();

下表描述了预定义的邮件头:spring-doc.cadn.net.cn

表 1.预定义的邮件头
标头名称 标题类型 用法
 MessageHeaders.ID
 java.util.UUID

此消息实例的标识符。 每次消息发生变化时都会发生变化。spring-doc.cadn.net.cn

 MessageHeaders.
TIMESTAMP
 java.lang.Long

消息的创建时间。 每次消息发生变化时都会发生变化。spring-doc.cadn.net.cn

 MessageHeaders.
REPLY_CHANNEL
 java.lang.Object
(String or
MessageChannel)

当未配置显式输出通道且没有显式输出通道时,向其发送回复(如果有)的通道ROUTING_SLIPROUTING_SLIP筋疲力尽。 如果值是String,它必须表示 Bean 名称或由ChannelRegistry.spring-doc.cadn.net.cn

 MessageHeaders.
ERROR_CHANNEL
 java.lang.Object
(String or
MessageChannel)

发送错误的通道。 如果值是String,它必须表示 Bean 名称或由ChannelRegistry.spring-doc.cadn.net.cn

许多入站和出站适配器实现还提供或期望某些标头,您可以配置其他用户定义的标头。 例如,可以在存在此类标头的模块中找到这些标头的常量。AmqpHeaders,JmsHeaders,依此类推。spring-doc.cadn.net.cn

MessageHeaderAccessor应用程序接口

从 Spring Framework 4.0 和 Spring Integration 4.0 开始,核心消息传递抽象已移至spring-messaging模块和MessageHeaderAccessor引入了 API 来提供对消息传递实现的额外抽象。 全部 (core) 特定于 Spring Integration 的消息头常量现在在IntegrationMessageHeaderAccessor类。 下表描述了预定义的邮件头:spring-doc.cadn.net.cn

表 2.预定义的邮件头
标头名称 标题类型 用法
 IntegrationMessageHeaderAccessor.
CORRELATION_ID
 java.lang.Object

用于关联两个或多个消息。spring-doc.cadn.net.cn

 IntegrationMessageHeaderAccessor.
SEQUENCE_NUMBER
 java.lang.Integer

通常,带有一组消息的序列号,带有SEQUENCE_SIZE但也可以用于<resequencer/>重新排序一组无界消息。spring-doc.cadn.net.cn

 IntegrationMessageHeaderAccessor.
SEQUENCE_SIZE
 java.lang.Integer

一组相关消息中的消息数。spring-doc.cadn.net.cn

 IntegrationMessageHeaderAccessor.
EXPIRATION_DATE
 java.lang.Long

指示消息何时过期。 框架不直接使用,但可以使用标头扩充器进行设置,并在<filter/>配置了UnexpiredMessageSelector.spring-doc.cadn.net.cn

 IntegrationMessageHeaderAccessor.
PRIORITY
 java.lang.Integer

消息优先级 — 例如,在PriorityChannel.spring-doc.cadn.net.cn

 IntegrationMessageHeaderAccessor.
DUPLICATE_MESSAGE
 java.lang.Boolean

如果幂等接收方拦截器将消息检测为重复项,则为 true。 请参阅幂等接收方企业集成模式。spring-doc.cadn.net.cn

 IntegrationMessageHeaderAccessor.
CLOSEABLE_RESOURCE
 java.io.Closeable

如果邮件与Closeable应在消息处理完成后关闭。 一个例子是Session与使用 FTP、SFTP 等的流式文件传输相关联。spring-doc.cadn.net.cn

 IntegrationMessageHeaderAccessor.
DELIVERY_ATTEMPT
 java.lang.
AtomicInteger

如果消息驱动的通道适配器支持RetryTemplate,则此标头包含当前投放尝试。spring-doc.cadn.net.cn

 IntegrationMessageHeaderAccessor.
ACKNOWLEDGMENT_CALLBACK
 o.s.i.support.
Acknowledgment
Callback

如果入站终结点支持,则回调以接受、拒绝或重新排队消息。 请参阅延迟确认可轮询消息源MQTT 手动确认spring-doc.cadn.net.cn

其中一些标头的方便类型化 getter 在IntegrationMessageHeaderAccessor类,如以下示例所示:spring-doc.cadn.net.cn

IntegrationMessageHeaderAccessor accessor = new IntegrationMessageHeaderAccessor(message);
int sequenceNumber = accessor.getSequenceNumber();
Object correlationId = accessor.getCorrelationId();
...

下表描述了也出现在IntegrationMessageHeaderAccessor但通常不被用户代码使用(也就是说,它们通常由 Spring Integration 的内部部分使用 - 在这里包含它们是为了完整):spring-doc.cadn.net.cn

表 3.预定义的邮件头
标头名称 标题类型 用法
 IntegrationMessageHeaderAccessor.
SEQUENCE_DETAILS
 java.util.
List<List<Object>>

需要嵌套关联时使用的相关数据堆栈(例如,splitter→…​→splitter→…​→aggregator→…​→aggregator).spring-doc.cadn.net.cn

 IntegrationMessageHeaderAccessor.
ROUTING_SLIP
 java.util.
Map<List<Object>, Integer>

请参阅布线单spring-doc.cadn.net.cn

消息 ID 生成

当消息通过应用程序转换时,每次更改消息时(例如,通过转换器),都会分配一个新的消息标识。 消息 ID 是UUID. 从 Spring Integration 3.0 开始,用于 IS 生成的默认策略比以前的更有效java.util.UUID.randomUUID()实现。 它使用基于安全随机种子的简单随机数,而不是每次都创建安全随机数。spring-doc.cadn.net.cn

可以通过声明实现org.springframework.util.IdGenerator在应用程序上下文中。spring-doc.cadn.net.cn

一个类加载器中只能使用一个 UUID 生成策略。这意味着,如果两个或多个应用程序上下文在同一类加载器中运行,它们将共享相同的策略。如果其中一个上下文更改了策略,则所有上下文都会使用它。如果同一类加载器中的两个或多个上下文声明了类型为org.springframework.util.IdGenerator,它们必须都是同一类的实例。否则,尝试替换自定义策略的上下文无法初始化。如果策略相同但参数化,则使用要初始化的第一个上下文中的策略。

除了默认策略外,另外两个IdGenerators被提供。org.springframework.util.JdkIdGenerator使用前面的UUID.randomUUID()机制。 您可以使用o.s.i.support.IdGenerators.SimpleIncrementingIdGenerator当 UUID 并不真正需要并且简单的递增值就足够时。spring-doc.cadn.net.cn

只读标头

MessageHeaders.IDMessageHeaders.TIMESTAMP是只读标头,无法覆盖。spring-doc.cadn.net.cn

从 4.3.2 版本开始,MessageBuilder提供readOnlyHeaders(String…​ readOnlyHeaders)用于自定义不应从上游复制的标头列表的 APIMessage. 只有MessageHeaders.IDMessageHeaders.TIMESTAMP默认情况下是只读的。全局spring.integration.readOnly.headers属性(参见全局属性)来自定义DefaultMessageBuilderFactory对于框架组件。当您不想填充一些现成的标头(例如contentType通过ObjectToJsonTransformer(参见 JSON 转换器)。spring-doc.cadn.net.cn

当您尝试使用MessageBuilder,这种标头将被忽略,并且特定的INFO消息发送到日志。spring-doc.cadn.net.cn

从版本 5.0 开始,Messaging GatewayHeader EnricherContent EnricherHeader Filter 不允许您配置MessageHeaders.IDMessageHeaders.TIMESTAMP标头名称DefaultMessageBuilderFactory被使用,他们扔BeanInitializationException.spring-doc.cadn.net.cn

标头传播

当消息生成端点(如服务激活器)处理(和修改)消息时,通常,入站标头会传播到出站消息。一个例外是转换器,当将完整的消息返回到框架时。在这种情况下,用户代码负责整个出站消息。当转换器仅返回有效负载时,入站标头会传播。此外,仅当标头在出站消息中尚不存在时才会传播,从而允许您根据需要更改标头值。spring-doc.cadn.net.cn

从版本 4.3.10 开始,您可以配置消息处理程序(用于修改消息并生成输出)以抑制特定标头的传播。要配置标头,您不希望被复制,请调用setNotPropagatedHeaders()addNotPropagatedHeaders()方法MessageProducingMessageHandlerabstract 类。spring-doc.cadn.net.cn

您还可以通过设置readOnlyHeaders属性META-INF/spring.integration.properties设置为以逗号分隔的标头列表。spring-doc.cadn.net.cn

从 5.0 版开始,setNotPropagatedHeaders()AbstractMessageProducingHandler应用简单模式 (xxx*,*xxx,*xxx*xxx*yyy) 以允许过滤具有通用后缀或前缀的标头。 看PatternMatchUtilsJavadoc了解更多信息。 当其中一种模式为(星号)时,不会传播标头。 所有其他模式都将被忽略。 在这种情况下,服务激活器的行为方式与转换器相同,并且必须在*Message从 service 方法返回。 这notPropagatedHeaders()选项在ConsumerEndpointSpec对于 Java DSL 它也可用于<service-activator>组件作为not-propagated-headers属性。spring-doc.cadn.net.cn

标头传播抑制不适用于不修改消息的终端,例如网桥路由器

消息实现

的基本实现Message接口是GenericMessage<T>,它提供了两个构造函数,如以下列表所示:spring-doc.cadn.net.cn

new GenericMessage<T>(T payload);

new GenericMessage<T>(T payload, Map<String, Object> headers)

Message创建时,将生成一个随机的唯一 ID。 接受Mapof headers 将提供的标头复制到新创建的Message.spring-doc.cadn.net.cn

还有一个方便的实现Message旨在传达错误条件。 此实现采用Throwableobject 作为其有效负载,如以下示例所示:spring-doc.cadn.net.cn

ErrorMessage message = new ErrorMessage(someThrowable);

Throwable t = message.getPayload();

请注意,此实现利用了以下事实:GenericMessage基类是参数化的。 因此,如两个示例所示,在检索Message有效载荷Object.spring-doc.cadn.net.cn

提到的Message类实现是不可变的。 在某些情况下,当可变性不是问题,并且应用程序的逻辑经过精心设计以避免并发修改时,一个MutableMessage可以使用。spring-doc.cadn.net.cn

MessageBuilder辅助类

您可能会注意到Message接口定义其有效负载和标头的检索方法,但不提供 setter。 这样做的原因是Message初始创建后无法修改。 因此,当Message实例发送给多个消费者(例如,通过发布-订阅通道),如果其中一个消费者需要发送具有不同有效负载类型的回复,则必须创建一个新的Message. 因此,其他消费者不会受到这些更改的影响。 请记住,多个使用者可能会访问相同的有效负载实例或标头值,而此类实例本身是否不可变是留给您的决定。 换句话说,合同Message实例类似于不可修改的实例CollectionMessageHeaders地图进一步举例说明了这一点。 尽管MessageHeaders类实现java.util.Map,任何调用put作(或“删除”或“清除”)对MessageHeaders实例结果为UnsupportedOperationException.spring-doc.cadn.net.cn

Spring Integration 不需要创建和填充 Map 以传递给 GenericMessage 构造函数,而是提供了一种更方便的方式来构造 Messages:MessageBuilder. 这MessageBuilder提供了两种工厂方法来创建Message实例来自现有的Message或带有有效载荷Object. 从现有Message,标头和有效负载Message被复制到新的Message,如以下示例所示:spring-doc.cadn.net.cn

Message<String> message1 = MessageBuilder.withPayload("test")
        .setHeader("foo", "bar")
        .build();

Message<String> message2 = MessageBuilder.fromMessage(message1).build();

assertEquals("test", message2.getPayload());
assertEquals("bar", message2.getHeaders().get("foo"));

如果您需要创建Message具有新有效负载,但仍希望从现有有效负载中复制标头Message,您可以使用其中一种“copy”方法,如以下示例所示:spring-doc.cadn.net.cn

Message<String> message3 = MessageBuilder.withPayload("test3")
        .copyHeaders(message1.getHeaders())
        .build();

Message<String> message4 = MessageBuilder.withPayload("test4")
        .setHeader("foo", 123)
        .copyHeadersIfAbsent(message1.getHeaders())
        .build();

assertEquals("bar", message3.getHeaders().get("foo"));
assertEquals(123, message4.getHeaders().get("foo"));

请注意,copyHeadersIfAbsent方法不会覆盖现有值。 此外,在前面的示例中,您可以看到如何使用setHeader. 最后,还有set可用于预定义标头的方法以及用于设置任何标头(MessageHeaders还定义了预定义标头名称的常量)。spring-doc.cadn.net.cn

您还可以使用MessageBuilder设置消息的优先级,如以下示例所示:spring-doc.cadn.net.cn

Message<Integer> importantMessage = MessageBuilder.withPayload(99)
        .setPriority(5)
        .build();

assertEquals(5, importantMessage.getHeaders().getPriority());

Message<Integer> lessImportantMessage = MessageBuilder.fromMessage(importantMessage)
        .setHeaderIfAbsent(IntegrationMessageHeaderAccessor.PRIORITY, 2)
        .build();

assertEquals(2, lessImportantMessage.getHeaders().getPriority());

priorityheader 仅在使用PriorityChannel(如下一章所述)。 它被定义为java.lang.Integer.spring-doc.cadn.net.cn

MutableMessageBuilder提供处理MutableMessage实例。 此类的逻辑是创建一个MutableMessage或者保持原样并通过构建器方法更改其内容。 这样,当不变性不是消息交换的问题时,正在运行的应用程序的性能会略有提高。spring-doc.cadn.net.cn

从 6.4 版开始,一个BaseMessageBuilder类从MessageBuilder以简化默认消息构建逻辑的扩展。 例如,与自定义MessageBuilderFactory,一个自定义BaseMessageBuilder实现可以在应用程序上下文中全局使用,以提供自定义Message实例。 特别是,GenericMessage.toString()方法可以覆盖,以便在记录此类消息时从有效负载和标头中隐藏敏感信息。

MessageBuilderFactory抽象化

MessageBuilderFactorybean 与IntegrationUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAME全局注册到应用程序上下文中,并在框架中的任何地方使用,以创建Message实例。 默认情况下,它是DefaultMessageBuilderFactory. 开箱即用,该框架还提供了一个MutableMessageBuilderFactory创建MutableMessage实例。 自定义Message实例创建,一个MessageBuilderFactorybean 与IntegrationUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAME必须在目标应用程序上下文中提供才能覆盖默认上下文。 例如,自定义MessageBuilderFactory可以注册以实现BaseMessageBuilder我们想在其中提供GenericMessage扩展名,重写toString()在记录此类消息时从有效负载和标头中隐藏敏感信息。spring-doc.cadn.net.cn

这些类的一些快速实现以演示个人身份信息缓解措施可以如下所示:spring-doc.cadn.net.cn

class PiiMessageBuilderFactory implements MessageBuilderFactory {

	@Override
	public <T> PiiMessageBuilder<T> fromMessage(Message<T> message) {
	    return new PiiMessageBuilder<>(message.getPayload(), message);
	}

	@Override
	public <T> PiiMessageBuilder<T> withPayload(T payload) {
	    return new PiiMessageBuilder<>(payload, null);
	}

}

class PiiMessageBuilder<P> extends BaseMessageBuilder<P, PiiMessageBuilder<P>> {

    public PiiMessageBuilder(P payload, @Nullable Message<P> originalMessage) {
        super(payload, originalMessage);
    }

    @Override
    public Message<P> build() {
        return new PiiMessage<>(getPayload(), getHeaders());
    }

}

class PiiMessage<P> extends GenericMessage<P> {

    @Serial
    private static final long serialVersionUID = -354503673433669578L;

    public PiiMessage(P payload, Map<String, Object> headers) {
        super(payload, headers);
    }

    @Override
    public String toString() {
        return "PiiMessage [payload=" + getPayload() + ", headers=" + maskHeaders(getHeaders()) + ']';
    }

    private static Map<String, Object> maskHeaders(Map<String, Object> headers) {
        return headers.entrySet()
                .stream()
                .map((entry) -> entry.getKey().equals("password") ? Map.entry(entry.getKey(), "******") : entry)
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

}

然后这个PiiMessageBuilderFactory可以注册为 bean,并且每当框架记录消息时(例如,在errorChannel)、password标头将被屏蔽。spring-doc.cadn.net.cn