对于最新的稳定版本,请使用 Spring Integration 6.5.1spring-doc.cadn.net.cn

JMS 支持

Spring Integration提供了用于接收和发送JMS消息的通道适配器。spring-doc.cadn.net.cn

您需要将此依赖项包含在您的项目中:spring-doc.cadn.net.cn

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-jms</artifactId>
    <version>6.3.11</version>
</dependency>
compile "org.springframework.integration:spring-integration-jms:6.3.11"

jakarta.jms:jakarta.jms-api必须通过某些特定于 JMS 提供商的实现(例如 Apache ActiveMQ)显式添加。spring-doc.cadn.net.cn

实际上有两个基于 JMS 的入站通道适配器。 第一个使用 Spring 的JmsTemplate根据轮询周期接收。 第二种是“消息驱动”,依赖于 SpringMessageListener容器。 出站通道适配器使用JmsTemplate按需转换和发送 JMS 消息。spring-doc.cadn.net.cn

通过使用JmsTemplateMessageListener容器中,Spring Integration 依赖于 Spring 的 JMS 支持。 了解这一点很重要,因为这些适配器上公开的大多数属性都配置了底层JmsTemplateMessageListener容器。 有关JmsTemplateMessageListener容器,请参阅 Spring JMS 文档spring-doc.cadn.net.cn

虽然 JMS 通道适配器用于单向消息传递(仅发送或仅接收),但 Spring Integration 还为请求和回复作提供入站和出站 JMS 网关。 入站网关依赖于 Spring 的MessageListener用于消息驱动接收的容器实现。 它还能够将返回值发送到reply-todestination,由收到的消息提供。 出站网关将 JMS 消息发送到request-destination(或request-destination-namerequest-destination-expression),然后接收回复消息。 您可以显式配置reply-destination引用(或reply-destination-namereply-destination-expression). 否则,出站网关将使用 JMS TemporaryQueuespring-doc.cadn.net.cn

在 Spring Integration 2.2 之前,如有必要,一个TemporaryQueue为每个请求或回复创建(和删除)。 从 Spring Integration 2.2 开始,您可以将出站网关配置为使用MessageListener容器来接收回复,而不是直接使用新的(或缓存的)Consumer接收每个请求的回复。 如果这样配置,并且没有提供显式回复目标,则单个TemporaryQueue用于每个网关,而不是每个请求一个。spring-doc.cadn.net.cn

从 6.0 版开始,出站网关会创建一个TemporaryTopic而不是TemporaryQueue如果replyPubSubDomain选项设置为true. 一些 JMS 提供商以不同的方式处理这些目的地。spring-doc.cadn.net.cn

入站通道适配器

入站通道适配器需要对单个JmsTemplate实例或两者ConnectionFactoryDestination(您可以提供“destinationName”来代替“destination”引用)。 以下示例定义了具有Destination参考:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow jmsInbound(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(
                    Jms.inboundAdapter(connectionFactory)
                       .destination("inQueue"),
                    e -> e.poller(poller -> poller.fixedRate(30000)))
            .handle(m -> System.out.println(m.getPayload()))
            .get();
}
@Bean
fun jmsInbound(connectionFactory: ConnectionFactory) =
    integrationFlow(
            Jms.inboundAdapter(connectionFactory).destination("inQueue"),
            { poller { Pollers.fixedRate(30000) } })
       {
            handle { m -> println(m.payload) }
       }
@Bean
@InboundChannelAdapter(value = "exampleChannel", poller = @Poller(fixedRate = "30000"))
public MessageSource<Object> jmsIn(ConnectionFactory connectionFactory) {
    JmsDestinationPollingSource source = new JmsDestinationPollingSource(new JmsTemplate(connectionFactory));
    source.setDestinationName("inQueue");
    return source;
}
<int-jms:inbound-channel-adapter id="jmsIn" destination="inQueue" channel="exampleChannel">
    <int:poller fixed-rate="30000"/>
</int-jms:inbound-channel-adapter>
请注意,从前面的配置中,inbound-channel-adapter是轮询消费者。 这意味着它调用receive()触发时。 您应该仅在轮询相对不频繁且及时性不重要的情况下使用它。 对于所有其他情况(绝大多数基于 JMS 的用例),将message-driven-channel-adapter (稍后描述)是更好的选择。
默认情况下,所有需要引用ConnectionFactory自动查找名为jmsConnectionFactory. 这就是为什么您看不到connection-factory属性。 但是,如果您的 JMSConnectionFactory具有不同的 bean 名称,则需要提供该属性。

如果extract-payload设置为true(默认值),接收到的 JMS 消息将通过MessageConverter. 当依赖默认值时SimpleMessageConverter,这意味着生成的 Spring Integration Message 将 JMS 消息的正文作为其有效负载。 一个 JMSTextMessage生成基于字符串的有效负载,即 JMSBytesMessage生成字节数组有效负载,以及 JMS 的可序列化实例ObjectMessage成为 Spring Integration 消息的有效负载。 如果您希望将原始 JMS 消息作为 Spring Integration 消息的有效负载,请将extractPayload选项设置为false.spring-doc.cadn.net.cn

从版本 5.0.8 开始,默认值receive-timeout-1(无需等待)org.springframework.jms.connection.CachingConnectionFactorycacheConsumers,否则为 1 秒。 JMS 入站通道适配器将DynamicJmsTemplate基于提供的ConnectionFactory和选项。 如果外部JmsTemplate是必需的(例如在 Spring Boot 环境中),或者ConnectionFactory不缓存,或否cacheConsumers,建议设置jmsTemplate.receiveTimeout(-1)如果预期非阻塞消耗:spring-doc.cadn.net.cn

Jms.inboundAdapter(connectionFactory)
        .destination(queueName)
        .configureJmsTemplate(template -> template.receiveTimeout(-1))

交易

从 V4.0 开始,入站通道适配器支持session-transacted属性。 在早期版本中,您必须注入一个JmsTemplatesessionTransacted设置为true. (适配器确实允许您将acknowledge属性设置为transacted,但这是不正确的并且不起作用)。spring-doc.cadn.net.cn

