此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring Framework 6.2.10! |
接收消息
这描述了如何在 Spring 中使用 JMS 接收消息。
同步收货
虽然 JMS 通常与异步处理相关联,但您可以使用消息 同步。 这receive(..)
方法JmsTemplate
和JmsClient
提供这个 功能性。 在同步接收期间,调用线程会阻塞,直到消息变得可用。这可能是一个危险的作,因为调用线程可以可能被无限期地阻止。 这receiveTimeout
属性指定多长时间接收者应该等待,然后放弃等待消息。
异步接收:消息驱动的 POJO
Spring 还通过使用@JmsListener 注释,并提供开放的基础设施以编程方式注册端点。到目前为止,这是设置异步接收器的最便捷方法。有关更多详细信息,请参阅启用侦听器端点注释。 |
以类似于 EJB 世界中的消息驱动 Bean (MDB) 的方式,消息驱动POJO (MDP) 充当 JMS 消息的接收器。一个限制(但请参阅用MessageListenerAdapter
) 在 MDP 上,它必须实现jakarta.jms.MessageListener
接口。 请注意,如果您的 POJO 在多个线程上接收消息,请务必确保您的实现是线程安全的。
以下示例显示了 MDP 的简单实现:
-
Java
-
Kotlin
public class ExampleListener implements MessageListener {
public void onMessage(Message message) {
if (message instanceof TextMessage textMessage) {
try {
System.out.println(textMessage.getText());
}
catch (JMSException ex) {
throw new RuntimeException(ex);
}
}
else {
throw new IllegalArgumentException("Message must be of type TextMessage");
}
}
}
class ExampleListener : MessageListener {
override fun onMessage(message: Message) {
if (message is TextMessage) {
try {
println(message.text)
} catch (ex: JMSException) {
throw RuntimeException(ex)
}
} else {
throw IllegalArgumentException("Message must be of type TextMessage")
}
}
}
一旦你实现了MessageListener
,是时候创建一个消息监听器了 容器。
以下示例显示了如何定义和配置 Spring 附带的消息侦听器之一容器(在本例中,DefaultMessageListenerContainer
):
-
Java
-
Kotlin
-
Xml
@Bean
ExampleListener messageListener() {
return new ExampleListener();
}
@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {
DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
return jmsContainer;
}
@Bean
fun messageListener() = ExampleListener()
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestination(destination)
setMessageListener(messageListener)
}
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="jmsexample.ExampleListener"/>
<!-- and this is the message listener container -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
</bean>
请参阅各种消息监听器容器(所有容器都实现 MessageListenerContainer)的 Spring javadoc了解每个实现支持的功能的完整描述。
使用SessionAwareMessageListener
接口
这SessionAwareMessageListener
接口是一个特定于 Spring 的接口,它提供与 JMS 类似的合约MessageListener
接口,但还提供消息处理方法对 JMS 的访问权限Session
从中,Message
已收到。以下列表显示了SessionAwareMessageListener
接口:
package org.springframework.jms.listener;
public interface SessionAwareMessageListener {
void onMessage(Message message, Session session) throws JMSException;
}
您可以选择让 MDP 实现此接口(优先于标准JMSMessageListener
接口),如果您希望您的 MDP 能够响应任何收到的消息(通过使用Session
在onMessage(Message, Session)
方法)。Spring 附带的所有消息侦听器容器实现都支持实现MessageListener
或SessionAwareMessageListener
接口。 实现SessionAwareMessageListener
请注意,它们随后被绑定到 Spring通过接口。是否使用它的选择完全取决于您作为应用程序开发人员或架构师。
请注意,onMessage(..)
方法SessionAwareMessageListener
接口抛出JMSException
. 与标准 JMS 相比MessageListener
接口,当使用SessionAwareMessageListener
接口,它是客户端代码处理任何抛出的异常的责任。
用MessageListenerAdapter
这MessageListenerAdapter
class 是 Spring 异步
消息传递支持。简而言之,它允许您将几乎任何类公开为 MDP
(尽管有一些限制)。
请考虑以下接口定义:
-
Java
-
Kotlin
public interface MessageDelegate {
void handleMessage(String message);
void handleMessage(Map message);
void handleMessage(byte[] message);
void handleMessage(Serializable message);
}
interface MessageDelegate {
fun handleMessage(message: String)
fun handleMessage(message: Map<*, *>)
fun handleMessage(message: ByteArray)
fun handleMessage(message: Serializable)
}
请注意,尽管该接口既没有扩展MessageListener
也不是SessionAwareMessageListener
接口,您仍然可以通过使用MessageListenerAdapter
类。另请注意各种消息处理方法
根据各种内容强类型Message
他们可以的类型
接收和处理。
现在考虑以下MessageDelegate
接口:
-
Java
-
Kotlin
public class DefaultMessageDelegate implements MessageDelegate {
@Override
public void handleMessage(String message) {
// ...
}
@Override
public void handleMessage(Map message) {
// ...
}
@Override
public void handleMessage(byte[] message) {
// ...
}
@Override
public void handleMessage(Serializable message) {
// ...
}
}
class DefaultMessageDelegate : MessageDelegate {
override fun handleMessage(message: String) {
// ...
}
override fun handleMessage(message: Map<*, *>) {
// ...
}
override fun handleMessage(message: ByteArray) {
// ...
}
override fun handleMessage(message: Serializable) {
// ...
}
}
特别是,请注意前面的MessageDelegate
接口(DefaultMessageDelegate
class)根本没有 JMS 依赖项。这确实是一个
POJO,我们可以通过以下配置做成 MDP:
-
Java
-
Kotlin
-
Xml
@Bean
MessageListenerAdapter messageListener(DefaultMessageDelegate messageDelegate) {
return new MessageListenerAdapter(messageDelegate);
}
@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {
DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
return jmsContainer;
}
@Bean
fun messageListener(messageDelegate: DefaultMessageDelegate): MessageListenerAdapter {
return MessageListenerAdapter(messageDelegate)
}
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestination(destination)
setMessageListener(messageListener)
}
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="jmsexample.DefaultMessageDelegate"/>
</constructor-arg>
</bean>
<!-- and this is the message listener container... -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
</bean>
下一个示例显示了另一个只能处理接收 JMS 的 MDPTextMessage
消息。请注意消息处理方法的实际调用方式receive
(MessageListenerAdapter
默认为handleMessage
),但它是可配置的(正如本节后面所见)。通知
还有receive(..)
方法被强类型化为仅接收和响应 JMSTextMessage
消息。
以下列表显示了TextMessageDelegate
接口:
-
Java
-
Kotlin
public interface TextMessageDelegate {
void receive(TextMessage message);
}
interface TextMessageDelegate {
fun receive(message: TextMessage)
}
以下列表显示了一个实现TextMessageDelegate
接口:
-
Java
-
Kotlin
public class DefaultTextMessageDelegate implements TextMessageDelegate {
@Override
public void receive(TextMessage message) {
// ...
}
}
class DefaultTextMessageDelegate : TextMessageDelegate {
override fun receive(message: TextMessage) {
// ...
}
}
服务员的配置MessageListenerAdapter
则如下所示:
-
Java
-
Kotlin
-
Xml
@Bean
MessageListenerAdapter messageListener(DefaultTextMessageDelegate messageDelegate) {
MessageListenerAdapter messageListener = new MessageListenerAdapter(messageDelegate);
messageListener.setDefaultListenerMethod("receive");
// We don't want automatic message context extraction
messageListener.setMessageConverter(null);
return messageListener;
}
@Bean
fun messageListener(messageDelegate: DefaultTextMessageDelegate) = MessageListenerAdapter(messageDelegate).apply {
setDefaultListenerMethod("receive")
// We don't want automatic message context extraction
setMessageConverter(null)
}
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="jmsexample.DefaultTextMessageDelegate"/>
</constructor-arg>
<property name="defaultListenerMethod" value="receive"/>
<!-- we don't want automatic message context extraction -->
<property name="messageConverter">
<null/>
</property>
</bean>
请注意,如果messageListener
接收 JMSMessage
类型的
以外TextMessage
一IllegalStateException
被抛出(随后
吞下)。的另一个功能MessageListenerAdapter
class 是
能够自动发回响应Message
如果处理程序方法返回non-void 值。考虑以下接口和类:
-
Java
-
Kotlin
public interface ResponsiveTextMessageDelegate {
// Notice the return type...
String receive(TextMessage message);
}
interface ResponsiveTextMessageDelegate {
// Notice the return type...
fun receive(message: TextMessage): String
}
-
Java
-
Kotlin
public class DefaultResponsiveTextMessageDelegate implements ResponsiveTextMessageDelegate {
@Override
public String receive(TextMessage message) {
return "message";
}
}
class DefaultResponsiveTextMessageDelegate : ResponsiveTextMessageDelegate {
override fun receive(message: TextMessage): String {
return "message"
}
}
如果您使用DefaultResponsiveTextMessageDelegate
结合MessageListenerAdapter
,则从执行
这'receive(..)'
方法(在默认配置中)转换为TextMessage
.由此产生的TextMessage
然后发送到Destination
(如果
一个存在)在 JMS 中定义Reply-To
原始的属性Message
或
违约Destination
设置在MessageListenerAdapter
(如果已配置)。
如果没有Destination
找到,则InvalidDestinationException
被抛出
(请注意,此异常不会被吞噬并向上传播
调用堆栈)。
处理事务中的消息
在事务中调用消息侦听器只需要重新配置 listener 容器。
您可以通过sessionTransacted
旗
在侦听器容器定义上。然后,每个消息侦听器调用都会运行
在活动 JMS 事务中,在侦听器的情况下回滚消息接收
执行失败。发送响应消息(通过SessionAwareMessageListener
) 是
同一本地事务的一部分,但任何其他资源作(例如
数据库访问)独立运行。这通常需要重复的消息
侦听器实现中的检测,以涵盖数据库处理
已提交,但消息处理未能提交。
考虑以下 bean 定义:
-
Java
-
Kotlin
-
Xml
@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {
DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
jmsContainer.setSessionTransacted(true);
return jmsContainer;
}
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestination(destination)
setMessageListener(messageListener)
isSessionTransacted = true
}
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
<property name="sessionTransacted" value="true"/>
</bean>
要参与外部管理的交易,您需要配置一个
事务管理器,并使用支持外部托管的侦听器容器
事务(通常,DefaultMessageListenerContainer
).
要配置消息侦听器容器以参与 XA 事务,您需要
配置JtaTransactionManager
(默认情况下,它委托给雅加达 EE
服务器的事务子系统)。请注意,底层 JMSConnectionFactory
需要
具有 XA 能力并在您的 JTA 交易协调员处正确注册。(检查您的
Jakarta EE 服务器对 JNDI 资源的配置。这也允许消息接收
因为(例如)数据库访问是同一事务的一部分(具有统一提交
语义,以牺牲 XA 事务日志开销为代价)。
以下 bean 定义创建事务管理器:
-
Java
-
Kotlin
-
Xml
@Bean
JtaTransactionManager transactionManager() {
return new JtaTransactionManager();
}
@Bean
fun transactionManager() = JtaTransactionManager()
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>
然后我们需要将其添加到我们之前的容器配置中。容器负责其余的事情。以下示例显示了如何执行此作:
-
Java
-
Kotlin
-
Xml
@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {
DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
jmsContainer.setSessionTransacted(true);
return jmsContainer;
}
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener,
transactionManager: JtaTransactionManager) =
DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestination(destination)
setMessageListener(messageListener)
setTransactionManager(transactionManager)
}
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
<property name="transactionManager" ref="transactionManager"/>
</bean>