此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.9spring-doc.cadn.net.cn

交易

本节介绍 Spring for Apache Kafka 如何支持事务。spring-doc.cadn.net.cn

概述

0.11.0.0 客户端库添加了对事务的支持。 Spring for Apache Kafka 通过以下方式添加了支持:spring-doc.cadn.net.cn

通过提供DefaultKafkaProducerFactory使用transactionIdPrefix. 在这种情况下,而不是管理单个共享Producer,工厂维护事务生产者的缓存。 当用户调用close()在生产者上,它被返回到缓存中以供重用,而不是实际关闭。 这transactional.id每个生产者的属性是transactionIdPrefix + n哪里n开头为0并且会为每个新生产者递增。 在以前版本的 Spring for Apache Kafka 中,transactional.id对于由侦听器容器启动的事务,使用基于记录的侦听器以不同的方式生成,以支持隔离僵尸,这不再需要了,使用EOSMode.V2是从 3.0 开始的唯一选项。 对于使用多个实例运行的应用程序,transactionIdPrefix每个实例必须是唯一的。spring-doc.cadn.net.cn

使用 Spring Boot,只需将spring.kafka.producer.transaction-id-prefix属性 - Spring Boot 将自动配置一个KafkaTransactionManagerbean 并将其连接到侦听器容器中。spring-doc.cadn.net.cn

从 2.5.8 版开始,您现在可以配置maxAge生产者工厂的财产。 当使用可能为代理的transactional.id.expiration.ms. 与电流kafka-clients,这可能会导致ProducerFencedException无需重新平衡。 通过设置maxAge小于transactional.id.expiration.ms,如果生产商超过其最大使用年限,工厂将刷新生产商。

KafkaTransactionManager

KafkaTransactionManager是 Spring Framework 的PlatformTransactionManager. 在其构造函数中提供了对生产者工厂的引用。 如果您提供自定义生产者工厂,则它必须支持事务。 看ProducerFactory.transactionCapable().spring-doc.cadn.net.cn

您可以使用KafkaTransactionManager具有正常的 Spring 事务支持(@Transactional,TransactionTemplate等)。 如果事务处于活动状态,则任何KafkaTemplate在事务范围内执行的作使用事务的Producer. 管理器根据成功或失败提交或回滚事务。 您必须配置KafkaTemplate使用相同的ProducerFactory作为事务管理器。spring-doc.cadn.net.cn

事务同步

本节涉及仅生产者事务(不是由侦听器容器启动的事务);有关在容器启动事务时链接事务的信息,请参阅使用使用者发起的事务。spring-doc.cadn.net.cn

如果你想将记录发送到 kafka 并执行一些数据库更新,你可以使用普通的 Spring 事务管理,比如说,一个DataSourceTransactionManager.spring-doc.cadn.net.cn

@Transactional
public void process(List<Thing> things) {
    things.forEach(thing -> this.kafkaTemplate.send("topic", thing));
    updateDb(things);
}

的拦截器@Transactional注释启动事务,并且KafkaTemplate将事务与该事务管理器同步;每次发送都会参与该交易。 当该方法退出时,数据库事务将提交,然后是 Kafka 事务。 如果您希望以相反的顺序执行提交(首先是 Kafka),请使用 嵌套@Transactional方法,外部方法配置为使用DataSourceTransactionManager,并且配置为使用KafkaTransactionManager.spring-doc.cadn.net.cn

有关在 Kafka 优先或 DB 优先配置中同步 JDBC 和 Kafka 事务的应用程序示例,请参阅 Kafka 事务与其他事务管理器的示例spring-doc.cadn.net.cn

从版本 2.5.17、2.6.12、2.7.9 和 2.8.0 开始,如果在同步事务上提交失败(在主事务提交之后),则将向调用者抛出异常。 以前,这被静默忽略(在调试级别记录)。 如有必要,应用程序应采取补救措施来补偿已提交的主事务。

使用使用者发起的事务

ChainedKafkaTransactionManager现在已弃用,从 2.7 版开始;请参阅 JavaDocs 的超类ChainedTransactionManager了解更多信息。 相反,请使用KafkaTransactionManager启动 Kafka 事务,并使用@Transactional以启动另一笔事务。spring-doc.cadn.net.cn

有关链接 JDBC 和 Kafka 事务的示例应用程序,请参阅 Kafka 事务与其他事务管理器的示例spring-doc.cadn.net.cn

非阻塞重试不能与容器事务结合使用。 当监听器代码抛出异常时,容器事务提交成功,并将记录发送到可重试主题。

KafkaTemplate本地交易

您可以使用KafkaTemplate在本地事务中执行一系列作。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

