交易

本节介绍了Apache Pulsar的Spring如何支持事务。spring-doc.cadn.net.cn

概述

Spring for Apache Pulsar 的事务支持基于 Spring Framework 提供的事务支持。在高层次上,事务资源会通过事务管理器注册,该管理器又处理已注册资源的事务状态(提交、回滚等)。spring-doc.cadn.net.cn

Apache Pulsar 的春季提供了以下功能:spring-doc.cadn.net.cn

响应式组件尚未加入事务支持

默认情况下,事务支持被禁用。要启用 Spring Boot 支持,只需设置spring.pulsar.transaction.enabled财产。 以下每个组件部分概述了更多配置选项。spring-doc.cadn.net.cn

交易出版脉冲星模板

事务型上的所有发送作脉冲星模板寻找一个活跃的事务,并在该事务中列出每个发送作(如果找到的话)。spring-doc.cadn.net.cn

非交易性使用

默认情况下,是事务型脉冲星模板也可以用于非事务性作。 当找不到现有事务时,它将以非交易方式继续发送作。 然而,如果模板配置为要求交易,那么任何在交易范围之外使用该模板的尝试都会导致异常。spring-doc.cadn.net.cn

交易可以由交易模板一个@Transactional方法,调用执行交易,或者通过事务监听器容器。

本地交易

我们使用“本地”事务一词来指代不由 Spring 交易管理功能管理或关联的 Pulsar 原生事务(即PulsarTransactionManager). 相反,“同步”事务是指由PulsarTransactionManager.spring-doc.cadn.net.cn

你可以使用脉冲星模板在本地事务中执行一系列作。 以下示例展示了如何实现:spring-doc.cadn.net.cn

var results = pulsarTemplate.executeInTransaction((template) -> {
    var rv = new HashMap<String, MessageId>();
    rv.put("msg1", template.send(topic, "msg1"));
    rv.put("msg2", template.send(topic, "msg2"));
    return rv;
});

回调中的参数是模板实例执行交易方法被引用。 模板上的所有作都被列入当前事务中。 如果回调正常退出,则该事务被提交。 如果抛出异常,交易会被回滚。spring-doc.cadn.net.cn

如果有同步事务正在进行中,则忽略该事务,使用新的“嵌套”事务。

配置

以下交易设置可直接在脉冲星模板(通过交易场地):spring-doc.cadn.net.cn

当不使用Spring Boot时,你可以在提供的模板上调整这些设置。 然而,使用Spring Boot时,模板是自动配置的,且没有机制影响属性。 在这种情况下,你可以注册一个PulsarTemplateCustomizer可以用来调节设置的豆子。 以下示例展示了如何在自动配置模板上设置超时:spring-doc.cadn.net.cn

@Bean
PulsarTemplateCustomizer<?> templateCustomizer() {
    return (template) -> template.transactions().setTimeout(Duration.ofSeconds(45));
}

事务接收@PulsarListener

当监听器事务被启用时,@PulsarListener注释监听器方法在同步事务的范围内被调用。spring-doc.cadn.net.cn

DefaultPulsarMessageListenerContainer使用Spring交易模板配置为PulsarTransactionManager在调用方法之前发起交易。spring-doc.cadn.net.cn

每个收到消息的确认信息都被列入了该范围事务中。spring-doc.cadn.net.cn

消费-过程-生产场景

一种常见的交易模式是,消费者读取来自Pulsar主题的消息,进行转换,最后生产者将生成的消息写入另一个Pulsar主题。 当事务启用且监听方法使用事务时,框架支持这种用例脉冲星模板以产生转化后的信息。spring-doc.cadn.net.cn

给定以下监听者方法:spring-doc.cadn.net.cn

@PulsarListener(topics = "my-input-topic") (1)
void listen(String msg) { (2)
    var transformedMsg = msg.toUpperCase(Locale.ROOT); (3)
    this.transactionalTemplate.send("my-output-topic", transformedMsg); (4)
} (5) (6)

启用监听器事务时会发生以下交互:spring-doc.cadn.net.cn

1 监听器容器在事务范围内发起新事务并调用监听器方法
2 监听者方法接收消息
3 监听器方法对消息进行变换
4 监听器方法通过交易模板发送转换后的消息,该模板在活跃事务中执行发送作
5 监听器容器会自动确认消息,并在活跃事务中列入确认作
6 Listener container (via交易模板) 提交事务

如果你没有使用,那@PulsarListener而非直接使用监听器容器,则提供如上所述的相同事务支持。 记住,那个@PulsarListener只是方便地注册一个Java方法作为监听器容器消息监听器。spring-doc.cadn.net.cn

与唱片听众的交易

上面的例子使用了记录监听器。 使用记录监听器时,每调用一次监听器方法都会创建一个新的事务,相当于每条消息的一笔事务。spring-doc.cadn.net.cn

