连接与资源管理
尽管我们在前一节中描述的 AMQP 模型是通用的,适用于所有实现,但在资源管理方面,具体细节则取决于消息代理(broker)的实现。因此,在本节中,我们仅关注存在于我们的“spring-rabbit”模块中的代码,因为此时 RabbitMQ 是唯一受支持的实现。
管理与 RabbitMQ 代理连接的核心组件是 ConnectionFactory 接口。 ConnectionFactory 实现的职责是提供一个 org.springframework.amqp.rabbit.connection.Connection 的实例,该实例是对 com.rabbitmq.client.Connection 的封装。
选择连接工厂
有三种连接工厂可供选择
-
PooledChannelConnectionFactory -
ThreadChannelConnectionFactory -
CachingConnectionFactory
前两个是在版本 2.3 中添加的。
对于大多数用例,应使用 CachingConnectionFactory。如果希望确保严格的消息顺序且无需使用 作用域操作,则可使用 ThreadChannelConnectionFactory。PooledChannelConnectionFactory 与 CachingConnectionFactory 类似,它使用单个连接和通道池。其实现更为简单,但不支持关联的发布者确认。
所有三种工厂均支持简单的发布者确认。
在配置 RabbitTemplate 以使用 独立连接 时,从版本 2.3.2 开始,您可以将发布连接工厂配置为不同类型的连接工厂。默认情况下,发布工厂与主工厂类型相同,且在主工厂上设置的任何属性也会传播到发布工厂。
从版本 3.1 开始,AbstractConnectionFactory 包含了 connectionCreatingBackOff 属性,该属性支持在连接模块中使用退避策略。目前,在 createChannel() 的行为中已提供对异常的支持,以处理当 channelMax 限制达到时发生的异常情况,并基于尝试次数和间隔时间实现退避策略。
PooledChannelConnectionFactory
该工厂管理一个单一连接和两个通道池,基于 Apache Pool2。一个池用于事务性通道,另一个用于非事务性通道。这些池采用默认配置的 GenericObjectPool;提供了回调函数以配置这些池;更多详情请参阅 Apache 文档。
Apache commons-pool2 jar 必须在类路径中才能使用此工厂。
@Bean
PooledChannelConnectionFactory pcf() throws Exception {
ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
rabbitConnectionFactory.setHost("localhost");
PooledChannelConnectionFactory pcf = new PooledChannelConnectionFactory(rabbitConnectionFactory);
pcf.setPoolConfigurer((pool, tx) -> {
if (tx) {
// configure the transactional pool
}
else {
// configure the non-transactional pool
}
});
return pcf;
}
ThreadChannelConnectionFactory
该工厂管理一个连接和两个 ThreadLocal,其中一个用于事务性通道,另一个用于非事务性通道。该工厂确保同一线程上的所有操作均使用相同的通道(只要通道保持打开状态)。这有助于实现严格的消息顺序,而无需依赖 作用域操作。为避免内存泄漏,如果您的应用程序使用大量短生命周期的线程,则必须调用工厂的 closeThreadChannel() 以释放通道资源。自版本 2.3.7 起,线程可将其通道转移给另一线程。有关更多信息,请参阅 多线程环境中的严格消息顺序。
CachingConnectionFactory
提供的第三种实现是 CachingConnectionFactory,它默认建立一个单连接代理,该代理可由应用程序共享。由于与 AMQP 进行消息通信的“工作单元”实际上是一个“通道”(在某些方面,这类似于 JMS 中连接与会话之间的关系),因此可以共享连接。连接实例提供了一个 createChannel 方法。CachingConnectionFactory 的实现支持缓存这些通道,并根据通道是否为事务性维护独立的缓存。在创建 CachingConnectionFactory 实例时,您可以通过构造函数提供 'hostname'。您还应提供 'username' 和 'password' 属性。要配置通道缓存的大小(默认为 25),您可以调用 setChannelCacheSize() 方法。
从版本 1.3 开始,您可以配置 CachingConnectionFactory 来缓存连接以及仅缓存通道。在这种情况下,每次调用 createConnection() 都会创建一个新的连接(或从缓存中获取一个空闲连接)。关闭连接时,会将其返回到缓存中(前提是缓存大小未达到上限)。在这些连接上创建的通道也会被缓存。使用独立连接在某些环境中可能很有用,例如:在高可用性(HA)集群中消费数据,结合负载均衡器以连接到不同的集群成员等。要缓存连接,请将 cacheMode 设置为 CacheMode.CONNECTION。
| 这并不限制连接的数量。相反,它指定了允许保持空闲状态的打开连接的最大数量。 |
从版本 1.5.5 开始,新增了一个名为 connectionLimit 的属性。
当设置此属性时,它会限制允许的总连接数。
当设置该限制后,若达到上限,则使用 channelCheckoutTimeLimit 来等待连接变为空闲状态。
如果超时,则抛出 AmqpTimeoutException。
|
当缓存模式为 此外,在本文撰写时, |
重要的是要理解,缓存大小(默认情况下)并非一个限制,而仅仅是可缓存的通道数量。</p><p>例如,若缓存大小为10,则实际可使用的通道数量可以超过10个。</p><p>如果使用了超过10个通道,并且所有这些通道都返回到缓存中,则最多有10个通道进入缓存;其余的通道则会被物理关闭。
从版本 1.6 开始,默认通道缓存大小已从 1 增加到 25。在高流量、多线程环境中,较小的缓存意味着通道会以较高的频率被创建和关闭。增大默认缓存大小可避免此类开销。您应通过 RabbitMQ 管理界面监控正在使用的通道,并在观察到大量通道被创建和关闭时,考虑进一步增加缓存大小。该缓存仅按需增长(以适应应用程序的并发需求),因此此更改不会影响现有低流量应用。
从版本 1.4.2 开始,CachingConnectionFactory 具有一个名为 channelCheckoutTimeout 的属性。当该属性的值大于零时,channelCacheSize 就成为连接上可创建通道数量的限制。如果达到该限制,调用线程将阻塞,直到有通道可用或此超时到期为止,此时将抛出 AmqpTimeoutException。
框架内使用的通道(例如,RabbitTemplate)会被可靠地返回到缓存中。如果您在框架外部创建通道(例如,通过直接访问连接并调用 createChannel()),则必须可靠地将其返回(例如,通过关闭操作),可能需在 finally 块中完成,以避免耗尽通道。 |
以下示例展示了如何创建一个新的 connection:
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.createConnection();
在使用 XML 时,配置可能如下例所示:
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
</bean>
框架的单元测试代码中还提供了一个 SingleConnectionFactory 实现。该实现比 如果您因某种原因需要为某项功能自定义实现 |
A ConnectionFactory 可以通过使用 rabbit 命名空间快速便捷地创建,如下所示:
<rabbit:connection-factory id="connectionFactory"/>
在大多数情况下,这种做法更可取,因为框架可以为您选择最佳默认值。
创建的实例是一个 CachingConnectionFactory。
请记住,通道的默认缓存大小为 25。
如果您希望缓存更多通道,请通过设置 'channelCacheSize' 属性来设定一个更大的值。
在 XML 中,其形式如下:
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
<property name="channelCacheSize" value="50"/>
</bean>
此外,使用命名空间时,您还可以添加 'channel-cache-size' 属性,如下所示:
<rabbit:connection-factory
id="connectionFactory" channel-cache-size="50"/>
默认缓存模式为 CHANNEL,但您可以将其配置为缓存连接。
在以下示例中,我们使用 connection-cache-size:
<rabbit:connection-factory
id="connectionFactory" cache-mode="CONNECTION" connection-cache-size="25"/>
您可以使用命名空间提供主机和端口属性,如下所示:
<rabbit:connection-factory
id="connectionFactory" host="somehost" port="5672"/>
或者,如果在集群环境中运行,可以使用 <code>addresses</code> 属性,如下所示:
<rabbit:connection-factory
id="connectionFactory" addresses="host1:5672,host2:5672" address-shuffle-mode="RANDOM"/>
请参阅 连接到集群 以获取有关 address-shuffle-mode 的信息。
以下是一个自定义线程工厂的示例,该工厂将线程名称前缀为 rabbitmq-:
<rabbit:connection-factory id="multiHost" virtual-host="/bar" addresses="host1:1234,host2,host3:4567"
thread-factory="tf"
channel-cache-size="10" username="user" password="password" />
<bean id="tf" class="org.springframework.scheduling.concurrent.CustomizableThreadFactory">
<constructor-arg value="rabbitmq-" />
</bean>
命名连接
从版本 1.7 开始,为注入到 AbstractionConnectionFactory 中提供了一个 ConnectionNameStrategy。
connectionFactory.setConnectionNameStrategy(connectionFactory -> "MY_CONNECTION");
参数 ConnectionFactory 可用于通过某些逻辑区分目标连接名称。
默认情况下,使用 AbstractConnectionFactory 的 beanName(一个表示对象的十六进制字符串)和内部计数器来生成 connection_name。
命名空间组件 <rabbit:connection-factory> 还会提供 connection-name-strategy 属性。
一个 SimplePropertyValueConnectionNameStrategy 的实现将连接名称设置为应用程序属性。您可将其声明为 @Bean,并注入到连接工厂中,如下例所示:
@Bean
public SimplePropertyValueConnectionNameStrategy cns() {
return new SimplePropertyValueConnectionNameStrategy("spring.application.name");
}
@Bean
public ConnectionFactory rabbitConnectionFactory(ConnectionNameStrategy cns) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
...
connectionFactory.setConnectionNameStrategy(cns);
return connectionFactory;
}
该属性必须存在于应用上下文的 Environment 中。
当使用 Spring Boot 及其自动配置的连接工厂时,您只需声明 ConnectionNameStrategy @Bean。Spring Boot 会自动检测该 Bean,并将其注入到工厂中。 |
阻止的连接和资源限制
连接可能因与 内存警报 对应的代理(broker)阻止了交互而被阻断。从 2.0 版本开始,org.springframework.amqp.rabbit.connection.Connection 可以通过提供 com.rabbitmq.client.BlockedListener 个实例来接收连接被阻断和解除阻断事件的通知。此外,AbstractConnectionFactory 会通过其内部的 BlockedListener 实现分别发出 ConnectionBlockedEvent 和 ConnectionUnblockedEvent。这些功能使您能够为代理端出现的问题提供应用程序逻辑,并(例如)采取一些纠正措施。
当应用程序配置为单个 CachingConnectionFactory 时(这正是 Spring Boot 自动配置的默认设置),一旦连接被 Broker 阻塞,应用程序将停止工作。当连接被 Broker 阻塞时,其所有客户端均会停止工作。如果我们在一个应用程序中同时拥有生产者和消费者,就可能在生产者阻塞连接(因为 Broker 上已无可用资源)而消费者无法释放这些资源(因为连接已被阻塞)的情况下陷入死锁。 为缓解此问题,我们建议再设置一个独立的 对于在消费者线程上执行的事务性生产者而言,单独设置 |
从版本 2.0.2 开始,RabbitTemplate 提供了一个配置选项,可自动使用第二个连接工厂,除非正在使用事务。使用独立连接 获取更多信息。发布者连接的 ConnectionNameStrategy 与主策略相同,即在调用该方法的结果后附加 .publisher。
从版本 1.7.7 开始,提供了一个 AmqpResourceNotAvailableException 异常,当 SimpleConnection.createChannel() 无法创建 Channel 时(例如,由于 channelMax 限制已达到,且缓存中没有可用通道)会抛出该异常。您可以在 RetryPolicy 中使用此异常,在经过一段退避时间后恢复操作。
配置底层客户端连接工厂
该 CachingConnectionFactory 使用了 Rabbit 客户端 ConnectionFactory 的一个实例。当在 CachingConnectionFactory 上设置等效属性时,会通过若干配置属性(例如 host、port、userName、password、requestedHeartBeat 和 connectionTimeout)传递过去。要设置其他属性(例如 clientProperties),您可以定义一个 Rabbit 工厂的实例,并通过 CachingConnectionFactory 的适当构造函数提供对其的引用。在使用命名空间(如前所述)时,您需要在 connection-factory 属性中提供已配置工厂的引用。为方便起见,提供了一个工厂 Bean,以协助在 Spring 应用上下文中配置连接工厂,如 下一节 所讨论。
<rabbit:connection-factory
id="connectionFactory" connection-factory="rabbitConnectionFactory"/>
4.0.x 客户端默认启用自动恢复功能。虽然与该功能兼容,但 Spring AMQP 自身也具有恢复机制,因此客户端的自动恢复功能通常并不需要。我们建议禁用 amqp-client 自动恢复,以避免在代理(broker)可用但连接尚未恢复时出现 AutoRecoverConnectionNotCurrentlyOpenException 个实例的情况。例如,当在 RabbitTemplate 中配置了 RetryTemplate,即使在集群中故障转移到另一个代理时,您也可能遇到此类异常。由于自动恢复连接是基于定时器进行恢复的,因此使用 Spring AMQP 的恢复机制可使连接更快地恢复。从版本 1.7.1 开始,Spring AMQP 默认禁用 amqp-client 自动恢复,除非您显式创建自己的 RabbitMQ 连接工厂并将其提供给 CachingConnectionFactory。由 RabbitConnectionFactoryBean 创建的 RabbitMQ ConnectionFactory 实例也默认禁用此选项。 |
RabbitConnectionFactoryBean和配置SSL
从版本 1.4 开始,提供了一个便捷的 RabbitConnectionFactoryBean,可通过依赖注入来方便地配置底层客户端连接工厂的 SSL 属性。其他设置器会委托给底层工厂。此前,您必须通过编程方式配置 SSL 选项。以下示例展示了如何配置一个 RabbitConnectionFactoryBean:
-
Java
-
XML
@Bean
RabbitConnectionFactoryBean rabbitConnectionFactory() {
RabbitConnectionFactoryBean factoryBean = new RabbitConnectionFactoryBean();
factoryBean.setUseSSL(true);
factoryBean.setSslPropertiesLocation(new ClassPathResource("secrets/rabbitSSL.properties"));
return factoryBean;
}
@Bean
CachingConnectionFactory connectionFactory(ConnectionFactory rabbitConnectionFactory) {
CachingConnectionFactory ccf = new CachingConnectionFactory(rabbitConnectionFactory);
ccf.setHost("...");
// ...
return ccf;
}
<bean id="rabbitConnectionFactory"
class="org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean">
<property name="useSSL" value="true" />
<property name="sslPropertiesLocation" value="classpath:secrets/rabbitSSL.properties"/>
</bean>
<rabbit:connection-factory id="connectionFactory"
connection-factory="rabbitConnectionFactory"
host="${host}"
port="${port}"
virtual-host="${vhost}"
username="${username}" password="${password}" />
Spring Boot 应用程序文件 (.yaml 或 .properties)
-
Properties
-
YAML
spring.rabbitmq.host=...
spring.rabbitmq.ssl.keyStoreType=jks
spring.rabbitmq.ssl.trustStoreType=jks
spring.rabbitmq.ssl.keyStore=...
spring.rabbitmq.ssl.trustStore=...
spring.rabbitmq.ssl.trustStorePassword=...
spring.rabbitmq.ssl.keyStorePassword=...
spring.rabbitmq.ssl.enabled=true
spring:
rabbitmq:
host: ...
ssl:
keyStoreType: jks
trustStoreType: jks
keyStore: ...
trustStore: ...
trustStorePassword: ...
keyStorePassword: ...
enabled: true
参见 RabbitMQ 文档,了解有关配置 SSL 的信息。
省略 keyStore 和 trustStore 配置项,即可在不验证证书的情况下通过 SSL 连接。
下一个示例展示了如何提供密钥库和信任库的配置。
属性 sslPropertiesLocation 是一个 Spring Resource,指向一个包含以下键值的属性文件:
keyStore=file:/secret/keycert.p12
trustStore=file:/secret/trustStore
keyStore.passPhrase=secret
trustStore.passPhrase=secret
代码 keyStore 和 truststore 是 Spring Resources 指向存储库的引用。
通常,该属性文件由操作系统保护,应用程序仅具有读取访问权限。
从 Spring AMQP 1.5 版本开始,您可以直接在工厂 bean 上设置这些属性。如果同时提供了离散属性和 sslPropertiesLocation,后者提供的属性将覆盖前者离散值。
从版本 2.0 开始,服务器证书默认进行验证,因为这样更安全。如果出于某些原因希望跳过此验证,请将工厂 Bean 的 skipServerCertificateValidation 属性设置为 true。从版本 2.1 开始,RabbitConnectionFactoryBean 现在默认调用 enableHostnameVerification()。若要恢复到以前的行为,请将 enableHostnameVerification 属性设置为 false。 |
| 从版本 2.2.5 开始,工厂 Bean 将始终默认使用 TLS v1.2;此前,在某些情况下使用的是 v1.1,而在其他情况下则使用 v1.2(具体取决于其他属性)。 如果您出于某种原因需要使用 v1.1,请设置 |
连接到集群
要连接到集群,请在 addresses 上配置 CachingConnectionFactory 属性:
@Bean
public CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory();
ccf.setAddresses("host1:5672,host2:5672,host3:5672");
return ccf;
}
从版本 3.0 开始,底层连接工厂在建立新连接时,将尝试通过选择一个随机地址来连接到主机。要恢复之前的行为(即按从第一个到最后一个的顺序尝试连接),请将 addressShuffleMode 属性设置为 AddressShuffleMode.NONE。
从版本 2.3 开始,添加了 INORDER 混洗模式,即在建立连接后,第一个地址会被移动到末尾。如果您希望在所有节点上从所有分片中消费消息,可以将此模式与 RabbitMQ 分片插件 配合 CacheMode.CONNECTION 和适当的并发设置一起使用。
@Bean
public CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory();
ccf.setAddresses("host1:5672,host2:5672,host3:5672");
ccf.setAddressShuffleMode(AddressShuffleMode.INORDER);
return ccf;
}
路由连接工厂
从版本 1.3 开始,引入了 AbstractRoutingConnectionFactory。此工厂提供了一种机制,用于配置多个 ConnectionFactories 的映射,并在运行时通过某种 lookupKey 确定目标 ConnectionFactory。通常,该实现会检查线程绑定的上下文。为方便起见,Spring AMQP 提供了 SimpleRoutingConnectionFactory,它可从 SimpleResourceHolder 中获取当前线程绑定的 lookupKey。以下示例展示了如何在 XML 和 Java 中配置一个 SimpleRoutingConnectionFactory:
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory">
<property name="targetConnectionFactories">
<map>
<entry key="#{connectionFactory1.virtualHost}" ref="connectionFactory1"/>
<entry key="#{connectionFactory2.virtualHost}" ref="connectionFactory2"/>
</map>
</property>
</bean>
<rabbit:template id="template" connection-factory="connectionFactory" />
public class MyService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void service(String vHost, String payload) {
SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), vHost);
rabbitTemplate.convertAndSend(payload);
SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());
}
}
使用后重要的是解除资源的绑定。
有关更多信息,请参阅 JavaDoc 中 AbstractRoutingConnectionFactory 的相关内容。
从版本 1.4 开始,RabbitTemplate 支持 SpEL sendConnectionFactorySelectorExpression 和 receiveConnectionFactorySelectorExpression 属性,这些属性会在每次 AMQP 协议交互操作(send、sendAndReceive、receive 或 receiveAndReply)中进行求值,从而为提供的 AbstractRoutingConnectionFactory 解析出一个 lookupKey 值。
您可以在表达式中使用 Bean 引用,例如 @vHostResolver.getVHost(#root)。
对于 send 操作,待发送的消息是根求值对象。
对于 receive 操作,queueName 是根求值对象。
路由算法如下:如果选择器表达式为 null,或其计算结果为 null,或提供的 ConnectionFactory 不是 AbstractRoutingConnectionFactory 的实例,则一切照常进行,依赖于所提供的 ConnectionFactory 实现。
若评估结果不是 null,但该 lookupKey 对应的目标 ConnectionFactory 不存在,且 AbstractRoutingConnectionFactory 配置为 lenientFallback = true,也会发生相同情况。
对于 AbstractRoutingConnectionFactory 的情况,它会根据 determineCurrentLookupKey() 回退到其 routing 实现。
然而,如果 lenientFallback = false 成立,则会抛出一个 IllegalStateException。
命名空间支持还为 <rabbit:template> 组件提供了 send-connection-factory-selector-expression 和 receive-connection-factory-selector-expression 属性。
此外,从版本 1.4 开始,您可以在监听器容器中配置一个路由连接工厂。在这种情况下,队列名称列表用作查找键。例如,如果您将容器配置为 setQueueNames("thing1", "thing2"),则查找键为 [thing1,thing]"(注意,该键中没有空格)。
从版本 1.6.9 开始,您可以通过在监听器容器中使用 setLookupKeyQualifier 作为查找键的限定符来添加限定符。这样便可实现例如:监听具有相同名称但位于不同虚拟主机中的队列(此时您可为每个虚拟主机配置一个连接工厂)。
例如,使用查找键限定符 thing1 以及一个监听队列 thing2 的容器时,您可以将目标连接工厂注册的查找键设置为 thing1[thing2]。
| 目标(如果提供则为默认值)连接工厂必须具有相同的发布确认和返回设置。请参阅 发布确认和返回。 |
从版本 2.4.4 开始,此验证可以被禁用。如果您的场景中确认值与返回值需要不相等,您可以使用 AbstractRoutingConnectionFactory#setConsistentConfirmsReturns 来关闭该验证。注意:首先添加到 AbstractRoutingConnectionFactory 的连接工厂将决定 confirms 和 returns 的通用值。
如果您的场景中某些消息需要检查确认/返回,而其他消息则不需要,这可能会很有用。例如:
@Bean
public RabbitTemplate rabbitTemplate() {
final com.rabbitmq.client.ConnectionFactory cf = new com.rabbitmq.client.ConnectionFactory();
cf.setHost("localhost");
cf.setPort(5672);
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(cf);
cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
PooledChannelConnectionFactory pooledChannelConnectionFactory = new PooledChannelConnectionFactory(cf);
final Map<Object, ConnectionFactory> connectionFactoryMap = new HashMap<>(2);
connectionFactoryMap.put("true", cachingConnectionFactory);
connectionFactoryMap.put("false", pooledChannelConnectionFactory);
final AbstractRoutingConnectionFactory routingConnectionFactory = new SimpleRoutingConnectionFactory();
routingConnectionFactory.setConsistentConfirmsReturns(false);
routingConnectionFactory.setDefaultTargetConnectionFactory(pooledChannelConnectionFactory);
routingConnectionFactory.setTargetConnectionFactories(connectionFactoryMap);
final RabbitTemplate rabbitTemplate = new RabbitTemplate(routingConnectionFactory);
final Expression sendExpression = new SpelExpressionParser().parseExpression(
"messageProperties.headers['x-use-publisher-confirms'] ?: false");
rabbitTemplate.setSendConnectionFactorySelectorExpression(sendExpression);
}
这样,带有头部 x-use-publisher-confirms: true 的消息将通过缓存连接发送,您可以确保消息的传递。
有关确保消息传递的更多信息,请参阅 发布者确认和返回。
队列亲和性与LocalizedQueueConnectionFactory
在集群中使用高可用(HA)队列时,为了获得最佳性能,您可能希望连接到实际的代理服务器(broker),该服务器上存放着主队列。CachingConnectionFactory 可以配置多个代理服务器地址。这是为了实现故障转移,客户端会按照配置的 AddressShuffleMode 顺序尝试连接。LocalizedQueueConnectionFactory 利用管理插件提供的 REST API 来确定哪个节点是该队列的主节点(lead node),然后创建(或从缓存中检索)一个 CachingConnectionFactory,使其仅连接到该节点。如果连接失败,则重新确定新的主节点,并由消费者连接至该节点。LocalizedQueueConnectionFactory 配置了一个默认的连接工厂,用于在无法确定队列物理位置时作为备用方案,此时它将正常连接到集群。
The LocalizedQueueConnectionFactory is a RoutingConnectionFactory and the SimpleMessageListenerContainer uses the queue names as the lookup key as discussed in Routing Connection Factory above.
出于这个原因(使用队列名称进行查找),LocalizedQueueConnectionFactory 只能在容器被配置为监听单个队列时使用。 |
| 每个节点上都必须启用 RabbitMQ 管理插件。 |
此连接工厂旨在用于长期连接,例如由 SimpleMessageListenerContainer 使用的连接。它不适用于短期连接使用(例如与 RabbitTemplate 配合使用),因为每次建立连接前调用 REST API 会带来额外开销。此外,对于发布操作,队列未知,且消息仍会被发布到所有集群成员,因此查找节点的逻辑价值有限。 |
以下示例配置展示了如何配置工厂:
@Autowired
private ConfigurationProperties props;
@Bean
public CachingConnectionFactory defaultConnectionFactory() {
CachingConnectionFactory cf = new CachingConnectionFactory();
cf.setAddresses(this.props.getAddresses());
cf.setUsername(this.props.getUsername());
cf.setPassword(this.props.getPassword());
cf.setVirtualHost(this.props.getVirtualHost());
return cf;
}
@Bean
public LocalizedQueueConnectionFactory queueAffinityCF(
@Qualifier("defaultConnectionFactory") ConnectionFactory defaultCF) {
return new LocalizedQueueConnectionFactory(defaultCF,
StringUtils.commaDelimitedListToStringArray(this.props.getAddresses()),
StringUtils.commaDelimitedListToStringArray(this.props.getAdminUris()),
StringUtils.commaDelimitedListToStringArray(this.props.getNodes()),
this.props.getVirtualHost(), this.props.getUsername(), this.props.getPassword(),
false, null);
}
请注意,前三个参数是 addresses、adminUris 和 nodes 的数组。这些参数是按位置排列的:当容器尝试连接到队列时,它会使用管理 API 来确定该队列的主节点,并连接到与该节点在相同数组位置上的地址。
从版本 3.0 开始,RabbitMQ 的 http-client 已不再用于访问 Rest API。相反,如果类路径中存在 |
将 WebFlux 添加到类路径中:
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
compile 'org.springframework.amqp:spring-rabbit'
您还可以通过实现 LocalizedQueueConnectionFactory#setNodeLocator 并重写其 createClient、restCall 和可选的 close 方法,来使用其他 REST 技术。
lqcf.setNodeLocator(new NodeLocator<MyClient>() {
@Override
public MyClient createClient(String userName, String password) {
...
}
@Override
public Map<String, Object> restCall(MyClient client, String baseUri, String vhost, String queue) throws URISyntaxException {
...
}
});
该框架提供了 WebFluxNodeLocator 和 RestTemplateNodeLocator,默认值如上所述。
发布者确认和返回
通过设置 CachingConnectionFactory 属性 publisherConfirmType 为 ConfirmType.CORRELATED,并将 publisherReturns 属性设为 'true',可支持已确认(带关联)并返回的消息。
当设置这些选项时,工厂创建的 0 个实例将被包装在 1 中,该包装用于促进回调机制。当获取此类通道时,客户端可向 3 注册一个 2。4 实现包含将确认或返回路由至相应监听器的逻辑。这些功能将在以下各节中进一步解释。
参见 关联发布者确认和返回 以及 simplePublisherConfirms 在 作用域操作 中。
| 有关更多背景信息,请参阅 RabbitMQ 团队撰写的博客文章《引入发布者确认》。 |
连接和通道监听器
连接工厂支持注册 ConnectionListener 和 ChannelListener 种实现。这使您能够接收与连接和通道相关的事件通知。(ConnectionListener 由 RabbitAdmin 在连接建立时用于执行声明操作——有关更多信息,请参阅 自动声明交换机、队列和绑定)。以下列表展示了 ConnectionListener 接口的定义:
@FunctionalInterface
public interface ConnectionListener {
void onCreate(Connection connection);
default void onClose(Connection connection) {
}
default void onShutDown(ShutdownSignalException signal) {
}
}
从版本 2.0 开始,org.springframework.amqp.rabbit.connection.Connection 对象可以提供 com.rabbitmq.client.BlockedListener 个实例,用于在连接被阻塞和解除阻塞事件发生时接收通知。
以下示例展示了 ChannelListener 接口的定义:
@FunctionalInterface
public interface ChannelListener {
void onCreate(Channel channel, boolean transactional);
default void onShutDown(ShutdownSignalException signal) {
}
}
参见 发布是异步的——如何检测成功和失败,了解一种可能需要注册 ChannelListener 的场景。
日志通道关闭事件
版本 1.5 引入了一种机制,使用户能够控制日志级别。
该 AbstractConnectionFactory 使用默认策略记录通道关闭,具体如下:
-
正常通道关闭(200 OK)不会被记录。
-
如果由于被动队列声明失败而导致通道关闭,则会在 DEBUG 级别记录日志。
-
如果由于独占消费者条件导致
basic.consume被拒绝而关闭通道,则会以 DEBUG 级别记录日志(自 3.1 版本起,此前为 INFO 级别)。 -
其他所有内容均以 ERROR 级别记录。
要修改此行为,您可以将自定义的 ConditionalExceptionLogger 注入到 CachingConnectionFactory 的 closeExceptionLogger 属性中。
此外,AbstractConnectionFactory.DefaultChannelCloseLogger 现在是公开的,允许对其进行子类化。
参见 消费者事件。
运行时缓存属性
从版本 1.6 开始,CachingConnectionFactory 现在通过 getCacheProperties() 方法提供缓存统计信息。这些统计信息可用于调整缓存,以优化其在生产环境中的性能。例如,高水位线可用于判断是否应增加缓存大小;若其等于缓存大小,您可能需要考虑进一步扩大缓存容量。以下表格描述了 CacheMode.CHANNEL 属性:
| 属性 | 含义 |
|---|---|
connectionName |
由 |
channelCacheSize |
当前配置的最大空闲通道数。 |
localPort |
连接的本地端口(如果可用)。<br/>此信息可用于与 RabbitMQ 管理界面中的连接和通道进行关联。 |
idleChannelsTx |
当前空闲(缓存)的事务通道数量。 |
idleChannelsNotTx |
当前空闲(缓存)的非事务性通道数量。 |
idleChannelsTxHighWater |
已缓存的、当前处于空闲状态(闲置)的最大事务通道数量。 |
idleChannelsNotTxHighWater |
非事务性通道的最大数量已同时处于空闲(缓存)状态。 |
以下表格描述了 CacheMode.CONNECTION 属性:
| 属性 | 含义 |
|---|---|
connectionName:<localPort> |
由 |
openConnections |
表示与代理服务器连接的连接对象的数量。 |
channelCacheSize |
当前配置的最大空闲通道数。 |
connectionCacheSize |
当前配置的最大空闲连接数。 |
idleConnections |
当前空闲的连接数。 |
idleConnectionsHighWater |
当前空闲的连接数最大值。 |
idleChannelsTx:<localPort> |
当前为此连接处于空闲(缓存)状态的事务通道数量。您可使用属性名称中的 |
idleChannelsNotTx:<localPort> |
当前为此连接空闲(缓存)的非事务性通道数量。 |
idleChannelsTxHighWater:<localPort> |
当前处于空闲(缓存)状态的事务通道的最大数量。</p><p>属性名称中的 localPort 部分可用于与 RabbitMQ 管理界面中的连接和通道进行关联。 |
idleChannelsNotTxHighWater:<localPort> |
非事务性通道的最大数量已同时处于空闲(缓存)状态。您可以使用属性名称中的 |
属性 cacheMode(CHANNEL 或 CONNECTION)也包含在内。
RabbitMQ 自动连接/拓扑恢复
自 Spring AMQP 的首个版本起,该框架便在发生代理服务器故障时提供了自己的连接和通道恢复功能。此外,如 配置代理服务器 中所讨论的,RabbitAdmin 在重新建立连接时会重新声明任何基础设施 Bean(如队列等)。因此,它不依赖于 自动恢复 功能——该功能现由 amqp-client 库提供。而 amqp-client 默认启用了自动恢复功能。两种恢复机制之间存在一些不兼容性,因此,默认情况下,Spring 会将底层 RabbitMQ connectionFactory 的 automaticRecoveryEnabled 属性设置为 false。即使该属性被设置为 true,Spring 仍会通过立即关闭任何已恢复的连接来实际禁用该功能。
| 默认情况下,只有被定义为 Bean 的元素(队列、交换机、绑定)在连接失败后才会被重新声明。有关如何更改此行为,请参阅 恢复自动删除声明。 |