但是请注意,该设置session-transactedtrue价值不大,因为事务已提交 紧接在receive()作,然后再将消息发送到channel.spring-doc.cadn.net.cn

如果您希望整个流是事务性的(例如,如果存在下游出站通道适配器),则必须使用transactional轮询器与JmsTransactionManager. 或者,考虑使用jms-message-driven-channel-adapteracknowledge设置为transacted(默认值)。spring-doc.cadn.net.cn

消息驱动通道适配器

message-driven-channel-adapter需要对 Spring 实例的引用MessageListener容器(任何AbstractMessageListenerContainer) 或两者兼而有之ConnectionFactoryDestination(可以提供“destinationName”来代替“destination”引用)。 以下示例定义了一个消息驱动的通道适配器,其中包含Destination参考:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow jmsMessageDrivenRedeliveryFlow() {
    return IntegrationFlow
            .from(Jms.messageDrivenChannelAdapter(jmsConnectionFactory())
                     .destination("inQueue"))
            .channel("exampleChannel")
            .get();
}
@Bean
fun jmsMessageDrivenFlowWithContainer() =
        integrationFlow(
                Jms.messageDrivenChannelAdapter(jmsConnectionFactory())
                             .destination("inQueue")) {
            channel("exampleChannel")
        }
@Bean
public JmsMessageDrivenEndpoint jmsIn() {
    JmsMessageDrivenEndpoint endpoint = new JmsMessageDrivenEndpoint(container(), listener());
    return endpoint;
}
@Bean
public AbstractMessageListenerContainer container() {
    DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
    container.setConnectionFactory(cf());
    container.setDestinationName("inQueue");
    return container;
}

@Bean
public ChannelPublishingJmsMessageListener listener() {
    ChannelPublishingJmsMessageListener listener = new ChannelPublishingJmsMessageListener();
    listener.setRequestChannelName("exampleChannel");
    return listener;
}
<int-jms:message-driven-channel-adapter id="jmsIn" destination="inQueue" channel="exampleChannel"/>

消息驱动适配器还接受与MessageListener容器。 仅当您未提供container参考。 在这种情况下,实例DefaultMessageListenerContainer根据这些属性创建和配置。 例如,您可以指定transaction-managerreference,concurrent-consumersvalue 以及其他几个属性引用和值。 请参阅 Javadoc 和 Spring Integration 的 JMS 模式 (spring-integration-jms.xsd)了解更多详情。spring-doc.cadn.net.cn

如果你有一个自定义监听器容器实现(通常是DefaultMessageListenerContainer),您可以使用container属性或使用container-class属性。 在这种情况下,适配器上的属性将传输到自定义容器的实例。spring-doc.cadn.net.cn

不能使用 Spring JMS 命名空间元素<jms:listener-container/>要为<int-jms:message-driven-channel-adapter>因为该元素实际上并不引用容器。 每<jms:listener/>子元素获得自己的DefaultMessageListenerContainer(在父级<jms:listener-container/>元素)。 您可以为每个监听器子元素提供一个id,并使用它注入到通道适配器中,但是,<jms:/>命名空间需要一个真正的侦听器。spring-doc.cadn.net.cn

建议配置常规<bean>对于DefaultMessageListenerContainer并将其用作通道适配器中的参考。spring-doc.cadn.net.cn

从 4.2 版开始,默认的acknowledgemode 是transacted,除非您提供外部容器。 在这种情况下,应根据需要配置容器。 我们建议使用transacted使用DefaultMessageListenerContainer以避免消息丢失。

'extract-payload' 属性具有相同的效果,其默认值为 'true'。 这poller元素不适用于消息驱动的通道适配器,因为它是主动调用的。 对于大多数方案,消息驱动的方法更好,因为消息会传递给MessageChannel一旦从底层 JMS 使用者处收到它们。spring-doc.cadn.net.cn

最后,<message-driven-channel-adapter>元素还接受 'error-channel' 属性。 这提供了相同的基本功能,如输入GatewayProxyFactoryBean. 以下示例演示如何在消息驱动通道适配器上设置错误通道:spring-doc.cadn.net.cn

<int-jms:message-driven-channel-adapter id="jmsIn" destination="inQueue"
    channel="exampleChannel"
    error-channel="exampleErrorChannel"/>

将前面的示例与我们稍后讨论的通用网关配置或 JMS 'inbound-gateway' 进行比较时,关键区别在于我们处于单向流中,因为这是一个 'channel-adapter',而不是网关。因此,'error-channel' 的下游流也应该是单向的。例如,它可以发送到日志记录处理程序,也可以连接到不同的 JMS<outbound-channel-adapter>元素。spring-doc.cadn.net.cn

从 Topic 使用时,将pub-sub-domain属性设置为 true。 设置subscription-durabletrue用于持久订阅或subscription-shared对于共享订阅(需要 JMS 2.0 代理,并且从 V4.2 开始可用)。 用subscription-name以命名订阅。spring-doc.cadn.net.cn

从版本 5.1 开始,当端点停止而应用程序保持运行时,底层侦听器容器将关闭,从而关闭其共享连接和使用者。 此前,连接和消费者保持开放。 要恢复到以前的行为,请将shutdownContainerOnStopJmsMessageDrivenEndpointfalse.spring-doc.cadn.net.cn

从 6.3 版开始,ChannelPublishingJmsMessageListener现在可以提供RetryTemplateRecoveryCallback<Message<?>>用于重试下游发送和发送和接收作。 这些选项也公开到JmsMessageDrivenChannelAdapterSpec用于 Java DSL。spring-doc.cadn.net.cn

入站转化错误

从 4.2 版开始,“错误通道”也用于转换错误。 以前,如果 JMS<message-driven-channel-adapter/><inbound-gateway/>由于转换错误而无法传递消息,则异常将抛回容器。 如果容器配置为使用事务,则消息将回滚并重复重新传递。 转换过程发生在消息构造之前和期间,因此此类错误不会发送到“错误通道”。 现在,此类转换异常会导致ErrorMessage被发送到 'error-channel',但例外情况为payload. 如果您希望事务回滚,并且您定义了“错误通道”,则“错误通道”上的集成流必须重新抛出异常(或其他异常)。 如果错误流未抛出异常,则提交事务并删除消息。 如果未定义“错误通道”,则像以前一样将异常抛回容器。spring-doc.cadn.net.cn

