对于最新稳定版本,请使用 Spring Framework 7.0.6spring-doc.cadn.net.cn

接收消息

这描述了如何使用Spring接收JMS消息。spring-doc.cadn.net.cn

同步接收

虽然 JMS 通常与异步处理相关联,但你也可以同步地消费消息。重载的 receive(..) 方法提供了此功能。在同步接收过程中,调用线程会一直阻塞,直到有消息可用为止。这可能是一项危险的操作,因为调用线程可能会被无限期地阻塞。receiveTimeout 属性用于指定接收器在放弃等待消息之前应等待多长时间。spring-doc.cadn.net.cn

异步接收:消息驱动的 POJO

Spring 还通过使用 @JmsListener 注解支持带注解的监听器端点,并提供了一个开放的基础设施,用于以编程方式注册端点。 这是迄今为止设置异步接收器最便捷的方式。 更多详情请参见启用监听器端点注解

与 EJB 世界中的消息驱动 Bean (MDB) 类似,消息驱动 POJO (MDP) 充当 JMS 消息的接收器。MDP 的一个限制(但请参阅 使用 MessageListenerAdapter)是它必须实现 jakarta.jms.MessageListener 接口。请注意,如果您的 POJO 在多个线程上接收消息,确保您的实现是线程安全的非常重要。spring-doc.cadn.net.cn

以下示例展示了一个 MDP 的简单实现:spring-doc.cadn.net.cn

import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageListener;
import jakarta.jms.TextMessage;

public class ExampleListener implements MessageListener {

	public void onMessage(Message message) {
		if (message instanceof TextMessage textMessage) {
			try {
				System.out.println(textMessage.getText());
			}
			catch (JMSException ex) {
				throw new RuntimeException(ex);
			}
		}
		else {
			throw new IllegalArgumentException("Message must be of type TextMessage");
		}
	}
}

一旦你实现了自己的 MessageListener,就该创建一个消息监听器容器了。spring-doc.cadn.net.cn

以下示例展示了如何定义和配置 Spring 自带的某个消息监听器容器(在本例中为 DefaultMessageListenerContainer):spring-doc.cadn.net.cn

<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="jmsexample.ExampleListener"/>

<!-- and this is the message listener container -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destination" ref="destination"/>
	<property name="messageListener" ref="messageListener"/>
</bean>

有关各个消息监听器容器(全部实现了MessageListenerContainer)所支持功能的完整描述,请参阅 Spring 的相关 JavaDoc 文档。spring-doc.cadn.net.cn

使用SessionAwareMessageListener接口

SessionAwareMessageListener 接口是 Spring 特有的一个接口,它提供的契约与 JMS 的 MessageListener 接口类似,但同时还允许消息处理方法访问接收该 Session 所用的 JMS Message。 以下代码清单展示了 SessionAwareMessageListener 接口的定义:spring-doc.cadn.net.cn

package org.springframework.jms.listener;

public interface SessionAwareMessageListener {

	void onMessage(Message message, Session session) throws JMSException;
}

如果你希望你的消息驱动 POJO(MDP)能够响应接收到的任何消息(通过在 MessageListener 方法中使用所提供的 Session),你可以选择让你的 MDP 实现此接口(优先于标准的 JMS onMessage(Message, Session) 接口)。Spring 自带的所有消息监听器容器实现均支持实现了 MessageListenerSessionAwareMessageListener 接口的 MDP。需要注意的是,实现 SessionAwareMessageListener 接口的类会因此与 Spring 框架产生耦合。是否使用该接口完全由你作为应用程序开发者或架构师自行决定。spring-doc.cadn.net.cn

请注意,onMessage(..) 接口的 SessionAwareMessageListener 方法会抛出 JMSException。与标准的 JMS MessageListener 接口不同,当使用 SessionAwareMessageListener 接口时,处理所抛出异常的责任在于客户端代码。spring-doc.cadn.net.cn

使用MessageListenerAdapter

MessageListenerAdapter 类是 Spring 异步消息支持中的最后一个组件。简而言之,它允许你将几乎任意类暴露为 MDP(消息驱动 POJO,Message-Driven POJO),尽管存在一些限制条件。spring-doc.cadn.net.cn

考虑以下接口定义:spring-doc.cadn.net.cn

public interface MessageDelegate {

	void handleMessage(String message);

	void handleMessage(Map message);

	void handleMessage(byte[] message);

	void handleMessage(Serializable message);
}

请注意,尽管该接口既未继承 MessageListener 接口,也未继承 SessionAwareMessageListener 接口,但你仍然可以通过使用 MessageListenerAdapter 类将其作为消息驱动 POJO(MDP)来使用。同时请注意,各个消息处理方法是如何根据它们可以接收和处理的不同 Message 类型的内容进行强类型定义的。spring-doc.cadn.net.cn

现在考虑以下 MessageDelegate 接口的实现:spring-doc.cadn.net.cn

public class DefaultMessageDelegate implements MessageDelegate {
	// implementation elided for clarity...
}

