此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring Integration 6.5.1! |
WebSockets 支持
从版本 4.1 开始,Spring Integration 支持 WebSocket。
它基于 Spring Framework 的web-socket
模块。
因此,Spring WebSocket 的许多组件(例如SubProtocolHandler
或WebSocketClient
)和配置选项(例如@EnableWebSocketMessageBroker
)可以在 Spring Integration 中重用。
有关更多信息,请参阅 Spring Framework 参考手册中的 Spring Framework WebSocket 支持一章。
项目需要此依赖项:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-websocket</artifactId>
<version>7.0.0-SNAPSHOT</version>
</dependency>
compile "org.springframework.integration:spring-integration-websocket:7.0.0-SNAPSHOT"
对于服务器端,org.springframework:spring-webmvc
必须显式包含依赖项。
Spring Framework WebSocket 基础设施基于 Spring 消息传递基础,并提供了一个基于相同的基本消息传递框架MessageChannel
implementations 和MessageHandler
Spring Integration使用的实现(以及一些POJO方法注释映射)。
因此,即使没有 WebSocket 适配器,Spring Integration 也可以直接参与 WebSocket 流。
为此,您可以配置 Spring Integration@MessagingGateway
使用适当的注释,如以下示例所示:
@MessagingGateway
@Controller
public interface WebSocketGateway {
@MessageMapping("/greeting")
@SendToUser("/queue/answer")
@Gateway(requestChannel = "greetingChannel")
String greeting(String payload);
}
概述
由于 WebSocket 协议在定义上是流式的,我们可以同时向 WebSocket 发送和接收消息,因此我们可以处理适当的WebSocketSession
,无论是在客户端还是服务器端。
封装连接管理和WebSocketSession
注册表,则IntegrationWebSocketContainer
提供ClientWebSocketContainer
和ServerWebSocketContainer
实现。
由于 WebSocket API 及其在 Spring Framework 中的实现(具有许多扩展),服务器端和客户端都使用相同的类(当然,从 Java 的角度来看)。
因此,大多数连接和WebSocketSession
注册表选项在两端是相同的。
这使我们能够重用许多配置项和基础设施挂钩,在服务器端和客户端构建 WebSocket 应用程序。
以下示例显示了组件如何同时实现这两个目的:
//Client side
@Bean
public WebSocketClient webSocketClient() {
return new SockJsClient(Collections.singletonList(new WebSocketTransport(new JettyWebSocketClient())));
}
@Bean
public IntegrationWebSocketContainer clientWebSocketContainer() {
return new ClientWebSocketContainer(webSocketClient(), "ws://my.server.com/endpoint");
}
//Server side
@Bean
public IntegrationWebSocketContainer serverWebSocketContainer() {
return new ServerWebSocketContainer("/endpoint").withSockJs();
}
这IntegrationWebSocketContainer
旨在实现双向消息传递,并且可以在入站和出站通道适配器之间共享(见下文),在使用单向(发送或接收)WebSocket 消息传递时只能从其中一个引用。
它可以在没有任何通道适配器的情况下使用,但是,在这种情况下,IntegrationWebSocketContainer
仅起到作用,作为WebSocketSession
注册表。
这ServerWebSocketContainer 实现WebSocketConfigurer 注册内部IntegrationWebSocketContainer.IntegrationWebSocketHandler 作为Endpoint .
它根据提供的paths 和其他服务器 WebSocket 选项(例如HandshakeHandler 或SockJS fallback ) 在ServletWebSocketHandlerRegistry 对于目标提供商 WebSocket 容器。
该注册是通过基础设施实现的WebSocketIntegrationConfigurationInitializer 组件,其作与@EnableWebSocket 注解。
这意味着,通过使用@EnableIntegration (或应用程序上下文中的任何 Spring Integration 命名空间),您可以省略@EnableWebSocket 声明,因为 Spring Integration 基础设施会检测所有 WebSocket 端点。 |
从 6.1 版开始,ClientWebSocketContainer
可以使用提供的URI
而不是uriTemplate
和uriVariables
组合。
这在 uri 的某些部分需要自定义编码的情况下非常有用。
请参阅UriComponentsBuilder
为了方便起见。
WebSocket 入站通道适配器
这WebSocketInboundChannelAdapter
实现接收部分WebSocketSession
互动。
您必须为其提供IntegrationWebSocketContainer
,适配器将自己注册为WebSocketListener
处理传入消息和WebSocketSession
事件。
只有一个WebSocketListener 可以在IntegrationWebSocketContainer . |
对于 WebSocket 子协议,该WebSocketInboundChannelAdapter
可以配置为SubProtocolHandlerRegistry
作为第二个构造函数参数。
适配器委托给SubProtocolHandlerRegistry
确定适当的SubProtocolHandler
对于被接受的WebSocketSession
并将WebSocketMessage
设置为Message
根据子协议实现。
默认情况下,WebSocketInboundChannelAdapter 仅依赖原始PassThruSubProtocolHandler 实现,它将WebSocketMessage 设置为Message . |
这WebSocketInboundChannelAdapter
仅接受并发送到底层集成流Message
具有SimpMessageType.MESSAGE
或空的simpMessageType
页眉。
所有其他Message
类型通过ApplicationEvent
从SubProtocolHandler
实现(例如StompSubProtocolHandler
).
在服务器端,如果@EnableWebSocketMessageBroker
配置存在,您可以配置WebSocketInboundChannelAdapter
使用useBroker = true
选择。
在这种情况下,所有non-MESSAGE
Message
类型被委托给提供的AbstractBrokerMessageHandler
.
此外,如果代理中继配置了目标前缀,则与代理目标匹配的消息将路由到AbstractBrokerMessageHandler
而不是outputChannel
的WebSocketInboundChannelAdapter
.
如果useBroker = false
并且收到的消息是SimpMessageType.CONNECT
type,则WebSocketInboundChannelAdapter
立即发送一个SimpMessageType.CONNECT_ACK
消息发送给WebSocketSession
而不将其发送到通道。
Spring 的 WebSocket 支持只允许配置一个代理中继。
因此,我们不需要AbstractBrokerMessageHandler 参考。
它在应用程序上下文中检测到。 |
有关更多配置选项,请参阅 WebSockets 命名空间支持。
WebSocket 出站通道适配器
这WebSocketOutboundChannelAdapter
:
-
接受来自其
MessageChannel
-
确定
WebSocketSession
id
从MessageHeaders
-
检索
WebSocketSession
从提供的IntegrationWebSocketContainer
-
委托转换和发送
WebSocketMessage
工作到适当的地方SubProtocolHandler
从提供的SubProtocolHandlerRegistry
.
在客户端,WebSocketSession
id
message header 不是必需的,因为ClientWebSocketContainer
仅处理单个连接及其WebSocketSession
分别。
要使用 STOMP 子协议,您应该使用StompSubProtocolHandler
.
然后,您可以使用以下命令将任何 STOMP 消息类型发送到此适配器,使用StompHeaderAccessor.create(StompCommand…)
和MessageBuilder
,或者只是使用HeaderEnricher
(参见标题扩充器)。
本章的其余部分主要介绍其他配置选项。
WebSockets 命名空间支持
Spring Integration WebSocket 命名空间包括本章其余部分中描述的几个组件。 要将其包含在配置中,请在应用程序上下文配置文件中使用以下命名空间声明:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-websocket="http://www.springframework.org/schema/integration/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/websocket
https://www.springframework.org/schema/integration/websocket/spring-integration-websocket.xsd">
...
</beans>
<int-websocket:client-container>
属性
以下列表显示了可用于<int-websocket:client-container>
元素:
<int-websocket:client-container
id="" (1)
client="" (2)
uri="" (3)
uri-variables="" (4)
origin="" (5)
send-time-limit="" (6)
send-buffer-size-limit="" (7)
send-buffer-overflow-strategy="" (8)
auto-startup="" (9)
phase=""> (10)
<int-websocket:http-headers>
<entry key="" value=""/>
</int-websocket:http-headers> (11)
</int-websocket:client-container>
1 | 组件 bean 名称。 |
2 | 这WebSocketClient bean 引用。 |
3 | 这uri 或uriTemplate 到目标 WebSocket 服务。
如果将其用作uriTemplate 使用 URI 变量占位符时,uri-variables 属性是必需的。 |
4 | URI 变量占位符的逗号分隔值uri 属性值。
这些值根据其在uri .
看UriComponents.expand(Object…uriVariableValues) . |
5 | 这Origin 握手 HTTP 标头值。 |
6 | WebSocket 会话send 超时限制。
默认为10000 . |
7 | WebSocket 会话send 消息大小限制。
默认为524288 . |
8 | WebSocket 会话发送缓冲区溢出策略
确定会话的出站消息缓冲区达到send-buffer-size-limit .
看ConcurrentWebSocketSessionDecorator.OverflowStrategy 获取可能的值和更多详细信息。 |
9 | 布尔值,指示此终结点是否应自动启动。
默认为false ,假设此容器是从 WebSocket 入站适配器启动的。 |
10 | 此终结点应启动和停止的生命周期阶段。
值越低,此终结点开始的时间越早,停止的时间越晚。
默认值为Integer.MAX_VALUE .
值可以为负数。
看SmartLifeCycle . |
11 | 一个Map 之HttpHeaders 与握手请求一起使用。 |
<int-websocket:server-container>
属性
以下列表显示了可用于<int-websocket:server-container>
元素:
<int-websocket:server-container
id="" (1)
path="" (2)
handshake-handler="" (3)
handshake-interceptors="" (4)
decorator-factories="" (5)
send-time-limit="" (6)
send-buffer-size-limit="" (7)
send-buffer-overflow-strategy="" (8)
allowed-origins=""> (9)
<int-websocket:sockjs
client-library-url="" (10)
stream-bytes-limit="" (11)
session-cookie-needed="" (12)
heartbeat-time="" (13)
disconnect-delay="" (14)
message-cache-size="" (15)
websocket-enabled="" (16)
scheduler="" (17)
message-codec="" (18)
transport-handlers="" (19)
suppress-cors="true" /> (20)
</int-websocket:server-container>
1 | 组件 bean 名称。 |
2 | 将特定请求映射到WebSocketHandler .
支持精确路径映射 URI(例如/myPath )和Ant式路径模式(例如/myPath/** ). |
3 | 这HandshakeHandler bean 引用。
默认为DefaultHandshakeHandler . |
4 | 列表HandshakeInterceptor bean 引用。 |
5 | 一个或多个工厂的列表 (WebSocketHandlerDecoratorFactory )来装饰用于处理 WebSocket 消息的处理程序。
这对于某些高级用例可能很有用(例如,允许 Spring Security 强制关闭
WebSocket 会话)。
有关更多信息,请参阅Spring Session项目。 |
6 | 请参阅<int-websocket:client-container> . |
7 | 请参阅<int-websocket:client-container> . |
8 | WebSocket 会话发送缓冲区溢出策略
确定会话的出站消息缓冲区达到send-buffer-size-limit .
看ConcurrentWebSocketSessionDecorator.OverflowStrategy 获取可能的值和更多详细信息。 |
9 | 允许的源标头值。
您可以将多个来源指定为逗号分隔的列表。
此检查主要针对浏览器客户端设计。
没有什么可以阻止其他类型的客户端修改源标头值。
当启用 SockJS 并限制允许的源时,不对跨源请求使用源头的传输类型(jsonp-polling ,iframe-xhr-polling ,iframe-eventsource 和iframe-htmlfile ) 被禁用。
因此,不支持 IE6 和 IE7,并且仅支持 IE8 和 IE9,没有 cookie。
默认情况下,允许所有源。 |
10 | 没有本机跨域通信的传输(例如eventsource 和htmlfile )必须从不可见的 iframe 中的“外部”域获取一个简单的页面,以便 iframe 中的代码可以从本地域运行到 SockJS 服务器。
由于 iframe 需要加载 SockJS JavaScript 客户端库,因此此属性允许您指定从中加载它的位置。
默认情况下,它指向d1fxtkz8shb9d2.cloudfront.net/sockjs-0.3.4.min.js .
但是,您也可以将其设置为指向应用程序提供的 URL。
请注意,可以指定相对 URL,在这种情况下,URL 必须相对于 iframe URL。
例如,假设 SockJS 端点映射到/sockjs 生成的 iframe URL 为/sockjs/iframe.html ,则相对 URL 必须以../../ 以遍历到 SockJS 映射上方的位置。
对于基于前缀的 servlet 映射,您可能需要再一次遍历。 |
11 | 在关闭单个 HTTP 流式处理请求之前可以通过单个 HTTP 流式处理请求发送的最小字节数。
默认为128K (即 128*1024 或 131072 字节)。 |
12 | 这cookie_needed SockJs 响应中的值/info 端点。
此属性指示JSESSIONID 应用程序正常运行需要 cookie(例如,用于负载平衡或在 Java Servlet 容器中使用 HTTP 会话)。 |
13 | 服务器未发送任何消息的时间量(以毫秒为单位),之后服务器应发送任何消息
向客户端发送检测信号帧以防止连接中断。
默认值为25,000 (25 秒)。 |
14 | 客户端在没有接收连接(即服务器可以通过该连接向客户端发送数据的活动连接)后被视为断开连接之前的时间量(以毫秒为单位)。
默认值为5000 . |
15 | 会话在等待来自客户端的下一个 HTTP 轮询请求时可以缓存的服务器到客户端消息数。
默认大小为100 . |
16 | 某些负载均衡器不支持 WebSocket。
将此选项设置为false 以禁用服务器端的 WebSocket 传输。
默认值为true . |
17 | 这TaskScheduler bean 引用。
一个新的ThreadPoolTaskScheduler 如果未提供值,则创建实例。
此调度程序实例用于调度心跳消息。 |
18 | 这SockJsMessageCodec 用于编码和解码 SockJS 消息的 bean 引用。
默认情况下,Jackson2SockJsMessageCodec ,这要求 Jackson 库存在于类路径上。 |
19 | 列表TransportHandler bean 引用。 |
20 | 是否禁用 SockJS 请求自动添加 CORS 标头。
默认值为false . |
<int-websocket:outbound-channel-adapter>
属性
以下列表显示了可用于<int-websocket:outbound-channel-adapter>
元素:
<int-websocket:outbound-channel-adapter
id="" (1)
channel="" (2)
container="" (3)
default-protocol-handler="" (4)
protocol-handlers="" (5)
message-converters="" (6)
merge-with-default-converters="" (7)
auto-startup="" (8)
phase=""/> (9)
1 | 组件 bean 名称。
如果您不提供channel 属性,一个DirectChannel 在应用程序上下文中创建和注册,并使用此id 属性作为 bean 名称。
在这种情况下,端点将使用 Bean 名称注册id 加.adapter .
以及MessageHandler 使用 bean 别名注册id 加.handler . |
2 | 标识连接到此适配器的通道。 |
3 | 对IntegrationWebSocketContainer bean,它封装了低级连接和WebSocketSession 处理作。
必填。 |
4 | 对SubProtocolHandler 实例。
当客户端未请求子协议或它是单个协议处理程序时,使用它。
如果此引用或protocol-handlers list 时,则PassThruSubProtocolHandler 默认使用。 |
5 | 列表SubProtocolHandler 此通道适配器的 bean 引用。
如果您只提供单个 bean 引用,并且没有提供default-protocol-handler ,那单SubProtocolHandler 用作default-protocol-handler .
如果您未设置此属性或default-protocol-handler 这PassThruSubProtocolHandler 默认使用。 |
6 | 列表MessageConverter 此通道适配器的 bean 引用。 |
7 | 布尔值,指示是否应在任何自定义转换器之后注册默认转换器。
仅当message-converters 被提供。
否则,将注册所有默认转换器。
默认为false .
默认转换器是(按顺序):StringMessageConverter ,ByteArrayMessageConverter 和MappingJackson2MessageConverter (如果 Jackson 库存在于类路径上)。 |
8 | 布尔值,指示此终结点是否应自动启动。
默认为true . |
9 | 此终结点应启动和停止的生命周期阶段。
值越低,此终结点开始的时间越早,停止的时间越晚。
默认值为Integer.MIN_VALUE .
值可以为负数。
看SmartLifeCycle . |
<int-websocket:inbound-channel-adapter>
属性
以下列表显示了可用于<int-websocket:outbound-channel-adapter>
元素:
<int-websocket:inbound-channel-adapter
id="" (1)
channel="" (2)
error-channel="" (3)
container="" (4)
default-protocol-handler="" (5)
protocol-handlers="" (6)
message-converters="" (7)
merge-with-default-converters="" (8)
send-timeout="" (9)
payload-type="" (10)
use-broker="" (11)
auto-startup="" (12)
phase=""/> (13)
1 | 组件 bean 名称。
如果您未将channel 属性,一个DirectChannel 在应用程序上下文中创建和注册,并使用此id 属性作为 bean 名称。
在这种情况下,端点将使用 Bean 名称注册id 加.adapter . |
2 | 标识连接到此适配器的通道。 |
3 | 这MessageChannel bean 引用,其中ErrorMessage 应发送实例。 |
4 | 请参阅<int-websocket:outbound-channel-adapter> . |
5 | 请参阅<int-websocket:outbound-channel-adapter> . |
6 | 请参阅<int-websocket:outbound-channel-adapter> . |
7 | 请参阅<int-websocket:outbound-channel-adapter> . |
8 | 请参阅<int-websocket:outbound-channel-adapter> . |
9 | 如果通道可以阻塞,则向通道发送消息时等待的最大时间(以毫秒为单位)。
例如,一个QueueChannel 如果已达到最大容量,则可以阻止,直到空间可用。 |
10 | 目标的 Java 类型的完全限定名称payload 从传入的WebSocketMessage .
默认为java.lang.String . |
11 | 指示此适配器是否发送non-MESSAGE WebSocketMessage 实例和消息,并将代理目标发送到AbstractBrokerMessageHandler 从应用程序上下文中。
当此属性为true 这Broker Relay 配置是必需的。
此属性仅在服务器端使用。
在客户端,它被忽略。
默认为false . |
12 | 请参阅<int-websocket:outbound-channel-adapter> . |
13 | 请参阅<int-websocket:outbound-channel-adapter> . |
用ClientStompEncoder
从版本 4.3.13 开始,Spring Integration 提供了ClientStompEncoder
(作为标准的扩展StompEncoder
) 用于 WebSocket 通道适配器的客户端。
为了正确准备客户端消息,您必须注入ClientStompEncoder
进入StompSubProtocolHandler
.
默认值的一个问题StompSubProtocolHandler
是它是为服务器端设计的,所以它更新了SEND
stompCommand
header 到MESSAGE
(根据服务器端的 STOMP 协议的要求)。
如果客户端没有以正确的方式发送其消息SEND
Web 套接字框架,一些 STOMP 代理不接受它们。
的目的ClientStompEncoder
,在这种情况下,是覆盖stompCommand
标头,并将其设置为SEND
值,然后将消息编码为byte[]
.
动态 WebSocket 端点注册
从版本 5.5 开始,WebSocket 服务器端点(基于ServerWebSocketContainer
) 现在可以在运行时注册(和删除)-paths
一个ServerWebSocketContainer
映射通过HandlerMapping
变成一个DispatcherServlet
并可供 WebSocket 客户端访问。
动态和运行时集成流支持有助于以透明的方式注册这些终结点:
@Autowired
IntegrationFlowContext integrationFlowContext;
@Autowired
HandshakeHandler handshakeHandler;
...
ServerWebSocketContainer serverWebSocketContainer =
new ServerWebSocketContainer("/dynamic")
.setHandshakeHandler(this.handshakeHandler);
WebSocketInboundChannelAdapter webSocketInboundChannelAdapter =
new WebSocketInboundChannelAdapter(serverWebSocketContainer);
QueueChannel dynamicRequestsChannel = new QueueChannel();
IntegrationFlow serverFlow =
IntegrationFlow.from(webSocketInboundChannelAdapter)
.channel(dynamicRequestsChannel)
.get();
IntegrationFlowContext.IntegrationFlowRegistration dynamicServerFlow =
this.integrationFlowContext.registration(serverFlow)
.addBean(serverWebSocketContainer)
.register();
...
dynamicServerFlow.destroy();
打电话很重要.addBean(serverWebSocketContainer) 在动态流注册上添加ServerWebSocketContainer 变成一个ApplicationContext 用于终结点注册。
销毁动态流注册时,关联的ServerWebSocketContainer 实例以及相应的端点注册(包括 URL 路径映射)也会被销毁。 |
动态 Websocket 端点只能通过 Spring Integration 机制注册:当常规 Spring@EnableWebsocket ,Spring Integration 配置将退出,并且不会注册动态端点的基础设施。 |