交易

本节介绍 Spring for Apache Pulsar 如何支持事务。spring-doc.cadn.net.cn

概述

Spring for Apache Pulsar 事务支持建立在 Spring Framework 提供的事务支持之上。 在高级别上,事务资源向事务管理器注册,事务管理器又处理已注册资源的事务状态(提交、回滚等)。spring-doc.cadn.net.cn

Spring for Apache Pulsar 提供了以下功能:spring-doc.cadn.net.cn

事务支持尚未添加到响应式组件中

事务支持默认处于禁用状态。 要在使用 Spring Boot 时启用支持,只需将spring.pulsar.transaction.enabled财产。 下面的每个组件部分概述了更多配置选项。spring-doc.cadn.net.cn

事务性发布PulsarTemplate

事务性上的所有发送作PulsarTemplate查找活动事务并在事务中登记每个发送作(如果找到)。spring-doc.cadn.net.cn

非交易性使用

默认情况下,事务性PulsarTemplate也可用于非事务性作。 当找不到现有事务时,它将以非事务方式继续发送作。 但是,如果模板配置为需要事务,则在事务范围之外使用模板的任何尝试都会导致异常。spring-doc.cadn.net.cn

事务可以由TransactionTemplate一个@Transactional方法, 调用executeInTransaction,或通过事务侦听器容器。

本地交易

我们使用术语“本地”事务来表示不受 Spring 事务管理工具管理或与 Spring 事务管理工具相关联的 Pulsar 本机事务(即PulsarTransactionManager). 相反,“同步”事务是由PulsarTransactionManager.spring-doc.cadn.net.cn

您可以使用PulsarTemplate在本地事务中执行一系列作。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

var results = pulsarTemplate.executeInTransaction((template) -> {
    var rv = new HashMap<String, MessageId>();
    rv.put("msg1", template.send(topic, "msg1"));
    rv.put("msg2", template.send(topic, "msg2"));
    return rv;
});

回调中的参数是模板实例,其中executeInTransaction方法被调用。 模板上的所有作都登记在当前事务中。 如果回调正常退出,则事务被提交。 如果抛出异常,则事务将回滚。spring-doc.cadn.net.cn

如果正在处理同步事务,则忽略该事务并使用新的“嵌套”事务。

配置

以下事务设置可直接在PulsarTemplate(通过transactions字段):spring-doc.cadn.net.cn

不使用 Spring Boot 时,可以在提供的模板上调整这些设置。但是,使用 Spring Boot 时,模板是自动配置的,并且没有影响属性的机制。在这种情况下,您可以注册一个PulsarTemplateCustomizer可用于调整设置的 bean。以下示例显示了如何在自动配置的模板上设置超时:spring-doc.cadn.net.cn

@Bean
PulsarTemplateCustomizer<?> templateCustomizer() {
    return (template) -> template.transactions().setTimeout(Duration.ofSeconds(45));
}

事务性接收@PulsarListener

启用侦听器事务后,@PulsarListenerComments Listener 方法在同步事务的作用域中调用。spring-doc.cadn.net.cn

DefaultPulsarMessageListenerContainer使用 SpringTransactionTemplate配置了PulsarTransactionManager在方法调用之前启动事务。spring-doc.cadn.net.cn

每个收到的消息的确认都登记在作用域事务中。spring-doc.cadn.net.cn

消费-加工-生产场景

一种常见的事务模式是,消费者从 Pulsar 主题读取消息,转换消息,最后生产者将生成的消息写入另一个 Pulsar 主题。 当启用事务并且您的侦听器方法使用事务性PulsarTemplate以生成转换后的消息。spring-doc.cadn.net.cn

给定以下监听器方法:spring-doc.cadn.net.cn

@PulsarListener(topics = "my-input-topic") (1)
void listen(String msg) { (2)
    var transformedMsg = msg.toUpperCase(Locale.ROOT); (3)
    this.transactionalTemplate.send("my-output-topic", transformedMsg); (4)
} (5) (6)

启用侦听器事务时,将发生以下交互:spring-doc.cadn.net.cn

1 监听器容器发起新事务并在事务范围内调用监听器方法
2 侦听器方法接收消息
3 侦听器方法转换消息
4 侦听器方法使用事务模板发送转换后的消息,该模板在活动事务中登记发送作
5 侦听器容器自动确认消息并在活动事务中登记确认作
6 侦听器容器(通过TransactionTemplate) 提交事务

如果您没有使用@PulsarListener而不是直接使用侦听器容器,提供与上述相同的事务支持。 请记住,@PulsarListener只是方便将 Java 方法注册为监听器容器消息监听器。spring-doc.cadn.net.cn

与记录侦听器的交易

上面的示例使用记录侦听器。 使用记录侦听器时,在每个侦听器方法调用时都会创建一个新事务,这相当于每条消息的事务。spring-doc.cadn.net.cn

