此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring Integration 6.5.1spring-doc.cadn.net.cn

WebSockets 支持

从版本 4.1 开始,Spring Integration 支持 WebSocket。 它基于 Spring Framework 的web-socket模块。 因此,Spring WebSocket 的许多组件(例如SubProtocolHandlerWebSocketClient)和配置选项(例如@EnableWebSocketMessageBroker)可以在 Spring Integration 中重用。 有关更多信息,请参阅 Spring Framework 参考手册中的 Spring Framework WebSocket 支持一章。spring-doc.cadn.net.cn

项目需要此依赖项:spring-doc.cadn.net.cn

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-websocket</artifactId>
    <version>7.0.0-SNAPSHOT</version>
</dependency>
compile "org.springframework.integration:spring-integration-websocket:7.0.0-SNAPSHOT"

对于服务器端,org.springframework:spring-webmvc必须显式包含依赖项。spring-doc.cadn.net.cn

Spring Framework WebSocket 基础设施基于 Spring 消息传递基础,并提供了一个基于相同的基本消息传递框架MessageChannelimplementations 和MessageHandlerSpring Integration使用的实现(以及一些POJO方法注释映射)。 因此,即使没有 WebSocket 适配器,Spring Integration 也可以直接参与 WebSocket 流。 为此,您可以配置 Spring Integration@MessagingGateway使用适当的注释,如以下示例所示:spring-doc.cadn.net.cn

@MessagingGateway
@Controller
public interface WebSocketGateway {

    @MessageMapping("/greeting")
    @SendToUser("/queue/answer")
    @Gateway(requestChannel = "greetingChannel")
    String greeting(String payload);

}

概述

由于 WebSocket 协议在定义上是流式的,我们可以同时向 WebSocket 发送和接收消息,因此我们可以处理适当的WebSocketSession,无论是在客户端还是服务器端。 封装连接管理和WebSocketSession注册表,则IntegrationWebSocketContainer提供ClientWebSocketContainerServerWebSocketContainer实现。 由于 WebSocket API 及其在 Spring Framework 中的实现(具有许多扩展),服务器端和客户端都使用相同的类(当然,从 Java 的角度来看)。 因此,大多数连接和WebSocketSession注册表选项在两端是相同的。 这使我们能够重用许多配置项和基础设施挂钩,在服务器端和客户端构建 WebSocket 应用程序。 以下示例显示了组件如何同时实现这两个目的:spring-doc.cadn.net.cn

//Client side
@Bean
public WebSocketClient webSocketClient() {
    return new SockJsClient(Collections.singletonList(new WebSocketTransport(new JettyWebSocketClient())));
}

@Bean
public IntegrationWebSocketContainer clientWebSocketContainer() {
    return new ClientWebSocketContainer(webSocketClient(), "ws://my.server.com/endpoint");
}

//Server side
@Bean
public IntegrationWebSocketContainer serverWebSocketContainer() {
    return new ServerWebSocketContainer("/endpoint").withSockJs();
}

IntegrationWebSocketContainer旨在实现双向消息传递,并且可以在入站和出站通道适配器之间共享(见下文),在使用单向(发送或接收)WebSocket 消息传递时只能从其中一个引用。 它可以在没有任何通道适配器的情况下使用,但是,在这种情况下,IntegrationWebSocketContainer仅起到作用,作为WebSocketSession注册表。spring-doc.cadn.net.cn

ServerWebSocketContainer实现WebSocketConfigurer注册内部IntegrationWebSocketContainer.IntegrationWebSocketHandler作为Endpoint. 它根据提供的paths和其他服务器 WebSocket 选项(例如HandshakeHandlerSockJS fallback) 在ServletWebSocketHandlerRegistry对于目标提供商 WebSocket 容器。 该注册是通过基础设施实现的WebSocketIntegrationConfigurationInitializer组件,其作与@EnableWebSocket注解。 这意味着,通过使用@EnableIntegration(或应用程序上下文中的任何 Spring Integration 命名空间),您可以省略@EnableWebSocket声明,因为 Spring Integration 基础设施会检测所有 WebSocket 端点。

从 6.1 版开始,ClientWebSocketContainer可以使用提供的URI而不是uriTemplateuriVariables组合。 这在 uri 的某些部分需要自定义编码的情况下非常有用。 请参阅UriComponentsBuilder为了方便起见。spring-doc.cadn.net.cn

