对于最新稳定版本,请使用 Spring Framework 7.0.6spring-doc.cadn.net.cn

基于注解驱动的监听器端点

接收消息最简单的方式是使用带注解的监听器端点基础设施。简而言之,它允许你将一个托管 Bean 的方法暴露为 JMS 监听器端点。以下示例展示了如何使用它:spring-doc.cadn.net.cn

@Component
public class MyService {

	@JmsListener(destination = "myDestination")
	public void processOrder(String data) { ... }
}

上述示例的理念是,每当 jakarta.jms.Destination myDestination 上有消息可用时,就会相应地调用 processOrder 方法(在本例中,使用 JMS 消息的内容,类似于 MessageListenerAdapter 所提供的功能)。spring-doc.cadn.net.cn

带注解的端点基础设施会为每个带注解的方法在后台创建一个消息监听器容器,该容器通过使用 JmsListenerContainerFactory 来创建。 此类容器不会注册到应用上下文中,但可以通过 JmsListenerEndpointRegistry bean 轻松定位,以便进行管理。spring-doc.cadn.net.cn

@JmsListener 是 Java 8 中的可重复注解,因此你可以通过在同一个方法上添加多个 @JmsListener 声明,将其与多个 JMS 目的地关联起来。

启用监听器端点注解

要启用对 @JmsListener 注解的支持,您可以在其中一个 @EnableJms 类上添加 @Configuration,如下例所示:spring-doc.cadn.net.cn

@Configuration
@EnableJms
public class AppConfig {

	@Bean
	public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
		DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
		factory.setConnectionFactory(connectionFactory());
		factory.setDestinationResolver(destinationResolver());
		factory.setSessionTransacted(true);
		factory.setConcurrency("3-10");
		return factory;
	}
}

默认情况下,基础设施会查找名为 jmsListenerContainerFactory 的 bean, 作为用于创建消息监听器容器的工厂来源。在这种情况下(忽略 JMS 基础设施的设置), 你可以使用核心线程池大小为 3、最大线程池大小为 10 来调用 processOrder 方法。spring-doc.cadn.net.cn

您可以为每个注解自定义要使用的监听器容器工厂,或者通过实现 JmsListenerConfigurer 接口来配置一个明确的默认值。 仅当至少有一个端点在没有指定特定容器工厂的情况下注册时,才需要该默认值。有关详细信息和示例,请参阅实现 JmsListenerConfigurer 的类的 Javadoc。spring-doc.cadn.net.cn

如果你更喜欢XML 配置,可以使用<jms:annotation-driven>元素,如下例所示:spring-doc.cadn.net.cn

<jms:annotation-driven/>

<bean id="jmsListenerContainerFactory"
		class="org.springframework.jms.config.DefaultJmsListenerContainerFactory">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destinationResolver" ref="destinationResolver"/>
	<property name="sessionTransacted" value="true"/>
	<property name="concurrency" value="3-10"/>
</bean>

编程式端点注册

JmsListenerEndpoint 提供了一个 JMS 端点的模型,并负责根据该模型配置容器。 除了通过 JmsListener 注解检测到的端点之外,该基础设施还允许你以编程方式配置端点。 以下示例展示了如何实现这一点:spring-doc.cadn.net.cn

@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {

	@Override
	public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
		SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
		endpoint.setId("myJmsEndpoint");
		endpoint.setDestination("anotherQueue");
		endpoint.setMessageListener(message -> {
			// processing
		});
		registrar.registerEndpoint(endpoint);
	}
}

在前面的示例中,我们使用了 SimpleJmsListenerEndpoint,它提供了要调用的实际 MessageListener。不过,你也可以构建自己的端点变体,以描述自定义的调用机制。spring-doc.cadn.net.cn

请注意,您可以完全跳过使用 @JmsListener,而仅通过 JmsListenerConfigurer 以编程方式注册您的端点。spring-doc.cadn.net.cn

带注解的端点方法签名

到目前为止,我们一直在端点中注入一个简单的String,但实际上它的方法签名可以非常灵活。在下面的示例中,我们将其重写为通过自定义请求头注入Order对象:spring-doc.cadn.net.cn

@Component
public class MyService {

	@JmsListener(destination = "myDestination")
	public void processOrder(Order order, @Header("order_type") String orderType) {
		...
	}
}

