STOMP 支持
STOMP 支持
Spring Integration 4.2 版引入了 STOMP(面向简单文本的消息传递协议)客户端支持。
它基于 Spring Framework 的消息传递模块 stomp 包中的架构、基础设施和 API。
Spring Integration 使用许多 Spring STOMP 组件(例如StompSession
和StompClientSupport
).
有关更多信息,请参阅 Spring Framework 参考手册中的 Spring Framework STOMP 支持一章。
您需要将此依赖项包含在您的项目中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stomp</artifactId>
<version>6.0.9</version>
</dependency>
compile "org.springframework.integration:spring-integration-stomp:6.0.9"
对于服务器端组件,您需要添加一个org.springframework:spring-websocket
和/或io.projectreactor.netty:reactor-netty
依赖。
概述
要配置 STOMP,您应该从 STOMP 客户端对象开始。 Spring 框架提供了以下实现:
-
WebSocketStompClient
:基于 Spring WebSocket API 构建,支持标准 JSR-356 WebSocket、Jetty 9 和 SockJS,用于使用 SockJS 客户端进行基于 HTTP 的 WebSocket 仿真。 -
ReactorNettyTcpStompClient
:建立在ReactorNettyTcpClient
从reactor-netty
项目。
您可以提供任何其他StompClientSupport
实现。
请参阅这些类的 Javadoc。
这StompClientSupport
类被设计为工厂生产StompSession
对于提供的StompSessionHandler
所有剩余的工作都是通过回调完成的StompSessionHandler
和StompSession
抽象化。
使用 Spring Integration 适配器抽象,我们需要提供一些托管共享对象来将我们的应用程序表示为具有唯一会话的 STOMP 客户端。
为此,Spring Integration 提供了StompSessionManager
抽象来管理单个 StompSession
在任何提供的StompSessionHandler
.
这允许为特定的 STOMP 代理使用入站或出站通道适配器(或两者)。
看StompSessionManager
(及其实现)JavaDocs 了解更多信息。
STOMP 入站通道适配器
这StompInboundChannelAdapter
是一站式MessageProducer
组件,该组件将 Spring Integration 应用程序订阅到提供的 STOMP 目标并从它们接收消息(使用提供的 STOMP 帧从 STOMP 帧转换MessageConverter
在连接的StompSession
).
您可以使用适当的@ManagedOperation
注释StompInboundChannelAdapter
.
有关更多配置选项,请参阅 STOMP 命名空间支持和StompInboundChannelAdapter
Javadoc 的 Javadoc 中。
STOMP 出站通道适配器
这StompMessageHandler
是MessageHandler
对于<int-stomp:outbound-channel-adapter>
并用于发送传出Message<?>
实例添加到 STOMPdestination
(预先配置或在运行时使用 SpEL 表达式确定)通过StompSession
(由共享的StompSessionManager
).
有关更多配置选项,请参阅 STOMP 命名空间支持和StompMessageHandler
Javadoc 的 Javadoc 中。
STOMP 标头映射
STOMP 协议提供标头作为其帧的一部分。 STOMP 框架的整个结构具有以下格式:
....
COMMAND
header1:value1
header2:value2
Body^@
....
Spring Framework 提供了StompHeaders
来表示这些标头。
有关更多详细信息,请参阅 Javadoc。
STOMP 帧与 STOMP 帧进行转换和从Message<?>
实例,这些标头映射到MessageHeaders
实例。
Spring Integration 提供默认的HeaderMapper
STOMP 适配器的实现。
实现是StompHeaderMapper
.
它提供了fromHeaders()
和toHeaders()
分别是入站和出站适配器的作。
与许多其他 Spring Integration 模块一样,IntegrationStompHeaders
引入类以将标准 STOMP 标头映射到MessageHeaders
跟stomp_
作为标头名称前缀。
此外,所有MessageHeaders
具有该前缀的实例将映射到StompHeaders
发送到目的地时。
STOMP 集成事件
许多 STOMP作都是异步的,包括错误处理。
例如,STOMP 有一个RECEIPT
服务器帧,当客户端帧请求一个时,它通过添加RECEIPT
页眉。
为了提供对这些异步事件的访问,Spring Integration 发出StompIntegrationEvent
实例,您可以通过实现ApplicationListener
或通过使用<int-event:inbound-channel-adapter>
(请参阅接收 Spring 应用程序事件)。
具体来说,一个StompExceptionEvent
从AbstractStompSessionManager
当stompSessionListenableFuture
接收onFailure()
由于无法连接到 STOMP 代理。
另一个例子是StompMessageHandler
.
它处理ERROR
STOMP 帧,这是服务器对StompMessageHandler
.
这StompMessageHandler
发出StompReceiptEvent
作为StompSession.Receiptable
发送到StompSession
.
这StompReceiptEvent
可以是正的,也可以是负的,具体取决于RECEIPT
frame 是从receiptTimeLimit
句点,您可以在StompClientSupport
实例。
它默认为15 * 1000
(以毫秒为单位,所以 15 秒)。
这StompSession.Receiptable 仅当RECEIPT 要发送的消息的 STOMP 标头不是null .
您可以启用自动RECEIPT 标头生成StompSession 通过其autoReceipt 选项,并在StompSessionManager 分别。 |
有关如何配置 Spring Integration 以接受这些内容的更多信息,请参阅 STOMP 适配器 Java 配置ApplicationEvent
实例。
STOMP 适配器 Java 配置
以下示例显示了 STOMP 适配器的全面 Java 配置:
@Configuration
@EnableIntegration
public class StompConfiguration {
@Bean
public ReactorNettyTcpStompClient stompClient() {
ReactorNettyTcpStompClient stompClient = new ReactorNettyTcpStompClient("127.0.0.1", 61613);
stompClient.setMessageConverter(new PassThruMessageConverter());
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.afterPropertiesSet();
stompClient.setTaskScheduler(taskScheduler);
stompClient.setReceiptTimeLimit(5000);
return stompClient;
}
@Bean
public StompSessionManager stompSessionManager() {
ReactorNettyTcpStompSessionManager stompSessionManager = new ReactorNettyTcpStompSessionManager(stompClient());
stompSessionManager.setAutoReceipt(true);
return stompSessionManager;
}
@Bean
public PollableChannel stompInputChannel() {
return new QueueChannel();
}
@Bean
public StompInboundChannelAdapter stompInboundChannelAdapter() {
StompInboundChannelAdapter adapter =
new StompInboundChannelAdapter(stompSessionManager(), "/topic/myTopic");
adapter.setOutputChannel(stompInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "stompOutputChannel")
public MessageHandler stompMessageHandler() {
StompMessageHandler handler = new StompMessageHandler(stompSessionManager());
handler.setDestination("/topic/myTopic");
return handler;
}
@Bean
public PollableChannel stompEvents() {
return new QueueChannel();
}
@Bean
public ApplicationListener<ApplicationEvent> stompEventListener() {
ApplicationEventListeningMessageProducer producer = new ApplicationEventListeningMessageProducer();
producer.setEventTypes(StompIntegrationEvent.class);
producer.setOutputChannel(stompEvents());
return producer;
}
}
STOMP 命名空间支持
Spring Integration STOMP 命名空间实现入站和出站通道适配器组件。 要将其包含在配置中,请在应用程序上下文配置文件中提供以下命名空间声明:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-stomp="http://www.springframework.org/schema/integration/stomp"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/stomp
https://www.springframework.org/schema/integration/stomp/spring-integration-stomp.xsd">
...
</beans>
了解<int-stomp:outbound-channel-adapter>
元素
以下列表显示了 STOMP 出站通道适配器的可用属性:
<int-stomp:outbound-channel-adapter
id="" (1)
channel="" (2)
stomp-session-manager="" (3)
header-mapper="" (4)
mapped-headers="" (5)
destination="" (6)
destination-expression="" (7)
auto-startup="" (8)
phase=""/> (9)
1 | 组件 bean 名称。
这MessageHandler 使用 Bean 别名id 加.handler .
如果您未将channel 属性,一个DirectChannel 在应用程序上下文中创建和注册,其值为id 属性作为 bean 名称。
在这种情况下,端点注册了一个 bean 名称id 加.adapter . |
2 | 标识连接到此适配器的通道,如果id 存在。
看id .
自选。 |
3 | 引用StompSessionManager bean,它封装了低级连接和StompSession 处理作。
必填。 |
4 | 对实现HeaderMapper<StompHeaders> ,它映射了 Spring IntegrationMessageHeaders 往返
STOMP 帧标头。
它与mapped-headers .
它默认为StompHeaderMapper . |
5 | 要映射到 STOMP 帧标头的 STOMP 标头名称的逗号分隔列表。
只有在header-mapper 未设置引用。
此列表中的值也可以是与标头名称匹配的简单模式(例如myheader* 或*myheader ).
特殊Tokens (STOMP_OUTBOUND_HEADERS )表示所有标准 STOMP 标头(content-length、receipt、heart-beat 等)。
默认情况下,它们包含在内。
如果要添加自己的标头并希望标准标头也被映射,则必须包含此Tokens或提供自己的TokensHeaderMapper 使用header-mapper . |
6 | STOMP 消息发送到的目标的名称。
它与destination-expression . |
7 | 在运行时针对每个 Spring Integration 进行评估的 SpEL 表达式Message 作为根对象。
它与destination . |
8 | 布尔值,指示此终结点是否应自动启动。
它默认为true . |
9 | 此终结点应启动和停止的生命周期阶段。
值越低,此终结点开始的时间越早,停止的时间越晚。
默认值为Integer.MIN_VALUE .
值可以为负数。
看SmartLifeCycle . |
了解<int-stomp:inbound-channel-adapter>
元素
以下列表显示了 STOMP 入站通道适配器的可用属性:
<int-stomp:inbound-channel-adapter
id="" (1)
channel="" (2)
error-channel="" (3)
stomp-session-manager="" (4)
header-mapper="" (5)
mapped-headers="" (6)
destinations="" (7)
send-timeout="" (8)
payload-type="" (9)
auto-startup="" (10)
phase=""/> (11)
1 | 组件 bean 名称。
如果您未将channel 属性,一个DirectChannel 在应用程序上下文中创建和注册,其值为id 属性作为 bean 名称。
在这种情况下,端点将使用 Bean 名称注册id 加.adapter . |
2 | 标识连接到此适配器的通道。 |
3 | 这MessageChannel bean 引用,其中ErrorMessage 应发送实例。 |
4 | 请参阅<int-stomp:outbound-channel-adapter> . |
5 | 要从 STOMP 帧报头映射的 STOMP 报头名称的逗号分隔列表。
只有在header-mapper 未设置引用。
此列表中的值也可以是与标头名称匹配的简单模式(例如,myheader* 或*myheader ).
特殊Tokens (STOMP_INBOUND_HEADERS )表示所有标准 STOMP 标头(content-length、receipt、heart-beat 等)。
默认情况下,它们包含在内。
如果要添加自己的标头并希望映射标准标头,则还必须包含此Tokens或提供自己的TokensHeaderMapper 使用header-mapper . |
6 | 请参阅<int-stomp:outbound-channel-adapter> . |
7 | 要订阅的 STOMP 目标名称的逗号分隔列表。目标列表(以及订阅)可以在运行时通过addDestination() 和removeDestination() @ManagedOperation 附注。 |
8 | 如果通道可以阻塞,则向通道发送消息时等待的最大时间(以毫秒为单位)。例如,一个QueueChannel 如果已达到最大容量,则可以阻止,直到空间可用。 |
9 | 目标的 Java 类型的完全限定名称payload 从传入的 STOMP 帧进行转换。
它默认为String.class . |
10 | 请参阅<int-stomp:outbound-channel-adapter> . |
11 | 请参阅<int-stomp:outbound-channel-adapter> . |