WebSocket 入站通道适配器

WebSocketInboundChannelAdapter实现接收部分WebSocketSession互动。 您必须为其提供IntegrationWebSocketContainer,适配器将自己注册为WebSocketListener处理传入消息和WebSocketSession事件。spring-doc.cadn.net.cn

只有一个WebSocketListener可以在IntegrationWebSocketContainer.

对于 WebSocket 子协议,该WebSocketInboundChannelAdapter可以配置为SubProtocolHandlerRegistry作为第二个构造函数参数。 适配器委托给SubProtocolHandlerRegistry确定适当的SubProtocolHandler对于被接受的WebSocketSession并将WebSocketMessage设置为Message根据子协议实现。spring-doc.cadn.net.cn

默认情况下,WebSocketInboundChannelAdapter仅依赖原始PassThruSubProtocolHandler实现,它将WebSocketMessage设置为Message.

WebSocketInboundChannelAdapter仅接受并发送到底层集成流Message具有SimpMessageType.MESSAGE或空的simpMessageType页眉。 所有其他Message类型通过ApplicationEventSubProtocolHandler实现(例如StompSubProtocolHandler).spring-doc.cadn.net.cn

在服务器端,如果@EnableWebSocketMessageBroker配置存在,您可以配置WebSocketInboundChannelAdapter使用useBroker = true选择。 在这种情况下,所有non-MESSAGE Message类型被委托给提供的AbstractBrokerMessageHandler. 此外,如果代理中继配置了目标前缀,则与代理目标匹配的消息将路由到AbstractBrokerMessageHandler而不是outputChannelWebSocketInboundChannelAdapter.spring-doc.cadn.net.cn

如果useBroker = false并且收到的消息是SimpMessageType.CONNECTtype,则WebSocketInboundChannelAdapter立即发送一个SimpMessageType.CONNECT_ACK消息发送给WebSocketSession而不将其发送到通道。spring-doc.cadn.net.cn

Spring 的 WebSocket 支持只允许配置一个代理中继。 因此,我们不需要AbstractBrokerMessageHandler参考。 它在应用程序上下文中检测到。

有关更多配置选项,请参阅 WebSockets 命名空间支持spring-doc.cadn.net.cn

WebSocket 出站通道适配器

WebSocketOutboundChannelAdapter:spring-doc.cadn.net.cn

  1. 接受来自其MessageChannelspring-doc.cadn.net.cn

  2. 确定WebSocketSession idMessageHeadersspring-doc.cadn.net.cn

  3. 检索WebSocketSession从提供的IntegrationWebSocketContainerspring-doc.cadn.net.cn

  4. 委托转换和发送WebSocketMessage工作到适当的地方SubProtocolHandler从提供的SubProtocolHandlerRegistry.spring-doc.cadn.net.cn

在客户端,WebSocketSession idmessage header 不是必需的,因为ClientWebSocketContainer仅处理单个连接及其WebSocketSession分别。spring-doc.cadn.net.cn

要使用 STOMP 子协议,您应该使用StompSubProtocolHandler. 然后,您可以使用以下命令将任何 STOMP 消息类型发送到此适配器,使用StompHeaderAccessor.create(StompCommand…​)MessageBuilder,或者只是使用HeaderEnricher(参见标题扩充器)。spring-doc.cadn.net.cn

本章的其余部分主要介绍其他配置选项。spring-doc.cadn.net.cn

WebSockets 命名空间支持

Spring Integration WebSocket 命名空间包括本章其余部分中描述的几个组件。 要将其包含在配置中,请在应用程序上下文配置文件中使用以下命名空间声明:spring-doc.cadn.net.cn

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-websocket="http://www.springframework.org/schema/integration/websocket"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/websocket
    https://www.springframework.org/schema/integration/websocket/spring-integration-websocket.xsd">
    ...
</beans>

<int-websocket:client-container>属性

以下列表显示了可用于<int-websocket:client-container>元素:spring-doc.cadn.net.cn

<int-websocket:client-container
                  id=""                             (1)
                  client=""                         (2)
                  uri=""                            (3)
                  uri-variables=""                  (4)
                  origin=""                         (5)
                  send-time-limit=""                (6)
                  send-buffer-size-limit=""         (7)
                  send-buffer-overflow-strategy=""  (8)
                  auto-startup=""                   (9)
                  phase="">                        (10)
                <int-websocket:http-headers>
                  <entry key="" value=""/>
                </int-websocket:http-headers>      (11)
