|
对于最新稳定版本,请使用 Spring Framework 7.0.6! |
使用 Spring JMS
本节介绍如何使用 Spring 的 JMS 组件。
使用JmsTemplate
JmsTemplate 类是 JMS 核心包中的中心类。它简化了 JMS 的使用,因为它在发送消息或同步接收消息时会自动处理资源的创建和释放。
使用 JmsTemplate 的代码只需实现回调接口,这些接口为其提供了清晰定义的高层契约。MessageCreator 回调接口在接收到 Session 中调用代码所提供的 JmsTemplate 时创建一条消息。为了支持对 JMS API 更复杂的使用,SessionCallback 提供了 JMS 会话,而 ProducerCallback 则暴露了一对 Session 和 MessageProducer。
JMS API 提供了两类发送方法:一类接收传递模式(delivery mode)、优先级(priority)和生存时间(time-to-live)作为服务质量(QoS)参数;另一类不接收 QoS 参数,而是使用默认值。由于 JmsTemplate 提供了多种发送方法,为了避免发送方法数量的重复膨胀,QoS 参数的设置已作为 bean 属性公开。同样地,同步接收调用的超时值通过 setReceiveTimeout 属性进行设置。
某些 JMS 提供商允许通过配置 ConnectionFactory 在管理层面设置默认的 QOS(服务质量)值。这样会导致对 MessageProducer 实例的 send 方法(send(Destination destination, Message message))的调用所使用的 QOS 默认值与 JMS 规范中指定的值不同。为了实现对 QOS 值的一致性管理,必须显式启用 JmsTemplate 使用其自身的 QOS 值,即将布尔属性 isExplicitQosEnabled 设置为 true。
为方便起见,JmsTemplate 还提供了一个基本的请求-回复操作,该操作允许发送一条消息,并在作为操作一部分而创建的临时队列上等待回复。
JmsTemplate 类的实例在配置完成后是线程安全的。这一点非常重要,因为这意味着你可以配置一个 JmsTemplate 的单一实例,然后安全地将这个共享引用注入到多个协作者中。需要明确的是,JmsTemplate 是有状态的,因为它持有一个对 ConnectionFactory 的引用,但这种状态并非会话状态。 |
从 Spring Framework 4.1 起,JmsMessagingTemplate 基于 JmsTemplate 构建,
并提供了与消息抽象(即 org.springframework.messaging.Message)的集成。这使您可以以通用的方式创建要发送的消息。
连接
JmsTemplate 需要一个对 ConnectionFactory 的引用。ConnectionFactory 是 JMS 规范的一部分,作为使用 JMS 的入口点。客户端应用程序使用它作为工厂来创建与 JMS 提供者的连接,并封装了各种配置参数,其中许多是提供商特定的,例如 SSL 配置选项。
在 EJB 内部使用 JMS 时,提供商会提供 JMS 接口的实现,以便参与声明式事务管理,并对连接和会话进行池化。为了在 EJB 中配合 resource-ref 使用这些功能,客户端应用程序应确保引用的是由容器管理的 JmsTemplate 实现。通常,Jakarta EE 容器要求您在 EJB 或 Servlet 的部署描述符中将 JMS 连接工厂声明为 ConnectionFactory。
缓存消息资源
标准 API 涉及创建许多中间对象。要发送一条消息,需要执行以下“API”调用流程:
ConnectionFactory->Connection->Session->MessageProducer->send
在 ConnectionFactory 和 Send 操作之间,会创建并销毁三个中间对象。为了优化资源使用并提升性能,Spring 提供了两种 ConnectionFactory 的实现。
使用SingleConnectionFactory
Spring 提供了 ConnectionFactory 接口的一个实现类 SingleConnectionFactory,该实现对所有 Connection 调用均返回同一个 createConnection(),并忽略对 close() 的调用。这在测试和独立环境中非常有用,使得同一个连接可用于多次 JmsTemplate 调用,而这些调用可能跨越任意数量的事务。SingleConnectionFactory 需要引用一个标准的 ConnectionFactory,该引用通常来自 JNDI。
使用CachingConnectionFactory
CachingConnectionFactory扩展了SingleConnectionFactory的功能,并添加了对Session、MessageProducer和MessageConsumer实例的缓存。
初始缓存大小设置为1。您可以使用sessionCacheSize属性来增加缓存会话的数量。请注意,实际缓存的会话数量会多于该数值,因为会话是根据其确认模式进行缓存的,因此当sessionCacheSize设置为 1 时,最多可以缓存四个会话实例(每种确认模式各一个)。MessageProducer和MessageConsumer实例在其所属会话内进行缓存,同时在缓存时也会考虑生产者和消费者的独特属性。MessageProducer 根据其目标地址进行缓存。MessageConsumer 则根据由目标地址、选择器、noLocal 交付标志以及持久订阅名称(如果创建的是持久消费者)组成的键进行缓存。
|
临时队列和主题(TemporaryQueue/TemporaryTopic)的消息生产者(MessageProducer)和消息消费者(MessageConsumer)永远不会被缓存。不幸的是,WebLogic JMS 在其实现常规目的地(destination)时,恰好也实现了临时队列/主题的接口,从而错误地表明其所有目的地都无法被缓存。请在 WebLogic 上使用不同的连接池/缓存,或者针对 WebLogic 的用途对 |
目的地管理
目的地(Destinations)作为 ConnectionFactory 实例,是 JMS 管理的对象,您可以将其存储在 JNDI 中并从中检索。在配置Spring应用程序上下文时,您可以使用JNDI JndiObjectFactoryBean 工厂类或<jee:jndi-lookup>来对对象引用的JMS目的地进行依赖注入。然而,如果应用程序中存在大量目的地,或者JMS提供商具有独特的高级目的地管理功能,这种策略通常会显得笨拙繁琐。此类高级目标管理的示例包括动态目标的创建,或对目标的分层命名空间的支持。The JmsTemplate 代理将
解析一个目标名称并将其转换为实现DestinationResolver接口的JMS目标对象。DynamicDestinationResolver 是默认实现,用于 JmsTemplate 并能够解决动态目标。A
JndiDestinationResolver 还可用作服务定位器,用于查找包含在 JNDI 中的目标,并可选择性地回退到 DynamicDestinationResolver 中包含的行为。
在JMS应用程序中,目标(destinations)通常仅在运行时才确定,因此无法在应用程序部署时通过管理方式创建。这通常是因为在相互交互的系统组件之间存在共享的应用逻辑,这些组件会根据一个众所周知的命名约定在运行时创建目标。尽管动态目的地的创建不属于JMS规范的一部分,但大多数提供商都提供了此功能。动态目的地是使用用户定义的名称创建的,
这使其区别于临时目的地,并且通常
不会在 JNDI 中注册。用于创建动态目的地的 API 因提供商而异,因为与目的地相关的属性是厂商特定的。然而,提供商有时会采用一种简单的实现方式,即忽略 JMS 规范中的警告,而使用 TopicSession 的 createTopic(String topicName) 方法或 QueueSession 的 createQueue(String
queueName) 方法来创建一个具有默认目标属性的新目的地。取决于提供商的实现,DynamicDestinationResolver 可以创建一个物理目标,而不仅仅是解析一个。
布尔属性 pubSubDomain 用于配置 JmsTemplate,使其了解当前使用的是哪种 JMS 域。默认情况下,该属性的值为 false,表示将使用点对点(point-to-point)域,即 Queues(队列)。此属性(由 JmsTemplate 使用)通过 DestinationResolver 接口的实现来决定动态目的地解析的行为。
您还可以通过 JmsTemplate 属性为 defaultDestination 配置一个默认目的地。该默认目的地用于那些未指定具体目的地的发送和接收操作。
消息监听器容器
在EJB领域中,JMS消息最常见的用途之一是驱动消息驱动Bean(MDB)。Spring提供了一种创建消息驱动POJO(MDP)的解决方案,这种方式不会将用户绑定到EJB容器。(有关Spring对MDP支持的详细内容,请参见异步接收:消息驱动POJO。)从Spring Framework 4.1开始,端点方法可以使用@JmsListener注解——更多详情请参见基于注解的监听器端点。
消息监听器容器用于从 JMS 消息队列接收消息,并驱动注入其中的 MessageListener。该监听器容器负责处理消息接收的所有线程操作,并将消息分发给监听器进行处理。消息监听器容器是消息驱动 POJO(MDP)与消息服务提供者之间的中介,负责注册以接收消息、参与事务、获取和释放资源、异常转换等任务。这使得你可以专注于编写(可能较为复杂的)与接收消息(并可能作出响应)相关的业务逻辑,而将样板化的 JMS 基础设施相关问题委托给框架处理。
Spring 自带了两种标准的 JMS 消息监听器容器,每种都具有其特有的功能集。
使用SimpleMessageListenerContainer
该消息监听器容器是两种标准类型中较为简单的一种。它在启动时创建固定数量的 JMS 会话和消费者,通过标准的 JMS MessageConsumer.setMessageListener() 方法注册监听器,并交由 JMS 提供商执行监听器回调。此变体不支持根据运行时需求动态调整,也不支持参与外部管理的事务。
从兼容性角度看,它非常贴近独立 JMS 规范的设计理念,但通常不符合 Jakarta EE 对 JMS 的限制要求。
虽然 SimpleMessageListenerContainer 不允许参与外部管理的事务,但它支持原生 JMS 事务。要启用此功能,您可以将 sessionTransacted 标志设置为 true,或者在 XML 命名空间中将 acknowledge 属性设为 transacted。此时,如果您的监听器抛出异常,将触发回滚,并重新投递消息。另外,也可以考虑使用 CLIENT_ACKNOWLEDGE 模式,该模式在发生异常时同样会重新投递消息,但不会使用事务型的 Session 实例,因此不会将其他 Session 操作(例如发送响应消息)纳入事务协议中。 |
默认的 AUTO_ACKNOWLEDGE 模式无法提供适当的可靠性保证。
当监听器执行失败时(因为消息提供商会自动在监听器调用后确认每条消息,且不会将任何异常传播回提供商),或者当监听器容器关闭时(可通过设置 acceptMessagesWhileStopping 标志进行配置),消息可能会丢失。在需要可靠性保障的情况下(例如,用于可靠的队列处理和持久化主题订阅),请务必使用事务型会话。 |
使用DefaultMessageListenerContainer
该消息监听器容器在大多数情况下使用。与SimpleMessageListenerContainer相比,此容器变体允许动态适应运行时需求,并能够参与外部管理的事务。
当配置了JtaTransactionManager时,每条接收到的消息都会注册到一个XA事务中。
因此,处理过程可以利用XA事务语义。
该监听器容器在对JMS提供者的低要求、高级功能(例如参与外部管理的事务)以及与Jakarta EE环境的兼容性之间取得了良好的平衡。
您可以自定义容器的缓存级别。请注意,当未启用缓存时,每次接收消息都会创建一个新的连接和一个新的会话。在高负载情况下,如果同时使用非持久订阅,可能会导致消息丢失。在这种情况下,请务必使用适当的缓存级别。
此容器在代理宕机时也具备可恢复能力。默认情况下,一个简单的 BackOff 实现会每五秒重试一次。您可以指定自定义的 BackOff 实现以获得更细粒度的恢复选项。请参阅 ExponentialBackOff 获取示例。
与其兄弟框架(SimpleMessageListenerContainer)类似,
DefaultMessageListenerContainer 支持原生 JMS 事务,并允许自定义确认模式。如果您的场景可行,强烈建议优先采用此方式,而非外部管理的事务——也就是说,如果您能够接受在 JVM 崩溃时偶尔出现重复消息的情况。您可以在业务逻辑中实施自定义的重复消息检测步骤来应对此类情况——例如,通过检查业务实体是否存在或查询协议表来实现。
任何此类安排都显著优于替代方案:即使用 XA 事务包裹整个处理流程(通过将您的 DefaultMessageListenerContainer 配置为使用 JtaTransactionManager),以同时涵盖 JMS 消息的接收以及消息监听器中业务逻辑的执行(包括数据库操作等)。 |
默认的 AUTO_ACKNOWLEDGE 模式无法提供适当的可靠性保证。
当监听器执行失败时(因为消息提供商会自动在监听器调用后确认每条消息,且不会将任何异常传播回提供商),或者当监听器容器关闭时(可通过设置 acceptMessagesWhileStopping 标志进行配置),消息可能会丢失。在需要可靠性保障的情况下(例如,用于可靠的队列处理和持久化主题订阅),请务必使用事务型会话。 |
事务管理
Spring 提供了一个 JmsTransactionManager,用于管理单个 JMS
ConnectionFactory 的事务。这使得 JMS 应用程序能够利用 Spring 的托管事务功能,如
数据访问章节中的事务管理部分 所述。
JmsTransactionManager 执行本地资源事务,将来自指定 ConnectionFactory 的 JMS
连接(Connection)/会话(Session)对绑定到当前线程。
JmsTemplate 会自动检测此类事务性资源,并相应地对其进行操作。
在 Jakarta EE 环境中,ConnectionFactory 会对 Connection 和 Session 实例进行池化,从而在事务之间高效地重用这些资源。在独立(standalone)环境中,使用 Spring 的 SingleConnectionFactory 会共享一个 JMS Connection,而每个事务则拥有自己独立的 Session。或者,也可以考虑使用特定于 JMS 提供商的连接池适配器,例如 ActiveMQ 的 PooledConnectionFactory 类。
您还可以将 JmsTemplate 与 JtaTransactionManager 以及支持 XA 的 JMS
ConnectionFactory 结合使用,以执行分布式事务。请注意,这需要使用 JTA 事务管理器以及正确配置为 XA 模式的 ConnectionFactory。
(请查阅您的 Jakarta EE 服务器或 JMS 提供商的文档。)
在使用 JMS API 从 Connection 创建 Session 时,如果在受管和非受管的事务环境中复用代码,可能会令人困惑。这是因为 JMS API 仅提供一个用于创建 Session 的工厂方法,且该方法需要指定事务模式和确认模式的值。在受管环境中,设置这些值是环境事务基础设施的职责,因此厂商对 JMS Connection 的包装器会忽略这些值。当您在非受管环境中使用 JmsTemplate 时,可以通过属性 sessionTransacted 和 sessionAcknowledgeMode 来指定这些值。当您将 PlatformTransactionManager 与 JmsTemplate 一起使用时,该模板始终会被赋予一个事务性的 JMS Session。