事务
本部分描述了Spring for Apache Kafka如何支持事务。
概述
0.11.0.0 客户端库增加了对事务的支持。 Spring for Apache Kafka 通过以下方式提供支持:
-
KafkaTransactionManager: 用于正常的 Spring 事务支持 (@Transactional,TransactionTemplate, 等等) -
事务性
KafkaMessageListenerContainer -
使用 0 的本地事务
-
与其他事务管理器的事务同步
事务通过提供带有DefaultKafkaProducerFactory的transactionIdPrefix来启用。
在这种情况下,而不是管理一个共享的Producer,工厂维护一个事务性生产者的缓存。
当用户在生产者上调用close()时,生产者会返回到缓存以便重用,而不是实际关闭。
每个生产者的transactional.id属性为transactionIdPrefix+n,其中n从0开始并为每个新生产者递增。
在Spring for Apache Kafka的先前版本中,由监听器容器使用基于记录的监听器启动的事务的transactional.id生成方式不同,以支持僵尸 fencing,这在新的方式下已不再需要,从3.0开始EOSMode.V2是唯一可用的选项。
对于运行在多个实例中的应用程序,transactionIdPrefix必须在每个实例之间唯一。
也请参见 Exactly Once 语义。
也请参见 transactionIdPrefix。
使用 Spring Boot,只需设置 spring.kafka.producer.transaction-id-prefix 属性 - Spring Boot 将会自动配置一个 KafkaTransactionManager 组件并将其连接到监听器容器。
从 2.5.8 版本开始,你现在可以在生产者工厂上配置 maxAge 属性。
这在使用可能在经纪人 transactional.id.expiration.ms 的事务性生产者时很有用。
在当前的 kafka-clients 情况下,这可能导致在没有重新平衡的情况下出现 ProducerFencedException。
通过将 maxAge 设置为小于 transactional.id.expiration.ms,如果生产者超过其最大年龄,工厂将刷新生产者。 |
使用KafkaTransactionManager
The KafkaTransactionManager 是 Spring Framework 的 PlatformTransactionManager 的实现。
它在构造函数中提供对生产者工厂的引用。
如果你提供自定义的生产者工厂,它必须支持事务。
参见 ProducerFactory.transactionCapable()。
You can use the KafkaTransactionManager with normal Spring transaction support (@Transactional, TransactionTemplate, and others).
If a transaction is active, any KafkaTemplate operations performed within the scope of the transaction use the transaction’s Producer.
The manager commits or rolls back the transaction, depending on success or failure.
You must configure the KafkaTemplate to use the same ProducerFactory as the transaction manager
事务同步
本部分指的是生产者发起的事务(由监听器容器启动的事务之外的事务);有关容器启动事务时的链式事务信息,请参见 使用消费者发起的事务。
如果您想要将记录发送到kafka并执行一些数据库更新,您可以使用正常的Spring事务管理,例如使用一个 DataSourceTransactionManager。
@Transactional
public void process(List<Thing> things) {
things.forEach(thing -> this.kafkaTemplate.send("topic", thing));
updateDb(things);
}
拦截器对于注解@Transactional开始事务,而KafkaTemplate将与该事务管理器同步一个事务;每个发送都将参与该事务。
当方法退出时,数据库事务将提交,随后是Kafka事务。
如果您希望按相反的顺序执行提交(先Kafka后数据库),请使用嵌套的@Transactional方法,外层方法配置为使用DataSourceTransactionManager,内层方法配置为使用KafkaTransactionManager。
见 与其他事务管理器结合的Kafka事务示例 ,了解同步JDBC和Kafka事务的应用示例,适用于Kafka优先或数据库优先的配置。
| 从 2.5.17、2.6.12、2.7.9 和 2.8.0 版本开始,如果在主事务已提交后,同步事务的提交失败,异常将抛出到调用方。 此前,该情况会被静默忽略(记录为调试日志)。 应用程序应根据需要采取补救措施,以弥补已提交的主事务。 |
使用消费者发起的事务
The ChainedKafkaTransactionManager 已在 2.7 版本中弃用;有关其超类 ChainedTransactionManager 的更多信息,请参见 JavaDocs。
替代方案是使用 KafkaTransactionManager 在容器中启动 Kafka 事务,并使用 @Transactional 注解监听器方法以启动另一个事务。
查看 与其他事务管理器结合的Kafka事务示例 以获取一个示例应用程序,该应用程序串联JDBC和Kafka事务。
KafkaTemplate本地事务
你可以使用 KafkaTemplate 在本地事务中执行一系列操作。
以下示例展示了如何操作:
boolean result = template.executeInTransaction(t -> {
t.sendDefault("thing1", "thing2");
t.sendDefault("cat", "hat");
return true;
});
The argument in the callback is the template itself (this).
If the callback exits normally, the transaction is committed.
If an exception is thrown, the transaction is rolled back。
如果正在处理一个KafkaTransactionManager(或同步化的)事务,则不使用它。
相反,将使用一个新的"嵌套"事务。 |
TransactionIdPrefix
With EOSMode.V2(aka BETA),唯一的支持模式是,无需再使用相同的 transactional.id,甚至对于由消费者发起的事务;事实上,它必须在每个实例上都是唯一的,就像对于生产者发起的事务一样。
该属性必须在每个应用程序实例上具有不同的值。
TransactionIdSuffix Fixed
自 3.2 版本起,引入了一个新的 TransactionIdSuffixStrategy 接口来管理 transactional.id 后缀。
当设置的 maxCache 大于零时,默认实现为 DefaultTransactionIdSuffixStrategy,可以在特定范围内重用 transactional.id,否则将通过递增计数器动态生成后缀。
当事务生产者请求一个 transactional.id 而所有 transactional.id 后缀都已使用时,将抛出 NoProducerAvailableException。
用户可以配置一个重试策略 RetryTemplate 来重试该异常,并使用适当的退避策略进行配置。
public static class Config {
@Bean
public ProducerFactory<String, String> myProducerFactory() {
Map<String, Object> configs = producerConfigs();
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
...
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
...
TransactionIdSuffixStrategy ss = new DefaultTransactionIdSuffixStrategy(5);
pf.setTransactionIdSuffixStrategy(ss);
return pf;
}
}
当设置maxCache为5时,transactional.id是my.txid.+`{0-4}`。
在使用 KafkaTransactionManager 与 ConcurrentMessageListenerContainer 且启用 maxCache 时,需要将 maxCache 设置为不小于 concurrency 的值。
如果 MessageListenerContainer 无法获取 transactional.id 前缀,将抛出 NoProducerAvailableException。
在使用 ConcurrentMessageListenerContainer 的嵌套事务时,需要调整 maxCache 设置以处理嵌套事务数量的增加。 |
KafkaTemplate事务性与非事务性发布
通常情况下,当一个 KafkaTemplate 是事务性的(配置了具有事务能力的生产者工厂)时,需要事务。
事务可以通过 TransactionTemplate、@Transactional 方法、调用 executeInTransaction 或通过监听器容器来启动,前提是已使用 KafkaTransactionManager 进行配置。
任何尝试在事务范围外使用模板的操作都会导致模板抛出 IllegalStateException。
从版本 2.4.3 开始,您可以设置模板的 allowNonTransactional 属性为 true。
在这种情况下,模板将在没有事务的情况下允许操作运行,方法是调用 ProducerFactory 的 createNonTransactionalProducer() 方法;生产者将像平时一样被缓存或绑定到线程上,以便重复使用。
参见 DefaultKafkaProducerFactory。
带有批处理监听器的事务
当事务过程中监听器出现故障时,会调用AfterRollbackProcessor来在回滚操作完成后采取一些行动。
使用默认的AfterRollbackProcessor和记录监听器时,会进行重试以重新发送失败的记录。
然而,对于批次监听器来说,整个批次都会被重新发送,因为框架不知道批次中的哪条记录失败了。
有关更多信息,请参见回滚后处理程序。
使用批处理监听器时,版本 2.4.2 引入了一种替代机制来处理批处理过程中的失败:BatchToRecordAdapter。当配置了 batchListener 为 true 的容器工厂与 BatchToRecordAdapter 结合使用时,监听器会逐条记录调用。
这使得可以在批处理中进行错误处理,同时仍然可能根据异常类型停止整个批处理的处理。
提供了一个默认的 BatchToRecordAdapter,可以使用标准的 ConsumerRecordRecoverer 进行配置,例如 DeadLetterPublishingRecoverer。
以下测试用例配置片段说明了如何使用此功能:
public static class TestListener {
final List<String> values = new ArrayList<>();
@KafkaListener(id = "batchRecordAdapter", topics = "test")
public void listen(String data) {
values.add(data);
if ("bar".equals(data)) {
throw new RuntimeException("reject partial");
}
}
}
@Configuration
@EnableKafka
public static class Config {
ConsumerRecord<?, ?> failed;
@Bean
public TestListener test() {
return new TestListener();
}
@Bean
public ConsumerFactory<?, ?> consumerFactory() {
return mock(ConsumerFactory.class);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setBatchToRecordAdapter(new DefaultBatchToRecordAdapter<>((record, ex) -> {
this.failed = record;
}));
return factory;
}
}