</int-websocket:client-container>
1 组件 bean 名称。
2 WebSocketClientbean 引用。
3 uriuriTemplate到目标 WebSocket 服务。 如果将其用作uriTemplate使用 URI 变量占位符时,uri-variables属性是必需的。
4 URI 变量占位符的逗号分隔值uri属性值。 这些值根据其在uri. 看UriComponents.expand(Object…​uriVariableValues).
5 Origin握手 HTTP 标头值。
6 WebSocket 会话send超时限制。 默认为10000.
7 WebSocket 会话send消息大小限制。 默认为524288.
8 WebSocket 会话发送缓冲区溢出策略 确定会话的出站消息缓冲区达到send-buffer-size-limit. 看ConcurrentWebSocketSessionDecorator.OverflowStrategy获取可能的值和更多详细信息。
9 布尔值,指示此终结点是否应自动启动。 默认为false,假设此容器是从 WebSocket 入站适配器启动的。
10 此终结点应启动和停止的生命周期阶段。 值越低,此终结点开始的时间越早,停止的时间越晚。 默认值为Integer.MAX_VALUE. 值可以为负数。 看SmartLifeCycle.
11 一个MapHttpHeaders与握手请求一起使用。

<int-websocket:server-container>属性

以下列表显示了可用于<int-websocket:server-container>元素:spring-doc.cadn.net.cn

<int-websocket:server-container
          id=""                             (1)
          path=""                           (2)
          handshake-handler=""              (3)
          handshake-interceptors=""         (4)
          decorator-factories=""            (5)
          send-time-limit=""                (6)
          send-buffer-size-limit=""         (7)
          send-buffer-overflow-strategy=""  (8)
          allowed-origins="">               (9)
          <int-websocket:sockjs
            client-library-url=""          (10)
            stream-bytes-limit=""          (11)
            session-cookie-needed=""       (12)
            heartbeat-time=""              (13)
            disconnect-delay=""            (14)
            message-cache-size=""          (15)
            websocket-enabled=""           (16)
            scheduler=""                   (17)
            message-codec=""               (18)
            transport-handlers=""          (19)
            suppress-cors="true" />        (20)