出站通道适配器

JmsSendingMessageHandler实现MessageHandler接口,并且能够转换 Spring IntegrationMessages发送到 JMS 消息,然后发送到 JMS 目标。 它需要jmsTemplate参考或两者兼而有之jmsConnectionFactorydestination参考资料 (destinationName可以提供代替destination). 与入站通道适配器一样,配置此适配器的最简单方法是使用命名空间支持。 以下配置生成一个适配器,该适配器从exampleChannel,将它们转换为 JMS 消息,并将它们发送到 Bean 名称为outQueue:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow jmsOutboundFlow() {
    return IntegrationFlow.from("exampleChannel")
                .handle(Jms.outboundAdapter(cachingConnectionFactory())
                    .destinationExpression("headers." + SimpMessageHeaderAccessor.DESTINATION_HEADER)
                    .configureJmsTemplate(t -> t.id("jmsOutboundFlowTemplate")));
}
@Bean
fun jmsOutboundFlow() =
        integrationFlow("exampleChannel") {
            handle(Jms.outboundAdapter(jmsConnectionFactory())
                    .apply {
                        destinationExpression("headers." + SimpMessageHeaderAccessor.DESTINATION_HEADER)
                        deliveryModeFunction<Any> { DeliveryMode.NON_PERSISTENT }
                        timeToLiveExpression("10000")
                        configureJmsTemplate { it.explicitQosEnabled(true) }
                    }
            )
        }
@Bean
jmsOutboundFlow() {
    integrationFlow('exampleChannel') {
        handle(Jms.outboundAdapter(new ActiveMQConnectionFactory())
                .with {
                    destinationExpression 'headers.' + SimpMessageHeaderAccessor.DESTINATION_HEADER
                    deliveryModeFunction { DeliveryMode.NON_PERSISTENT }
                    timeToLiveExpression '10000'
                    configureJmsTemplate {
                        it.explicitQosEnabled true
                    }
                }
        )
    }
}
@Bean
@ServiceActivator(inputChannel = "exampleChannel")
public MessageHandler jmsOut() {
    JmsSendingMessageHandler handler = new JmsSendingMessageHandler(new JmsTemplate(connectionFactory));
    handler.setDestinationName("outQueue");
    return handler;
}
<int-jms:outbound-channel-adapter id="jmsOut" destination="outQueue" channel="exampleChannel"/>

与入站通道适配器一样,有一个“extract-payload”属性。 但是,对于出站适配器,其含义相反。 布尔属性不是应用于 JMS 消息,而是应用于 Spring Integration 消息有效负载。 换句话说,决定是将 Spring Integration 消息本身作为 JMS 消息正文传递,还是将 Spring Integration 消息有效负载作为 JMS 消息正文传递。 默认值为“true”。 因此,如果您传递一个 Spring Integration 消息,其有效负载是String, 一个 JMSTextMessage被创建。 另一方面,如果您想通过 JMS 将实际的 Spring Integration 消息发送到另一个系统,请将其设置为 'false'。spring-doc.cadn.net.cn

无论有效负载提取的布尔值如何,Spring IntegrationMessageHeaders映射到 JMS 属性,只要您依赖于默认转换器或提供对另一个实例的引用MessageConverter. (对于“入站”适配器也是如此,只是在这些情况下,JMS 属性映射到 Spring IntegrationMessageHeaders).

从 5.1 版本开始,<int-jms:outbound-channel-adapter> (JmsSendingMessageHandler) 可以使用deliveryModeExpressiontimeToLiveExpression属性来评估 JMS 消息的适当 QoS 值,以便在运行时根据请求 Spring 发送Message. 新的setMapInboundDeliveryMode(true)setMapInboundExpiration(true)选项的DefaultJmsHeaderMapper可以促进作为动态信息的来源deliveryModetimeToLive从消息头:spring-doc.cadn.net.cn

<int-jms:outbound-channel-adapter delivery-mode-expression="headers.jms_deliveryMode"
                        time-to-live-expression="headers.jms_expiration - T(System).currentTimeMillis()"/>

交易

从 V4.0 开始,出站通道适配器支持session-transacted属性。 在早期版本中,您必须注入一个JmsTemplatesessionTransacted设置为true. 该属性现在将属性设置为内置默认值JmsTemplate. 如果存在交易(可能来自上游message-driven-channel-adapter),发送作在同一事务中执行。 否则,将启动新事务。spring-doc.cadn.net.cn

入站网关

Spring Integration 的消息驱动 JMS 入站网关委托给MessageListener容器,支持动态调整并发消费者,还可以处理回复。 入站网关需要引用ConnectionFactory和请求Destination(或 'requestDestinationName')。 以下示例定义了 JMSinbound-gateway从 bean id 引用的 JMS 队列接收,inQueue,并发送到名为exampleChannel:spring-doc.cadn.net.cn

<int-jms:inbound-gateway id="jmsInGateway"
    request-destination="inQueue"
    request-channel="exampleChannel"/>

由于网关提供请求-回复行为而不是单向发送或接收行为,因此它们还具有两个不同的“有效负载提取”属性(如前面对通道适配器的“提取-有效负载”设置所述)。 对于入站网关,“extract-request-payload”属性确定是否提取接收到的 JMS 消息正文。 如果为“false”,则 JMS 消息本身将成为 Spring Integration 消息有效负载。 默认值为 'true'。spring-doc.cadn.net.cn

同样,对于入站网关,“extract-reply-payload”属性适用于要转换为回复JMS消息的Spring Integration消息。 如果要传递整个 Spring Integration 消息(作为 JMS ObjectMessage 的主体),请将值 this 设置为 'false'。 默认情况下,将 Spring Integration 消息有效负载转换为 JMS 消息也是“true”(例如,一个String有效负载成为 JMS TextMessage)。spring-doc.cadn.net.cn