特别要注意的是,前面所示的 MessageDelegate 接口实现(即 DefaultMessageDelegate 类)完全没有任何 JMS 依赖。它确实是一个普通的 Java 对象(POJO),我们可以通过以下配置将其转变为消息驱动 POJO(MDP):spring-doc.cadn.net.cn

<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
	<constructor-arg>
		<bean class="jmsexample.DefaultMessageDelegate"/>
	</constructor-arg>
</bean>

<!-- and this is the message listener container... -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destination" ref="destination"/>
	<property name="messageListener" ref="messageListener"/>
</bean>

下一个示例展示了另一个消息驱动 POJO(MDP),它只能处理接收 JMS TextMessage 消息。请注意,此处的消息处理方法实际名为 receive(在 MessageListenerAdapter 中,消息处理方法的默认名称为 handleMessage),但该名称是可配置的(如本节稍后所示)。同时请注意,receive(..) 方法是强类型的,仅用于接收和响应 JMS TextMessage 消息。 以下代码清单展示了 TextMessageDelegate 接口的定义:spring-doc.cadn.net.cn

public interface TextMessageDelegate {

	void receive(TextMessage message);
}

以下代码清单展示了一个实现了 TextMessageDelegate 接口的类:spring-doc.cadn.net.cn

public class DefaultTextMessageDelegate implements TextMessageDelegate {
	// implementation elided for clarity...
}

相应的 MessageListenerAdapter 的配置如下所示:spring-doc.cadn.net.cn

<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
	<constructor-arg>
		<bean class="jmsexample.DefaultTextMessageDelegate"/>
	</constructor-arg>
	<property name="defaultListenerMethod" value="receive"/>
	<!-- we don't want automatic message context extraction -->
	<property name="messageConverter">
		<null/>
	</property>
</bean>

请注意,如果 messageListener 接收到的 JMS Message 类型不是 TextMessage,则会抛出一个 IllegalStateException(随后被忽略)。此外,MessageListenerAdapter 类的另一项功能是:如果处理方法返回一个非 void 值,它能够自动发送一个响应 Message。请考虑以下接口和类:spring-doc.cadn.net.cn

public interface ResponsiveTextMessageDelegate {

	// notice the return type...
	String receive(TextMessage message);
}
public class DefaultResponsiveTextMessageDelegate implements ResponsiveTextMessageDelegate {
	// implementation elided for clarity...
}

如果您将 DefaultResponsiveTextMessageDelegateMessageListenerAdapter 结合使用,则从 'receive(..)' 方法执行返回的任何非空值(在默认配置下)都会被转换为 TextMessage。生成的 TextMessage 随后会被发送到原始 Message 的 JMS Reply-To 属性中定义的 Destination(如果存在),或者发送到在 MessageListenerAdapter 上设置的默认 Destination(如果已配置)。如果未找到 Destination,则会抛出 InvalidDestinationException(请注意,此异常不会被吞没,而是会向上传播到调用栈)。spring-doc.cadn.net.cn

在事务中处理消息

在事务中调用消息监听器仅需重新配置监听器容器。spring-doc.cadn.net.cn

您可以通过监听器容器定义中的 sessionTransacted 标志来启用本地资源事务。这样,每次消息监听器的调用都会在一个活跃的 JMS 事务中执行,如果监听器执行失败,消息接收操作将被回滚。发送响应消息(通过 SessionAwareMessageListener)属于同一个本地事务,但其他任何资源操作(例如数据库访问)则独立进行。这通常要求在监听器实现中包含重复消息检测机制,以处理数据库处理已提交但消息处理未能成功提交的情况。spring-doc.cadn.net.cn

请考虑以下 bean 定义:spring-doc.cadn.net.cn

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destination" ref="destination"/>
	<property name="messageListener" ref="messageListener"/>
	<property name="sessionTransacted" value="true"/>
</bean>

要参与外部管理的事务,您需要配置一个事务管理器,并使用支持外部管理事务的监听器容器(通常为 DefaultMessageListenerContainer)。spring-doc.cadn.net.cn

要为 XA 事务参与配置消息监听器容器,您需要配置一个 JtaTransactionManager(默认情况下,它会委托给 Jakarta EE 服务器的事务子系统)。请注意,底层的 JMS ConnectionFactory 必须支持 XA,并且已正确注册到您的 JTA 事务协调器中。(请检查您的 Jakarta EE 服务器对 JNDI 资源的配置。)这样,消息接收以及(例如)数据库访问就可以作为同一个事务的一部分(具有统一的提交语义,但会带来 XA 事务日志的额外开销)。spring-doc.cadn.net.cn

以下的 bean 定义创建了一个事务管理器:spring-doc.cadn.net.cn

<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>

然后我们需要将其添加到我们之前的容器配置中。容器会处理其余的工作。以下示例展示了如何实现这一点:spring-doc.cadn.net.cn

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destination" ref="destination"/>
	<property name="messageListener" ref="messageListener"/>
	<property name="transactionManager" ref="transactionManager"/> (1)
</bean>
1 我们的事务管理器。