boolean result = template.executeInTransaction(t -> {
    t.sendDefault("thing1", "thing2");
    t.sendDefault("cat", "hat");
    return true;
});

回调中的参数是模板本身(this). 如果回调正常退出,则事务被提交。 如果抛出异常,则事务将回滚。spring-doc.cadn.net.cn

如果有KafkaTransactionManager(或同步的)事务正在进行中,则不使用。 相反,使用新的“嵌套”事务。

TransactionIdPrefix

EOSMode.V2(又名BETA),唯一支持的模式,不再需要使用相同的transactional.id,即使是消费者发起的交易;事实上,它在每个实例上必须是唯一的,就像生产者发起的事务一样。 此属性在每个应用程序实例上必须具有不同的值。spring-doc.cadn.net.cn

TransactionIdSuffix Fixed

从 3.2 开始,新的TransactionIdSuffixStrategy引入界面来管理transactional.id后缀。 默认实现是DefaultTransactionIdSuffixStrategy设置时maxCache大于零可以重用transactional.id在特定范围内,否则将通过递增计数器动态生成后缀。 当请求事务生产者时,并且transactional.id全部使用中,抛出一个NoProducerAvailableException. 然后,用户可以使用RetryTemplate配置为重试该异常,并适当配置回退。spring-doc.cadn.net.cn

public static class Config {

    @Bean
    public ProducerFactory<String, String> myProducerFactory() {
        Map<String, Object> configs = producerConfigs();
        configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
        ...
        DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
        ...
        TransactionIdSuffixStrategy ss = new DefaultTransactionIdSuffixStrategy(5);
        pf.setTransactionIdSuffixStrategy(ss);
        return pf;
    }

}

设置maxCache到 5,transactional.idmy.txid.+'{0-4}'。spring-doc.cadn.net.cn

使用时KafkaTransactionManager使用ConcurrentMessageListenerContainer并启用maxCache,需要设置maxCache设置为大于或等于concurrency. 如果MessageListenerContainer无法获得transactional.id后缀,它将抛出一个NoProducerAvailableException. 在ConcurrentMessageListenerContainer,则需要调整 maxCache 设置以处理增加的嵌套事务数量。

KafkaTemplate事务性和非事务性发布

通常,当KafkaTemplate是事务性的(配置了支持事务的生产者工厂),则需要事务。 事务可以通过TransactionTemplate一个@Transactional方法, 调用executeInTransaction,或通过侦听器容器,当配置了KafkaTransactionManager. 任何在事务范围之外使用模板的尝试都会导致模板抛出IllegalStateException. 从 2.4.3 版开始,您可以将模板的allowNonTransactional属性设置为true. 在这种情况下,模板将允许作在没有事务的情况下运行,方法是调用ProducerFactorycreateNonTransactionalProducer()方法;生产者将像往常一样被缓存或线程绑定,以便重用。 看DefaultKafkaProducerFactory.spring-doc.cadn.net.cn

与批处理侦听器的交易

当侦听器在使用事务时失败时,将AfterRollbackProcessor在回滚发生后调用以执行某些作。 使用默认值AfterRollbackProcessor使用记录侦听器时,将执行搜索,以便重新传递失败的记录。 但是,使用批处理侦听器时,将重新传递整个批处理,因为框架不知道批处理中的哪条记录失败了。 有关详细信息,请参阅回滚后处理器spring-doc.cadn.net.cn

使用批处理侦听器时,版本 2.4.2 引入了一种替代机制来处理处理批处理时的故障:BatchToRecordAdapter. 当集装箱工厂batchListener设置为 true 配置了BatchToRecordAdapter,则一次调用一条记录。 这可以在批处理中进行错误处理,同时仍然可以停止处理整个批次,具体取决于异常类型。 默认BatchToRecordAdapter提供,可以配置标准ConsumerRecordRecoverer例如DeadLetterPublishingRecoverer. 以下测试用例配置片段说明了如何使用此功能:spring-doc.cadn.net.cn

public static class TestListener {

    final List<String> values = new ArrayList<>();

    @KafkaListener(id = "batchRecordAdapter", topics = "test")
    public void listen(String data) {
        values.add(data);
        if ("bar".equals(data)) {
            throw new RuntimeException("reject partial");
        }
    }

}

@Configuration
@EnableKafka
public static class Config {

    ConsumerRecord<?, ?> failed;

    @Bean
    public TestListener test() {
        return new TestListener();
    }

    @Bean
    public ConsumerFactory<?, ?> consumerFactory() {
        return mock(ConsumerFactory.class);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        factory.setBatchToRecordAdapter(new DefaultBatchToRecordAdapter<>((record, ex) ->  {
            this.failed = record;
        }));
        return factory;
    }

}