你可以在 JMS 监听器端点中注入的主要元素如下:spring-doc.cadn.net.cn

  • 原始的 jakarta.jms.Message 或其任意子类(前提是它与传入的消息类型匹配)。spring-doc.cadn.net.cn

  • 用于可选地访问原生 JMS API 的 jakarta.jms.Session(例如,用于发送自定义回复)。spring-doc.cadn.net.cn

  • 表示传入的 JMS 消息的 org.springframework.messaging.Message。 请注意,此消息同时包含自定义头信息和标准头信息(由 JmsHeaders 定义)。spring-doc.cadn.net.cn

  • @Header 注解的方法参数用于提取特定的头部值,包括标准的 JMS 头部。spring-doc.cadn.net.cn

  • 一个使用 @Headers 注解的参数,该参数还必须可赋值给 java.util.Map,以便访问所有请求头。spring-doc.cadn.net.cn

  • 一个未加注解且不属于受支持类型(MessageSession)的元素将被视为有效载荷(payload)。您可以通过在参数上添加 @Payload 注解来显式地表明这一点。此外,您还可以通过额外添加 @Valid 注解来启用验证。spring-doc.cadn.net.cn

注入 Spring 的 Message 抽象的能力特别有用,它能让你充分利用存储在传输协议特定消息中的所有信息,而无需依赖于传输协议特定的 API。以下示例展示了如何实现这一点:spring-doc.cadn.net.cn

@JmsListener(destination = "myDestination")
public void processOrder(Message<Order> order) { ... }

方法参数的处理由 DefaultMessageHandlerMethodFactory 提供,您可以进一步自定义该工厂以支持额外的方法参数。您也可以在此处自定义类型转换和验证支持。spring-doc.cadn.net.cn

例如,如果我们希望在处理 Order 之前确保其有效性,可以使用 @Valid 注解标注该有效载荷,并配置必要的验证器,如下例所示:spring-doc.cadn.net.cn

@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {

	@Override
	public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
		registrar.setMessageHandlerMethodFactory(myJmsHandlerMethodFactory());
	}

	@Bean
	public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
		DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
		factory.setValidator(myValidator());
		return factory;
	}
}

响应管理

MessageListenerAdapter 中现有的支持已经允许您的方法具有非void返回类型。在这种情况下,调用的结果会被封装在jakarta.jms.Message中,并发送到原始消息的JMSReplyTo头中指定的目标,或发送到监听器上配置的默认目标。您现在可以使用消息抽象的@SendTo注解来设置该默认目标。spring-doc.cadn.net.cn

假设我们的 processOrder 方法现在应返回一个 OrderStatus,我们可以将其编写为自动发送响应,如下例所示:spring-doc.cadn.net.cn

@JmsListener(destination = "myDestination")
@SendTo("status")
public OrderStatus processOrder(Order order) {
	// order processing
	return status;
}
如果你有多个使用 @JmsListener 注解的方法,也可以将 @SendTo 注解放在类级别上,以共享一个默认的回复目标。

如果你需要以与传输无关的方式设置额外的头部信息,可以改为返回一个Message,方法类似于以下所示:spring-doc.cadn.net.cn

@JmsListener(destination = "myDestination")
@SendTo("status")
public Message<OrderStatus> processOrder(Order order) {
	// order processing
	return MessageBuilder
			.withPayload(status)
			.setHeader("code", 1234)
			.build();
}

如果你需要在运行时计算响应的目标地址,可以将你的响应封装在一个 JmsResponse 实例中,该实例同时提供运行时要使用的目标地址。我们可以将前面的示例重写如下:spring-doc.cadn.net.cn

@JmsListener(destination = "myDestination")
public JmsResponse<Message<OrderStatus>> processOrder(Order order) {
	// order processing
	Message<OrderStatus> response = MessageBuilder
			.withPayload(status)
			.setHeader("code", 1234)
			.build();
	return JmsResponse.forQueue(response, "status");
}

最后,如果你需要为响应指定某些服务质量(QoS)值,例如优先级或生存时间(TTL),你可以相应地配置 JmsListenerContainerFactory,如下例所示:spring-doc.cadn.net.cn

@Configuration
@EnableJms
public class AppConfig {

	@Bean
	public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
		DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
		factory.setConnectionFactory(connectionFactory());
		QosSettings replyQosSettings = new QosSettings();
		replyQosSettings.setPriority(2);
		replyQosSettings.setTimeToLive(10000);
		factory.setReplyQosSettings(replyQosSettings);
		return factory;
	}
}