由于事务边界是每条消息的,且每个消息确认都包含在每个事务中,批处理确认模式不能用于事务记录监听器。

与批处理监听器的事务

使用批处理监听器时,每调用一次监听器方法调用都会创建一个新的事务,相当于每批消息处理一个事务。spring-doc.cadn.net.cn

事务批处理监听器目前不支持自定义错误处理程序。

配置

监听器容器工厂

以下交易设置可直接在脉冲星容器属性被以下机构使用ConcurrentPulsarListenerContainerFactory在创建监听器容器时。 这些设置影响所有监听器容器,包括@PulsarListener.spring-doc.cadn.net.cn

不使用Spring Boot时,你可以在你提供的容器出厂设置中调整这些设置。 然而,使用Spring Boot时,容器工厂是自动配置的。 在这种情况下,你可以注册一个org.springframework.boot.autoconfigure.pulsar.PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>>BEAN 用于访问和自定义容器属性。 以下示例展示了如何设置容器工厂的超时:spring-doc.cadn.net.cn

@Bean
PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>> containerCustomizer() {
    return (containerFactory) -> containerFactory.getContainerProperties().transactions().setTimeout(Duration.ofSeconds(45));
}

@PulsarListener

默认情况下,每个监听器都尊重其对应监听器容器工厂的事务设置。 然而,用户可以设置事务每个 属性@PulsarListener如何覆盖容器出厂设置如下:spring-doc.cadn.net.cn

  • 如果容器工厂启用了交易,那么交易性 = 错误将为个人监听者禁用事务。spring-doc.cadn.net.cn

  • 如果容器工厂启用且要求事务,则尝试交易性 = 错误将导致抛出异常,说明需要交易。spring-doc.cadn.net.cn

  • 如果容器工厂禁用了交易,那么尝试事务型 = 真将被忽略并记录警告。spring-doc.cadn.net.cn

PulsarTransactionManager

PulsarTransactionManager是 Spring Framework 的实现PlatformTransactionManager. 你可以使用PulsarTransactionManager支持正常的 Spring 事务 (@Transactional,交易模板,以及其他。spring-doc.cadn.net.cn

如果交易是活跃的,任何脉冲星模板在交易范围内执行的作,登记并参与正在进行的交易。 管理者根据成功或失败来提交或回滚该事务。spring-doc.cadn.net.cn

你大概不需要使用PulsarTransactionManager直接,因为大多数事务性用例都涵盖于脉冲星模板@PulsarListener.

与其他事务管理器的脉冲星交易

仅生产者交易

如果你想将记录发送到 Pulsar,并在单一事务中进行一些数据库更新,你可以使用普通的 Spring 事务管理,配合DataSourceTransactionManager.spring-doc.cadn.net.cn

以下示例假设存在DataSourceTransactionManagerBean 注册时名为“dataSourceTransactionManager”
@Transactional("dataSourceTransactionManager")
public void myServiceMethod() {
    var msg = calculateMessage();
    this.pulsarTemplate.send("my-topic", msg);
    this.jdbcTemplate.execute("insert into my_table (data) values ('%s')".formatted(msg));
}

拦截机用于@Transactional注释启动数据库事务,并且脉冲星模板将与数据库事务管理器同步事务;每个发送者都会参与该交易。 当方法退出时,数据库事务将提交,随后是 Pulsar 事务。spring-doc.cadn.net.cn

如果你想先提交 Pulsar 事务,只有在 Pulsar 事务成功时才提交数据库事务,可以使用嵌套@Transactional方法,外层方法配置为使用DataSourceTransactionManager,以及配置为使用PulsarTransactionManager.spring-doc.cadn.net.cn

@Transactional("dataSourceTransactionManager")
public void myServiceMethod() {
    var msg = calculateMessage();
    this.jdbcTemplate.execute("insert into my_table (data) values ('%s')".formatted(msg));
    this.sendToPulsar(msg);
}

@Transactional("pulsarTransactionManager")
public void sendToPulsar(String msg) {
    this.pulsarTemplate.send("my-topic", msg);
}

消费者+生产者交易

如果你想从Pulsar中获取记录,向Pulsar发送记录,并在事务中进行一些数据库更新,你可以结合普通的Spring事务管理(使用DataSourceTransactionManager)通过容器发起的交易。spring-doc.cadn.net.cn

在以下示例中,监听器容器启动了 Pulsar 事务,并且@Transactional注释启动数据库事务。 数据库事务首先提交;如果 Pulsar 事务未能提交,记录将被重新交付,因此数据库更新应为幂等元。spring-doc.cadn.net.cn

@PulsarListener(topics = "my-input-topic")
@Transactional("dataSourceTransactionManager")
void listen(String msg) {
    var transformedMsg = msg.toUpperCase(Locale.ROOT);
    this.pulsarTemplate.send("my-output-topic", transformedMsg);
    this.jdbcTemplate.execute("insert into my_table (data) values ('%s')".formatted(transformedMsg));
}