STOMP 支持

STOMP 支持

Spring Integration 4.2 版本引入了 STOMP(面向简单文本的消息协议)客户端支持。 它基于 Spring Framework 消息模块中 stomp 包的架构、基础设施和 API。 Spring Integration 使用了大量 Spring STOMP 组件(例如 StompSessionStompClientSupport)。 更多信息,请参阅 Spring Framework 参考手册中的 Spring Framework STOMP 支持 章节。spring-doc.cadn.net.cn

您需要将以下依赖项包含到您的项目中:spring-doc.cadn.net.cn

Maven
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stomp</artifactId>
    <version>6.1.9</version>
</dependency>
Gradle
compile "org.springframework.integration:spring-integration-stomp:6.1.9"

对于服务器端组件,您需要添加 org.springframework:spring-websocket 和/或 io.projectreactor.netty:reactor-netty 依赖项。spring-doc.cadn.net.cn

概述

要配置 STOMP,您应从 STOMP 客户端对象开始。 Spring Framework 提供以下实现:spring-doc.cadn.net.cn

  • WebSocketStompClient: 基于 Spring WebSocket API 构建,支持标准的 JSR-356 WebSocket、Jetty 9,以及使用 SockJS Client 进行基于 HTTP 的 WebSocket 模拟(SockJS)。spring-doc.cadn.net.cn

  • ReactorNettyTcpStompClient: 基于 ReactorNettyTcpClient 构建,源自 reactor-netty 项目。spring-doc.cadn.net.cn

您可以提供任何其他 StompClientSupport 实现。 请参阅这些类的 Javadocspring-doc.cadn.net.cn

The StompClientSupport 类被设计为一个工厂,用于为提供的StompSessionHandler生成一个StompSession,所有其余工作均通过对该StompSessionHandlerStompSession抽象的回调完成。 借助 Spring Integration 的适配器抽象,我们需要提供一个受管理的共享对象,以将我们的应用程序表示为具有唯一会话的 STOMP 客户端。 为此,Spring Integration 提供了StompSessionManager抽象,用于管理任何提供的StompSessionHandler之间的单个StompSession。 这使得可以为特定的 STOMP 代理使用入站出站通道适配器(或两者)。 有关更多信息,请参阅StompSessionManager(及其实现)的 Javadocs。spring-doc.cadn.net.cn

STOMP 入站通道适配器

The StompInboundChannelAdapter 是一个一站式 MessageProducer 组件,它将您的 Spring Integration 应用程序订阅到提供的 STOMP 目的地,并从这些目的地接收消息(通过连接到 StompSession 上提供的 MessageConverter 将 STOMP 帧转换为消息)。 您可以使用 StompInboundChannelAdapter 上的适当 @ManagedOperation 注解在运行时更改目的地(从而更改 STOMP 订阅)。spring-doc.cadn.net.cn

有关更多配置选项,请参见 STOMP 命名空间支持 以及 StompInboundChannelAdapter Javadocspring-doc.cadn.net.cn

STOMP 出站通道适配器

The StompMessageHandler<int-stomp:outbound-channel-adapter>MessageHandler,用于通过 StompSession(由共享的 StompSessionManager 提供)将出站的 Message<?> 实例发送到 STOMP destination(预配置或在运行时通过 SpEL 表达式确定)。spring-doc.cadn.net.cn

有关更多配置选项,请参阅 STOMP 命名空间支持 以及 StompMessageHandler Javadocspring-doc.cadn.net.cn

STOMP 头映射

STOMP 协议在其帧中提供头部信息。 STOMP 帧的整个结构格式如下:spring-doc.cadn.net.cn

....
COMMAND
header1:value1
header2:value2

Body^@
....

Spring Framework 提供了 StompHeaders 来表示这些头信息。 有关更多详细信息,请参阅 Javadoc。 STOMP 帧被转换为 Message<?> 实例并从其转换,这些头信息被映射到 MessageHeaders 实例并从中映射。 Spring Integration 为 STOMP 适配器提供了一个默认的 HeaderMapper 实现。 该实现是 StompHeaderMapper。 它分别为入站和出站适配器提供 fromHeaders()toHeaders() 操作。spring-doc.cadn.net.cn

与许多其他 Spring Integration 模块一样,IntegrationStompHeaders 类用于将标准 STOMP 标头映射到 MessageHeaders,其中 stomp_ 作为标头名称前缀。 此外,所有带有该前缀的 MessageHeaders 实例在发送到目标地址时都会被映射到 StompHeadersspring-doc.cadn.net.cn

