此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring AMQP 3.2.6! |
连接和资源管理
虽然我们在上一节中描述的 AMQP 模型是通用的,适用于所有实现,但当我们进入资源管理时,详细信息特定于代理实现。 因此,在本节中,我们将重点介绍仅存在于“spring-rabbit”模块中的代码,因为此时,RabbitMQ 是唯一受支持的实现。
管理与 RabbitMQ 代理的连接的核心组件是ConnectionFactory
接口。
的责任ConnectionFactory
实现是提供org.springframework.amqp.rabbit.connection.Connection
,它是com.rabbitmq.client.Connection
.
选择连接工厂
有三个连接工厂可供选择
-
PooledChannelConnectionFactory
-
ThreadChannelConnectionFactory
-
CachingConnectionFactory
前两个是在 2.3 版本中添加的。
对于大多数用例,CachingConnectionFactory
应该使用。
这ThreadChannelConnectionFactory
如果要确保严格的消息排序而无需使用作用域作,则可以使用。
这PooledChannelConnectionFactory
类似于CachingConnectionFactory
因为它使用单个连接和一个通道池。
它的实现更简单,但不支持相关的发布者确认。
所有三个工厂都支持简单的发布者确认。
配置RabbitTemplate
要使用单独的连接,您现在可以从版本 2.3.2 开始将发布连接工厂配置为不同的类型。
默认情况下,发布工厂的类型相同,并且在主工厂上设置的任何属性也会传播到发布工厂。
从 3.1 版开始,AbstractConnectionFactory
包括connectionCreatingBackOff
属性,它支持连接模块中的退避策略。
目前,在行为中得到了支持createChannel()
以处理channelMax
达到限制,则根据尝试次数和间隔实施退避策略。
PooledChannelConnectionFactory
该工厂基于 Apache Pool2 管理单个连接和两个通道池。
一个池用于事务性通道,另一个用于非事务性通道。
这些池是GenericObjectPool
s 具有默认配置;提供回调来配置池;有关更多信息,请参阅 Apache 文档。
阿帕奇commons-pool2
jar 必须位于类路径上才能使用此工厂。
@Bean
PooledChannelConnectionFactory pcf() throws Exception {
ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
rabbitConnectionFactory.setHost("localhost");
PooledChannelConnectionFactory pcf = new PooledChannelConnectionFactory(rabbitConnectionFactory);
pcf.setPoolConfigurer((pool, tx) -> {
if (tx) {
// configure the transactional pool
}
else {
// configure the non-transactional pool
}
});
return pcf;
}
ThreadChannelConnectionFactory
该工厂管理一个连接和两个ThreadLocal
s,一个用于事务性通道,另一个用于非事务性通道。
该工厂确保同一线程上的所有作都使用相同的通道(只要它保持打开状态)。
这有助于严格的消息排序,而无需作用域作。
为避免内存泄漏,如果您的应用程序使用许多短期线程,则必须调用工厂的closeThreadChannel()
以释放通道资源。
从 2.3.7 版开始,一个线程可以将其通道转移到另一个线程。
有关更多信息,请参阅多线程环境中的严格消息排序。
CachingConnectionFactory
提供的第三个实现是CachingConnectionFactory
,默认情况下,它会建立一个可由应用程序共享的单个连接代理。
共享连接是可能的,因为使用 AMQP 进行消息传递的“工作单元”实际上是一个“通道”(在某些方面,这类似于 JMS 中连接和会话之间的关系)。
连接实例提供了一个createChannel
方法。
这CachingConnectionFactory
实现支持这些通道的缓存,并且它根据通道是否是事务性的为通道维护单独的缓存。
创建CachingConnectionFactory
,您可以通过构造函数提供“主机名”。
您还应该提供“用户名”和“密码”属性。
要配置通道缓存的大小(默认值为 25),您可以调用setChannelCacheSize()
方法。
从 1.3 版开始,您可以配置CachingConnectionFactory
缓存连接以及仅缓存通道。
在这种情况下,每个调用createConnection()
创建一个新连接(或从缓存中检索空闲连接)。
关闭连接会将其返回到缓存(如果尚未达到缓存大小)。
在此类连接上创建的通道也会被缓存。
在某些环境中,使用单独的连接可能很有用,例如从 HA 集群使用
与负载均衡器结合使用,以连接到不同的集群成员等。
要缓存连接,请将cacheMode
自CacheMode.CONNECTION
.
这不限制连接数。 相反,它指定允许多少个空闲打开的连接。 |
从 1.5.5 版本开始,一个名为connectionLimit
被提供。
设置此属性后,它会限制允许的连接总数。
设置后,如果达到限制,则channelCheckoutTimeLimit
用于等待连接空闲。
如果超过时间,则AmqpTimeoutException
被抛出。
当缓存模式为 此外,在撰写本文时, |
重要的是要了解缓存大小(默认情况下)不是限制,而只是可以缓存的通道数。 缓存大小为 10 时,实际上可以使用任意数量的通道。 如果使用的通道超过 10 个,并且它们都返回到缓存中,则缓存中会有 10 个通道。 其余的物理关闭。
从 1.6 版本开始,默认通道缓存大小已从 1 增加到 25。 在高容量、多线程环境中,较小的缓存意味着以高速率创建和关闭通道。 增加默认缓存大小可以避免此开销。 您应该通过 RabbitMQ 管理 UI 监控正在使用的通道,并考虑进一步增加缓存大小,如果您 查看正在创建和关闭的许多频道。 缓存仅按需增长(以满足应用程序的并发要求),因此此更改不会 影响现有的小批量应用程序。
从 1.4.2 版本开始,CachingConnectionFactory
有一个名为channelCheckoutTimeout
.
当此属性大于零时,channelCacheSize
成为连接上可创建的通道数的限制。
如果达到限制,则调用线程将阻塞,直到通道可用或达到此超时,在这种情况下,AmqpTimeoutException
被抛出。
框架内使用的通道(例如RabbitTemplate ) 可靠地返回到缓存。
如果您在框架之外创建通道(例如,
通过直接访问连接并调用createChannel() ),您必须可靠地返回它们(通过关闭),也许在finally 块,以避免通道耗尽。 |
以下示例演示如何创建新的connection
:
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.createConnection();
使用 XML 时,配置可能类似于以下示例:
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
</bean>
还有一个SingleConnectionFactory 仅在框架的单元测试代码中可用的实现。
它比CachingConnectionFactory ,因为它不缓存通道,但由于缺乏性能和弹性,它不适合简单测试之外的实际用途。
如果您需要实现自己的ConnectionFactory 出于某种原因,AbstractConnectionFactory 基类可能提供一个很好的起点。 |
一个ConnectionFactory
可以使用 rabbit 命名空间快速方便地创建,如下所示:
<rabbit:connection-factory id="connectionFactory"/>
在大多数情况下,这种方法更可取,因为框架可以为您选择最佳默认值。
创建的实例是CachingConnectionFactory
.
请记住,通道的默认缓存大小为 25。
如果要缓存更多通道,请通过设置 'channelCacheSize' 属性来设置更大的值。
在 XML 中,它看起来如下:
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
<property name="channelCacheSize" value="50"/>
</bean>
此外,使用命名空间,您可以添加 'channel-cache-size' 属性,如下所示:
<rabbit:connection-factory
id="connectionFactory" channel-cache-size="50"/>
默认缓存模式为CHANNEL
,但您可以将其配置为缓存连接。
在下面的示例中,我们使用connection-cache-size
:
<rabbit:connection-factory
id="connectionFactory" cache-mode="CONNECTION" connection-cache-size="25"/>
您可以使用命名空间提供主机和端口属性,如下所示:
<rabbit:connection-factory
id="connectionFactory" host="somehost" port="5672"/>
或者,如果在集群环境中运行,则可以使用 addresses 属性,如下所示:
<rabbit:connection-factory
id="connectionFactory" addresses="host1:5672,host2:5672" address-shuffle-mode="RANDOM"/>
有关以下信息,请参阅连接到集群address-shuffle-mode
.
以下示例包含一个自定义线程工厂,该工厂在线程名称前加上rabbitmq-
:
<rabbit:connection-factory id="multiHost" virtual-host="/bar" addresses="host1:1234,host2,host3:4567"
thread-factory="tf"
channel-cache-size="10" username="user" password="password" />
<bean id="tf" class="org.springframework.scheduling.concurrent.CustomizableThreadFactory">
<constructor-arg value="rabbitmq-" />
</bean>
命名连接
从 1.7 版开始,一个ConnectionNameStrategy
提供注入AbstractionConnectionFactory
.
生成的名称用于目标 RabbitMQ 连接的特定于应用程序的标识。
如果 RabbitMQ 服务器支持连接名称,则会在管理 UI 中显示连接名称。
此值不必是唯一的,也不能用作连接标识符,例如,在 HTTP API 请求中。
这个值应该是人类可读的,并且是ClientProperties
在connection_name
钥匙。
您可以使用简单的 Lambda,如下所示:
connectionFactory.setConnectionNameStrategy(connectionFactory -> "MY_CONNECTION");
这ConnectionFactory
参数可用于通过某些逻辑区分目标连接名称。
默认情况下,beanName
的AbstractConnectionFactory
、表示对象的十六进制字符串和内部计数器用于生成connection_name
.
这<rabbit:connection-factory>
命名空间组件也随connection-name-strategy
属性。
实现SimplePropertyValueConnectionNameStrategy
将连接名称设置为应用程序属性。
您可以将其声明为@Bean
并将其注入到连接工厂中,如以下示例所示:
@Bean
public SimplePropertyValueConnectionNameStrategy cns() {
return new SimplePropertyValueConnectionNameStrategy("spring.application.name");
}
@Bean
public ConnectionFactory rabbitConnectionFactory(ConnectionNameStrategy cns) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
...
connectionFactory.setConnectionNameStrategy(cns);
return connectionFactory;
}
该属性必须存在于应用程序上下文的Environment
.
使用 Spring Boot 及其自动配置的连接工厂时,只需声明ConnectionNameStrategy @Bean .
Spring Boot 自动检测 bean 并将其连接到工厂。 |
阻止的连接和资源约束
对于与内存警报相对应的代理的交互,可能会阻止连接。
从 2.0 版开始,org.springframework.amqp.rabbit.connection.Connection
可提供com.rabbitmq.client.BlockedListener
要通知连接阻止和取消阻止事件的实例。
此外,AbstractConnectionFactory
发出ConnectionBlockedEvent
和ConnectionUnblockedEvent
,分别通过其内部BlockedListener
实现。
这些允许您提供应用程序逻辑以对代理上的问题做出适当的反应,并(例如)采取一些纠正措施。
当应用程序配置了单个CachingConnectionFactory ,就像默认情况下使用 Spring Boot 自动配置一样,当连接被 Broker 阻止时,应用程序将停止工作。
当它被经纪人阻止时,它的任何客户端都会停止工作。
如果我们在同一个应用程序中有生产者和消费者,那么当生产者阻止连接(因为 Broker 上不再有资源)并且消费者无法释放它们(因为连接被阻止)时,我们最终可能会陷入死锁。
为了缓解这个问题,我们建议再有一个单独的CachingConnectionFactory 实例具有相同的选项 - 一个用于生产者,一个用于消费者。
一个单独的CachingConnectionFactory 对于在使用者线程上执行的事务生产者来说是不可能的,因为它们应该重用Channel 与消费者事务相关联。 |
从 2.0.2 版本开始,RabbitTemplate
有一个配置选项,可以自动使用第二个连接工厂,除非正在使用事务。
有关详细信息,请参阅使用单独的连接。
这ConnectionNameStrategy
对于发布者,连接与主策略相同,使用.publisher
附加到调用方法的结果。
从 1.7.7 版本开始,AmqpResourceNotAvailableException
提供,当SimpleConnection.createChannel()
无法创建Channel
(例如,因为channelMax
已达到限制,并且缓存中没有可用通道)。
您可以在RetryPolicy
在一些回退后恢复作。
配置底层客户端连接工厂
这CachingConnectionFactory
使用 Rabbit 客户端的实例ConnectionFactory
.
许多配置属性将传递 (host
,port
,userName
,password
,requestedHeartBeat
和connectionTimeout
例如)在CachingConnectionFactory
.
要设置其他属性 (clientProperties
,例如),您可以定义 Rabbit 工厂的实例,并使用CachingConnectionFactory
.
使用命名空间时(如前所述),您需要在connection-factory
属性。
为方便起见,提供了一个工厂 Bean 来帮助在 Spring 应用程序上下文中配置连接工厂,如下一节所述。
<rabbit:connection-factory
id="connectionFactory" connection-factory="rabbitConnectionFactory"/>
默认情况下,4.0.x 客户端启用自动恢复。
虽然与此功能兼容,但 Spring AMQP 有自己的恢复机制,通常不需要客户端恢复功能。
我们建议禁用amqp-client 自动恢复,避免获得AutoRecoverConnectionNotCurrentlyOpenException 代理可用但连接尚未恢复的情况。
您可能会注意到此异常,例如,当RetryTemplate 在RabbitTemplate ,即使在故障转移到集群中的另一个代理时也是如此。
由于自动恢复连接在计时器上恢复,因此使用 Spring AMQP 的恢复机制可以更快地恢复连接。
从版本 1.7.1 开始,Spring AMQP 禁用amqp-client 自动恢复,除非您显式创建自己的 RabbitMQ 连接工厂并将其提供给CachingConnectionFactory .
兔子MQConnectionFactory 由RabbitConnectionFactoryBean 默认情况下,还禁用该选项。 |
RabbitConnectionFactoryBean
和配置 SSL
从 1.4 版本开始,一个方便的RabbitConnectionFactoryBean
提供是为了使用依赖注入在基础客户端连接工厂上方便地配置 SSL 属性。
其他 setter 委托给底层工厂。
以前,您必须以编程方式配置 SSL 选项。
以下示例演示如何配置RabbitConnectionFactoryBean
:
-
Java
-
XML
@Bean
RabbitConnectionFactoryBean rabbitConnectionFactory() {
RabbitConnectionFactoryBean factoryBean = new RabbitConnectionFactoryBean();
factoryBean.setUseSSL(true);
factoryBean.setSslPropertiesLocation(new ClassPathResource("secrets/rabbitSSL.properties"));
return factoryBean;
}
@Bean
CachingConnectionFactory connectionFactory(ConnectionFactory rabbitConnectionFactory) {
CachingConnectionFactory ccf = new CachingConnectionFactory(rabbitConnectionFactory);
ccf.setHost("...");
// ...
return ccf;
}
<bean id="rabbitConnectionFactory"
class="org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean">
<property name="useSSL" value="true" />
<property name="sslPropertiesLocation" value="classpath:secrets/rabbitSSL.properties"/>
</bean>
<rabbit:connection-factory id="connectionFactory"
connection-factory="rabbitConnectionFactory"
host="${host}"
port="${port}"
virtual-host="${vhost}"
username="${username}" password="${password}" />
Spring Boot 应用程序文件 (.yaml
或.properties
)
-
Properties
-
YAML
spring.rabbitmq.host=...
spring.rabbitmq.ssl.keyStoreType=jks
spring.rabbitmq.ssl.trustStoreType=jks
spring.rabbitmq.ssl.keyStore=...
spring.rabbitmq.ssl.trustStore=...
spring.rabbitmq.ssl.trustStorePassword=...
spring.rabbitmq.ssl.keyStorePassword=...
spring.rabbitmq.ssl.enabled=true
spring:
rabbitmq:
host: ...
ssl:
keyStoreType: jks
trustStoreType: jks
keyStore: ...
trustStore: ...
trustStorePassword: ...
keyStorePassword: ...
enabled: true
有关配置 SSL 的信息,请参阅 RabbitMQ 文档。
省略keyStore
和trustStore
配置为无需证书验证即可通过 SSL 进行连接。
下一个示例演示如何提供密钥和信任存储配置。
这sslPropertiesLocation
属性是弹簧Resource
指向包含以下键的属性文件:
keyStore=file:/secret/keycert.p12
trustStore=file:/secret/trustStore
keyStore.passPhrase=secret
trustStore.passPhrase=secret
这keyStore
和truststore
是SpringResources
指向商店。
通常,此属性文件由具有读取访问权限的应用程序的作系统保护。
从 Spring AMQP 版本 1.5 开始,您可以直接在工厂 Bean 上设置这些属性。
如果离散属性和sslPropertiesLocation
,后者的属性会覆盖
离散值。
从 2.0 版开始,默认情况下会验证服务器证书,因为它更安全。
如果您出于某种原因希望跳过此验证,请将工厂 Bean 的skipServerCertificateValidation 属性设置为true .
从 2.1 版开始,RabbitConnectionFactoryBean 现在调用enableHostnameVerification() 默认情况下。
要恢复到以前的行为,请将enableHostnameVerification 属性设置为false . |
从 2.2.5 版开始,默认情况下,工厂 Bean 将始终使用 TLS v1.2;以前,它在某些情况下使用 v1.1,在其他情况下使用 v1.2(取决于其他属性)。
如果出于某种原因需要使用 v1.1,请将sslAlgorithm 财产:setSslAlgorithm("TLSv1.1") . |
连接到集群
要连接到集群,请配置addresses
属性CachingConnectionFactory
:
@Bean
public CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory();
ccf.setAddresses("host1:5672,host2:5672,host3:5672");
return ccf;
}
从 3.0 版开始,每当建立新连接时,底层连接工厂将尝试通过选择随机地址来连接到主机。
要恢复到之前尝试从第一个到最后一个连接的行为,请将addressShuffleMode
属性设置为AddressShuffleMode.NONE
.
从 2.3 版本开始,INORDER
添加了随机播放模式,这意味着在创建连接后,第一个地址将移动到末尾。
您可能希望将此模式与 RabbitMQ 分片插件一起使用,并使用CacheMode.CONNECTION
如果您希望从所有节点上的所有分片中消费,则可以使用合适的并发性。
@Bean
public CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory();
ccf.setAddresses("host1:5672,host2:5672,host3:5672");
ccf.setAddressShuffleMode(AddressShuffleMode.INORDER);
return ccf;
}
工艺路线连接工厂
从 1.3 版本开始,AbstractRoutingConnectionFactory
已被引入。
该工厂提供了一种机制来配置多个映射ConnectionFactories
并确定目标ConnectionFactory
由一些人lookupKey
在运行时。
通常,实现会检查线程绑定的上下文。
为方便起见,Spring AMQP 提供了SimpleRoutingConnectionFactory
,它获取当前线程绑定的lookupKey
从SimpleResourceHolder
.
以下示例演示如何配置SimpleRoutingConnectionFactory
在 XML 和 Java 中:
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory">
<property name="targetConnectionFactories">
<map>
<entry key="#{connectionFactory1.virtualHost}" ref="connectionFactory1"/>
<entry key="#{connectionFactory2.virtualHost}" ref="connectionFactory2"/>
</map>
</property>
</bean>
<rabbit:template id="template" connection-factory="connectionFactory" />
public class MyService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void service(String vHost, String payload) {
SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), vHost);
rabbitTemplate.convertAndSend(payload);
SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());
}
}
使用后解绑资源很重要。
有关更多信息,请参阅 JavaDocAbstractRoutingConnectionFactory
.
从 1.4 版本开始,RabbitTemplate
支持 SpELsendConnectionFactorySelectorExpression
和receiveConnectionFactorySelectorExpression
属性,这些属性在每个 AMQP 协议交互作 (send
,sendAndReceive
,receive
或receiveAndReply
),解析为lookupKey
提供的值AbstractRoutingConnectionFactory
.
您可以使用 bean 引用,例如@vHostResolver.getVHost(#root)
在表达式中。
为send
作时,要发送的消息是根求值对象。
为receive
作,queueName
是根评估对象。
路由算法如下:如果选择器表达式为null
或被评估为null
或提供的ConnectionFactory
不是AbstractRoutingConnectionFactory
,一切都像以前一样工作,依赖于提供的ConnectionFactory
实现。
如果评估结果不是null
,但没有目标ConnectionFactory
为此lookupKey
和AbstractRoutingConnectionFactory
配置为lenientFallback = true
.
在AbstractRoutingConnectionFactory
,它确实回退到其routing
基于determineCurrentLookupKey()
.
但是,如果lenientFallback = false
一IllegalStateException
被抛出。
命名空间支持还提供send-connection-factory-selector-expression
和receive-connection-factory-selector-expression
属性<rabbit:template>
元件。
此外,从版本 1.4 开始,您可以在侦听器容器中配置路由连接工厂。
在这种情况下,队列名称列表将用作查找键。
例如,如果将容器配置为setQueueNames("thing1", "thing2")
,查找键是[thing1,thing]"
(请注意,键中没有空格)。
从版本 1.6.9 开始,您可以使用以下命令向查找键添加限定符setLookupKeyQualifier
在侦听器容器上。例如,这样做可以侦听具有相同名称但在不同虚拟主机中的队列(其中每个虚拟主机都有一个连接工厂)。
例如,使用 lookup 键限定符thing1
以及一个监听队列的容器thing2
,您可以注册目标连接工厂的查找键可以是thing1[thing2]
.
目标(和默认值,如果提供)连接工厂必须具有相同的发布者确认和返回设置。 请参阅发布商确认和退货。 |
从版本 2.4.4 开始,可以禁用此验证。
如果您遇到确认和返回之间的值需要不相等的情况,您可以使用AbstractRoutingConnectionFactory#setConsistentConfirmsReturns
到验证的轮流。
请注意,第一个连接工厂添加到AbstractRoutingConnectionFactory
将确定confirms
和returns
.
如果您有需要检查的某些消息确认/返回而其他消息则不需要,这可能会很有用。 例如:
@Bean
public RabbitTemplate rabbitTemplate() {
final com.rabbitmq.client.ConnectionFactory cf = new com.rabbitmq.client.ConnectionFactory();
cf.setHost("localhost");
cf.setPort(5672);
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(cf);
cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
PooledChannelConnectionFactory pooledChannelConnectionFactory = new PooledChannelConnectionFactory(cf);
final Map<Object, ConnectionFactory> connectionFactoryMap = new HashMap<>(2);
connectionFactoryMap.put("true", cachingConnectionFactory);
connectionFactoryMap.put("false", pooledChannelConnectionFactory);
final AbstractRoutingConnectionFactory routingConnectionFactory = new SimpleRoutingConnectionFactory();
routingConnectionFactory.setConsistentConfirmsReturns(false);
routingConnectionFactory.setDefaultTargetConnectionFactory(pooledChannelConnectionFactory);
routingConnectionFactory.setTargetConnectionFactories(connectionFactoryMap);
final RabbitTemplate rabbitTemplate = new RabbitTemplate(routingConnectionFactory);
final Expression sendExpression = new SpelExpressionParser().parseExpression(
"messageProperties.headers['x-use-publisher-confirms'] ?: false");
rabbitTemplate.setSendConnectionFactorySelectorExpression(sendExpression);
}
这样,带有x-use-publisher-confirms: true
将通过缓存连接发送,您可以确保邮件传递。有关确保邮件传递的详细信息,请参阅发布者确认和返回。
队列关联和LocalizedQueueConnectionFactory
在集群中使用 HA 队列时,为了获得最佳性能,您可能需要连接到主要队列所在的物理代理。 这CachingConnectionFactory
可以配置多个代理地址。这是为了故障转移,客户端尝试根据配置的AddressShuffleMode
次序。 这LocalizedQueueConnectionFactory
使用管理插件提供的 REST API 来确定哪个节点是队列的前导。然后它创建(或从缓存中检索)一个CachingConnectionFactory
仅连接到该节点。如果连接失败,则确定新的主节点,使用者连接到该节点。 这LocalizedQueueConnectionFactory
配置了默认的连接工厂,以防无法确定队列的物理位置,在这种情况下,它会正常连接到集群。
这LocalizedQueueConnectionFactory
是一个RoutingConnectionFactory
和SimpleMessageListenerContainer
使用队列名称作为查找键,如上面的路由连接工厂中所述。
出于这个原因(使用队列名称进行查找),该LocalizedQueueConnectionFactory 仅当容器配置为侦听单个队列时才能使用。 |
必须在每个节点上启用 RabbitMQ 管理插件。 |
此连接工厂适用于长期连接,例如SimpleMessageListenerContainer . 它不适用于短连接使用,例如与RabbitTemplate 因为在建立连接之前调用 REST API 的开销。此外,对于发布作,队列是未知的,并且消息无论如何都会发布到所有集群成员,因此查找节点的逻辑几乎没有价值。 |
以下示例配置显示了如何配置工厂:
@Autowired
private ConfigurationProperties props;
@Bean
public CachingConnectionFactory defaultConnectionFactory() {
CachingConnectionFactory cf = new CachingConnectionFactory();
cf.setAddresses(this.props.getAddresses());
cf.setUsername(this.props.getUsername());
cf.setPassword(this.props.getPassword());
cf.setVirtualHost(this.props.getVirtualHost());
return cf;
}
@Bean
public LocalizedQueueConnectionFactory queueAffinityCF(
@Qualifier("defaultConnectionFactory") ConnectionFactory defaultCF) {
return new LocalizedQueueConnectionFactory(defaultCF,
StringUtils.commaDelimitedListToStringArray(this.props.getAddresses()),
StringUtils.commaDelimitedListToStringArray(this.props.getAdminUris()),
StringUtils.commaDelimitedListToStringArray(this.props.getNodes()),
this.props.getVirtualHost(), this.props.getUsername(), this.props.getPassword(),
false, null);
}
请注意,前三个参数是addresses
,adminUris
和nodes
.
这些是位置性的,因为当容器尝试连接到队列时,它使用管理 API 来确定哪个节点是队列的前导,并连接到与该节点位于同一数组位置的地址。
从 3.0 版本开始,RabbitMQhttp-client 不再用于访问 Rest API。
相反,默认情况下,WebClient 如果spring-webflux 在类路径上;否则RestTemplate 被使用。 |
添加WebFlux
到类路径:
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
compile 'org.springframework.amqp:spring-rabbit'
您还可以通过实现 LocalizedQueueConnectionFactory#setNodeLocator 并覆盖其createClient
,restCall
,以及可选的close
方法。
lqcf.setNodeLocator(new NodeLocator<MyClient>() {
@Override
public MyClient createClient(String userName, String password) {
...
}
@Override
public Map<String, Object> restCall(MyClient client, String baseUri, String vhost, String queue) throws URISyntaxException {
...
}
});
该框架提供了WebFluxNodeLocator
和RestTemplateNodeLocator
,默认值如上所述。
出版商确认并退货
通过设置CachingConnectionFactory
属性publisherConfirmType
自ConfirmType.CORRELATED
和publisherReturns
属性设置为“true”。
设置这些选项后,Channel
工厂创建的实例包装在PublisherCallbackChannel
,用于方便回调。当获得这样的通道时,客户端可以注册一个PublisherCallbackChannel.Listener
使用Channel
.
这PublisherCallbackChannel
实现包含用于将确认或返回路由到相应侦听器的逻辑。以下部分将进一步解释这些功能。
另请参阅相关发布者确认和退货和simplePublisherConfirms
在作用域作中。
有关更多背景信息,请参阅 RabbitMQ 团队的博客文章,标题为 Introducing Publisher Confirms。 |
连接和通道侦听器
连接工厂支持注册ConnectionListener
和ChannelListener
实现。 这允许您接收有关连接和通道相关事件的通知。(一个ConnectionListener
由RabbitAdmin
在建立连接时执行声明 - 有关详细信息,请参阅自动声明交换、队列和绑定)。以下列表显示了ConnectionListener
接口定义:
@FunctionalInterface
public interface ConnectionListener {
void onCreate(Connection connection);
default void onClose(Connection connection) {
}
default void onShutDown(ShutdownSignalException signal) {
}
}
从 2.0 版开始,org.springframework.amqp.rabbit.connection.Connection
对象可以提供com.rabbitmq.client.BlockedListener
要通知连接阻止和取消阻止事件的实例。
以下示例显示了 ChannelListener 接口定义:
@FunctionalInterface
public interface ChannelListener {
void onCreate(Channel channel, boolean transactional);
default void onShutDown(ShutdownSignalException signal) {
}
}
请参阅发布是异步的 — 如何检测成功和失败,了解您可能想要注册ChannelListener
.
记录通道关闭事件
版本 1.5 引入了一种机制,使用户能够控制日志记录级别。
这AbstractConnectionFactory
使用默认策略来记录通道关闭,如下所示:
-
不记录正常通道关闭 (200 OK)。
-
如果通道由于被动队列声明失败而关闭,则会在 DEBUG 级别记录该通道。
-
如果通道因
basic.consume
由于排他性消费者条件而被拒绝,则记录在 DEBUG 级别(从 3.1 开始,以前是 INFO)。 -
所有其他记录都记录在 ERROR 级别。
要修改此行为,您可以将自定义ConditionalExceptionLogger
进入CachingConnectionFactory
在其closeExceptionLogger
财产。
此外,AbstractConnectionFactory.DefaultChannelCloseLogger
现在是公共的,允许它被子类化。
另请参阅消费者事件。
运行时缓存属性
盯着 1.6 版本,CachingConnectionFactory
现在通过getCacheProperties()
方法。
这些统计信息可用于调整缓存,以在生产中对其进行优化。
例如,可以使用高水位线来确定是否应增加缓存大小。
如果它等于缓存大小,您可能需要考虑进一步增加。
下表描述了CacheMode.CHANNEL
性能:
属性 | 意义 |
---|---|
connectionName |
由 |
channelCacheSize |
当前配置的允许空闲的最大通道数。 |
localPort |
连接的本地端口(如果可用)。 这可用于与 RabbitMQ 管理 UI 上的连接和通道相关联。 |
idleChannelsTx |
当前空闲(缓存)的事务通道数。 |
idleChannelsNotTx |
当前空闲(缓存)的非事务性通道数。 |
idleChannelsTxHighWater |
已同时空闲(缓存)的事务通道的最大数量。 |
idleChannelsNotTxHighWater |
非事务通道的最大数量已同时空闲(缓存)。 |
下表描述了CacheMode.CONNECTION
性能:
属性 | 意义 |
---|---|
connectionName:<localPort> |
由 |
openConnections |
表示与代理的连接的连接对象数。 |
channelCacheSize |
当前配置的允许空闲的最大通道数。 |
connectionCacheSize |
当前配置的允许空闲的最大连接数。 |
idleConnections |
当前空闲的连接数。 |
idleConnectionsHighWater |
已同时空闲的最大连接数。 |
idleChannelsTx:<localPort> |
此连接当前空闲(缓存)的事务通道数。
您可以使用 |
idleChannelsNotTx:<localPort> |
此连接当前处于空闲(缓存)状态的非事务性通道数。
这 |
idleChannelsTxHighWater:<localPort> |
已同时空闲(缓存)的事务通道的最大数量。 属性名称的 localPort 部分可用于与 RabbitMQ 管理 UI 上的连接和通道相关联。 |
idleChannelsNotTxHighWater:<localPort> |
非事务通道的最大数量已同时空闲(缓存)。
您可以使用 |
这cacheMode
属性 (CHANNEL
或CONNECTION
) 也包括在内。

RabbitMQ 自动连接/拓扑恢复
自 Spring AMQP 的第一个版本以来,该框架在代理发生故障时提供了自己的连接和通道恢复。
此外,如配置代理中所述,该RabbitAdmin
在重新建立连接时重新声明任何基础架构 Bean(队列和其他)。
因此,它不依赖于现在由amqp-client
图书馆。
这amqp-client
,默认启用自动恢复。
两种恢复机制之间存在一些不兼容,因此默认情况下,Spring 将automaticRecoveryEnabled
基础上的属性RabbitMQ connectionFactory
自false
.
即使该属性是true
,Spring 通过立即关闭任何恢复的连接来有效地禁用它。
默认情况下,只有定义为 bean 的元素(队列、交换、绑定)才会在连接失败后重新声明。 有关如何更改该行为,请参阅恢复自动删除声明。 |