</int-websocket:server-container>
1 组件 bean 名称。
2 将特定请求映射到WebSocketHandler. 支持精确路径映射 URI(例如/myPath)和Ant式路径模式(例如/myPath/**).
3 HandshakeHandlerbean 引用。 默认为DefaultHandshakeHandler.
4 列表HandshakeInterceptorbean 引用。
5 一个或多个工厂的列表 (WebSocketHandlerDecoratorFactory)来装饰用于处理 WebSocket 消息的处理程序。 这对于某些高级用例可能很有用(例如,允许 Spring Security 强制关闭 WebSocket 会话)。 有关更多信息,请参阅Spring Session项目
6 请参阅<int-websocket:client-container>.
7 请参阅<int-websocket:client-container>.
8 WebSocket 会话发送缓冲区溢出策略 确定会话的出站消息缓冲区达到send-buffer-size-limit. 看ConcurrentWebSocketSessionDecorator.OverflowStrategy获取可能的值和更多详细信息。
9 允许的源标头值。 您可以将多个来源指定为逗号分隔的列表。 此检查主要针对浏览器客户端设计。 没有什么可以阻止其他类型的客户端修改源标头值。 当启用 SockJS 并限制允许的源时,不对跨源请求使用源头的传输类型(jsonp-polling,iframe-xhr-polling,iframe-eventsourceiframe-htmlfile) 被禁用。 因此,不支持 IE6 和 IE7,并且仅支持 IE8 和 IE9,没有 cookie。 默认情况下,允许所有源。
10 没有本机跨域通信的传输(例如eventsourcehtmlfile)必须从不可见的 iframe 中的“外部”域获取一个简单的页面,以便 iframe 中的代码可以从本地域运行到 SockJS 服务器。 由于 iframe 需要加载 SockJS JavaScript 客户端库,因此此属性允许您指定从中加载它的位置。 默认情况下,它指向d1fxtkz8shb9d2.cloudfront.net/sockjs-0.3.4.min.js. 但是,您也可以将其设置为指向应用程序提供的 URL。 请注意,可以指定相对 URL,在这种情况下,URL 必须相对于 iframe URL。 例如,假设 SockJS 端点映射到/sockjs生成的 iframe URL 为/sockjs/iframe.html,则相对 URL 必须以../../以遍历到 SockJS 映射上方的位置。 对于基于前缀的 servlet 映射,您可能需要再一次遍历。
11 在关闭单个 HTTP 流式处理请求之前可以通过单个 HTTP 流式处理请求发送的最小字节数。 默认为128K(即 128*1024 或 131072 字节)。
12 cookie_neededSockJs 响应中的值/info端点。 此属性指示JSESSIONID应用程序正常运行需要 cookie(例如,用于负载平衡或在 Java Servlet 容器中使用 HTTP 会话)。
13 服务器未发送任何消息的时间量(以毫秒为单位),之后服务器应发送任何消息 向客户端发送检测信号帧以防止连接中断。 默认值为25,000(25 秒)。
14 客户端在没有接收连接(即服务器可以通过该连接向客户端发送数据的活动连接)后被视为断开连接之前的时间量(以毫秒为单位)。 默认值为5000.
15 会话在等待来自客户端的下一个 HTTP 轮询请求时可以缓存的服务器到客户端消息数。 默认大小为100.
16 某些负载均衡器不支持 WebSocket。 将此选项设置为false以禁用服务器端的 WebSocket 传输。 默认值为true.
17 TaskSchedulerbean 引用。 一个新的ThreadPoolTaskScheduler如果未提供值,则创建实例。 此调度程序实例用于调度心跳消息。
18 SockJsMessageCodec用于编码和解码 SockJS 消息的 bean 引用。 默认情况下,Jackson2SockJsMessageCodec,这要求 Jackson 库存在于类路径上。
19 列表TransportHandlerbean 引用。
20 是否禁用 SockJS 请求自动添加 CORS 标头。 默认值为false.

<int-websocket:outbound-channel-adapter>属性

以下列表显示了可用于<int-websocket:outbound-channel-adapter>元素:spring-doc.cadn.net.cn

<int-websocket:outbound-channel-adapter
                          id=""                             (1)
                          channel=""                        (2)
                          container=""                      (3)
                          default-protocol-handler=""       (4)
                          protocol-handlers=""              (5)
                          message-converters=""             (6)
                          merge-with-default-converters=""  (7)
                          auto-startup=""                   (8)
                          phase=""/>                        (9)
1 组件 bean 名称。 如果您不提供channel属性,一个DirectChannel在应用程序上下文中创建和注册,并使用此id属性作为 bean 名称。 在这种情况下,端点将使用 Bean 名称注册id.adapter. 以及MessageHandler使用 bean 别名注册id.handler.
2 标识连接到此适配器的通道。
3 IntegrationWebSocketContainerbean,它封装了低级连接和WebSocketSession处理作。 必填。
4 SubProtocolHandler实例。 当客户端未请求子协议或它是单个协议处理程序时,使用它。 如果此引用或protocol-handlerslist 时,则PassThruSubProtocolHandler默认使用。
5 列表SubProtocolHandler此通道适配器的 bean 引用。 如果您只提供单个 bean 引用,并且没有提供default-protocol-handler,那单SubProtocolHandler用作default-protocol-handler. 如果您未设置此属性或default-protocol-handlerPassThruSubProtocolHandler默认使用。
6 列表MessageConverter此通道适配器的 bean 引用。
7 布尔值,指示是否应在任何自定义转换器之后注册默认转换器。 仅当message-converters被提供。 否则,将注册所有默认转换器。 默认为false. 默认转换器是(按顺序):StringMessageConverter,ByteArrayMessageConverterMappingJackson2MessageConverter(如果 Jackson 库存在于类路径上)。
8 布尔值,指示此终结点是否应自动启动。 默认为true.
9 此终结点应启动和停止的生命周期阶段。 值越低,此终结点开始的时间越早,停止的时间越晚。 默认值为Integer.MIN_VALUE. 值可以为负数。 看SmartLifeCycle.

<int-websocket:inbound-channel-adapter>属性

以下列表显示了可用于<int-websocket:outbound-channel-adapter>元素:spring-doc.cadn.net.cn

<int-websocket:inbound-channel-adapter
                            id=""  (1)
                            channel=""  (2)
                            error-channel=""  (3)
                            container=""  (4)
                            default-protocol-handler=""  (5)
                            protocol-handlers=""  (6)
                            message-converters=""  (7)
                            merge-with-default-converters=""  (8)
                            send-timeout=""  (9)
                            payload-type=""  (10)
                            use-broker=""  (11)
                            auto-startup=""  (12)
                            phase=""/>  (13)
1 组件 bean 名称。 如果您未将channel属性,一个DirectChannel在应用程序上下文中创建和注册,并使用此id属性作为 bean 名称。 在这种情况下,端点将使用 Bean 名称注册id.adapter.
2 标识连接到此适配器的通道。
3 MessageChannelbean 引用,其中ErrorMessage应发送实例。
4 请参阅<int-websocket:outbound-channel-adapter>.
5 请参阅<int-websocket:outbound-channel-adapter>.
6 请参阅<int-websocket:outbound-channel-adapter>.
7 请参阅<int-websocket:outbound-channel-adapter>.
8 请参阅<int-websocket:outbound-channel-adapter>.
9 如果通道可以阻塞,则向通道发送消息时等待的最大时间(以毫秒为单位)。 例如,一个QueueChannel如果已达到最大容量,则可以阻止,直到空间可用。
10 目标的 Java 类型的完全限定名称payload从传入的WebSocketMessage. 默认为java.lang.String.
11 指示此适配器是否发送non-MESSAGE WebSocketMessage实例和消息,并将代理目标发送到AbstractBrokerMessageHandler从应用程序上下文中。 当此属性为trueBroker Relay配置是必需的。 此属性仅在服务器端使用。 在客户端,它被忽略。 默认为false.
12 请参阅<int-websocket:outbound-channel-adapter>.
13 请参阅<int-websocket:outbound-channel-adapter>.

ClientStompEncoder

从版本 4.3.13 开始,Spring Integration 提供了ClientStompEncoder(作为标准的扩展StompEncoder) 用于 WebSocket 通道适配器的客户端。 为了正确准备客户端消息,您必须注入ClientStompEncoder进入StompSubProtocolHandler. 默认值的一个问题StompSubProtocolHandler是它是为服务器端设计的,所以它更新了SEND stompCommandheader 到MESSAGE(根据服务器端的 STOMP 协议的要求)。 如果客户端没有以正确的方式发送其消息SENDWeb 套接字框架,一些 STOMP 代理不接受它们。 的目的ClientStompEncoder,在这种情况下,是覆盖stompCommand标头,并将其设置为SEND值,然后将消息编码为byte[].spring-doc.cadn.net.cn

动态 WebSocket 端点注册

从版本 5.5 开始,WebSocket 服务器端点(基于ServerWebSocketContainer) 现在可以在运行时注册(和删除)-paths一个ServerWebSocketContainer映射通过HandlerMapping变成一个DispatcherServlet并可供 WebSocket 客户端访问。 动态和运行时集成流支持有助于以透明的方式注册这些终结点:spring-doc.cadn.net.cn

@Autowired
IntegrationFlowContext integrationFlowContext;

@Autowired
HandshakeHandler handshakeHandler;
...
ServerWebSocketContainer serverWebSocketContainer =
       new ServerWebSocketContainer("/dynamic")
               .setHandshakeHandler(this.handshakeHandler);

WebSocketInboundChannelAdapter webSocketInboundChannelAdapter =
       new WebSocketInboundChannelAdapter(serverWebSocketContainer);

QueueChannel dynamicRequestsChannel = new QueueChannel();

IntegrationFlow serverFlow =
       IntegrationFlow.from(webSocketInboundChannelAdapter)
               .channel(dynamicRequestsChannel)
               .get();

IntegrationFlowContext.IntegrationFlowRegistration dynamicServerFlow =
       this.integrationFlowContext.registration(serverFlow)
               .addBean(serverWebSocketContainer)
               .register();
...
dynamicServerFlow.destroy();
打电话很重要.addBean(serverWebSocketContainer)在动态流注册上添加ServerWebSocketContainer变成一个ApplicationContext用于终结点注册。 销毁动态流注册时,关联的ServerWebSocketContainer实例以及相应的端点注册(包括 URL 路径映射)也会被销毁。
动态 Websocket 端点只能通过 Spring Integration 机制注册:当常规 Spring@EnableWebsocket,Spring Integration 配置将退出,并且不会注册动态端点的基础设施。