此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring Integration 6.5.1! |
ZeroMQ 支持
Spring Integration 提供了支持应用程序中 ZeroMQ 通信的组件。 该实现基于 JeroMQ 库的良好支持的 Java API。 所有组件都封装了 ZeroMQ 套接字生命周期,并在内部为它们管理线程,使与这些组件的交互无锁且线程安全。
您需要将此依赖项包含在您的项目中:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-zeromq</artifactId>
<version>6.5.2-SNAPSHOT</version>
</dependency>
compile "org.springframework.integration:spring-integration-zeromq:6.5.2-SNAPSHOT"
ZeroMQ 代理
这ZeroMqProxy
是内置的 Spring 友好包装器ZMQ.proxy()
功能。
它封装了套接字生命周期和线程管理。
该代理的客户端仍然可以使用标准的 ZeroMQ 套接字连接和交互 API。
与标准一起ZContext
它需要一种众所周知的 ZeroMQ 代理模式:SUB/PUB、PULL/PUSH 或 ROUTER/DEALER。
这样,代理的前端和后端就使用了一对适当的 ZeroMQ 套接字类型。
看ZeroMqProxy.Type
了解详情。
这ZeroMqProxy
实现SmartLifecycle
创建、绑定和配置套接字并启动ZMQ.proxy()
在专用线程中Executor
(如果有)。
前端和后端套接字的绑定是通过tcp://
协议连接到所有可用网络接口上,并使用提供的端口。
否则,它们将绑定到随机端口,这些端口稍后可以通过相应的getFrontendPort()
和getBackendPort()
API 方法。
控制套接字公开为SocketType.PAIR
在"inproc://" + beanName + ".control"
地址;可以通过getControlAddress()
.
它应该与另一个应用程序的同一应用程序一起使用SocketType.PAIR
要发送的套接字ZMQ.PROXY_TERMINATE
,ZMQ.PROXY_PAUSE
和/或ZMQ.PROXY_RESUME
命令。
这ZeroMqProxy
执行ZMQ.PROXY_TERMINATE
命令stop()
调用其生命周期以终止ZMQ.proxy()
循环并优雅地关闭所有绑定的套接字。
这setExposeCaptureSocket(boolean)
选项会导致此组件将附加线程间套接字与SocketType.PUB
捕获并发布前端和后端套接字之间的所有通信,如其ZMQ.proxy()
实现。
此套接字绑定到"inproc://" + beanName + ".capture"
地址,并且不需要任何特定的订阅进行筛选。
可以使用其他属性(例如读/写超时或安全性)自定义前端和后端套接字。
此自定义可通过以下方式获得setFrontendSocketConfigurer(Consumer<ZMQ.Socket>)
和setBackendSocketConfigurer(Consumer<ZMQ.Socket>)
回调。
这ZeroMqProxy
可以像这样提供简单的 bean:
@Bean
ZeroMqProxy zeroMqProxy() {
ZeroMqProxy proxy = new ZeroMqProxy(CONTEXT, ZeroMqProxy.Type.SUB_PUB);
proxy.setExposeCaptureSocket(true);
proxy.setFrontendPort(6001);
proxy.setBackendPort(6002);
return proxy;
}
所有客户端节点都应通过以下方式连接到此代理的主机tcp://
并使用各自感兴趣的端口。
ZeroMQ 消息通道
这ZeroMqChannel
是一个SubscribableChannel
它使用一对 ZeroMQ 套接字连接发布者和订阅者进行消息传递交互。
它可以在 PUB/SUB 模式下工作(默认为 PUSH/PULL);它也可以用作本地线程间通道(使用PAIR
sockets) -connectUrl
在这种情况下不提供。
在分布式模式下,它必须连接到外部管理的 ZeroMQ 代理,在那里它可以与连接到同一代理的其他类似通道交换消息。
连接 url 选项是一个标准的 ZeroMQ 连接字符串,其中包含协议和主机,以及一对冒号上的端口,用于 ZeroMQ 代理的前端和后端套接字。
为方便起见,通道可以配备ZeroMqProxy
instance 而不是连接字符串,如果它与代理配置在同一应用程序中。
发送和接收套接字都在自己的专用线程中进行管理,使该通道对并发友好。
这样我们就可以发布和消费到ZeroMqChannel
来自不同的线程而不同步。
默认情况下,ZeroMqChannel
使用EmbeddedJsonHeadersMessageMapper
(反)序列化Message
(包括标头)from/tobyte[]
使用 Jackson JSON 处理器。
可以通过以下方式配置此逻辑setMessageMapper(BytesMessageMapper)
.
发送和接收套接字可以通过各自的任何选项(读/写超时、安全性等)进行自定义setSendSocketConfigurer(Consumer<ZMQ.Socket>)
和setSubscribeSocketConfigurer(Consumer<ZMQ.Socket>)
回调。
的内部逻辑ZeroMqChannel
基于通过 Project Reactor 的反应流Flux
和Mono
运营商。
这提供了更轻松的线程控制,并允许无锁并发发布和与通道之间的使用。
本地 PUB/SUB 逻辑实现为Flux.publish()
运算符以允许此通道的所有本地订阅者接收相同的已发布消息,作为PUB
插座。
下面是一个简单的示例ZeroMqChannel
配置:
@Bean
ZeroMqChannel zeroMqPubSubChannel(ZContext context) {
ZeroMqChannel channel = new ZeroMqChannel(context, true);
channel.setConnectUrl("tcp://localhost:6001:6002");
channel.setConsumeDelay(Duration.ofMillis(100));
return channel;
}
ZeroMQ 入站通道适配器
这ZeroMqMessageProducer
是一个MessageProducerSupport
使用响应式语义实现。
它以非阻塞的方式不断从 ZeroMQ 套接字读取数据,并将消息发布到无限Flux
由FluxMessageChannel
或显式地在start()
方法,如果输出通道不是无功的。
当套接字上没有收到任何数据时,一个consumeDelay
(默认为 1 秒)在下一次读取尝试之前应用。
只SocketType.PAIR
,SocketType.PULL
和SocketType.SUB
由ZeroMqMessageProducer
.
此组件可以连接到远程套接字或使用提供的或随机端口绑定到 TCP 协议。
实际端口可以通过以下方式获得getBoundPort()
此组件启动并绑定 ZeroMQ 套接字后。
套接字选项(例如安全或写入超时)可以通过以下方式配置setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer)
回调。
如果receiveRaw
选项设置为true
一个ZMsg
,从套接字消耗,在生成的Message
:由下游流解析和转换ZMsg
.
否则,一个InboundMessageMapper
用于将消耗的数据转换为Message
.
如果收到的ZMsg
是多帧,则第一帧被视为ZeroMqHeaders.TOPIC
此 ZeroMQ 消息发布到的标头。
如果unwrapTopic
选项设置为false
,则传入消息被认为由两个帧组成:主题和 ZeroMQ 消息。
否则,默认情况下,ZMsg
被认为由三个帧组成:第一个包含主题,最后一个包含消息的帧,中间有一个空帧。
跟SocketType.SUB
这ZeroMqMessageProducer
使用提供的topics
订阅选项;默认订阅全部。
订阅可以在运行时使用subscribeToTopics()
和unsubscribeFromTopics()
@ManagedOperation
s.
这是一个示例ZeroMqMessageProducer
配置:
@Bean
ZeroMqMessageProducer zeroMqMessageProducer(ZContext context, MessageChannel outputChannel) {
ZeroMqMessageProducer messageProducer = new ZeroMqMessageProducer(context, SocketType.SUB);
messageProducer.setOutputChannel(outputChannel);
messageProducer.setTopics("some");
messageProducer.setReceiveRaw(true);
messageProducer.setBindPort(7070);
messageProducer.setConsumeDelay(Duration.ofMillis(100));
return messageProducer;
}
ZeroMQ 出站通道适配器
这ZeroMqMessageHandler
是一个ReactiveMessageHandler
实现将发布消息生成到 ZeroMQ 套接字中。
只SocketType.PAIR
,SocketType.PUSH
和SocketType.PUB
被支持。
此组件可以连接到远程套接字或使用提供的或随机端口绑定到 TCP 协议。
实际端口可以通过以下方式获得getBoundPort()
此组件启动并绑定 ZeroMQ 套接字后。
当SocketType.PUB
使用时,则topicExpression
根据请求消息进行评估,以将主题帧注入 ZeroMQ 消息(如果它不是 null)。
订阅者端 (SocketType.SUB
)必须先接收主题帧,然后才能解析实际数据。
如果wrapTopic
选项设置为false
,ZeroMQ 消息帧在注入的主题之后发送(如果存在)。
默认情况下,在主题和消息之间发送一个额外的空帧。
当请求消息的有效负载是ZMsg
,则不执行转换或主题提取:该ZMsg
按原样发送到套接字中,并且不会被销毁以供进一步重用。
否则,一个OutboundMessageMapper<byte[]>
用于将请求消息(或仅其有效负载)转换为 ZeroMQ 帧以进行发布。
默认情况下,一个ConvertingBytesMessageMapper
使用时提供ConfigurableCompositeMessageConverter
.
套接字选项(例如安全或写入超时)可以通过以下方式配置setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer)
回调。
这是一个示例ZeroMqMessageHandler
连接到套接字的配置:
@Bean
@ServiceActivator(inputChannel = "zeroMqPublisherChannel")
ZeroMqMessageHandler zeroMqMessageHandler(ZContext context) {
ZeroMqMessageHandler messageHandler =
new ZeroMqMessageHandler(context, "tcp://localhost:6060", SocketType.PUB);
messageHandler.setTopicExpression(
new FunctionExpression<Message<?>>((message) -> message.getHeaders().get("topic")));
messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper());
}
这是一个示例ZeroMqMessageHandler
绑定到提供的端口的配置:
@Bean
@ServiceActivator(inputChannel = "zeroMqPublisherChannel")
ZeroMqMessageHandler zeroMqMessageHandler(ZContext context) {
ZeroMqMessageHandler messageHandler =
new ZeroMqMessageHandler(context, 7070, SocketType.PUB);
messageHandler.setTopicExpression(
new FunctionExpression<Message<?>>((message) -> message.getHeaders().get("topic")));
messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper());
}
ZeroMQ Java DSL 支持
这spring-integration-zeromq
通过以下方式提供方便的 Java DSL 流畅 APIZeroMq
工厂和IntegrationComponentSpec
上述组件的实现。
这是 Java DSL 的示例,用于ZeroMqChannel
:
.channel(ZeroMq.zeroMqChannel(this.context)
.connectUrl("tcp://localhost:6001:6002")
.consumeDelay(Duration.ofMillis(100)))
}
ZeroMQ Java DSL 的入站通道适配器是:
IntegrationFlow.from(
ZeroMq.inboundChannelAdapter(this.context, SocketType.SUB)
.connectUrl("tcp://localhost:9000")
.topics("someTopic")
.receiveRaw(true)
.consumeDelay(Duration.ofMillis(100)))
}
ZeroMQ Java DSL 的出站通道适配器是:
.handle(ZeroMq.outboundChannelAdapter(this.context, "tcp://localhost:9001", SocketType.PUB)
.topicFunction(message -> message.getHeaders().get("myTopic")))
}