对于最新的稳定版本,请使用 Spring Integration 6.5.1! |
ZeroMQ 支持
Spring Integration 提供了支持应用程序中 ZeroMQ 通信的组件。 该实现基于 JeroMQ 库的良好支持的 Java API。 所有组件都封装了 ZeroMQ 套接字生命周期,并在内部为它们管理线程,使与这些组件的交互无锁且线程安全。
您需要将此依赖项包含在您的项目中:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-zeromq</artifactId>
<version>6.3.11</version>
</dependency>
compile "org.springframework.integration:spring-integration-zeromq:6.3.11"
ZeroMQ 代理
这ZeroMqProxy
是内置的 Spring 友好包装器ZMQ.proxy()
功能。
它封装了套接字生命周期和线程管理。
该代理的客户端仍然可以使用标准的 ZeroMQ 套接字连接和交互 API。
与标准一起ZContext
它需要一种众所周知的 ZeroMQ 代理模式:SUB/PUB、PULL/PUSH 或 ROUTER/DEALER。
这样,代理的前端和后端就使用了一对适当的 ZeroMQ 套接字类型。
看ZeroMqProxy.Type
了解详情。
这ZeroMqProxy
实现SmartLifecycle
创建、绑定和配置套接字并启动ZMQ.proxy()
在专用线程中Executor
(如果有)。
前端和后端套接字的绑定是通过tcp://
协议连接到所有可用网络接口上,并使用提供的端口。
否则,它们将绑定到随机端口,这些端口稍后可以通过相应的getFrontendPort()
和getBackendPort()
API 方法。
控制套接字公开为SocketType.PAIR
在"inproc://" + beanName + ".control"
地址;可以通过getControlAddress()
.
它应该与另一个应用程序的同一应用程序一起使用SocketType.PAIR
要发送的套接字ZMQ.PROXY_TERMINATE
,ZMQ.PROXY_PAUSE
和/或ZMQ.PROXY_RESUME
命令。
这ZeroMqProxy
执行ZMQ.PROXY_TERMINATE
命令stop()
调用其生命周期以终止ZMQ.proxy()
循环并优雅地关闭所有绑定的套接字。
这setExposeCaptureSocket(boolean)
选项会导致此组件将附加线程间套接字与SocketType.PUB
捕获并发布前端和后端套接字之间的所有通信,如其ZMQ.proxy()
实现。
此套接字绑定到"inproc://" + beanName + ".capture"
地址,并且不需要任何特定的订阅进行筛选。
可以使用其他属性(例如读/写超时或安全性)自定义前端和后端套接字。
此自定义可通过以下方式获得setFrontendSocketConfigurer(Consumer<ZMQ.Socket>)
和setBackendSocketConfigurer(Consumer<ZMQ.Socket>)
回调。
这ZeroMqProxy
可以像这样提供简单的 bean:
@Bean
ZeroMqProxy zeroMqProxy() {
ZeroMqProxy proxy = new ZeroMqProxy(CONTEXT, ZeroMqProxy.Type.SUB_PUB);
proxy.setExposeCaptureSocket(true);
proxy.setFrontendPort(6001);
proxy.setBackendPort(6002);
return proxy;
}
所有客户端节点都应通过以下方式连接到此代理的主机tcp://
并使用各自感兴趣的端口。
ZeroMQ 消息通道
这ZeroMqChannel
是一个SubscribableChannel
它使用一对 ZeroMQ 套接字连接发布者和订阅者进行消息传递交互。
它可以在 PUB/SUB 模式下工作(默认为 PUSH/PULL);它也可以用作本地线程间通道(使用PAIR
sockets) -connectUrl
在这种情况下不提供。
在分布式模式下,它必须连接到外部管理的 ZeroMQ 代理,在那里它可以与连接到同一代理的其他类似通道交换消息。
连接 url 选项是一个标准的 ZeroMQ 连接字符串,其中包含协议和主机,以及一对冒号上的端口,用于 ZeroMQ 代理的前端和后端套接字。
为方便起见,通道可以配备ZeroMqProxy
instance 而不是连接字符串,如果它与代理配置在同一应用程序中。
发送和接收套接字都在自己的专用线程中进行管理,使该通道对并发友好。
这样我们就可以发布和消费到ZeroMqChannel
来自不同的线程而不同步。
默认情况下,ZeroMqChannel
使用EmbeddedJsonHeadersMessageMapper
(反)序列化Message
(包括标头)from/tobyte[]
使用 Jackson JSON 处理器。
可以通过以下方式配置此逻辑setMessageMapper(BytesMessageMapper)
.
发送和接收套接字可以通过各自的任何选项(读/写超时、安全性等)进行自定义setSendSocketConfigurer(Consumer<ZMQ.Socket>)
和setSubscribeSocketConfigurer(Consumer<ZMQ.Socket>)
回调。
的内部逻辑ZeroMqChannel
基于通过 Project Reactor 的反应流Flux
和Mono
运营商。 这提供了更轻松的线程控制,并允许无锁并发发布和与通道之间的使用。本地 PUB/SUB 逻辑实现为Flux.publish()
运算符以允许此通道的所有本地订阅者接收相同的已发布消息,作为PUB
插座。
下面是一个简单的示例ZeroMqChannel
配置:
@Bean
ZeroMqChannel zeroMqPubSubChannel(ZContext context) {
ZeroMqChannel channel = new ZeroMqChannel(context, true);
channel.setConnectUrl("tcp://localhost:6001:6002");
channel.setConsumeDelay(Duration.ofMillis(100));
return channel;
}
ZeroMQ 入站通道适配器
这ZeroMqMessageProducer
是一个MessageProducerSupport
使用响应式语义实现。
它以非阻塞的方式不断从 ZeroMQ 套接字读取数据,并将消息发布到无限Flux
由FluxMessageChannel
或显式地在start()
方法,如果输出通道不是无功的。
当套接字上没有收到任何数据时,一个consumeDelay
(默认为 1 秒)在下一次读取尝试之前应用。
只SocketType.PAIR
,SocketType.PULL
和SocketType.SUB
由ZeroMqMessageProducer
.
此组件可以连接到远程套接字或使用提供的或随机端口绑定到 TCP 协议。
实际端口可以通过以下方式获得getBoundPort()
此组件启动并绑定 ZeroMQ 套接字后。
套接字选项(例如安全或写入超时)可以通过以下方式配置setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer)
回调。
如果receiveRaw
选项设置为true
一个ZMsg
,从套接字消耗,在生成的Message
:由下游流解析和转换ZMsg
.
否则,一个InboundMessageMapper
用于将消耗的数据转换为Message
.
如果收到的ZMsg
是多帧,则第一帧被视为ZeroMqHeaders.TOPIC
此 ZeroMQ 消息发布到的标头。
如果unwrapTopic
选项设置为false
,则传入消息被认为由两个帧组成:主题和 ZeroMQ 消息。
否则,默认情况下,ZMsg
被认为由三个帧组成:第一个包含主题,最后一个包含消息的帧,中间有一个空帧。
跟SocketType.SUB
这ZeroMqMessageProducer
使用提供的topics
订阅选项;默认订阅全部。
订阅可以在运行时使用subscribeToTopics()
和unsubscribeFromTopics()
@ManagedOperation
s.
这是一个示例ZeroMqMessageProducer
配置:
@Bean
ZeroMqMessageProducer zeroMqMessageProducer(ZContext context, MessageChannel outputChannel) {
ZeroMqMessageProducer messageProducer = new ZeroMqMessageProducer(context, SocketType.SUB);
messageProducer.setOutputChannel(outputChannel);
messageProducer.setTopics("some");
messageProducer.setReceiveRaw(true);
messageProducer.setBindPort(7070);
messageProducer.setConsumeDelay(Duration.ofMillis(100));
return messageProducer;
}
ZeroMQ 出站通道适配器
这ZeroMqMessageHandler
是一个ReactiveMessageHandler
实现将发布消息生成到 ZeroMQ 套接字中。
只SocketType.PAIR
,SocketType.PUSH
和SocketType.PUB
被支持。
这ZeroMqMessageHandler
仅支持连接 ZeroMQ 套接字;不支持绑定。
当SocketType.PUB
使用时,则topicExpression
根据请求消息进行评估,以将主题帧注入 ZeroMQ 消息(如果它不是 null)。
订阅者端 (SocketType.SUB
)必须先接收主题帧,然后才能解析实际数据。
如果wrapTopic
选项设置为false
,ZeroMQ 消息帧在注入的主题之后发送(如果存在)。
默认情况下,在主题和消息之间发送一个额外的空帧。
当请求消息的有效负载是ZMsg
,则不执行转换或主题提取:该ZMsg
按原样发送到套接字中,并且不会被销毁以供进一步重用。
否则,一个OutboundMessageMapper<byte[]>
用于将请求消息(或仅其有效负载)转换为 ZeroMQ 帧以进行发布。
默认情况下,一个ConvertingBytesMessageMapper
使用时提供ConfigurableCompositeMessageConverter
.
套接字选项(例如安全或写入超时)可以通过以下方式配置setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer)
回调。
这是一个示例ZeroMqMessageHandler
配置:
@Bean
@ServiceActivator(inputChannel = "zeroMqPublisherChannel")
ZeroMqMessageHandler zeroMqMessageHandler(ZContext context) {
ZeroMqMessageHandler messageHandler =
new ZeroMqMessageHandler(context, "tcp://localhost:6060", SocketType.PUB);
messageHandler.setTopicExpression(
new FunctionExpression<Message<?>>((message) -> message.getHeaders().get("topic")));
messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper());
}
ZeroMQ Java DSL 支持
这spring-integration-zeromq
通过以下方式提供方便的 Java DSL 流畅 APIZeroMq
工厂和IntegrationComponentSpec
上述组件的实现。
这是 Java DSL 的示例,用于ZeroMqChannel
:
.channel(ZeroMq.zeroMqChannel(this.context)
.connectUrl("tcp://localhost:6001:6002")
.consumeDelay(Duration.ofMillis(100)))
}
ZeroMQ Java DSL 的入站通道适配器是:
IntegrationFlow.from(
ZeroMq.inboundChannelAdapter(this.context, SocketType.SUB)
.connectUrl("tcp://localhost:9000")
.topics("someTopic")
.receiveRaw(true)
.consumeDelay(Duration.ofMillis(100)))
}
ZeroMQ Java DSL 的出站通道适配器是:
.handle(ZeroMq.outboundChannelAdapter(this.context, "tcp://localhost:9001", SocketType.PUB)
.topicFunction(message -> message.getHeaders().get("myTopic")))
}