此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring Framework 6.2.10! |
接收消息
这描述了如何在 Spring 中使用 JMS 接收消息。
同步收货
虽然 JMS 通常与异步处理相关联,但您可以
同步消费消息。超载的receive(..)
方法提供了
功能性。在同步接收期间,调用线程会阻塞,直到消息
变得可用。这可能是一个危险的作,因为调用线程可以
可能会被无限期阻止。这receiveTimeout
属性指定多长时间
接收者应该在放弃等待消息之前等待。
异步接收:消息驱动的 POJO
Spring 还通过使用@JmsListener 注释,并提供开放的基础设施以编程方式注册端点。
到目前为止,这是设置异步接收器的最便捷方法。
有关更多详细信息,请参阅启用侦听器端点注释。 |
以类似于 EJB 世界中的消息驱动 Bean (MDB) 的方式,消息驱动
POJO (MDP) 充当 JMS 消息的接收器。一个限制(但请参阅用MessageListenerAdapter
)
在 MDP 上,它必须实现jakarta.jms.MessageListener
接口。
请注意,如果您的 POJO 在多个线程上接收消息,请务必
确保实现是线程安全的。
以下示例显示了 MDP 的简单实现:
-
Java
-
Kotlin
public class ExampleListener implements MessageListener {
public void onMessage(Message message) {
if (message instanceof TextMessage textMessage) {
try {
System.out.println(textMessage.getText());
}
catch (JMSException ex) {
throw new RuntimeException(ex);
}
}
else {
throw new IllegalArgumentException("Message must be of type TextMessage");
}
}
}
class ExampleListener : MessageListener {
override fun onMessage(message: Message) {
if (message is TextMessage) {
try {
println(message.text)
} catch (ex: JMSException) {
throw RuntimeException(ex)
}
} else {
throw IllegalArgumentException("Message must be of type TextMessage")
}
}
}
一旦你实现了MessageListener
,是时候创建一个消息监听器了
容器。
以下示例显示如何定义和配置其中一个消息侦听器
随 Spring 一起提供的容器(在本例中,DefaultMessageListenerContainer
):
-
Java
-
Kotlin
-
Xml
@Bean
ExampleListener messageListener() {
return new ExampleListener();
}
@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {
DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
return jmsContainer;
}
@Bean
fun messageListener() = ExampleListener()
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestination(destination)
setMessageListener(messageListener)
}
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="jmsexample.ExampleListener"/>
<!-- and this is the message listener container -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
</bean>
请参阅各种消息侦听器容器(所有容器都实现 MessageListenerContainer)的 Spring javadoc 了解每个实现支持的功能的完整描述。
使用SessionAwareMessageListener
接口
这SessionAwareMessageListener
interface 是一个特定于 Spring 的接口,它提供
与 JMS 类似的合同MessageListener
接口,但也给出了消息处理
方法访问 JMSSession
从中,Message
被收到。
以下列表显示了SessionAwareMessageListener
接口:
package org.springframework.jms.listener;
public interface SessionAwareMessageListener {
void onMessage(Message message, Session session) throws JMSException;
}
您可以选择让 MDP 实现此接口(优先于
JMS系统MessageListener
接口),如果您希望您的 MDP 能够响应任何
收到的消息(通过使用Session
在onMessage(Message, Session)
方法)。Spring 附带的所有消息侦听器容器实现
支持实现MessageListener
或SessionAwareMessageListener
接口。实现SessionAwareMessageListener
请注意,它们随后与 Spring 联系在一起
通过界面。是否使用它的选择完全取决于您
作为应用程序开发人员或架构师。
请注意,onMessage(..)
方法SessionAwareMessageListener
接口抛出JMSException
.与标准 JMS 相比MessageListener
接口,当使用SessionAwareMessageListener
接口,它是
客户端代码处理任何引发的异常的责任。
用MessageListenerAdapter
这MessageListenerAdapter
class 是 Spring 异步消息传递支持中的最后一个组件。简而言之,它允许您将几乎任何类公开为 MDP(尽管有一些限制)。
请考虑以下接口定义:
-
Java
-
Kotlin
public interface MessageDelegate {
void handleMessage(String message);
void handleMessage(Map message);
void handleMessage(byte[] message);
void handleMessage(Serializable message);
}
interface MessageDelegate {
fun handleMessage(message: String)
fun handleMessage(message: Map<*, *>)
fun handleMessage(message: ByteArray)
fun handleMessage(message: Serializable)
}
请注意,尽管该接口既没有扩展MessageListener
也不是SessionAwareMessageListener
接口,您仍然可以通过使用MessageListenerAdapter
类。 另请注意各种消息处理方法是如何的根据各种内容强类型化Message
他们可以接收和处理。
现在考虑以下MessageDelegate
接口:
-
Java
-
Kotlin
public class DefaultMessageDelegate implements MessageDelegate {
@Override
public void handleMessage(String message) {
// ...
}
@Override
public void handleMessage(Map message) {
// ...
}
@Override
public void handleMessage(byte[] message) {
// ...
}
@Override
public void handleMessage(Serializable message) {
// ...
}
}
class DefaultMessageDelegate : MessageDelegate {
override fun handleMessage(message: String) {
// ...
}
override fun handleMessage(message: Map<*, *>) {
// ...
}
override fun handleMessage(message: ByteArray) {
// ...
}
override fun handleMessage(message: Serializable) {
// ...
}
}
特别是,请注意前面的MessageDelegate
接口(DefaultMessageDelegate
class) 根本没有 JMS 依赖项。它确实是一个POJO,我们可以通过以下配置将其制作成 MDP:
-
Java
-
Kotlin
-
Xml
@Bean
MessageListenerAdapter messageListener(DefaultMessageDelegate messageDelegate) {
return new MessageListenerAdapter(messageDelegate);
}
@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {
DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
return jmsContainer;
}
@Bean
fun messageListener(messageDelegate: DefaultMessageDelegate): MessageListenerAdapter {
return MessageListenerAdapter(messageDelegate)
}
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestination(destination)
setMessageListener(messageListener)
}
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="jmsexample.DefaultMessageDelegate"/>
</constructor-arg>
</bean>
<!-- and this is the message listener container... -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
</bean>
下一个示例显示了另一个只能处理接收 JMS 的 MDPTextMessage
消息。 请注意消息处理方法的实际调用方式receive
(MessageListenerAdapter
默认为handleMessage
),但它是可配置的(正如您在本节后面看到的那样)。 通知 还有receive(..)
方法被强类型化为仅接收和响应 JMSTextMessage
消息。 以下列表显示了TextMessageDelegate
接口:
-
Java
-
Kotlin
public interface TextMessageDelegate {
void receive(TextMessage message);
}
interface TextMessageDelegate {
fun receive(message: TextMessage)
}
以下列表显示了一个实现TextMessageDelegate
接口:
-
Java
-
Kotlin
public class DefaultTextMessageDelegate implements TextMessageDelegate {
@Override
public void receive(TextMessage message) {
// ...
}
}
class DefaultTextMessageDelegate : TextMessageDelegate {
override fun receive(message: TextMessage) {
// ...
}
}
服务员的配置MessageListenerAdapter
则如下所示:
-
Java
-
Kotlin
-
Xml
@Bean
MessageListenerAdapter messageListener(DefaultTextMessageDelegate messageDelegate) {
MessageListenerAdapter messageListener = new MessageListenerAdapter(messageDelegate);
messageListener.setDefaultListenerMethod("receive");
// We don't want automatic message context extraction
messageListener.setMessageConverter(null);
return messageListener;
}
@Bean
fun messageListener(messageDelegate: DefaultTextMessageDelegate) = MessageListenerAdapter(messageDelegate).apply {
setDefaultListenerMethod("receive")
// We don't want automatic message context extraction
setMessageConverter(null)
}
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="jmsexample.DefaultTextMessageDelegate"/>
</constructor-arg>
<property name="defaultListenerMethod" value="receive"/>
<!-- we don't want automatic message context extraction -->
<property name="messageConverter">
<null/>
</property>
</bean>
请注意,如果messageListener
接收 JMSMessage
类型的 以外TextMessage
一IllegalStateException
被抛出(随后吞下)。的另一个功能MessageListenerAdapter
class 是自动发回响应的能力Message
如果处理程序方法返回non-void 值。考虑以下接口和类:
-
Java
-
Kotlin
public interface ResponsiveTextMessageDelegate {
// Notice the return type...
String receive(TextMessage message);
}
interface ResponsiveTextMessageDelegate {
// Notice the return type...
fun receive(message: TextMessage): String
}
-
Java
-
Kotlin
public class DefaultResponsiveTextMessageDelegate implements ResponsiveTextMessageDelegate {
@Override
public String receive(TextMessage message) {
return "message";
}
}
class DefaultResponsiveTextMessageDelegate : ResponsiveTextMessageDelegate {
override fun receive(message: TextMessage): String {
return "message"
}
}
如果您使用DefaultResponsiveTextMessageDelegate
结合MessageListenerAdapter
,则从执行
这'receive(..)'
方法(在默认配置中)转换为TextMessage
.由此产生的TextMessage
然后发送到Destination
(如果
一个存在)在 JMS 中定义Reply-To
原始的属性Message
或
违约Destination
设置在MessageListenerAdapter
(如果已配置)。
如果没有Destination
找到,则InvalidDestinationException
被抛出
(请注意,此异常不会被吞噬并向上传播
调用堆栈)。
处理事务中的消息
在事务中调用消息侦听器只需要重新配置 listener 容器。
您可以通过sessionTransacted
旗
在侦听器容器定义上。然后,每个消息侦听器调用都会运行
在活动 JMS 事务中,在侦听器的情况下回滚消息接收
执行失败。发送响应消息(通过SessionAwareMessageListener
) 是
同一本地事务的一部分,但任何其他资源作(例如
数据库访问)独立运行。这通常需要重复的消息
侦听器实现中的检测,以涵盖数据库处理
已提交,但消息处理未能提交。
考虑以下 bean 定义:
-
Java
-
Kotlin
-
Xml
@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {
DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
jmsContainer.setSessionTransacted(true);
return jmsContainer;
}
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestination(destination)
setMessageListener(messageListener)
isSessionTransacted = true
}
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
<property name="sessionTransacted" value="true"/>
</bean>
要参与外部管理的交易,您需要配置一个
事务管理器,并使用支持外部托管的侦听器容器
事务(通常,DefaultMessageListenerContainer
).
要配置消息侦听器容器以参与 XA 事务,您需要
配置JtaTransactionManager
(默认情况下,它委托给雅加达 EE
服务器的事务子系统)。请注意,底层 JMSConnectionFactory
需要
具有 XA 能力并在您的 JTA 交易协调员处正确注册。(检查您的
Jakarta EE 服务器对 JNDI 资源的配置。这也允许消息接收
因为(例如)数据库访问是同一事务的一部分(具有统一提交
语义,以牺牲 XA 事务日志开销为代价)。
以下 bean 定义创建事务管理器:
-
Java
-
Kotlin
-
Xml
@Bean
JtaTransactionManager transactionManager() {
return new JtaTransactionManager();
}
@Bean
fun transactionManager() = JtaTransactionManager()
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>
然后我们需要将其添加到我们之前的容器配置中。容器 处理剩下的。以下示例显示了如何执行此作:
-
Java
-
Kotlin
-
Xml
@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {
DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
jmsContainer.setSessionTransacted(true);
return jmsContainer;
}
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener,
transactionManager: JtaTransactionManager) =
DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestination(destination)
setMessageListener(messageListener)
setTransactionManager(transactionManager)
}
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
<property name="transactionManager" ref="transactionManager"/>
</bean>