交易活页夹

通过设置来启用交易spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix变为非空值,例如:谢谢-. 在处理器应用中使用时,消费者开始交易;发送到消费者线程的任何记录都参与同一事务。 当监听器正常退出时,监听器容器会将偏移量发送到事务并提交。 所有生产者绑定都使用一个共同生产工厂,配置为spring.cloud.stream.kafka.binder.transaction.producer.*性能;忽略单个绑定卡夫卡生产者属性。spring-doc.cadn.net.cn

普通的活页夹重试(以及死字母)不支持交易,因为重试会在原始交易中运行,原始交易可能会被回滚,任何已发布的记录也会被回滚。 当重试被启用时(这是共同的特性)最大尝试次数大于 0)重试属性用于配置DefaultAfterRollback处理器以启用容器级别的重试。 同样,该功能不再在事务中发布死符记录,而是通过DefaultAfterRollback处理器该程序在主事务回滚后运行。

如果你想在源应用中使用事务,或者从某个任意线程中用于仅生产者事务(例如,@Scheduled方法),你必须获得交易生产者工厂的引用并定义一个KafkaTransactionManager豆子在用它。spring-doc.cadn.net.cn

@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders,
        @Value("${unique.tx.id.per.instance}") String txId) {

    ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
            MessageChannel.class)).getTransactionalProducerFactory();
    KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
    tm.setTransactionId(txId)
    return tm;
}

注意我们通过装订工坊;用在第一个论元中,当只有一个活页夹配置时, 如果配置了多个活页夹,使用活页夹名称来获取引用。 一旦我们有了对活写器的引用,就可以获得对生产工厂并创建事务管理器。spring-doc.cadn.net.cn

然后你会使用正常的Spring事务支持,例如:交易模板@Transactional例如:spring-doc.cadn.net.cn

public static class Sender {

    @Transactional
    public void doInTransaction(MessageChannel output, List<String> stuffToSend) {
        stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff)));
    }

}

如果你想将仅生产者事务与其他事务管理器的事务同步,可以使用ChainedTransactionManager.spring-doc.cadn.net.cn

如果你部署多个应用程序实例,每个实例都需要一个唯一的实例transactionId前缀.

Kafka 事务中的异常重试行为

配置事务回滚重试行为

在处理Kafka事务中的消息时,你可以配置事务回滚后应重试哪些异常,使用以下方式default可重试财产和retryableExceptions地图。spring-doc.cadn.net.cn

默认重试行为

DefaultAfterRollback处理器确定哪些例外在交易回滚后触发重试。 默认情况下,所有异常都会被重试,但你可以修改以下行为:spring-doc.cadn.net.cn

spring:
 cloud:
   stream:
     kafka:
       bindings:
         <binding-name>:
           consumer:
             defaultRetryable: false  # Change default to NOT retry exceptions

什么时候default可重试设置为falseDefaultAfterRollback处理器将配置为defaultFalse(true),意味着除非明确配置为可重试,否则不会重试异常。spring-doc.cadn.net.cn

例外特定配置

对于细粒度控制,你可以为各个例外类型指定重试行为:spring-doc.cadn.net.cn

spring:
 cloud:
   stream:
     kafka:
       bindings:
         <binding-name>:
           consumer:
             retryableExceptions:
               java.lang.IllegalStateException: true    # Always retry this exception
               java.lang.IllegalArgumentException: false  # Never retry this exception

DefaultAfterRollback处理器将使用addRetryableExceptions()对于标记为trueaddNotRetryableExceptions()对于标记为false. 这些针对异常的配置优先于默认行为。spring-doc.cadn.net.cn

实现细节