|
对于最新的稳定版本,请使用 Spring Framework 7.0.6! |
接收消息
这描述了如何在Spring中使用JMS接收消息。
同步接收
虽然JMS通常与异步处理相关联,但你可以同步地接收消息。重载的<code>0</code>方法提供了此功能。在同步接收过程中,调用线程会阻塞直到消息可用。这可能是一个危险的操作,因为调用线程可能会被无限期地阻塞。<code>1</code>属性指定接收器在放弃等待消息之前应该等待的时间。
异步接收:消息驱动的POJO
Spring 还通过使用 @JmsListener 注解支持带注解的监听器端点,并提供了开放的基础设施以编程方式注册端点。
这无疑是设置异步接收器最方便的方式。
有关更多详细信息,请参见 启用监听器端点注解。 |
在类似于EJB世界中的消息驱动Bean(MDB)的方式中,消息驱动的POJO(MDP)作为JMS消息的接收者。MDP的一个限制(但请参见使用MessageListenerAdapter)是它必须实现jakarta.jms.MessageListener接口。请注意,如果您的POJO在多个线程上接收消息,确保您的实现是线程安全的非常重要。
以下示例展示了一个MDP的简单实现:
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageListener;
import jakarta.jms.TextMessage;
public class ExampleListener implements MessageListener {
public void onMessage(Message message) {
if (message instanceof TextMessage textMessage) {
try {
System.out.println(textMessage.getText());
}
catch (JMSException ex) {
throw new RuntimeException(ex);
}
}
else {
throw new IllegalArgumentException("Message must be of type TextMessage");
}
}
}
在您实现了您的 MessageListener 之后,就可以创建一个消息监听器容器了。
以下示例显示了如何定义和配置Spring随附的消息监听器容器之一(在此情况下为DefaultMessageListenerContainer):
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="jmsexample.ExampleListener"/>
<!-- and this is the message listener container -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
</bean>
查看各种消息监听器容器的 Spring javadoc(所有容器均实现 MessageListenerContainer) 以获得每个实现支持的功能的完整描述。
使用 SessionAwareMessageListener 接口
SessionAwareMessageListener 接口是一个 Spring 特有的接口,它提供了与 JMS MessageListener 接口类似的契约,但还允许消息处理方法访问接收 Message 的 JMS Session。
以下列表显示了 SessionAwareMessageListener 接口的定义:
package org.springframework.jms.listener;
public interface SessionAwareMessageListener {
void onMessage(Message message, Session session) throws JMSException;
}
您可以选择让您的MDP实现此接口(优先于标准的JMS MessageListener接口),如果您希望您的MDP能够响应任何接收到的消息(通过使用在Session中提供的onMessage(Message, Session)方法)。Spring自带的所有消息监听器容器实现都支持实现MessageListener或SessionAwareMessageListener接口的MDP。实现SessionAwareMessageListener接口的类有一个注意事项,即它们将通过接口与Spring绑定。是否使用该接口的选择完全由您作为应用程序开发人员或架构师决定。
请注意,onMessage(..)接口的SessionAwareMessageListener方法会抛出JMSException。与标准JMS的MessageListener接口不同,当使用SessionAwareMessageListener接口时,客户端代码需要处理任何抛出的异常。
使用 MessageListenerAdapter
MessageListenerAdapter 类是 Spring 异步消息传递支持中的最终组件。简而言之,它允许您将几乎任何类作为 MDP(尽管有一些限制)公开。
考虑以下接口定义:
public interface MessageDelegate {
void handleMessage(String message);
void handleMessage(Map message);
void handleMessage(byte[] message);
void handleMessage(Serializable message);
}
请注意,尽管该接口既不继承 MessageListener 也不继承 SessionAwareMessageListener 接口,您仍然可以通过使用 MessageListenerAdapter 类将其用作 MDP。还要注意各种消息处理方法是如何根据它们可以接收和处理的各个 Message 类型的内容进行强类型的。
现在考虑以下 MessageDelegate 接口的实现:
public class DefaultMessageDelegate implements MessageDelegate {
// implementation elided for clarity...
}
特别是要注意前面实现的 MessageDelegate 接口(DefaultMessageDelegate 类)完全没有 JMS 依赖。它确实是一个 POJO,我们可以通过以下配置将其变为 MDP:
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="jmsexample.DefaultMessageDelegate"/>
</constructor-arg>
</bean>
<!-- and this is the message listener container... -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
</bean>
下一个示例显示了另一个只能处理接收JMS
TextMessage消息的MDP。请注意消息处理方法实际上被调用为
receive(在MessageListenerAdapter中的消息处理方法名称默认为handleMessage),但它是可配置的(如本节后面所示)。请注意
receive(..)方法被强类型化,仅接收和响应JMS
TextMessage消息。
以下列表显示了TextMessageDelegate接口的定义:
public interface TextMessageDelegate {
void receive(TextMessage message);
}
以下列表显示了一个实现 TextMessageDelegate 接口的类:
public class DefaultTextMessageDelegate implements TextMessageDelegate {
// implementation elided for clarity...
}
代理 MessageListenerAdapter 的配置将如下所示:
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="jmsexample.DefaultTextMessageDelegate"/>
</constructor-arg>
<property name="defaultListenerMethod" value="receive"/>
<!-- we don't want automatic message context extraction -->
<property name="messageConverter">
<null/>
</property>
</bean>
请注意,如果messageListener接收到一个类型不是TextMessage的JMS Message,则会抛出一个IllegalStateException(随后被吞掉)。MessageListenerAdapter类的另一个功能是,如果处理方法返回非void值,则自动发送回一个响应Message。请考虑以下接口和类:
public interface ResponsiveTextMessageDelegate {
// notice the return type...
String receive(TextMessage message);
}
public class DefaultResponsiveTextMessageDelegate implements ResponsiveTextMessageDelegate {
// implementation elided for clarity...
}
如果将 DefaultResponsiveTextMessageDelegate 与 MessageListenerAdapter 一起使用,则从执行 'receive(..)' 方法返回的任何非空值都会(在默认配置中)转换为 TextMessage。然后,生成的 TextMessage 会被发送到 JMS Reply-To 属性中定义的 Destination(如果存在的话),该属性来自原始 Message 或者在 MessageListenerAdapter 上设置的默认 Destination(如果已配置的话)。
如果没有找到 Destination,则会抛出 InvalidDestinationException(注意,此异常不会被吞没,并会传播到调用堆栈中)。
在事务中处理消息
在事务内调用消息监听器只需重新配置监听器容器。
您可以通过在监听器容器定义中使用sessionTransacted标志来激活本地资源事务。然后每次消息监听器调用都会在活动的JMS事务中进行,如果监听器执行失败,消息接收将被回滚。通过SessionAwareMessageListener发送响应消息是同一本地事务的一部分,但任何其他资源操作(例如数据库访问)则独立进行。这通常需要在监听器实现中检测重复消息,以覆盖数据库处理已提交但消息处理未提交的情况。
考虑以下的bean定义:
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
<property name="sessionTransacted" value="true"/>
</bean>
要参与外部管理的事务,您需要配置一个事务管理器,并使用支持外部管理事务的监听器容器(通常为DefaultMessageListenerContainer)。
要为参与 XA 事务的消息监听器容器进行配置,您需要配置一个 JtaTransactionManager(默认情况下,它会委托给 Jakarta EE 服务器的事务子系统)。请注意,底层 JMS ConnectionFactory 必须具备 XA 能力,并正确注册到您的 JTA 事务协调器中。(请检查您的 Jakarta EE 服务器对 JNDI 资源的配置。)这使得消息接收以及(例如)数据库访问能够成为同一事务的一部分(具有统一的提交语义,但会带来 XA 事务日志的开销)。
以下 bean 定义创建了一个事务管理器:
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>
然后我们需要将其添加到我们之前的容器配置中。容器会处理其余部分。下面的示例展示了如何操作:
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
<property name="transactionManager" ref="transactionManager"/> (1)
</bean>
| 1 | 我们的事务管理器。 |