与其他任何作一样,网关调用可能会导致错误。 默认情况下,生产者不会收到使用者端可能发生的错误的通知,并且在等待回复时超时。 但是,有时您可能希望将错误条件传达回使用者(换句话说,您可能希望通过将异常映射到消息来将异常视为有效回复)。 为了实现这一点,JMS 入站网关提供了对消息通道的支持,可以向该消息通道发送错误进行处理,这可能会导致响应消息有效负载符合某些协定,该协定定义了调用方可能期望的“错误”回复。 您可以使用 error-channel 属性来配置此类通道,如以下示例所示:spring-doc.cadn.net.cn

<int-jms:inbound-gateway request-destination="requestQueue"
          request-channel="jmsInputChannel"
          error-channel="errorTransformationChannel"/>

<int:transformer input-channel="exceptionTransformationChannel"
        ref="exceptionTransformer" method="createErrorResponse"/>

您可能会注意到,此示例看起来与输入GatewayProxyFactoryBean. 同样的想法也适用于此:该exceptionTransformer可以是创建错误响应对象的 POJO,您可以引用nullChannel来抑制错误,或者您可以保留 'error-channel' 以让异常传播。spring-doc.cadn.net.cn

从 Topic 使用时,将pub-sub-domain属性设置为 true。 设置subscription-durabletrue用于持久订阅或subscription-shared对于共享订阅(需要 JMS 2.0 代理,并且从 V4.2 开始可用)。 用subscription-name以命名订阅。spring-doc.cadn.net.cn

从 4.2 版开始,默认的acknowledgemode 是transacted,除非提供外部容器。 在这种情况下,应根据需要配置容器。 我们建议您使用transacted使用DefaultMessageListenerContainer以避免消息丢失。

从版本 5.1 开始,当端点停止而应用程序保持运行时,底层侦听器容器将关闭,从而关闭其共享连接和使用者。 此前,连接和消费者保持开放。 要恢复到以前的行为,请将shutdownContainerOnStopJmsInboundGatewayfalse.spring-doc.cadn.net.cn

默认情况下,JmsInboundGateway寻找一个jakarta.jms.Message.getJMSReplyTo()属性来确定将回复发送到何处。 否则,可以使用静态defaultReplyDestinationdefaultReplyQueueNamedefaultReplyTopicName. 此外,从 6.1 版开始,一个replyToExpression可以在提供的ChannelPublishingJmsMessageListener动态确定回复目的地,如果标准JMSReplyTo属性是null根据请求。 收到的jakarta.jms.Message用于根评估上下文对象。 以下示例演示如何使用 Java DSL API 配置入站 JMS 网关,其中包含从请求消息解析的自定义回复目标:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow jmsInboundGatewayFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(
                    Jms.inboundGateway(connectionFactory)
                            .requestDestination("requestDestination")
                            .replyToFunction(message -> message.getStringProperty("myReplyTo")))
            .<String, String>transform(String::toUpperCase)
            .get();
}

从 6.3 版开始,Jms.inboundGateway()API 公开了一个retryTemplate()recoveryCallback()用于重试内部发送和接收作的选项。spring-doc.cadn.net.cn

出站网关

出站网关从 Spring Integration 消息创建 JMS 消息,并将它们发送到request-destination. 然后,它通过使用选择器从reply-destination配置的,或者,如果没有reply-destination通过创建 JMSTemporaryQueue(或TemporaryTopic如果replyPubSubDomain= true) 实例。spring-doc.cadn.net.cn

使用reply-destination(或reply-destination-name)以及CachingConnectionFactory将 cacheConsumers 设置为true可能会导致内存不足的情况。 这是因为每个请求都会获得一个带有新选择器的新消费者(在correlation-key值或当没有correlation-key,在发送的 JMSMessageID 上)。 鉴于这些选择器是唯一的,因此在当前请求完成后,它们仍保留在缓存中(未使用)。spring-doc.cadn.net.cn

如果指定回复目标,建议不要使用缓存的使用者。 或者,考虑使用<reply-listener/>如下所述spring-doc.cadn.net.cn

以下示例演示如何配置出站网关:spring-doc.cadn.net.cn

<int-jms:outbound-gateway id="jmsOutGateway"
    request-destination="outQueue"
    request-channel="outboundJmsRequests"
    reply-channel="jmsReplies"/>

“outbound-gateway”有效负载提取属性与“inbound-gateway”的属性成反比(请参阅前面的讨论)。 这意味着 'extract-request-payload'属性值适用于将 Spring Integration 消息转换为 JMS 消息以作为请求发送。 “extract-reply-payload”属性值适用于作为回复接收的 JMS 消息,然后转换为 Spring Integration 消息,随后发送到 'reply-channel',如前面的配置示例所示。spring-doc.cadn.net.cn

使用<reply-listener/>

Spring Integration 2.2 引入了一种处理回复的替代技术。 如果您添加<reply-listener/>child 元素添加到网关,而不是为每个回复创建一个消费者,而是MessageListenercontainer 用于接收回复并将其交给请求线程。 这提供了许多性能优势,并缓解了前面警告中描述的缓存使用者内存利用率问题。spring-doc.cadn.net.cn

使用<reply-listener/>使用没有reply-destination,而不是创建TemporaryQueue对于每个请求,单个TemporaryQueue被使用。 (网关会创建额外的TemporaryQueue,如果与代理的连接丢失并恢复,则视需要)。 如果replyPubSubDomain设置为true,从 6.0 版开始,一个TemporaryTopic而是创建。spring-doc.cadn.net.cn

使用correlation-key,多个网关可以共享相同的应答目的地,因为侦听器容器使用每个网关唯一的选择器。spring-doc.cadn.net.cn

如果指定应答侦听器并指定应答目标(或应答目标名称),但未提供相关键,则网关将记录警告并回退到 2.2 版之前的行为。 这是因为在这种情况下无法配置选择器。 因此,无法避免应答转到可能配置了相同应答目标的不同网关。spring-doc.cadn.net.cn

请注意,在这种情况下,每个请求都会使用一个新的使用者,并且使用者可以按照上述警告中所述在内存中构建;因此,在这种情况下不应使用缓存的消费者。spring-doc.cadn.net.cn

