技巧、窍门和Recipes
与卡夫卡的简单DLQ
问题陈述
作为开发者,我想编写一个消费者应用程序来处理来自 Kafka 主题的记录。然而,如果处理过程中出现错误,我不希望应用程序完全停止。相反,我想将错误记录发送到死信主题(DLT),然后继续处理新的记录。
溶液
解决这个问题的方法是使用Spring Cloud Stream中的DLQ功能。为了讨论的目的,我们假设以下是处理器函数。
@Bean
public Consumer<byte[]> processData() {
return s -> {
throw new RuntimeException();
};
}
这是一个非常简单的函数,会对处理的所有记录抛出异常,但你可以将这个函数扩展到其他类似情况。
为了将错误记录发送到DLT,我们需要提供以下配置。
spring.cloud.stream:
bindings:
processData-in-0:
group: my-group
destination: input-topic
kafka:
bindings:
processData-in-0:
consumer:
enableDlq: true
dlqName: input-topic-dlq
为了激活DLQ,应用程序必须提供组名。
匿名消费者无法使用DLQ设施。
我们还需要通过设置enableDLQ卡夫卡消费者的属性绑定于true.
最后,我们可以通过提供dlqName在 Kafka 消费者绑定上,否则默认为error.input-topic.my-group在这种情况下。
注意,在上述示例消费者中,有效载荷类型为字节[].
默认情况下,Kafka binder 中的 DLQ 生成器期望 为 的有效载荷字节[].
如果不是这样,我们需要提供合适的串行器配置。
例如,我们将消费者函数重写如下:
@Bean
public Consumer<String> processData() {
return s -> {
throw new RuntimeException();
};
}
现在,我们需要告诉 Spring Cloud Stream,写入 DLT 时我们想如何序列化数据。 以下是该场景的修改配置:
spring.cloud.stream:
bindings:
processData-in-0:
group: my-group
destination: input-topic
kafka:
bindings:
processData-in-0:
consumer:
enableDlq: true
dlqName: input-topic-dlq
dlqProducerProperties:
configuration:
value.serializer: org.apache.kafka.common.serialization.StringSerializer
DLQ带高级重试选项
溶液
如果你按照上面的配方作,那么当处理遇到错误时,卡夫卡绑定器里会内置默认的重试选项。
默认情况下,绑定器最多可尝试3次,初始延迟1秒,每次后退时2.0倍数,最大延迟10秒。 你可以像下面一样更改所有这些配置:
spring.cloud.stream.bindings.processData-in-0.consumer.maxAttempts
spring.cloud.stream.bindings.processData-in-0.consumer.backOffInitialInterval
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMultiplier
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMaxInterval
如果你愿意,也可以通过提供布尔值的映射来提供可重试的异常列表。 例如
spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalStateException=true
spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalArgumentException=false
默认情况下,地图上未列出的任何例外情况将被重试。 如果不希望这样,你可以通过提供以下方式禁用它,
spring.cloud.stream.bindings.processData-in-0.consumer.defaultRetryable=false
你也可以提供自己的重试模板并标记为@StreamRetryTemplate这些文件将被扫描并由活页夹使用。
当你需要更复杂的重试策略和策略时,这非常有用。
如果你有多重@StreamRetryTemplate然后你可以通过以下属性指定绑定想要的 Beans,
spring.cloud.stream.bindings.processData-in-0.consumer.retry-template-name=<your-retry-template-bean-name>
处理使用 DLQ 的反序列化错误
溶液
Spring Cloud Stream 提供的常规 DLQ 机制在 Kafka 用户抛出不可恢复的反序列化异常时无法解决问题。
这是因为,这种例外甚至在消费者poll()方法返回。
Spring for Apache Kafka 项目提供了一些很好的方法来帮助绑装师处理这种情况。
让我们来探讨这些。
假设这是我们的函数:
@Bean
public Consumer<String> functionName() {
return s -> {
System.out.println(s);
};
}
这是一个平凡函数,具有字符串参数。
我们想绕过 Spring Cloud Stream 提供的消息转换器,改用原生的反串化器。
在字符串类型方面,这不太合理,但对于像AVRO等更复杂的类型,你必须依赖外部解串器,因此需要委托转换到Kafka。
现在当消费者收到数据时,假设存在一个错误记录导致反序列化错误,可能有人传递了整数而不是字符串例如。
在这种情况下,如果你在应用里没有做某件事,异常会在链条中传播,最终你的应用会退出。
为了处理这个问题,你可以添加一个ListenerContainerCustomizer @Bean该配置为默认错误处理.
这默认错误处理配置为死信出版恢复者.
我们还需要配置一个ErrorHandlingDeserializer对消费者来说。
这听起来很复杂,但实际上,这里的三颗豆子就很简单了。
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(DefaultErrorHandler errorHandler) {
return (container, dest, group) -> {
container.setCommonErrorHandler(errorHandler);
};
}
@Bean
public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
return new DefaultErrorHandler(deadLetterPublishingRecoverer);
}
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaOperations bytesTemplate) {
return new DeadLetterPublishingRecoverer(bytesTemplate);
}
让我们逐一分析。
第一个是ListenerContainerCustomizer豆子需要一个默认错误处理.
容器现在已经用该错误处理程序进行了定制。
你可以在这里了解更多关于容器定制的信息。
第二颗豆子是默认错误处理该配置为发布到双重学习技术.
更多详情请见此处默认错误处理.
第三颗豆子是死信出版恢复者最终负责发送给双重学习技术. 默认情况下,双重学习技术主题被命名为ORIGINAL_TOPIC_NAME。DLT。不过你可以更改这个名。详情请参见文档。
我们还需要通过应用配置配置ErrorHandlingDe串行器。
这ErrorHandlingDeserializer委派给实际的反串行器。如果出现错误,它会将记录的键/值设置为空,并包含消息的原始字节。然后将异常设置为头部,并将该记录传递给监听器,监听者再调用注册的错误处理程序。
以下是所需的配置:
spring.cloud.stream:
function:
definition: functionName
bindings:
functionName-in-0:
group: group-name
destination: input-topic
consumer:
use-native-decoding: true
kafka:
bindings:
functionName-in-0:
consumer:
enableDlq: true
dlqName: dlq-topic
dlqProducerProperties:
configuration:
value.serializer: org.apache.kafka.common.serialization.StringSerializer
configuration:
value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
我们正在提供ErrorHandlingDeserializer通过配置绑定上的属性。我们还表示实际要委托的反串行化器是字符串解串器.
请记住,上述DLQ属性与本方案讨论无关。它们纯粹用于解决应用层面的错误。
Kafka 绑定器中的基础偏移管理
溶液
我们鼓励你阅读相关文档部分,以全面理解。
大致内容如下:
Kafka 默认支持两种初始偏移类型——最早和最近的. 它们的语义从名字中就能理解。
假设你是第一次运行消费者。如果你在 Spring Cloud Stream 应用中错过了 group.id,那么它就会变成匿名消费者。无论你有匿名消费者,在这种情况下,Spring Cloud Stream 应用默认会从最近的主题分区中的可用偏移量。另一方面,如果你明确指定 group.id,那么默认情况下,Spring Cloud Stream 应用将从最早主题分区中的可用偏移量。
在上述两种情况(具有显式组和匿名组的消费者)中,起始偏移量可以通过使用以下属性进行交换spring.cloud.stream.kafka.bindings.<binding-name>.consumer.startOffset并设置为最早或最近的.
现在,假设你之前已经运行了消费者,现在重新开始。在这种情况下,上述起始偏移语义不适用,因为消费者找到了消费者组已承诺的偏移量(对于匿名消费者,虽然应用程序没有提供 group.id,绑定器会自动为你生成一个)。它只是从最后一个提交的偏移量开始。即使你有startOffset价值被提供。
不过,你可以通过使用resetOffsets财产。 为此,设置属性spring.cloud.stream.kafka.bindings.<binding-name>.consumer.resetOffsets自true(即false默认情况下)。然后确保你提供startOffset值(或最早或最近的). 当你这样做后再启动消费者应用程序时,每次启动时,它都像第一次启动一样开始,忽略分区的任何已提交的偏移量。
在卡夫卡中寻求任意抵消
溶液
之前我们见过 Kafka Binder 如何让你处理基础的偏移管理。默认情况下,Binder 不允许你倒带到任意偏移,至少通过我们在那个配方中看到的机制是这样。不过,Binder 提供了一些低层次的策略来实现这个用例。让我们来探讨一下它们。
首先,当你想重置到任意偏移量时,除了最早或最近的,确保离开resetOffsets配置到其默认值,即false. 然后你必须提供一种定制的咖啡豆KafkaBindingRebalanceListener,将注入所有消费者绑定。这是一个带有一些默认方法的接口,但我们感兴趣的方法如下:
/**
* Invoked when partitions are initially assigned or after a rebalance. Applications
* might only want to perform seek operations on an initial assignment. While the
* 'initial' argument is true for each thread (when concurrency is greater than 1),
* implementations should keep track of exactly which partitions have been sought.
* There is a race in that a rebalance could occur during startup and so a topic/
* partition that has been sought on one thread may be re-assigned to another
* thread and you may not wish to re-seek it at that time.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
* @param initial true if this is the initial assignment on the current thread.
*/
default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
Collection<TopicPartition> partitions, boolean initial) {
// do nothing
}
让我们来看看细节。
本质上,该方法将在主题划分的初始分配或重新平衡后每次调用。
为了更好地说明,我们假设我们的主题是福它有4个分区。
最初,我们只在组中启动一个消费者,这个消费者会从所有分区消费。
当消费者首次启动时,所有4个分区都会被初始分配。
然而,我们不希望分区从默认值开始消耗(最早由于我们定义了一个群),而是对于每个划分,我们希望它们在寻找任意偏移量后消耗。
想象一下,你有一个商业理由,可以从某些抵消中取用,如下所示。
Partition start offset
0 1000
1 2000
2 2000
3 1000
这可以通过实现上述方法实现。
@Override
public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions, boolean initial) {
Map<TopicPartition, Long> topicPartitionOffset = new HashMap<>();
topicPartitionOffset.put(new TopicPartition("foo", 0), 1000L);
topicPartitionOffset.put(new TopicPartition("foo", 1), 2000L);
topicPartitionOffset.put(new TopicPartition("foo", 2), 2000L);
topicPartitionOffset.put(new TopicPartition("foo", 3), 1000L);
if (initial) {
partitions.forEach(tp -> {
if (topicPartitionOffset.containsKey(tp)) {
final Long offset = topicPartitionOffset.get(tp);
try {
consumer.seek(tp, offset);
}
catch (Exception e) {
// Handle exceptions carefully.
}
}
});
}
}
这只是一个基础的实现。
现实中的使用场景要复杂得多,你需要相应调整,但这确实给了你一个基本的草图。
当消费者寻求如果失败,可能会触发运行时异常,你需要决定在这种情况下该怎么做。
[[如果我们用同样的组号开始第二个消费者怎么办?]] === 如果我们用同样的组ID启动第二个用户呢?
当我们增加第二个消费者时,会发生重新平衡,一些分区会被调整。
假设新的消费者获得了分区2和3.
当这个新的Spring Cloud Stream用户称呼时onPartitionsAssigned方法,它会看到这是划分的初始赋值2和3在这个消费者身上。
因此,它会执行寻道作,因为对初论点。
对于第一个消费者,它现在只有分区0和1然而,对该消费者来说,这只是一次再平衡事件,并未被视为初始交易。
因此,由于条件检查,它不会重新寻找给定偏移量初论点。
[[如何用卡夫卡活页夹手动确认?]] == 我如何手动确认使用卡夫卡活页夹?
溶液
默认情况下,Kafka 绑定器会委派给 Spring for Apache Kafka 项目中的默认提交设置。
默认ack模式Spring的卡夫卡是Batch.
更多详情请见这里。
有些情况下你会想禁用默认提交行为,依赖手动提交。 按照步骤作,你就能做到这一点。
设置属性spring.cloud.stream.kafka.bindings.<binding-name>.consumer.ackMode无论哪一手动或MANUAL_IMMEDIATE.
当它被这样设置时,会有一个叫做kafka_acknowledgment(摘自KafkaHeaders.致谢)存在于消费者方法接收到的消息中。
例如,把它想象成你的消费者方法。
@Bean
public Consumer<Message<String>> myConsumer() {
return msg -> {
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
};
}
然后你设置属性spring.cloud.stream.kafka.bindings.myConsumer-in-0.consumer.ackMode自手动或MANUAL_IMMEDIATE.
[[如何覆盖春云流中默认绑定名称?]] == 我如何覆盖 Spring Cloud Stream 中的默认绑定名称?
溶液
假设以下是你的函数签名。
@Bean
public Function<String, String> uppercase(){
...
}
默认情况下,Spring Cloud Stream会像下面那样创建绑定。
-
0大写
-
大写输出0
你可以通过以下属性覆盖这些绑定。
spring.cloud.stream.function.bindings.uppercase-in-0=my-transformer-in
spring.cloud.stream.function.bindings.uppercase-out-0=my-transformer-out
之后,必须对新名称施加所有绑定性质,我的变形金刚和我的转换器输出.
这里还有另一个例子,使用Kafka Streams和多输入。
@Bean
public BiFunction<KStream<String, Order>, KTable<String, Account>, KStream<String, EnrichedOrder>> processOrder() {
...
}
默认情况下,Spring Cloud Stream 会为该函数创建三个不同的绑定名称。
-
processOrder-in-0
-
processOrder-in-1
-
processOrder-out-0
每次想设置这些绑定时,都必须使用这些绑定名称。 你不喜欢这样,你想用更适合域且易读的绑定名称,比如。
-
订单
-
帐户
-
丰富勋章
你只需设置这三个属性即可轻松实现
-
spring.cloud.stream.function.bindings.processOrder-in-0=orders
-
spring.cloud.stream.function.bindings.processOrder-in-1=accounts
-
spring.cloud.stream.function.bindings.processOrder-out-0=enrichedOrders
一旦你这样做,它就会覆盖默认的绑定名,而你想设置的所有属性都必须放在这些新的绑定名上。
[[如何将消息密钥作为我的记录的一部分发送?]] == 我如何将消息密钥作为记录的一部分发送?
溶液
通常你需要发送关联数据结构,比如映射作为带有键和值的记录。 《春云流》让你以一种直接的方式实现这一点。 以下是实现这一点的基本蓝图,但你可能需要根据你的具体用例进行调整。
这里是采样生产者法(也称为提供商).
@Bean
public Supplier<Message<String>> supplier() {
return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
}
这是一个平凡函数,发送带有字符串有效载荷,但也有钥匙。
注意,我们将密钥设置为消息头部,使用KafkaHeaders.MESSAGE_KEY.
如果你想把密钥从默认更改kafka_messageKey那么在配置中,我们需要指定以下性质:
spring.cloud.stream.kafka.bindings.supplier-out-0.producer.messageKeyExpression=headers['my-special-key']
请注意,我们使用绑定名称提供商出局0既然这是我们的功能名称,请相应更新。
然后,我们在生成消息时使用这个新密钥。
[[how-do-i-use-native-serializer-and-deserializer-inste-of-message-conversion-done-by-spring-cloud-stream?]] == 我如何使用原生串行器和反串行器来代替Spring Cloud Stream完成的消息转换?
问题陈述
我想在 Kafka 中使用原生的串行器和解串器,而不是使用 Spring Cloud Stream 里的消息转换器。 默认情况下,Spring Cloud Stream 通过其内置的消息转换器来完成转换。 我怎样才能绕过这个,把责任交给卡夫卡?
溶液
这真的很简单。
你只需提供以下属性即可启用本地序列化。
spring.cloud.stream.kafka.bindings.<binding-name>.producer.useNativeEncoding: true
然后,你还需要设置串行器。 有几种方法可以做到这一点。
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer
或者使用活页夹配置。
spring.cloud.stream.kafka.binder.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.binder.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer
使用束缚器时,它会对所有绑定施加,而在绑定处设置绑定则是每个绑定。
在反串出方面,你只需要提供解串器作为配置。
例如
spring.cloud.stream.kafka.bindings.<binding-name>.consumer.configuration.key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
你也可以把他们设定在活页夹层面。
有一个可选属性可以强制本地解码。
spring.cloud.stream.kafka.bindings.<binding-name>.consumer.useNativeDecoding: true
然而,对于 Kafka 装帧器来说,这并非必要,因为当它到达装帧器时,Kafka 已经用配置好的反串化器将其反序列化。
解释一下 Kafka Streams 绑定器中偏移重置的工作原理
溶液
在我们看解决方案之前,先看看以下情景。
@Bean
public BiConsumer<KStream<Object, Object>, KTable<Object, Object>> myBiConsumer{
(s, t) -> s.join(t, ...)
...
}
我们有一个双消费者需要两个输入绑定的豆子。
在这种情况下,第一个绑定是KStream第二个是KTable(英国可爱的)音乐.
首次运行该应用程序时,默认情况下,两个绑定都从最早抵消。
那我想从这个开始呢?最近的是因为某些要求而抵消的吗?
你可以通过启用以下属性来实现这一点。
spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-0.consumer.startOffset: latest
spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-1.consumer.startOffset: latest
如果你只想从一个绑定开始,最近的偏移量,另一个则从默认值转给消费者最早然后将后者从配置中剔除。
请记住,一旦有承诺的偏移量,这些设置就不会被尊重,已承诺的偏移优先。
跟踪Kafka中记录的成功发送(制作)
溶液
假设申请中有以下提供商。
@Bean
public Supplier<Message<String>> supplier() {
return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
}
然后,我们需要定义一个新的消息频道BEAN 用来捕捉所有成功发送的信息。
@Bean
public MessageChannel fooRecordChannel() {
return new DirectChannel();
}
接下来,在应用配置中定义该属性,以提供记录元数据通道.
spring.cloud.stream.kafka.bindings.supplier-out-0.producer.recordMetadataChannel: fooRecordChannel
此时,成功发送的信息将发送给fooRecord频道.
你可以写一个集成流程如下所示,您可以查看相关信息。
@Bean
public IntegrationFlow integrationFlow() {
return f -> f.channel("fooRecordChannel")
.handle((payload, messageHeaders) -> payload);
}
在处理方法,payload 是发送到 Kafka 的,消息头部包含一个名为kafka_recordMetadata. 其价值为记录元数据其中包含主题分区、当前偏移等信息。
在Kafka中添加自定义头部映射器
溶液
在正常情况下,这应该没问题。
想象一下,你有以下的制作人。
@Bean
public Supplier<Message<String>> supply() {
return () -> MessageBuilder.withPayload("foo").setHeader("foo", "bar").build();
}
在消费者端,你仍然应该看到“foo”这个头,以下内容应该不会有问题。
@Bean
public Consumer<Message<String>> consume() {
return s -> {
final String foo = (String)s.getHeaders().get("foo");
System.out.println(foo);
};
}
如果你在应用中提供了自定义的头部映射器,那就无法实现。假设你有一个空KafkaHeaderMapper在申请表里。
@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
return new KafkaHeaderMapper() {
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
}
@Override
public void toHeaders(Headers source, Map<String, Object> target) {
}
};
}
如果你的实现方式是这样的,那你就会错过福消费者的头部。很可能你在这些文件里包含了一些逻辑KafkaHeaderMapper方法。 你需要以下信息来填充福页眉。
@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
return new KafkaHeaderMapper() {
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
final String foo = (String) headers.get("foo");
target.add("foo", foo.getBytes());
}
@Override
public void toHeaders(Headers source, Map<String, Object> target) {
final Header foo = source.lastHeader("foo");
target.put("foo", new String(foo.value()));
}
}
这样才能正确填充福生产者向消费者的标题。
关于id头的特别说明
在春云溪中,身份证Header 是一个特殊的 header,但有些应用可能希望有特殊的自定义 ID header——类似这样的自定义ID或身份证或身份. 第一个(自定义ID)将无需任何自定义头部映射器,从生产者传播到消费者。然而,如果你使用保留的变体框架来生产身份证首部 - 例如身份证,身份,iD等等,否则你会遇到框架内部的问题。想了解更多这个用例,可以参考这个StackOverflow的帖子。在这种情况下,你必须使用自定义KafkaHeaderMapper映射大小写区分的ID头。例如,假设你有以下生产者。
@Bean
public Supplier<Message<String>> supply() {
return () -> MessageBuilder.withPayload("foo").setHeader("Id", "my-id").build();
}
头部身份以上内容将从消费端消失,因为它与框架冲突身份证页眉。 你可以提供定制KafkaHeaderMapper解决这个问题。
@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper1() {
return new KafkaHeaderMapper() {
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
final String myId = (String) headers.get("Id");
target.add("Id", myId.getBytes());
}
@Override
public void toHeaders(Headers source, Map<String, Object> target) {
final Header Id = source.lastHeader("Id");
target.put("Id", new String(Id.value()));
}
};
}
这样做,两者都得身份证和身份头部将由生产商提供给消费者端。
在交易中对多个主题进行生产
溶液
在 Kafka Binder 中使用事务支持进行交易,然后提供AfterRollback处理器. 为了生成多个主题,请使用流桥应用程序接口。
以下是该代码的摘要:
@Autowired
StreamBridge bridge;
@Bean
Consumer<String> input() {
return str -> {
System.out.println(str);
this.bridge.send("left", str.toUpperCase());
this.bridge.send("right", str.toLowerCase());
if (str.equals("Fail")) {
throw new RuntimeException("test");
}
};
}
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(BinderFactory binders) {
return (container, dest, group) -> {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
KafkaTemplate<byte[], byte[]> template = new KafkaTemplate<>(pf);
DefaultAfterRollbackProcessor rollbackProcessor = rollbackProcessor(template);
container.setAfterRollbackProcessor(rollbackProcessor);
};
}
DefaultAfterRollbackProcessor rollbackProcessor(KafkaTemplate<byte[], byte[]> template) {
return new DefaultAfterRollbackProcessor<>(
new DeadLetterPublishingRecoverer(template), new FixedBackOff(2000L, 2L), template, true);
}
所需配置
spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: tx-
spring.cloud.stream.kafka.binder.required-acks=all
spring.cloud.stream.bindings.input-in-0.group=foo
spring.cloud.stream.bindings.input-in-0.destination=input
spring.cloud.stream.bindings.left.destination=left
spring.cloud.stream.bindings.right.destination=right
spring.cloud.stream.kafka.bindings.input-in-0.consumer.maxAttempts=1
为了测试,你可以使用以下工具:
@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
System.in.read();
template.send("input", "Fail".getBytes());
template.send("input", "Good".getBytes());
};
}
一些重要说明:
请确保您在应用配置中没有任何DLQ设置,因为我们会手动配置DLT(默认情况下,DLT会发布到一个名为输入。 双重学习技术基于初始消费者函数)。同时,重置最大尝试次数关于消费者绑定到1以避免绑定器重试。在上述示例中,最多尝试3次(初始尝试+2次尝试)固定后退).
关于如何测试这些代码,请参见 StackOverflow 讨论串。如果你使用 Spring Cloud Stream 通过添加更多消费者函数来测试,请务必设置隔离层级在消费者绑定到已读提交.
这个StackOverflow帖子也与此讨论相关。
运行多个可投票用户时应避免的陷阱
溶液
假设我有以下定义:
spring.cloud.stream.pollable-source: foo
spring.cloud.stream.bindings.foo-in-0.group: my-group
运行应用时,Kafka的消费者会生成一个 client.id(类似消费者-我的组-1). 对于每个正在运行的应用程序实例,以下client.id结果会一样,导致意想不到的问题。
为了解决这个问题,你可以在每个应用程序实例上添加以下属性:
spring.cloud.stream.kafka.bindings.foo-in-0.consumer.configuration.client.id=${client.id}
详情请参见GitHub相关问题。