由于事务边界是每条消息,并且每个消息确认都登记在每个事务中,因此批处理确认模式不能与事务记录侦听器一起使用。

与批处理侦听器的交易

使用批处理侦听器时,在每次侦听器方法调用时都会创建一个新事务,这相当于每批消息的事务。spring-doc.cadn.net.cn

事务性批处理侦听器当前不支持自定义错误处理程序。

配置

监听器容器工厂

以下事务设置可直接在PulsarContainerPropertiesConcurrentPulsarListenerContainerFactory创建侦听器容器时。 这些设置会影响所有侦听器容器,包括@PulsarListener.spring-doc.cadn.net.cn

不使用 Spring Boot 时,可以在提供的容器工厂上调整这些设置。 但是,当使用 Spring Boot 时,容器工厂是自动配置的。 在这种情况下,您可以注册一个org.springframework.boot.pulsar.autoconfigure.PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>>bean 来访问和自定义容器属性。 以下示例演示如何在容器工厂上设置超时:spring-doc.cadn.net.cn

@Bean
PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>> containerCustomizer() {
    return (containerFactory) -> containerFactory.getContainerProperties().transactions().setTimeout(Duration.ofSeconds(45));
}

@PulsarListener

默认情况下,每个侦听器都遵循其相应侦听器容器工厂的事务设置。 但是,用户可以将transactional属性@PulsarListener覆盖容器出厂设置,如下所示:spring-doc.cadn.net.cn

  • 如果容器工厂启用了事务,则transactional = false将禁用个人侦听器的事务。spring-doc.cadn.net.cn

  • 如果容器工厂启用了事务并需要,则尝试将transactional = false将导致抛出异常,指出需要事务。spring-doc.cadn.net.cn

  • 如果容器工厂禁用了事务,则尝试将transactional = true将被忽略并记录警告。spring-doc.cadn.net.cn

PulsarTransactionManager

PulsarTransactionManager是 Spring Framework 的PlatformTransactionManager. 您可以使用PulsarTransactionManager具有正常的 Spring 事务支持(@Transactional,TransactionTemplate等)。spring-doc.cadn.net.cn

如果事务处于活动状态,则任何PulsarTemplate在事务范围内执行的作会登记并参与正在进行的事务。 管理器根据成功或失败提交或回滚事务。spring-doc.cadn.net.cn

您可能不需要使用PulsarTransactionManager直接,因为大多数事务用例都被PulsarTemplate@PulsarListener.

与其他事务管理器的 Pulsar 事务

仅限生产者的交易

如果你想将记录发送到 Pulsar 并在单个事务中执行一些数据库更新,你可以使用普通的 Spring 事务管理和DataSourceTransactionManager.spring-doc.cadn.net.cn

以下示例假设有一个DataSourceTransactionManager以名称“dataSourceTransactionManager”注册的 bean
@Transactional("dataSourceTransactionManager")
public void myServiceMethod() {
    var msg = calculateMessage();
    this.pulsarTemplate.send("my-topic", msg);
    this.jdbcTemplate.execute("insert into my_table (data) values ('%s')".formatted(msg));
}

的拦截器@Transactional注释启动数据库事务,并且PulsarTemplate将事务与数据库事务管理器同步;每次发送都会参与该交易。 当该方法退出时,数据库事务将提交,然后是 Pulsar 事务。spring-doc.cadn.net.cn

如果您希望先提交 Pulsar 事务,并且仅在 Pulsar 事务成功时提交数据库事务,请使用 nest@Transactional方法,外部方法配置为使用DataSourceTransactionManager,并且配置为使用PulsarTransactionManager.spring-doc.cadn.net.cn

@Transactional("dataSourceTransactionManager")
public void myServiceMethod() {
    var msg = calculateMessage();
    this.jdbcTemplate.execute("insert into my_table (data) values ('%s')".formatted(msg));
    this.sendToPulsar(msg);
}

@Transactional("pulsarTransactionManager")
public void sendToPulsar(String msg) {
    this.pulsarTemplate.send("my-topic", msg);
}

消费者 + 生产者事务

如果要从 Pulsar 使用记录,将记录发送到 Pulsar,并在事务中执行一些数据库更新,则可以结合正常的 Spring 事务管理(使用DataSourceTransactionManager) 与容器发起的交易。spring-doc.cadn.net.cn

在以下示例中,侦听器容器启动 Pulsar 事务,并且@Transactional注解启动数据库事务。首先提交数据库事务;如果 Pulsar 事务提交失败,则记录将被重新传递,因此数据库更新应该是幂等的。spring-doc.cadn.net.cn

@PulsarListener(topics = "my-input-topic")
@Transactional("dataSourceTransactionManager")
void listen(String msg) {
    var transformedMsg = msg.toUpperCase(Locale.ROOT);
    this.pulsarTemplate.send("my-output-topic", transformedMsg);
    this.jdbcTemplate.execute("insert into my_table (data) values ('%s')".formatted(transformedMsg));
}