有关更多信息,请参阅这些类的 Javadoc 以及 STOMP 命名空间支持 中的 mapped-headers 属性描述。spring-doc.cadn.net.cn

STOMP 集成事件

许多 STOMP 操作是异步的,包括错误处理。 例如,当客户端通过在帧中添加 RECEIPT 头请求时,STOMP 会返回一个 RECEIPT 服务器帧。 为了访问这些异步事件,Spring Integration 会发出 StompIntegrationEvent 实例,您可以通过实现 ApplicationListener 或使用 <int-event:inbound-channel-adapter> 来获取它们(请参阅 接收 Spring 应用程序事件)。spring-doc.cadn.net.cn

具体而言,当 StompExceptionEvent 因无法连接到 STOMP 代理而失败时,AbstractStompSessionManager 会发出一个 StompExceptionEvent。 另一个例子是 StompMessageHandler。 它处理来自 ERROR 的 STOMP 帧,这些帧是服务器对由该 StompMessageHandler 发送的不正确(未被接受)消息的响应。spring-doc.cadn.net.cn

The StompMessageHandler 作为异步响应的一部分,向 StompSession 发送消息时,会在 StompSession.Receiptable 回调中发出 StompReceiptEvent。 The StompReceiptEvent 可以是正值或负值,取决于是否在 receiptTimeLimit 周期内从服务器收到了 RECEIPT 帧,该周期可在 StompClientSupport 实例上进行配置。 默认值为 15 * 1000(单位为毫秒,即 15 秒)。spring-doc.cadn.net.cn

只有当要发送的消息的 RECEIPT STOMP 头不为 null 时,才会添加 StompSession.Receiptable 回调。 您可以在 StompSession 上通过其 autoReceipt 选项以及在 StompSessionManager 上分别启用自动 RECEIPT 头的生成。

有关如何配置 Spring Integration 以接受这些 ApplicationEvent 实例的更多信息,请参阅 STOMP 适配器 Java 配置spring-doc.cadn.net.cn

STOMP 适配器 Java 配置

以下示例展示了 STOMP 适配器的全面 Java 配置:spring-doc.cadn.net.cn

@Configuration
@EnableIntegration
public class StompConfiguration {

    @Bean
    public ReactorNettyTcpStompClient stompClient() {
        ReactorNettyTcpStompClient stompClient = new ReactorNettyTcpStompClient("127.0.0.1", 61613);
        stompClient.setMessageConverter(new PassThruMessageConverter());
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.afterPropertiesSet();
        stompClient.setTaskScheduler(taskScheduler);
        stompClient.setReceiptTimeLimit(5000);
        return stompClient;
    }

    @Bean
    public StompSessionManager stompSessionManager() {
        ReactorNettyTcpStompSessionManager stompSessionManager = new ReactorNettyTcpStompSessionManager(stompClient());
        stompSessionManager.setAutoReceipt(true);
        return stompSessionManager;
    }

    @Bean
    public PollableChannel stompInputChannel() {
        return new QueueChannel();
    }

