交易活页夹
通过设置来启用交易spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix变为非空值,例如:谢谢-.
在处理器应用中使用时,消费者开始交易;发送到消费者线程的任何记录都参与同一事务。
当监听器正常退出时,监听器容器会将偏移量发送到事务并提交。
所有生产者绑定都使用一个共同生产工厂,配置为spring.cloud.stream.kafka.binder.transaction.producer.*性能;忽略单个绑定卡夫卡生产者属性。
普通的活页夹重试(以及死字母)不支持交易,因为重试会在原始交易中运行,原始交易可能会被回滚,任何已发布的记录也会被回滚。
当重试被启用时(这是共同的特性)最大尝试次数大于 0)重试属性用于配置DefaultAfterRollback处理器以启用容器级别的重试。
同样,该功能不再在事务中发布死符记录,而是通过DefaultAfterRollback处理器该程序在主事务回滚后运行。 |
如果你想在源应用中使用事务,或者从某个任意线程中用于仅生产者事务(例如,@Scheduled方法),你必须获得交易生产者工厂的引用并定义一个KafkaTransactionManager豆子在用它。
@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders,
@Value("${unique.tx.id.per.instance}") String txId) {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
tm.setTransactionId(txId)
return tm;
}
注意我们通过装订工坊;用零在第一个论元中,当只有一个活页夹配置时,
如果配置了多个活页夹,使用活页夹名称来获取引用。
一旦我们有了对活写器的引用,就可以获得对生产工厂并创建事务管理器。
然后你会使用正常的Spring事务支持,例如:交易模板或@Transactional例如:
public static class Sender {
@Transactional
public void doInTransaction(MessageChannel output, List<String> stuffToSend) {
stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff)));
}
}
如果你想将仅生产者事务与其他事务管理器的事务同步,可以使用ChainedTransactionManager.
如果你部署多个应用程序实例,每个实例都需要一个唯一的实例transactionId前缀. |
Kafka 事务中的异常重试行为
默认重试行为
这DefaultAfterRollback处理器确定哪些例外在交易回滚后触发重试。
默认情况下,所有异常都会被重试,但你可以修改以下行为:
spring:
cloud:
stream:
kafka:
bindings:
<binding-name>:
consumer:
defaultRetryable: false # Change default to NOT retry exceptions
什么时候default可重试设置为false这DefaultAfterRollback处理器将配置为defaultFalse(true),意味着除非明确配置为可重试,否则不会重试异常。
例外特定配置
对于细粒度控制,你可以为各个例外类型指定重试行为:
spring:
cloud:
stream:
kafka:
bindings:
<binding-name>:
consumer:
retryableExceptions:
java.lang.IllegalStateException: true # Always retry this exception
java.lang.IllegalArgumentException: false # Never retry this exception
这DefaultAfterRollback处理器将使用addRetryableExceptions()对于标记为true和addNotRetryableExceptions()对于标记为false.
这些针对异常的配置优先于默认行为。