|
对于最新的稳定版本,请使用 Spring Framework 6.2.7! |
接收消息
这描述了如何在 Spring 中使用 JMS 接收消息。
同步接收
虽然 JMS 通常与异步处理相关联,但您可以
同步消费消息。超载的receive(..)方法提供此
功能性。在同步接收期间,调用线程会阻塞,直到出现一条消息
变为可用。这可能是一个危险的作,因为调用线程可以
可能会无限期阻止。这receiveTimeoutproperty 指定多长时间
接收方应该在放弃等待消息之前等待。
异步接收:消息驱动的 POJO
Spring 还通过使用@JmsListener注解,并提供一个开放的基础设施来以编程方式注册端点。
到目前为止,这是设置 asynchronous receiver 最方便的方法。
有关更多详细信息,请参阅启用侦听器终端节点注释。 |
与EJB世界中的消息驱动Bean (MDB) 类似,消息驱动的
POJO (MDP) 充当 JMS 消息的接收方。一个限制(但请参阅用MessageListenerAdapter) 的 MDP 上执行
这jakarta.jms.MessageListener接口。请注意,如果您的 POJO 收到消息
在多个线程上,确保您的实现是线程安全的非常重要。
以下示例显示了 MDP 的简单实现:
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageListener;
import jakarta.jms.TextMessage;
public class ExampleListener implements MessageListener {
public void onMessage(Message message) {
if (message instanceof TextMessage textMessage) {
try {
System.out.println(textMessage.getText());
}
catch (JMSException ex) {
throw new RuntimeException(ex);
}
}
else {
throw new IllegalArgumentException("Message must be of type TextMessage");
}
}
}
实施MessageListener,是时候创建消息侦听器
容器。
以下示例说明如何定义和配置其中一个消息侦听器
容器中(在本例中为DefaultMessageListenerContainer):
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="jmsexample.ExampleListener"/>
<!-- and this is the message listener container -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
</bean>
请参阅各种消息侦听器容器(所有这些容器都实现了MessageListenerContainer)的 Spring javadoc ,了解每种实施所支持的功能的完整描述。
使用SessionAwareMessageListener接口
这SessionAwareMessageListenerinterface 是一个特定于 Spring 的接口,它提供
与 JMS 类似的合同MessageListener接口,但也给出了消息处理
方法访问 JMSSession从中,Message收到了。
下面的清单显示了SessionAwareMessageListener接口:
package org.springframework.jms.listener;
public interface SessionAwareMessageListener {
void onMessage(Message message, Session session) throws JMSException;
}
您可以选择让您的 MDP 实现此接口(优先于标准
JMS 公司MessageListener接口),如果您希望 MDP 能够响应任何
收到的消息(通过使用Session在onMessage(Message, Session)方法)。Spring 附带的所有消息侦听器容器实现
支持实现MessageListener或SessionAwareMessageListener接口。实现SessionAwareMessageListener来时需要注意的是,他们随后会与 Spring 绑定
通过界面。是否使用它的选择完全取决于您
作为应用程序开发人员或架构师。
请注意,onMessage(..)方法SessionAwareMessageListener接口引发JMSException.与标准 JMS 相比MessageListener接口中,使用SessionAwareMessageListener接口,它是
客户端代码负责处理任何引发的异常。
用MessageListenerAdapter
这MessageListenerAdapterclass 是 Spring 的 asynchronous 中的最后一个组件
消息支持。简而言之,它允许您将几乎任何类公开为 MDP
(尽管有一些限制)。
请考虑以下接口定义:
public interface MessageDelegate {
void handleMessage(String message);
void handleMessage(Map message);
void handleMessage(byte[] message);
void handleMessage(Serializable message);
}
请注意,尽管接口既没有扩展MessageListener也不是SessionAwareMessageListener接口中,您仍然可以通过使用MessageListenerAdapter类。还要注意各种消息处理方法的
根据各种内容强类型Message类型,他们可以
接收和处理。
现在考虑以下MessageDelegate接口:
public class DefaultMessageDelegate implements MessageDelegate {
// implementation elided for clarity...
}
特别要注意的是,前面的MessageDelegate接口(DefaultMessageDelegate类)完全没有 JMS 依赖项。它确实是一个
POJO 中,我们可以通过以下配置将其制作成 MDP:
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="jmsexample.DefaultMessageDelegate"/>
</constructor-arg>
</bean>
<!-- and this is the message listener container... -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
</bean>
下一个示例显示了另一个只能处理接收 JMS 的 MDPTextMessage消息。请注意消息处理方法的实际调用receive(消息处理方法的名称,以MessageListenerAdapter默认为handleMessage),但它是可配置的(如本节后面所示)。通知
还有receive(..)method 的强类型化,以便仅接收和响应 JMSTextMessage消息。
下面的清单显示了TextMessageDelegate接口:
public interface TextMessageDelegate {
void receive(TextMessage message);
}
下面的清单显示了一个实现TextMessageDelegate接口:
public class DefaultTextMessageDelegate implements TextMessageDelegate {
// implementation elided for clarity...
}
Attendant 的配置MessageListenerAdapter将如下所示:
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="jmsexample.DefaultTextMessageDelegate"/>
</constructor-arg>
<property name="defaultListenerMethod" value="receive"/>
<!-- we don't want automatic message context extraction -->
<property name="messageConverter">
<null/>
</property>
</bean>
请注意,如果messageListener接收 JMSMessage类型
以外TextMessage一IllegalStateException被抛出(随后
吞下)。另一个功能MessageListenerAdapterclass 是
能够自动发回响应Message如果处理程序方法返回
non-void 值。请考虑以下接口和类:
public interface ResponsiveTextMessageDelegate {
// notice the return type...
String receive(TextMessage message);
}
public class DefaultResponsiveTextMessageDelegate implements ResponsiveTextMessageDelegate {
// implementation elided for clarity...
}
如果您使用DefaultResponsiveTextMessageDelegate与MessageListenerAdapter中,从执行
这'receive(..)'method 被(在默认配置中)转换为TextMessage.结果TextMessage然后发送到Destination(如果
一个存在)在 JMS 中定义Reply-To原始属性Message或
违约Destinationset 在MessageListenerAdapter(如果已配置)。
如果没有Destination,则InvalidDestinationException被抛出
(请注意,此异常不会被吞噬,而是会向上传播
call 堆栈)。
处理事务中的消息
在事务中调用消息侦听器只需要重新配置 listener 容器。
您可以通过sessionTransacted旗
在侦听器容器定义上。然后,每个消息侦听器调用都会运行
在活动的 JMS 事务中,如果侦听器,则回滚消息接收
执行失败。发送响应消息(通过SessionAwareMessageListener) 是
部分,但任何其他资源作(例如
数据库访问)独立运行。这通常需要重复的消息
detection 来覆盖数据库处理
已提交,但消息处理无法提交。
考虑以下 bean 定义:
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
<property name="sessionTransacted" value="true"/>
</bean>
要参与外部管理的事务,您需要配置
事务管理器,并使用支持外部管理的侦听器容器
事务(通常为DefaultMessageListenerContainer).
要为 XA 事务参与配置消息侦听器容器,您需要
要配置JtaTransactionManager(默认情况下,委托给 Jakarta EE
服务器的事务子系统)。请注意,底层 JMSConnectionFactory需要
支持 XA 并已正确注册到 JTA 事务协调器。(检查您的
Jakarta EE 服务器的 JNDI 资源的配置。这也允许消息接收
因为(例如)数据库访问是同一事务的一部分(使用统一提交
语义,但代价是 XA 事务日志开销)。
以下 bean 定义创建一个事务管理器:
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>
然后我们需要将其添加到我们之前的容器配置中。容器 负责其余的事情。以下示例显示了如何执行此作:
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
<property name="transactionManager" ref="transactionManager"/> (1)
</bean>
| 1 | 我们的交易管理器。 |