    @Bean
    public StompInboundChannelAdapter stompInboundChannelAdapter() {
        StompInboundChannelAdapter adapter =
        		new StompInboundChannelAdapter(stompSessionManager(), "/topic/myTopic");
        adapter.setOutputChannel(stompInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "stompOutputChannel")
    public MessageHandler stompMessageHandler() {
        StompMessageHandler handler = new StompMessageHandler(stompSessionManager());
        handler.setDestination("/topic/myTopic");
        return handler;
    }

    @Bean
    public PollableChannel stompEvents() {
        return new QueueChannel();
    }

    @Bean
    public ApplicationListener<ApplicationEvent> stompEventListener() {
        ApplicationEventListeningMessageProducer producer = new ApplicationEventListeningMessageProducer();
        producer.setEventTypes(StompIntegrationEvent.class);
        producer.setOutputChannel(stompEvents());
        return producer;
    }

}

STOMP 命名空间支持

Spring Integration STOMP 命名空间实现了入站和出站通道适配器组件。 要将其包含在配置中,请在您的应用程序上下文配置文件中提供以下命名空间声明:spring-doc.cadn.net.cn

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-stomp="http://www.springframework.org/schema/integration/stomp"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/stomp
    https://www.springframework.org/schema/integration/stomp/spring-integration-stomp.xsd">
    ...
</beans>

理解<int-stomp:outbound-channel-adapter>元素

以下列表显示了 STOMP 出站通道适配器的可用属性:spring-doc.cadn.net.cn

<int-stomp:outbound-channel-adapter
                           id=""                      (1)
                           channel=""                 (2)
                           stomp-session-manager=""   (3)
                           header-mapper=""           (4)
                           mapped-headers=""          (5)
                           destination=""             (6)
                           destination-expression=""  (7)
                           auto-startup=""            (8)
                           phase=""/>                 (9)
1 组件 bean 的名称。 The MessageHandler 已使用别名 id 加上 .handler 注册为 bean。 如果您未设置 channel 属性,则会创建一个 DirectChannel,并将其注册到应用程序上下文中,该属性的值将作为 bean 名称。id。 在这种情况下,端点使用 bean 名称 id 加上 .adapter 进行注册。
2 如果存在id,则标识与此适配器关联的通道。 请参见id。 可选。
3 StompSessionManager Bean 的引用,该 Bean 封装了底层连接和 StompSession 处理操作。 必需。
4 引用一个实现了HeaderMapper<StompHeaders>的 Bean,该 Bean 负责将 Spring Integration MessageHeaders与 STOMP 帧头之间进行映射。 它与mapped-headers互斥。 其默认值为StompHeaderMapper
5 以逗号分隔的 STOMP 标头名称列表,这些标头将被映射到 STOMP 帧标头。 仅当未设置 header-mapper 引用时方可提供此内容。 该列表中的值也可以是简单模式,用于与标头名称进行匹配(例如 myheader**myheader)。 特殊标记(STOMP_OUTBOUND_HEADERS)代表所有标准 STOMP 标头(如 content-length、receipt、heart-beat 等)。 默认情况下会包含这些标头。 如果您希望添加自定义标头,同时要求标准标头也被映射,则必须包含此标记,或者通过使用 header-mapper 提供您自己的 HeaderMapper 实现。
6 发送 STOMP 消息的目标名称。 它与 destination-expression 互斥。
7 一个在运行时针对每个 Spring Integration Message(作为根对象)求值的 SpEL 表达式。 它与 destination 互斥。
8 Boolean 值,指示此端点是否应自动启动。 它默认为 true
9 该端点应启动和停止的生命周期阶段。 值越小,该端点启动越早,停止越晚。 默认值为 Integer.MIN_VALUE。 值可以为负数。 参见 SmartLifeCycle

理解<int-stomp:inbound-channel-adapter>元素

以下代码片段显示了 STOMP 入站通道适配器的可用属性:spring-doc.cadn.net.cn

<int-stomp:inbound-channel-adapter
                           id=""                     (1)
                           channel=""                (2)
                           error-channel=""          (3)
                           stomp-session-manager=""  (4)
                           header-mapper=""          (5)
                           mapped-headers=""         (6)
                           destinations=""           (7)
                           send-timeout=""           (8)
                           payload-type=""           (9)
                           auto-startup=""           (10)
                           phase=""/>                (11)
1 组件 bean 的名称。 如果您未设置 channel 属性,系统将创建一个 DirectChannel 并将其注册到应用程序上下文中,该 bean 的名称为此 id 属性的值。 在此情况下,端点将以 bean 名称 id 加上 .adapter 的形式进行注册。
2 标识与此适配器关联的通道。
3 应发送 ErrorMessage 个实例的目标 MessageChannel bean 引用。
4 <int-stomp:outbound-channel-adapter> 上查看相同选项。
5 以逗号分隔的 STOMP 标头名称列表,用于从 STOMP 帧标头进行映射。 仅当未设置 header-mapper 引用时,才能提供此内容。 此列表中的值也可以是简单模式,用于匹配标头名称(例如,myheader**myheader)。 特殊标记 (STOMP_INBOUND_HEADERS) 代表所有标准 STOMP 标头(content-length、receipt、heart-beat 等)。 默认情况下会包含它们。 如果您想添加自己的标头并希望同时映射标准标头,则必须包含此标记,或使用 header-mapper 提供您自己的 HeaderMapper 实现。
6 <int-stomp:outbound-channel-adapter> 上查看相同选项。
7 订阅的 STOMP 目标名称的逗号分隔列表。 目标列表(因此也是订阅列表)可以通过 addDestination()removeDestination() @ManagedOperation 注解在运行时进行修改。
8 向通道发送消息时,如果通道可能阻塞,则等待的最大时间(以毫秒为单位)。 例如,如果已达到最大容量,QueueChannel 可能会一直阻塞直到有可用空间。
9 目标 Java 类型的完全限定名,用于将传入的 STOMP 帧转换为payload。 其默认值为String.class
10 <int-stomp:outbound-channel-adapter> 上查看相同选项。
11 <int-stomp:outbound-channel-adapter> 上查看相同选项。