以下示例显示了具有默认属性的应答侦听器:spring-doc.cadn.net.cn

<int-jms:outbound-gateway id="jmsOutGateway"
        request-destination="outQueue"
        request-channel="outboundJmsRequests"
        reply-channel="jmsReplies">
    <int-jms:reply-listener />
</int-jms-outbound-gateway>

侦听器非常轻量级,我们预计在大多数情况下,您只需要一个消费者。 但是,您可以添加诸如concurrent-consumers,max-concurrent-consumers,等。 有关受支持属性的完整列表,请参阅模式以及 Spring JMS 文档了解其含义。spring-doc.cadn.net.cn

空闲回复侦听器

从 4.2 版开始,您可以根据需要启动回复侦听器(并在空闲时间后停止它),而不是在网关的生命周期内运行。 如果应用程序上下文中有许多网关,而这些网关大部分处于空闲状态,则此功能非常有用。 其中一种情况是具有许多(非活动)分区 Spring Batch 作业的上下文,使用 Spring Integration 和 JMS 进行分区分发。 如果所有应答侦听器都处于活动状态,那么 JMS 代理为每个网关都有一个活动使用者。 通过启用空闲超时,每个使用者仅在相应的批处理作业运行时存在(以及完成后的短时间内)。spring-doc.cadn.net.cn

idle-reply-listener-timeout属性引用中spring-doc.cadn.net.cn

网关回复关联

本节介绍用于回复关联的机制(确保始发网关仅接收对其请求的回复),具体取决于网关的配置方式。 有关此处讨论的属性的完整描述,请参阅属性参考spring-doc.cadn.net.cn

