STOMP 支持

STOMP 支持

Spring Integration 4.2 版引入了 STOMP(面向简单文本的消息传递协议)客户端支持。 它基于 Spring Framework 的消息传递模块 stomp 包中的架构、基础设施和 API。 Spring Integration 使用许多 Spring STOMP 组件(例如StompSessionStompClientSupport). 有关更多信息,请参阅 Spring Framework 参考手册中的 Spring Framework STOMP 支持一章。spring-doc.cadn.net.cn

您需要将此依赖项包含在您的项目中:spring-doc.cadn.net.cn

专家
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stomp</artifactId>
    <version>6.0.9</version>
</dependency>
Gradle
compile "org.springframework.integration:spring-integration-stomp:6.0.9"

对于服务器端组件,您需要添加一个org.springframework:spring-websocket和/或io.projectreactor.netty:reactor-netty依赖。spring-doc.cadn.net.cn

概述

要配置 STOMP,您应该从 STOMP 客户端对象开始。 Spring 框架提供了以下实现:spring-doc.cadn.net.cn

  • WebSocketStompClient:基于 Spring WebSocket API 构建,支持标准 JSR-356 WebSocket、Jetty 9 和 SockJS,用于使用 SockJS 客户端进行基于 HTTP 的 WebSocket 仿真。spring-doc.cadn.net.cn

  • ReactorNettyTcpStompClient:建立在ReactorNettyTcpClientreactor-netty项目。spring-doc.cadn.net.cn

您可以提供任何其他StompClientSupport实现。 请参阅这些类的 Javadocspring-doc.cadn.net.cn

StompClientSupport类被设计为工厂生产StompSession对于提供的StompSessionHandler所有剩余的工作都是通过回完成的StompSessionHandlerStompSession抽象化。 使用 Spring Integration 适配器抽象,我们需要提供一些托管共享对象来将我们的应用程序表示为具有唯一会话的 STOMP 客户端。 为此,Spring Integration 提供了StompSessionManager抽象来管理单个 StompSession在任何提供的StompSessionHandler. 这允许为特定的 STOMP 代理使用入站出站通道适配器(或两者)。 看StompSessionManager(及其实现)JavaDocs 了解更多信息。spring-doc.cadn.net.cn

STOMP 入站通道适配器

StompInboundChannelAdapter是一站式MessageProducer组件,该组件将 Spring Integration 应用程序订阅到提供的 STOMP 目标并从它们接收消息(使用提供的 STOMP 帧从 STOMP 帧转换MessageConverter在连接的StompSession). 您可以使用适当的@ManagedOperation注释StompInboundChannelAdapter.spring-doc.cadn.net.cn

有关更多配置选项,请参阅 STOMP 命名空间支持StompInboundChannelAdapter Javadoc 的 Javadoc 中。spring-doc.cadn.net.cn

STOMP 出站通道适配器

StompMessageHandlerMessageHandler对于<int-stomp:outbound-channel-adapter>并用于发送传出Message<?>实例添加到 STOMPdestination(预先配置或在运行时使用 SpEL 表达式确定)通过StompSession(由共享的StompSessionManager).spring-doc.cadn.net.cn

有关更多配置选项,请参阅 STOMP 命名空间支持StompMessageHandler Javadoc 的 Javadoc 中。spring-doc.cadn.net.cn

STOMP 标头映射

STOMP 协议提供标头作为其帧的一部分。 STOMP 框架的整个结构具有以下格式:spring-doc.cadn.net.cn

....
COMMAND
header1:value1
header2:value2

Body^@
....

Spring Framework 提供了StompHeaders来表示这些标头。 有关更多详细信息,请参阅 Javadoc。 STOMP 帧与 STOMP 帧进行转换和从Message<?>实例,这些标头映射到MessageHeaders实例。 Spring Integration 提供默认的HeaderMapperSTOMP 适配器的实现。 实现是StompHeaderMapper. 它提供了fromHeaders()toHeaders()分别是入站和出站适配器的作。spring-doc.cadn.net.cn

与许多其他 Spring Integration 模块一样,IntegrationStompHeaders引入类以将标准 STOMP 标头映射到MessageHeadersstomp_作为标头名称前缀。 此外,所有MessageHeaders具有该前缀的实例将映射到StompHeaders发送到目的地时。spring-doc.cadn.net.cn

