|
对于最新的稳定版本,请使用 Spring Framework 7.0.6! |
注解驱动的监听器端点
使用带注解的监听器端点基础设施是异步接收消息的最简单方法。简而言之,它允许你将托管 bean 的方法公开为 JMS 监听器端点。下面的例子展示了如何使用它:
@Component
public class MyService {
@JmsListener(destination = "myDestination")
public void processOrder(String data) { ... }
}
前面示例的思路是,每当在jakarta.jms.Destination myDestination上有消息可用时,就会相应地调用processOrder方法(在这种情况下,使用JMS消息的内容,类似于MessageListenerAdapter所提供的内容)。
带注解的端点基础设施会为每个带注解的方法在后台创建一个消息监听器容器,方法是使用 JmsListenerContainerFactory。
这样的容器不会注册到应用程序上下文中,但可以通过使用 JmsListenerEndpointRegistry bean 轻松定位以用于管理目的。
@JmsListener 是 Java 8 上的可重复注解,因此可以通过向其添加额外的 @JmsListener
声明,将多个 JMS 目标与同一方法关联。 |
启用监听器端点注解
要支持 @JmsListener 注解,请将 @EnableJms 添加到您的一个 @Configuration 类中,如下例所示:
@Configuration
@EnableJms
public class AppConfig {
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setDestinationResolver(destinationResolver());
factory.setSessionTransacted(true);
factory.setConcurrency("3-10");
return factory;
}
}
默认情况下,基础设施会查找名为 jmsListenerContainerFactory 的 bean 作为工厂的来源,该工厂用于创建消息监听容器。在这种情况下(忽略 JMS 基础设施设置),您可以调用 processOrder 方法,并指定核心线程数为三个,最大线程数为十个。
您可以自定义用于每个注释的监听器容器工厂,或者可以通过实现 JmsListenerConfigurer 接口来配置一个显式的默认值。
如果至少有一个端点注册时没有特定的容器工厂,则需要默认值。有关详细信息和示例,请参阅实现
JmsListenerConfigurer
类的 Javadoc。
如果您更倾向于 XML 配置,可以使用 <jms:annotation-driven>
元素,如下例所示:
<jms:annotation-driven/>
<bean id="jmsListenerContainerFactory"
class="org.springframework.jms.config.DefaultJmsListenerContainerFactory">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destinationResolver" ref="destinationResolver"/>
<property name="sessionTransacted" value="true"/>
<property name="concurrency" value="3-10"/>
</bean>
编程式端点注册
JmsListenerEndpoint 提供了一个 JMS 端点的模型,并负责配置该模型的容器。基础设施允许您以编程方式配置端点,除了由 JmsListener 注解检测到的端点之外。
以下示例显示了如何操作:
@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {
@Override
public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
endpoint.setId("myJmsEndpoint");
endpoint.setDestination("anotherQueue");
endpoint.setMessageListener(message -> {
// processing
});
registrar.registerEndpoint(endpoint);
}
}
在前面的例子中,我们使用了 SimpleJmsListenerEndpoint,它提供了实际的 MessageListener 来调用。但是,您也可以构建自己的端点变体来描述自定义的调用机制。
请注意,您可以完全跳过使用 @JmsListener,并通过 JmsListenerConfigurer 以编程方式仅注册您的端点。
注解端点方法签名
到目前为止,我们一直在端点中注入一个简单的 String,但它实际上可以有非常灵活的方法签名。在下面的例子中,我们将其重写为通过自定义标题注入 Order:
@Component
public class MyService {
@JmsListener(destination = "myDestination")
public void processOrder(Order order, @Header("order_type") String orderType) {
...
}
}
JMS监听器端点中可以注入的主要元素如下:
-
原始
jakarta.jms.Message或其任何子类(前提是它与传入的消息类型匹配)。 -
对于可选访问原生JMS API(例如,用于发送自定义回复)的
jakarta.jms.Session。 -
表示传入JMS消息的
org.springframework.messaging.Message。 请注意,此消息包含自定义和标准头信息(由JmsHeaders定义)。 -
@Header注解的方法参数,用于提取特定的头信息,包括 标准的 JMS 头信息。 -
一个带有
@Headers注解的参数,也必须可分配给java.util.Map才能访问所有头信息。 -
一个未使用注解且不是支持类型(
Message或Session)的元素被视为有效负载。可以通过使用@Payload注解参数来明确这一点。您还可以通过添加额外的@Valid来开启验证。
注入 Spring 的 Message 抽象功能对于利用传输特定消息中存储的所有信息非常有用,而无需依赖传输特定的 API。下面的示例展示了如何做到这一点:
@JmsListener(destination = "myDestination")
public void processOrder(Message<Order> order) { ... }
方法参数的处理由 DefaultMessageHandlerMethodFactory 提供,您可以进一步自定义以支持其他方法参数。您也可以在其中自定义转换和验证支持。
例如,如果我们想在处理之前确保我们的 Order 是有效的,可以使用 @Valid 对负载进行注解,并配置必要的验证器,如下例所示:
@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {
@Override
public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(myJmsHandlerMethodFactory());
}
@Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setValidator(myValidator());
return factory;
}
}
响应管理
现有的支持在 MessageListenerAdapter
已经允许您的方法具有非void返回类型。在这种情况下,调用的结果会被封装在jakarta.jms.Message中,通过原始消息的JMSReplyTo头指定的目标发送,或者通过监听器上配置的默认目标发送。现在您可以使用消息抽象的@SendTo注解来设置该默认目标。
假设我们的 processOrder 方法现在应该返回一个 OrderStatus,我们可以编写它以自动发送响应,如下例所示:
@JmsListener(destination = "myDestination")
@SendTo("status")
public OrderStatus processOrder(Order order) {
// order processing
return status;
}
如果你有多个 @JmsListener-注解的方法,也可以将 @SendTo
注解放在类级别,以共享一个默认的回复目标。 |
如果您需要以与传输无关的方式设置其他标头,可以返回一个
Message,并使用如下方法:
@JmsListener(destination = "myDestination")
@SendTo("status")
public Message<OrderStatus> processOrder(Order order) {
// order processing
return MessageBuilder
.withPayload(status)
.setHeader("code", 1234)
.build();
}
如果您需要在运行时计算响应目标,可以将您的响应封装在一个JmsResponse实例中,该实例在运行时还提供要使用的目地。我们可以将前面的示例重写如下:
@JmsListener(destination = "myDestination")
public JmsResponse<Message<OrderStatus>> processOrder(Order order) {
// order processing
Message<OrderStatus> response = MessageBuilder
.withPayload(status)
.setHeader("code", 1234)
.build();
return JmsResponse.forQueue(response, "status");
}
最后,如果您需要为响应指定一些QoS值,例如优先级或生存时间,可以相应地配置JmsListenerContainerFactory,
如下面的示例所示:
@Configuration
@EnableJms
public class AppConfig {
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
QosSettings replyQosSettings = new QosSettings();
replyQosSettings.setPriority(2);
replyQosSettings.setTimeToLive(10000);
factory.setReplyQosSettings(replyQosSettings);
return factory;
}
}