以下列表描述了各种场景(数字用于识别 - 顺序无关紧要):spring-doc.cadn.net.cn

  1. reply-destination*properties 和 no<reply-listener>spring-doc.cadn.net.cn

    一个TemporaryQueue为每个请求创建,并在请求完成(成功或其他方式)时删除。correlation-key无关紧要。spring-doc.cadn.net.cn

  2. 一个reply-destination*属性,并且都没有<reply-listener/>也不是correlation-key提供spring-doc.cadn.net.cn

    JMSCorrelationID等于传出消息用作消费者的消息选择器:spring-doc.cadn.net.cn

    messageSelector = "JMSCorrelationID = '" + messageId + "'"spring-doc.cadn.net.cn

    响应系统应返回入站JMSMessageID在回复中JMSCorrelationID. 这是一种常见的模式,由 Spring Integration 入站网关以及 Spring 的MessageListenerAdapter用于消息驱动的 POJO。spring-doc.cadn.net.cn

    使用此配置时,不应使用主题进行回复。 回复可能会丢失。
  3. 一个reply-destination*属性,没有<reply-listener/>,并且correlation-key="JMSCorrelationID"spring-doc.cadn.net.cn

    网关生成一个唯一的关联 IS 并将其插入JMSCorrelationID页眉。 消息选择器是:spring-doc.cadn.net.cn

    messageSelector = "JMSCorrelationID = '" + uniqueId + "'"spring-doc.cadn.net.cn

    响应系统应返回入站JMSCorrelationID在回复中JMSCorrelationID. 这是一种常见的模式,由 Spring Integration 入站网关以及 Spring 的MessageListenerAdapter用于消息驱动的 POJO。spring-doc.cadn.net.cn

  4. 一个reply-destination*属性,没有<reply-listener/>,并且correlation-key="myCorrelationHeader"spring-doc.cadn.net.cn

    网关生成一个唯一的相关 ID,并将其插入myCorrelationHeadermessage 属性。 这correlation-key可以是任何用户定义的值。 消息选择器是:spring-doc.cadn.net.cn

    messageSelector = "myCorrelationHeader = '" + uniqueId + "'"spring-doc.cadn.net.cn

    响应系统应返回入站myCorrelationHeader在回复中myCorrelationHeader.spring-doc.cadn.net.cn

  5. 一个reply-destination*属性,没有<reply-listener/>,并且correlation-key="JMSCorrelationID*"(请注意相关键中的 。*spring-doc.cadn.net.cn

    网关使用jms_correlationId标头(如果存在)并将其插入JMSCorrelationID页眉。 消息选择器是:spring-doc.cadn.net.cn

    messageSelector = "JMSCorrelationID = '" + headers['jms_correlationId'] + "'"spring-doc.cadn.net.cn

    用户必须确保此值是唯一的。spring-doc.cadn.net.cn

    如果标头不存在,则网关的行为类似于3.spring-doc.cadn.net.cn

    响应系统应返回入站JMSCorrelationID在回复中JMSCorrelationID. 这是一种常见的模式,由 Spring Integration 入站网关以及 Spring 的MessageListenerAdapter用于消息驱动的 POJO。spring-doc.cadn.net.cn

  6. reply-destination*属性,并且<reply-listener>提供spring-doc.cadn.net.cn

    将创建一个临时队列,并将其用于来自此网关实例的所有回复。 消息中不需要相关数据,但传出JMSMessageID在网关内部使用,以将回复定向到正确的请求线程。spring-doc.cadn.net.cn

  7. 一个reply-destination*属性,则<reply-listener>,并且没有correlation-key提供spring-doc.cadn.net.cn

    <reply-listener/>配置被忽略,网关的行为类似于2. 将写入警告日志消息以指示这种情况。spring-doc.cadn.net.cn

  8. 一个reply-destination*属性,则<reply-listener>,并且correlation-key="JMSCorrelationID"spring-doc.cadn.net.cn

    网关具有唯一的相关 ID,并将其与递增值一起插入JMSCorrelationID标头 (gatewayId + "_" + ++seq). 消息选择器是:spring-doc.cadn.net.cn

    messageSelector = "JMSCorrelationID LIKE '" + gatewayId%'"spring-doc.cadn.net.cn

    响应系统应返回入站JMSCorrelationID在回复中JMSCorrelationID. 这是一种常见的模式,由 Spring Integration 入站网关以及 Spring 的MessageListenerAdapter用于消息驱动的 POJO。 由于每个网关都有一个唯一的 ID,因此每个实例只获得自己的回复。 完整的关联数据用于将回复路由到正确的请求线程。spring-doc.cadn.net.cn

  9. 一个reply-destination*属性被提供一个<reply-listener/>,并且correlation-key="myCorrelationHeader"spring-doc.cadn.net.cn

    网关具有唯一的相关 ID,并将其与递增值一起插入myCorrelationHeader属性 (gatewayId + "_" + ++seq). 这correlation-key可以是任何用户定义的值。 消息选择器是:spring-doc.cadn.net.cn

    messageSelector = "myCorrelationHeader LIKE '" + gatewayId%'"spring-doc.cadn.net.cn

    响应系统应返回入站myCorrelationHeader在回复中myCorrelationHeader. 由于每个网关都有一个唯一的 ID,因此每个实例只获得自己的回复。 完整的关联数据用于将回复路由到正确的请求线程。spring-doc.cadn.net.cn

  10. 一个reply-destination*属性,则<reply-listener/>,并且correlation-key="JMSCorrelationID*"spring-doc.cadn.net.cn

    (注意相关键中的*spring-doc.cadn.net.cn

    用户提供的关联 ID 不允许用于应答侦听器。 网关不会使用此配置进行初始化。spring-doc.cadn.net.cn

异步网关

从 4.3 版开始,您现在可以指定async="true"(或setAsync(true)在 Java 中)配置出站网关时。spring-doc.cadn.net.cn

默认情况下,当请求发送到网关时,请求线程将挂起,直到收到回复。 然后,该流在该线程上继续。 如果asynctrue,请求线程在send()完成,并在侦听器容器线程上返回回复(并且流继续)。 当在轮询器线程上调用网关时,这可能很有用。 线程被释放,可用于框架内的其他任务。spring-doc.cadn.net.cn

async需要一个<reply-listener/>(或setUseReplyContainer(true)使用 Java 配置时)。 它还需要一个correlationKey(通常JMSCorrelationID) 待指定。 如果不满足这些条件中的任何一个,async被忽略。spring-doc.cadn.net.cn

属性引用

以下列表显示了outbound-gateway:spring-doc.cadn.net.cn

<int-jms:outbound-gateway
    connection-factory="connectionFactory" (1)
    correlation-key="" (2)
    delivery-persistent="" (3)
    destination-resolver="" (4)
    explicit-qos-enabled="" (5)
    extract-reply-payload="true" (6)
    extract-request-payload="true" (7)
    header-mapper="" (8)
    message-converter="" (9)
    priority="" (10)
    receive-timeout="" (11)
    reply-channel="" (12)
    reply-destination="" (13)
    reply-destination-expression="" (14)
    reply-destination-name="" (15)
    reply-pub-sub-domain="" (16)
    reply-timeout="" (17)
    request-channel="" (18)
    request-destination="" (19)
    request-destination-expression="" (20)
    request-destination-name="" (21)
    request-pub-sub-domain="" (22)
    time-to-live="" (23)
    requires-reply="" (24)
    idle-reply-listener-timeout="" (25)
    async=""> (26)
  <int-jms:reply-listener /> (27)
</int-jms:outbound-gateway>
1 引用jakarta.jms.ConnectionFactory. 默认值jmsConnectionFactory.
2 包含相关数据的属性的名称,用于将响应与回复相关联。 如果省略,网关期望响应系统返回出站JMSMessageID标头JMSCorrelationID页眉。 如果指定,网关将生成一个关联 ID,并使用该 ID 填充指定的属性。 响应系统必须在同一属性中回显该值。 它可以设置为JMSCorrelationID,在这种情况下,使用标准标头而不是String属性来保存相关数据。 当您使用<reply-container/>,则必须指定correlation-key如果您提供显式的reply-destination. 从 V4.0.1 开始,此属性还支持JMSCorrelationID*,这意味着如果出站消息已经有JMSCorrelationID(映射自jms_correlationId) 标头,则使用它而不是生成新的标头。 请注意,JMSCorrelationID*键不允许在使用<reply-container/>,因为容器在初始化过程中需要设置消息选择器。
您应该了解,网关无法确保唯一性,如果提供的关联 ID 不是唯一的,则可能会发生意外的副作用。
3 一个布尔值,指示传递模式是否应为DeliveryMode.PERSISTENT (true) 或DeliveryMode.NON_PERSISTENT (false). 此设置仅在以下情况下生效explicit-qos-enabledtrue.
4 一个DestinationResolver. 默认值为DynamicDestinationResolver,它将目标名称映射到该名称的队列或主题。
5 当设置为true,它允许使用服务质量属性:priority,delivery-modetime-to-live.
6 当设置为true(默认值),Spring Integration 回复消息的有效负载是从 JMS 回复消息的正文创建的(通过使用MessageConverter). 当设置为false,整个 JMS 消息成为 Spring Integration 消息的有效负载。
7 当设置为true(默认值),Spring Integration 消息的有效负载将转换为JMSMessage(通过使用MessageConverter). 当设置为false,整个 Spring Integration Message 将转换为JMSMessage. 在这两种情况下,Spring Integration 消息标头都通过使用HeaderMapper.
8 一个HeaderMapper用于将 Spring Integration 消息头映射到 JMS 消息头和属性。
9 MessageConverter用于在 JMS 消息和 Spring Integration 消息有效负载之间进行转换(或消息,如果extract-request-payloadfalse). 默认值为SimpleMessageConverter.
10 请求消息的默认优先级。 由消息优先级标头(如果存在)覆盖。 它的范围是09. 此设置仅在以下情况下生效explicit-qos-enabledtrue.
11 等待回复的时间(以毫秒为单位)。 默认值为5000(五秒)。
12 将回复消息发送到的通道。
13 Destination,设置为JMSReplyTo页眉。 最多只有其中一个reply-destination,reply-destination-expressionreply-destination-name是允许的。 如果未提供任何内容,则TemporaryQueue用于回复此网关。
14 计算结果为Destination,将设置为JMSReplyTo页眉。 该表达式可以导致Destinationobject 或String. 它由DestinationResolver解决实际问题Destination. 最多只有其中一个reply-destination,reply-destination-expressionreply-destination-name是允许的。 如果未提供任何内容,则TemporaryQueue用于回复此网关。
15 设置为 JMSReplyTo 标头的目标的名称。 它由DestinationResolver解决实际问题Destination. 最多只有其中一个reply-destination,reply-destination-expressionreply-destination-name是允许的。 如果未提供任何内容,则TemporaryQueue用于回复此网关。
16 当设置为true,则表示任何回复DestinationDestinationResolver应该是一个Topic而不是Queue.
17 网关在将回复消息发送到reply-channel. 这仅在reply-channel可以阻止 — 例如QueueChannel当前容量限制已满。 默认值为无大。
18 此网关接收请求消息的通道。
19 Destination请求消息发送到的。 其中之一reply-destination,reply-destination-expressionreply-destination-name是必需的。 只能使用这三个属性中的一个。
20 计算结果为Destination请求消息发送到的。 该表达式可以导致Destinationobject 或String. 它由DestinationResolver解决实际问题Destination. 其中之一reply-destination,reply-destination-expressionreply-destination-name是必需的。 只能使用这三个属性中的一个。
21 请求消息发送到的目标的名称。 它由DestinationResolver解决实际问题Destination. 其中之一reply-destination,reply-destination-expressionreply-destination-name是必需的。 只能使用这三个属性中的一个。
22 当设置为true,则表示任何请求DestinationDestinationResolver应该是一个Topic而不是Queue.
23 指定消息的生存时间。 此设置仅在以下情况下生效explicit-qos-enabledtrue.
24 指定此出站网关是否必须返回非空值。 默认情况下,此值为true和一个MessageTimeoutException当底层服务在receive-timeout. 请注意,如果服务从未返回回复,则最好使用<int-jms:outbound-channel-adapter/>而不是<int-jms:outbound-gateway/>requires-reply="false". 对于后者,发送线程被阻止,等待receive-timeout时期。
25 当您使用<reply-listener />,默认情况下,其生命周期(启动和停止)与网关的生命周期匹配。 当此值大于0,则容器按需启动(发送请求时)。 容器将继续运行,直到至少此时间过去,没有收到任何请求(并且直到没有未完成的回复)。 容器将在下一个请求时再次启动。 停止时间是最小值,实际上可能高达该值的 1.5 倍。
26 请参阅异步网关
27 当包含此元素时,异步MessageListenerContainer而不是为每个回复创建一个消费者。 在许多情况下,这可以更有效。

将消息头映射到 JMS 消息或映射 JMS 消息

JMS 消息可以包含元信息,例如 JMS API 标头和简单属性。 您可以使用以下命令将这些消息头映射到 Spring Integration 消息标头。JmsHeaderMapper. JMS API 标头被传递给适当的 setter 方法(例如setJMSReplyTo),而其他标头被复制到 JMS 消息的常规属性中。 JMS 出站网关使用默认实现JmsHeaderMapper,它将映射标准 JMS API 标头以及原始或String消息标头。 您还可以使用header-mapper入站和出站网关的属性。spring-doc.cadn.net.cn

许多特定于 JMS 提供商的客户端不允许将deliveryMode,prioritytimeToLive属性直接添加到已创建的 JMS 消息上。 它们被认为是 QoS 属性,因此必须传播到目标MessageProducer.send(message, deliveryMode, priority, timeToLive)应用程序接口。 因此,DefaultJmsHeaderMapper不会将适当的 Spring Integration 标头(或表达式结果)映射到提到的 JMS 消息属性中。 相反,一个DynamicJmsTemplateJmsSendingMessageHandler将标头值从请求消息传播到MessageProducer.send()应用程序接口。 若要启用此功能,必须使用DynamicJmsTemplate与其explicitQosEnabled属性设置为 true。 Spring Integration Java DSL 配置了一个DynamicJmsTemplate默认情况下,但您仍然必须将explicitQosEnabled财产。
从 4.0 版本开始,JMSPriority标头映射到标准priority入站消息的标头。 以前,priorityheader 仅用于出站邮件。 要恢复到以前的行为(即不映射入站优先级),请将mapInboundPriority属性DefaultJmsHeaderMapperfalse.
从 4.3 版本开始,DefaultJmsHeaderMapper映射标准correlationIdheader 作为消息属性,方法是调用其toString()方法 (correlationId通常是一个UUID,JMS 不支持)。 在入站端,它映射为String. 这独立于jms_correlationId标头,该标头映射到JMSCorrelationID页眉。 这JMSCorrelationID通常用于关联请求和回复,而correlationId通常用于将相关消息组合成一个组(例如使用聚合器或重排序器)。

从 5.1 版本开始,DefaultJmsHeaderMapper可以配置为映射入站JMSDeliveryModeJMSExpiration性能:spring-doc.cadn.net.cn

@Bean
public DefaultJmsHeaderMapper jmsHeaderMapper() {
    DefaultJmsHeaderMapper mapper = new DefaultJmsHeaderMapper();
    mapper.setMapInboundDeliveryMode(true)
    mapper.setMapInboundExpiration(true)
    return mapper;
}

这些 JMS 属性映射到JmsHeaders.DELIVERY_MODEJmsHeaders.EXPIRATIONSpring Message 标头。spring-doc.cadn.net.cn

消息转换、封送和取消封送

如果您需要转换消息,所有 JMS 适配器和网关都允许您提供MessageConverter通过将message-converter属性。 为此,请提供MessageConverter在同一 ApplicationContext 中可用。此外,为了提供与 marshaller 和 unmarshaller 接口的一些一致性,Spring 提供了MarshallingMessageConverter,您可以使用自己的自定义封送程序和取消封送程序进行配置。 以下示例演示了如何执行此作spring-doc.cadn.net.cn

<int-jms:inbound-gateway request-destination="requestQueue"
    request-channel="inbound-gateway-channel"
    message-converter="marshallingMessageConverter"/>

<bean id="marshallingMessageConverter"
    class="org.springframework.jms.support.converter.MarshallingMessageConverter">
    <constructor-arg>
        <bean class="org.bar.SampleMarshaller"/>
    </constructor-arg>
    <constructor-arg>
        <bean class="org.bar.SampleUnmarshaller"/>
    </constructor-arg>
</bean>
当您提供自己的MessageConverter实例,它仍然包装在HeaderMappingMessageConverter. 这意味着 'extract-request-payload' 和 'extract-reply-payload' 属性可能会影响传递给转换器的实际对象。 这HeaderMappingMessageConverter本身委托给目标MessageConverter同时还映射了 Spring IntegrationMessageHeaders到 JMS 消息属性,然后再返回。

JMS 支持的消息通道

前面介绍的通道适配器和网关都适用于与其他外部系统集成的应用程序。 入站选项假定某个其他系统正在向 JMS 目标发送 JMS 消息,而出站选项假定某个其他系统正在从目标接收消息。 另一个系统可能是也可能不是 Spring Integration 应用程序。 当然,当将 Spring Integration 消息实例作为 JMS 消息本身的正文发送时(将 'extract-payload' 值设置为false),假设另一个系统是基于 Spring Integration 的。 然而,这绝不是必需的。 这种灵活性是使用基于消息的集成选项的好处之一,该选项可以抽象出“通道”(或 JMS 中的目标)。spring-doc.cadn.net.cn

有时,给定 JMS Destination 的生产者和消费者都旨在成为同一应用程序的一部分,在同一进程中运行。 您可以使用一对入站和出站通道适配器来实现此目的。 这种方法的问题在于,您需要两个适配器,尽管从概念上讲,目标是拥有一个消息通道。 从 Spring Integration 2.0 版开始支持更好的选项。 现在,在使用 JMS 命名空间时可以定义单个“通道”,如以下示例所示:spring-doc.cadn.net.cn

<int-jms:channel id="jmsChannel" queue="exampleQueue"/>

前面示例中的通道的行为与正常通道非常相似<channel/>元素。 它可以被input-channeloutput-channel任何端点的属性。 不同之处在于,此通道由名为exampleQueue. 这意味着在生产端点和使用端点之间可以进行异步消息传递。 但是,与通过添加<queue/>非 JMS 中的元素<channel/>元素,则消息不存储在内存队列中。 相反,这些消息在 JMS 消息正文中传递,然后底层 JMS 提供程序的全部功能可用于该通道。 使用这种替代方案的最常见理由可能是利用 JMS 消息传递的存储和转发方法提供的持久性。spring-doc.cadn.net.cn

如果配置正确,JMS 支持的消息通道也支持事务。 换句话说,如果生产者的发送作是回滚事务的一部分,则生产者实际上不会写入事务性 JMS 支持的通道。 同样,如果接收 JMS 消息是回滚事务的一部分,则消费者不会从通道中实际删除该消息。 请注意,在这种情况下,生产者事务和使用者事务是分开的。 这与在简单的同步环境中传播事务上下文有很大不同<channel/>没有<queue/>child 元素。spring-doc.cadn.net.cn

由于上面的示例引用了 JMS 队列实例,因此它充当点对点通道。 另一方面,如果您需要发布-订阅行为,则可以使用单独的元素并引用 JMS 主题。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

<int-jms:publish-subscribe-channel id="jmsChannel" topic="exampleTopic"/>

对于任一类型的 JMS 支持的通道,都可以提供目标的名称而不是引用,如以下示例所示:spring-doc.cadn.net.cn

<int-jms:channel id="jmsQueueChannel" queue-name="exampleQueueName"/>

<jms:publish-subscribe-channel id="jmsTopicChannel" topic-name="exampleTopicName"/>

在前面的示例中,目标名称由 Spring 的默认值解析DynamicDestinationResolver实现,但你可以提供DestinationResolver接口。 此外,JMSConnectionFactory是通道的必需属性,但默认情况下,预期的 bean 名称为jmsConnectionFactory. 以下示例提供了用于解析 JMS 目标名称的自定义实例和ConnectionFactory:spring-doc.cadn.net.cn

<int-jms:channel id="jmsChannel" queue-name="exampleQueueName"
    destination-resolver="customDestinationResolver"
    connection-factory="customConnectionFactory"/>

对于<publish-subscribe-channel />,将durable属性设置为true用于持久订阅或subscription-shared对于共享订阅(需要 JMS 2.0 代理,并且从 V4.2 开始可用)。 用subscription以命名订阅。spring-doc.cadn.net.cn

使用 JMS 消息选择器

使用 JMS 消息选择器,您可以根据 JMS 标头以及 JMS 属性过滤 JMS 消息。 例如,如果您想侦听其自定义 JMS 标头属性myHeaderProperty等于something,您可以指定以下表达式:spring-doc.cadn.net.cn

myHeaderProperty = 'something'

消息选择器表达式是 SQL-92 条件表达式语法的子集,定义为 Java Message Service 规范的一部分。 您可以指定 JMS 消息selector属性,使用以下 Spring Integration JMS 组件的 XML 命名空间配置:spring-doc.cadn.net.cn

不能使用 JMS 消息选择器引用消息正文值。

JMS 示例

要尝试使用这些 JMS 适配器,请查看 Spring Integration Samples Git 存储库中提供的 JMS 示例,网址为 https://github.com/spring-projects/spring-integration-samples/tree/master/basic/jmsspring-doc.cadn.net.cn

该存储库包括两个示例。 一个提供入站和出站通道适配器,另一个提供入站和出站网关。 它们配置为使用嵌入式 ActiveMQ 进程运行,但您可以修改每个示例的common.xml Spring 应用程序上下文文件,以支持不同的 JMS 提供程序或独立的 ActiveMQ 进程。spring-doc.cadn.net.cn

换句话说,您可以拆分配置,以便入站和出站适配器在单独的 JVM 中运行。 如果您安装了 ActiveMQ,请修改brokerURL属性中的common.xml要使用的文件tcp://localhost:61616(而不是vm://localhost). 这两个示例都接受来自 stdin 的输入并回显回 stdout。查看配置以了解这些消息是如何通过 JMS 路由的。spring-doc.cadn.net.cn