有关更多信息,请参阅这些类的 Javadocmapped-headers属性描述。spring-doc.cadn.net.cn

STOMP 集成事件

许多 STOMP作都是异步的,包括错误处理。 例如,STOMP 有一个RECEIPT服务器帧,当客户端帧请求一个时,它通过添加RECEIPT页眉。 为了提供对这些异步事件的访问,Spring Integration 发出StompIntegrationEvent实例,您可以通过实现ApplicationListener或通过使用<int-event:inbound-channel-adapter>(请参阅接收 Spring 应用程序事件)。spring-doc.cadn.net.cn

具体来说,一个StompExceptionEventAbstractStompSessionManagerstompSessionListenableFuture接收onFailure()由于无法连接到 STOMP 代理。 另一个例子是StompMessageHandler. 它处理ERRORSTOMP 帧,这是服务器对StompMessageHandler.spring-doc.cadn.net.cn

StompMessageHandler发出StompReceiptEvent作为StompSession.Receiptable发送到StompSession. 这StompReceiptEvent可以是正的,也可以是负的,具体取决于RECEIPTframe 是从receiptTimeLimit句点,您可以在StompClientSupport实例。 它默认为15 * 1000(以毫秒为单位,所以 15 秒)。spring-doc.cadn.net.cn

StompSession.Receiptable仅当RECEIPT要发送的消息的 STOMP 标头不是null. 您可以启用自动RECEIPT标头生成StompSession通过其autoReceipt选项,并在StompSessionManager分别。

有关如何配置 Spring Integration 以接受这些内容的更多信息,请参阅 STOMP 适配器 Java 配置ApplicationEvent实例。spring-doc.cadn.net.cn

STOMP 适配器 Java 配置

以下示例显示了 STOMP 适配器的全面 Java 配置:spring-doc.cadn.net.cn

@Configuration
@EnableIntegration
public class StompConfiguration {

    @Bean
    public ReactorNettyTcpStompClient stompClient() {
        ReactorNettyTcpStompClient stompClient = new ReactorNettyTcpStompClient("127.0.0.1", 61613);
        stompClient.setMessageConverter(new PassThruMessageConverter());
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.afterPropertiesSet();
        stompClient.setTaskScheduler(taskScheduler);
        stompClient.setReceiptTimeLimit(5000);
        return stompClient;
    }

    @Bean
    public StompSessionManager stompSessionManager() {
        ReactorNettyTcpStompSessionManager stompSessionManager = new ReactorNettyTcpStompSessionManager(stompClient());
        stompSessionManager.setAutoReceipt(true);
        return stompSessionManager;
    }

    @Bean
    public PollableChannel stompInputChannel() {
        return new QueueChannel();
    }

    @Bean
    public StompInboundChannelAdapter stompInboundChannelAdapter() {
        StompInboundChannelAdapter adapter =
        		new StompInboundChannelAdapter(stompSessionManager(), "/topic/myTopic");
        adapter.setOutputChannel(stompInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "stompOutputChannel")
    public MessageHandler stompMessageHandler() {
        StompMessageHandler handler = new StompMessageHandler(stompSessionManager());
        handler.setDestination("/topic/myTopic");
        return handler;
    }

    @Bean
    public PollableChannel stompEvents() {
        return new QueueChannel();
    }

    @Bean
    public ApplicationListener<ApplicationEvent> stompEventListener() {
        ApplicationEventListeningMessageProducer producer = new ApplicationEventListeningMessageProducer();
        producer.setEventTypes(StompIntegrationEvent.class);
        producer.setOutputChannel(stompEvents());
        return producer;
    }

}

STOMP 命名空间支持

Spring Integration STOMP 命名空间实现入站和出站通道适配器组件。 要将其包含在配置中,请在应用程序上下文配置文件中提供以下命名空间声明:spring-doc.cadn.net.cn

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-stomp="http://www.springframework.org/schema/integration/stomp"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/stomp
    https://www.springframework.org/schema/integration/stomp/spring-integration-stomp.xsd">
    ...
</beans>

了解<int-stomp:outbound-channel-adapter>元素

以下列表显示了 STOMP 出站通道适配器的可用属性:spring-doc.cadn.net.cn

<int-stomp:outbound-channel-adapter
                           id=""                      (1)
                           channel=""                 (2)
                           stomp-session-manager=""   (3)
                           header-mapper=""           (4)
                           mapped-headers=""          (5)
                           destination=""             (6)
                           destination-expression=""  (7)
                           auto-startup=""            (8)
                           phase=""/>                 (9)
1 组件 bean 名称。 这MessageHandler使用 Bean 别名id.handler. 如果您未将channel属性,一个DirectChannel在应用程序上下文中创建和注册,其值为id属性作为 bean 名称。 在这种情况下,端点注册了一个 bean 名称id.adapter.
2 标识连接到此适配器的通道,如果id存在。 看id. 自选。
3 引用StompSessionManagerbean,它封装了低级连接和StompSession处理作。 必填。
4 对实现HeaderMapper<StompHeaders>,它映射了 Spring IntegrationMessageHeaders往返 STOMP 帧标头。 它与mapped-headers. 它默认为StompHeaderMapper.
5 要映射到 STOMP 帧标头的 STOMP 标头名称的逗号分隔列表。 只有在header-mapper未设置引用。 此列表中的值也可以是与标头名称匹配的简单模式(例如myheader**myheader). 特殊Tokens (STOMP_OUTBOUND_HEADERS)表示所有标准 STOMP 标头(content-length、receipt、heart-beat 等)。 默认情况下,它们包含在内。 如果要添加自己的标头并希望标准标头也被映射,则必须包含此Tokens或提供自己的TokensHeaderMapper使用header-mapper.
6 STOMP 消息发送到的目标的名称。 它与destination-expression.
7 在运行时针对每个 Spring Integration 进行评估的 SpEL 表达式Message作为根对象。 它与destination.
8 布尔值,指示此终结点是否应自动启动。 它默认为true.
9 此终结点应启动和停止的生命周期阶段。 值越低,此终结点开始的时间越早,停止的时间越晚。 默认值为Integer.MIN_VALUE. 值可以为负数。 看SmartLifeCycle.

了解<int-stomp:inbound-channel-adapter>元素

以下列表显示了 STOMP 入站通道适配器的可用属性:spring-doc.cadn.net.cn

<int-stomp:inbound-channel-adapter
                           id=""                     (1)
                           channel=""                (2)
                           error-channel=""          (3)
                           stomp-session-manager=""  (4)
                           header-mapper=""          (5)
                           mapped-headers=""         (6)
                           destinations=""           (7)
                           send-timeout=""           (8)
                           payload-type=""           (9)
                           auto-startup=""           (10)
                           phase=""/>                (11)
1 组件 bean 名称。 如果您未将channel属性,一个DirectChannel在应用程序上下文中创建和注册,其值为id属性作为 bean 名称。 在这种情况下,端点将使用 Bean 名称注册id.adapter.
2 标识连接到此适配器的通道。
3 MessageChannelbean 引用,其中ErrorMessage应发送实例。
4 请参阅<int-stomp:outbound-channel-adapter>.
5 要从 STOMP 帧报头映射的 STOMP 报头名称的逗号分隔列表。 只有在header-mapper未设置引用。 此列表中的值也可以是与标头名称匹配的简单模式(例如,myheader**myheader). 特殊Tokens (STOMP_INBOUND_HEADERS)表示所有标准 STOMP 标头(content-length、receipt、heart-beat 等)。 默认情况下,它们包含在内。 如果要添加自己的标头并希望映射标准标头,则还必须包含此Tokens或提供自己的TokensHeaderMapper使用header-mapper.
6 请参阅<int-stomp:outbound-channel-adapter>.
7 要订阅的 STOMP 目标名称的逗号分隔列表。目标列表(以及订阅)可以在运行时通过addDestination()removeDestination() @ManagedOperation附注。
8 如果通道可以阻塞,则向通道发送消息时等待的最大时间(以毫秒为单位)。例如,一个QueueChannel如果已达到最大容量,则可以阻止,直到空间可用。
9 目标的 Java 类型的完全限定名称payload从传入的 STOMP 帧进行转换。 它默认为String.class.
10 请参阅<int-stomp:outbound-channel-adapter>.
11 请参阅<int-stomp:outbound-channel-adapter>.