该版本仍在开发中,尚未被视为稳定。对于最新稳定版本,请使用 spring-cloud-stream 5.0.0!spring-doc.cadn.net.cn

技巧、窍门和Recipes

与卡夫卡的简单DLQ

问题陈述

作为开发者,我想写一个消费者应用程序来处理来自卡夫卡主题的记录。 不过,如果处理过程中出现错误,我不希望应用程序完全停止。 相反,我想把错误记录发送到DLT(死信主题),然后继续处理新的记录。spring-doc.cadn.net.cn

溶液

解决这个问题的方法是使用 Spring Cloud Stream 中的 DLQ 功能。 为了讨论,我们假设以下是处理器函数。spring-doc.cadn.net.cn

@Bean
public Consumer<byte[]> processData() {
  return s -> {
     throw new RuntimeException();
  };
}

这是一个非常简单的函数,会对处理的所有记录抛出异常,但你可以将这个函数扩展到其他类似情况。spring-doc.cadn.net.cn

为了将错误记录发送到DLT,我们需要提供以下配置。spring-doc.cadn.net.cn

spring.cloud.stream:
  bindings:
   processData-in-0:
     group: my-group
     destination: input-topic
 kafka:
   bindings:
     processData-in-0:
       consumer:
         enableDlq: true
         dlqName: input-topic-dlq

为了激活DLQ,应用程序必须提供组名。 匿名消费者无法使用DLQ设施。 我们还需要通过设置enableDLQ卡夫卡消费者的属性绑定于true. 最后,我们可以通过提供dlqName在 Kafka 消费者绑定上,否则默认为error.input-topic.my-group在这种情况下。spring-doc.cadn.net.cn

注意,在上述示例消费者中,有效载荷类型为字节[]. 默认情况下,Kafka binder 中的 DLQ 生成器期望 为 的有效载荷字节[]. 如果不是这样,我们需要提供合适的串行器配置。 例如,我们将消费者函数重写如下:spring-doc.cadn.net.cn

@Bean
public Consumer<String> processData() {
  return s -> {
     throw new RuntimeException();
  };
}

现在,我们需要告诉 Spring Cloud Stream,写入 DLT 时我们想如何序列化数据。 以下是该场景的修改配置:spring-doc.cadn.net.cn

spring.cloud.stream:
  bindings:
   processData-in-0:
     group: my-group
     destination: input-topic
 kafka:
   bindings:
     processData-in-0:
       consumer:
         enableDlq: true
         dlqName: input-topic-dlq
         dlqProducerProperties:
           configuration:
             value.serializer: org.apache.kafka.common.serialization.StringSerializer

DLQ带高级重试选项

问题陈述

这和上面的方案类似,但作为开发者,我想配置重试处理方式。spring-doc.cadn.net.cn

溶液

如果你按照上面的配方作,那么当处理遇到错误时,卡夫卡绑定器里会内置默认的重试选项。spring-doc.cadn.net.cn

默认情况下,绑定器最多可尝试3次,初始延迟1秒,每次后退时2.0倍数,最大延迟10秒。 你可以像下面一样更改所有这些配置:spring-doc.cadn.net.cn

spring.cloud.stream.bindings.processData-in-0.consumer.maxAttempts
spring.cloud.stream.bindings.processData-in-0.consumer.backOffInitialInterval
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMultiplier
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMaxInterval

如果你愿意,也可以通过提供布尔值的映射来提供可重试的异常列表。 例如spring-doc.cadn.net.cn

spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalStateException=true
spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalArgumentException=false

默认情况下,地图上未列出的任何例外情况将被重试。 如果不希望这样,你可以通过提供以下方式禁用它,spring-doc.cadn.net.cn

spring.cloud.stream.bindings.processData-in-0.consumer.defaultRetryable=false

你也可以提供自己的重试模板并标记为@StreamRetryTemplate这些文件将被扫描并由活页夹使用。 当你需要更复杂的重试策略和策略时,这非常有用。spring-doc.cadn.net.cn

如果你有多重@StreamRetryTemplate然后你可以通过以下属性指定绑定想要的 Beans,spring-doc.cadn.net.cn

spring.cloud.stream.bindings.processData-in-0.consumer.retry-template-name=<your-retry-template-bean-name>

处理使用 DLQ 的反序列化错误

问题陈述

我有一个处理器在 Kafka Consumer 中遇到了反序列化异常。 我本以为春云流DLQ机制会捕捉到这种情况,但它没有。 我该怎么处理?spring-doc.cadn.net.cn

溶液

Spring Cloud Stream 提供的常规 DLQ 机制在 Kafka 用户抛出不可恢复的反序列化异常时无法解决问题。 这是因为,这种例外甚至在消费者poll()方法返回。 Spring for Apache Kafka 项目提供了一些很好的方法来帮助绑装师处理这种情况。 让我们来探讨这些。spring-doc.cadn.net.cn

假设这是我们的函数:spring-doc.cadn.net.cn

@Bean
public Consumer<String> functionName() {
    return s -> {
        System.out.println(s);
    };
}

这是一个平凡函数,具有字符串参数。spring-doc.cadn.net.cn

我们想绕过 Spring Cloud Stream 提供的消息转换器,改用原生的反串化器。 在字符串类型方面,这不太合理,但对于像AVRO等更复杂的类型,你必须依赖外部解串器,因此需要委托转换到Kafka。spring-doc.cadn.net.cn

现在当消费者收到数据时,假设存在一个错误记录导致反序列化错误,可能有人传递了整数而不是字符串例如。 在这种情况下,如果你在应用里没有做某件事,异常会在链条中传播,最终你的应用会退出。spring-doc.cadn.net.cn

为了处理这个问题,你可以添加一个ListenerContainerCustomizer @Bean该配置为默认错误处理. 这默认错误处理配置为死信出版恢复者. 我们还需要配置一个ErrorHandlingDeserializer对消费者来说。 这听起来很复杂,但实际上,这里的三颗豆子就很简单了。spring-doc.cadn.net.cn

	@Bean
	public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(DefaultErrorHandler errorHandler) {
		return (container, dest, group) -> {
			container.setCommonErrorHandler(errorHandler);
		};
	}
	@Bean
	public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
		return new DefaultErrorHandler(deadLetterPublishingRecoverer);
	}
	@Bean
	public DeadLetterPublishingRecoverer publisher(KafkaOperations bytesTemplate) {
		return new DeadLetterPublishingRecoverer(bytesTemplate);
	}

让我们逐一分析。 第一个是ListenerContainerCustomizer豆子需要一个默认错误处理. 容器现在已经用该错误处理程序进行了定制。 你可以在这里了解更多关于容器定制的信息。spring-doc.cadn.net.cn

第二颗豆子是默认错误处理该配置为发布到双重学习技术. 更多详情请见此处默认错误处理.spring-doc.cadn.net.cn

第三颗豆子是死信出版恢复者最终负责发送给双重学习技术. 默认情况下,双重学习技术主题被称为ORIGINAL_TOPIC_NAME。DLT。 不过你可以改变这个。 详情请参见文档spring-doc.cadn.net.cn

我们还需要通过应用配置配置ErrorHandlingDe串行器spring-doc.cadn.net.cn

ErrorHandlingDeserializer授权到实际的反串化器。 如果出现错误,它会将记录的键/值设置为空,并包含消息的原始字节。 然后它在一个头部中设置异常,并将该记录传递给监听器,监听器再调用注册的错误处理程序。spring-doc.cadn.net.cn

以下是所需的配置:spring-doc.cadn.net.cn

spring.cloud.stream:
  function:
    definition: functionName
  bindings:
    functionName-in-0:
      group: group-name
      destination: input-topic
      consumer:
       use-native-decoding: true
  kafka:
    bindings:
      functionName-in-0:
        consumer:
          enableDlq: true
          dlqName: dlq-topic
          dlqProducerProperties:
            configuration:
              value.serializer: org.apache.kafka.common.serialization.StringSerializer
          configuration:
            value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
            spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer

我们正在提供ErrorHandlingDeserializer通过配置绑定上的属性。 我们还表示实际要委派的反串化器是字符串解串器.spring-doc.cadn.net.cn

请记住,上述DLQ属性与本配方讨论无关。 它们纯粹用于解决任何应用层面的错误。spring-doc.cadn.net.cn

Kafka 绑定器中的基础偏移管理

问题陈述

我想写一个 Spring Cloud Stream 的 Kafka 消费者应用,但不确定它是如何管理 Kafka 消费者的偏移量的。 你能解释一下吗?spring-doc.cadn.net.cn

溶液

我们鼓励你阅读相关文档部分,以全面理解。spring-doc.cadn.net.cn

大致内容如下:spring-doc.cadn.net.cn

Kafka 默认支持两种初始偏移类型——最早最近的. 它们的语义从名字中就能理解。spring-doc.cadn.net.cn

假设你是第一次使用消费者。 如果你在Spring Cloud Stream申请中错过了 group.id,那么它就会变成匿名消费者。 无论哪里,只要你遇到匿名用户,Spring Cloud Stream 应用默认会从最近的主题分区中的可用偏移量。 另一方面,如果你明确指定了 group.id,那么默认情况下,Spring Cloud Stream 应用会从最早主题分区中的可用偏移量。spring-doc.cadn.net.cn

在上述两种情况(具有显式组和匿名组的消费者)中,起始偏移量可以通过使用以下属性进行交换spring.cloud.stream.kafka.bindings.<binding-name>.consumer.startOffset并设置为最早最近的.spring-doc.cadn.net.cn

现在,假设你之前已经运行过消费者,现在重新开始。 在这种情况下,上述起始偏移语义不适用,因为消费者已经为消费者群体找到了已承诺的偏移量(对于匿名消费者,尽管应用程序未提供 group.id,活页夹会自动为你生成一个)。 它只是从上一个提交的偏移量开始。 即使你有startOffset价值被提供。spring-doc.cadn.net.cn

不过,你可以通过使用resetOffsets财产。 为此,设置属性spring.cloud.stream.kafka.bindings.<binding-name>.consumer.resetOffsetstrue(即false默认情况下)。 然后确保你提供startOffset值(或最早最近的). 当你这样做后再启动消费者应用程序时,每次启动时,它都像第一次启动一样开始,忽略分区的任何已提交的偏移量。spring-doc.cadn.net.cn

在卡夫卡中寻求任意抵消

问题陈述

用Kafka活页夹,我知道它可以把偏移设置成以下一种最早最近的但我有个要求,必须寻找中间的某个偏移量,一个任意的偏移量。 有没有办法用Spring Cloud Stream Kafka活页夹实现这个功能?spring-doc.cadn.net.cn

溶液

之前我们看到Kafka活页夹如何帮助你处理基础的胶置管理。 默认情况下,活页夹不允许你倒带到任意偏移,至少通过我们在那个配方中看到的机制是这样。 然而,活页夹提供了一些低层策略来实现这一用例。 让我们来探索一下。spring-doc.cadn.net.cn

首先,当你想重置到任意偏移量时,除了最早最近的,确保离开resetOffsets配置到其默认值,即false. 然后你必须提供一种定制的咖啡豆KafkaBindingRebalanceListener,将注入所有消费者绑定。 这是一个带有几种默认方法的界面,但我们感兴趣的是:spring-doc.cadn.net.cn

/**
	 * Invoked when partitions are initially assigned or after a rebalance. Applications
	 * might only want to perform seek operations on an initial assignment. While the
	 * 'initial' argument is true for each thread (when concurrency is greater than 1),
	 * implementations should keep track of exactly which partitions have been sought.
	 * There is a race in that a rebalance could occur during startup and so a topic/
	 * partition that has been sought on one thread may be re-assigned to another
	 * thread and you may not wish to re-seek it at that time.
	 * @param bindingName the name of the binding.
	 * @param consumer the consumer.
	 * @param partitions the partitions.
	 * @param initial true if this is the initial assignment on the current thread.
	 */
	default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
			Collection<TopicPartition> partitions, boolean initial) {
		// do nothing
	}

让我们来看看细节。spring-doc.cadn.net.cn

本质上,该方法将在主题划分的初始分配或重新平衡后每次调用。 为了更好地说明,我们假设我们的主题是它有4个分区。 最初,我们只在组中启动一个消费者,这个消费者会从所有分区消费。 当消费者首次启动时,所有4个分区都会被初始分配。 然而,我们不希望分区从默认值开始消耗(最早由于我们定义了一个群),而是对于每个划分,我们希望它们在寻找任意偏移量后消耗。 想象一下,你有一个商业理由,可以从某些抵消中取用,如下所示。spring-doc.cadn.net.cn

Partition   start offset

0           1000
1           2000
2           2000
3           1000

这可以通过实现上述方法实现。spring-doc.cadn.net.cn

@Override
public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions, boolean initial) {

    Map<TopicPartition, Long> topicPartitionOffset = new HashMap<>();
    topicPartitionOffset.put(new TopicPartition("foo", 0), 1000L);
    topicPartitionOffset.put(new TopicPartition("foo", 1), 2000L);
    topicPartitionOffset.put(new TopicPartition("foo", 2), 2000L);
    topicPartitionOffset.put(new TopicPartition("foo", 3), 1000L);

    if (initial) {
        partitions.forEach(tp -> {
            if (topicPartitionOffset.containsKey(tp)) {
                final Long offset = topicPartitionOffset.get(tp);
                try {
                    consumer.seek(tp, offset);
                }
                catch (Exception e) {
                    // Handle exceptions carefully.
                }
            }
        });
    }
}

这只是一个基础的实现。 现实中的使用场景要复杂得多,你需要相应调整,但这确实给了你一个基本的草图。 当消费者寻求如果失败,可能会触发运行时异常,你需要决定在这种情况下该怎么做。spring-doc.cadn.net.cn

[[如果我们用同样的组号开始第二个消费者怎么办?]] === 如果我们用同样的组ID启动第二个用户呢?spring-doc.cadn.net.cn

当我们增加第二个消费者时,会发生重新平衡,一些分区会被调整。 假设新的消费者获得了分区23. 当这个新的Spring Cloud Stream用户称呼时onPartitionsAssigned方法,它会看到这是划分的初始赋值23在这个消费者身上。 因此,它会执行寻道作,因为对论点。 对于第一个消费者,它现在只有分区01然而,对该消费者来说,这只是一次再平衡事件,并未被视为初始交易。 因此,由于条件检查,它不会重新寻找给定偏移量论点。spring-doc.cadn.net.cn

[[如何用卡夫卡活页夹手动确认?]] == 我如何手动确认使用卡夫卡活页夹?spring-doc.cadn.net.cn

问题陈述

使用 Kafka Binder,我想在我的消费者中手动确认消息。 我该怎么做?spring-doc.cadn.net.cn

溶液

默认情况下,Kafka 绑定器会委派给 Spring for Apache Kafka 项目中的默认提交设置。 默认ack模式Spring的卡夫卡是Batch. 更多详情请见这里spring-doc.cadn.net.cn

有些情况下你会想禁用默认提交行为,依赖手动提交。 按照步骤作,你就能做到这一点。spring-doc.cadn.net.cn

设置属性spring.cloud.stream.kafka.bindings.<binding-name>.consumer.ackMode无论哪一手动MANUAL_IMMEDIATE. 当它被这样设置时,会有一个叫做kafka_acknowledgment(摘自KafkaHeaders.致谢)存在于消费者方法接收到的消息中。spring-doc.cadn.net.cn

例如,把它想象成你的消费者方法。spring-doc.cadn.net.cn

@Bean
public Consumer<Message<String>> myConsumer() {
    return msg -> {
        Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
        if (acknowledgment != null) {
         System.out.println("Acknowledgment provided");
         acknowledgment.acknowledge();
        }
    };
}

然后你设置属性spring.cloud.stream.kafka.bindings.myConsumer-in-0.consumer.ackMode手动MANUAL_IMMEDIATE.spring-doc.cadn.net.cn

[[如何覆盖春云流中默认绑定名称?]] == 我如何覆盖 Spring Cloud Stream 中的默认绑定名称?spring-doc.cadn.net.cn

问题陈述

Spring Cloud Stream 会根据函数定义和签名创建默认绑定,但我该如何将这些绑定覆盖到更适合域名的名称?spring-doc.cadn.net.cn

溶液

假设以下是你的函数签名。spring-doc.cadn.net.cn

@Bean
public Function<String, String> uppercase(){
...
}

默认情况下,Spring Cloud Stream会像下面那样创建绑定。spring-doc.cadn.net.cn

你可以通过以下属性覆盖这些绑定。spring-doc.cadn.net.cn

spring.cloud.stream.function.bindings.uppercase-in-0=my-transformer-in
spring.cloud.stream.function.bindings.uppercase-out-0=my-transformer-out

之后,必须对新名称施加所有绑定性质,我的变形金刚我的转换器输出.spring-doc.cadn.net.cn

这里还有另一个例子,使用Kafka Streams和多输入。spring-doc.cadn.net.cn

@Bean
public BiFunction<KStream<String, Order>, KTable<String, Account>, KStream<String, EnrichedOrder>> processOrder() {
...
}

默认情况下,Spring Cloud Stream 会为该函数创建三个不同的绑定名称。spring-doc.cadn.net.cn

  1. processOrder-in-0spring-doc.cadn.net.cn

  2. processOrder-in-1spring-doc.cadn.net.cn

  3. processOrder-out-0spring-doc.cadn.net.cn

每次想设置这些绑定时,都必须使用这些绑定名称。 你不喜欢这样,你想用更适合域且易读的绑定名称,比如。spring-doc.cadn.net.cn

你只需设置这三个属性即可轻松实现spring-doc.cadn.net.cn

  1. spring.cloud.stream.function.bindings.processOrder-in-0=ordersspring-doc.cadn.net.cn

  2. spring.cloud.stream.function.bindings.processOrder-in-1=accountsspring-doc.cadn.net.cn

  3. spring.cloud.stream.function.bindings.processOrder-out-0=enrichedOrdersspring-doc.cadn.net.cn

一旦你这样做,它就会覆盖默认的绑定名,而你想设置的所有属性都必须放在这些新的绑定名上。spring-doc.cadn.net.cn

[[如何将消息密钥作为我的记录的一部分发送?]] == 我如何将消息密钥作为记录的一部分发送?spring-doc.cadn.net.cn

问题陈述

我需要把密钥和记录的有效载荷一起发送,有没有办法在Spring Cloud Stream里做到这一点?spring-doc.cadn.net.cn

溶液

通常你需要发送关联数据结构,比如映射作为带有键和值的记录。 《春云流》让你以一种直接的方式实现这一点。 以下是实现这一点的基本蓝图,但你可能需要根据你的具体用例进行调整。spring-doc.cadn.net.cn

这里是采样生产者法(也称为提供商).spring-doc.cadn.net.cn

@Bean
public Supplier<Message<String>> supplier() {
    return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
}

这是一个平凡函数,发送带有字符串有效载荷,但也有钥匙。 注意,我们将密钥设置为消息头部,使用KafkaHeaders.MESSAGE_KEY.spring-doc.cadn.net.cn

如果你想把密钥从默认更改kafka_messageKey那么在配置中,我们需要指定以下性质:spring-doc.cadn.net.cn

spring.cloud.stream.kafka.bindings.supplier-out-0.producer.messageKeyExpression=headers['my-special-key']

请注意,我们使用绑定名称提供商出局0既然这是我们的功能名称,请相应更新。spring-doc.cadn.net.cn

然后,我们在生成消息时使用这个新密钥。spring-doc.cadn.net.cn

[[how-do-i-use-native-serializer-and-deserializer-inste-of-message-conversion-done-by-spring-cloud-stream?]] == 我如何使用原生串行器和反串行器来代替Spring Cloud Stream完成的消息转换?spring-doc.cadn.net.cn

问题陈述

我想在 Kafka 中使用原生的串行器和解串器,而不是使用 Spring Cloud Stream 里的消息转换器。 默认情况下,Spring Cloud Stream 通过其内置的消息转换器来完成转换。 我怎样才能绕过这个,把责任交给卡夫卡?spring-doc.cadn.net.cn

溶液

这真的很简单。spring-doc.cadn.net.cn

你只需提供以下属性即可启用本地序列化。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.bindings.<binding-name>.producer.useNativeEncoding: true

然后,你还需要设置串行器。 有几种方法可以做到这一点。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer

或者使用活页夹配置。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.binder.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.binder.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer

使用束缚器时,它会对所有绑定施加,而在绑定处设置绑定则是每个绑定。spring-doc.cadn.net.cn

在反串出方面,你只需要提供解串器作为配置。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.bindings.<binding-name>.consumer.configuration.key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.deserializer: org.apache.kafka.common.serialization.StringDeserializer

你也可以把他们设定在活页夹层面。spring-doc.cadn.net.cn

有一个可选属性可以强制本地解码。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.bindings.<binding-name>.consumer.useNativeDecoding: true

然而,对于 Kafka 装帧器来说,这并非必要,因为当它到达装帧器时,Kafka 已经用配置好的反串化器将其反序列化。spring-doc.cadn.net.cn

解释一下 Kafka Streams 绑定器中偏移重置的工作原理

问题陈述

默认情况下,Kafka Streams 的活页夹总是从新用户最早的偏移开始。 有时,申请要求从最新的偏移量开始是有益的或必须的。 Kafka Streams 的活页夹可以让你做到这一点。spring-doc.cadn.net.cn

溶液

在我们看解决方案之前,先看看以下情景。spring-doc.cadn.net.cn

@Bean
public BiConsumer<KStream<Object, Object>, KTable<Object, Object>> myBiConsumer{
    (s, t) -> s.join(t, ...)
    ...
}

我们有一个双消费者需要两个输入绑定的豆子。 在这种情况下,第一个绑定是KStream第二个是KTable(英国可爱的)音乐. 首次运行该应用程序时,默认情况下,两个绑定都从最早抵消。 那我想从这个开始呢?最近的是因为某些要求而抵消的吗? 你可以通过启用以下属性来实现这一点。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-0.consumer.startOffset: latest
spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-1.consumer.startOffset: latest

如果你只想从一个绑定开始,最近的偏移量,另一个则从默认值转给消费者最早然后将后者从配置中剔除。spring-doc.cadn.net.cn

请记住,一旦有承诺的偏移量,这些设置就不会被尊重,已承诺的偏移优先。spring-doc.cadn.net.cn

跟踪Kafka中记录的成功发送(制作)

问题陈述

我有一个Kafka制作人申请,想记录我所有成功的发送记录。spring-doc.cadn.net.cn

溶液

假设申请中有以下提供商。spring-doc.cadn.net.cn

@Bean
	public Supplier<Message<String>> supplier() {
		return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
	}

然后,我们需要定义一个新的消息频道BEAN 用来捕捉所有成功发送的信息。spring-doc.cadn.net.cn

@Bean
	public MessageChannel fooRecordChannel() {
		return new DirectChannel();
	}

接下来,在应用配置中定义该属性,以提供记录元数据通道.spring-doc.cadn.net.cn

spring.cloud.stream.kafka.bindings.supplier-out-0.producer.recordMetadataChannel: fooRecordChannel

此时,成功发送的信息将发送给fooRecord频道.spring-doc.cadn.net.cn

你可以写一个集成流程如下所示,您可以查看相关信息。spring-doc.cadn.net.cn

@Bean
public IntegrationFlow integrationFlow() {
    return f -> f.channel("fooRecordChannel")
                 .handle((payload, messageHeaders) -> payload);
}

处理方法,payload 是发送到 Kafka 的,消息头部包含一个名为kafka_recordMetadata. 其价值为记录元数据其中包含主题分区、当前偏移等信息。spring-doc.cadn.net.cn

在Kafka中添加自定义头部映射器

问题陈述

我有一个 Kafka producer 应用程序可以设置一些头部,但在消费者应用中缺少这些头部。为什么?spring-doc.cadn.net.cn

溶液

在正常情况下,这应该没问题。spring-doc.cadn.net.cn

想象一下,你有以下的制作人。spring-doc.cadn.net.cn

@Bean
public Supplier<Message<String>> supply() {
    return () -> MessageBuilder.withPayload("foo").setHeader("foo", "bar").build();
}

在消费者端,你仍然应该看到“foo”这个头,以下内容应该不会有问题。spring-doc.cadn.net.cn

@Bean
public Consumer<Message<String>> consume() {
    return s -> {
        final String foo = (String)s.getHeaders().get("foo");
        System.out.println(foo);
    };
}

如果你在应用中提供了自定义的头部映射器,那就无法实现。 假设你有一个空的KafkaHeaderMapper在申请表里。spring-doc.cadn.net.cn

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
    return new KafkaHeaderMapper() {
        @Override
        public void fromHeaders(MessageHeaders headers, Headers target) {

        }

        @Override
        public void toHeaders(Headers source, Map<String, Object> target) {

        }
    };
}

如果你的实现方式是这样的,那你就会错过消费者的头部。 很可能你在这些里面有一些逻辑KafkaHeaderMapper方法。 你需要以下信息来填充页眉。spring-doc.cadn.net.cn

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
    return new KafkaHeaderMapper() {
        @Override
        public void fromHeaders(MessageHeaders headers, Headers target) {
            final String foo = (String) headers.get("foo");
            target.add("foo", foo.getBytes());
        }

        @Override
        public void toHeaders(Headers source, Map<String, Object> target) {
            final Header foo = source.lastHeader("foo");
			target.put("foo", new String(foo.value()));
        }
    }

这样才能正确填充生产者向消费者的标题。spring-doc.cadn.net.cn

关于id头的特别说明

在春云溪中,身份证Header 是一个特殊的 header,但有些应用可能希望有特殊的自定义 ID header——类似这样的自定义ID身份证身份. 第一个(自定义ID)将无需任何自定义的头部映射器从生产者传播到消费者。 然而,如果你用框架的一个变体来生产,保留了身份证首部 - 例如身份证,身份,iD等等,你会遇到框架内部的问题。 想了解更多关于这个用例的背景,可以参考这个 StackOverflow 帖子。 那你必须用自定义软件KafkaHeaderMapper映射大小写区分ID头部。 例如,假设你有以下生产者。spring-doc.cadn.net.cn

@Bean
public Supplier<Message<String>> supply() {
    return () -> MessageBuilder.withPayload("foo").setHeader("Id", "my-id").build();
}

头部身份以上内容将从消费端消失,因为它与框架冲突身份证页眉。 你可以提供定制KafkaHeaderMapper解决这个问题。spring-doc.cadn.net.cn

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper1() {
    return new KafkaHeaderMapper() {
        @Override
        public void fromHeaders(MessageHeaders headers, Headers target) {
            final String myId = (String) headers.get("Id");
			target.add("Id", myId.getBytes());
        }

        @Override
        public void toHeaders(Headers source, Map<String, Object> target) {
            final Header Id = source.lastHeader("Id");
			target.put("Id", new String(Id.value()));
        }
    };
}

这样做,两者都得身份证身份头部将由生产商提供给消费者端。spring-doc.cadn.net.cn

在交易中对多个主题进行生产

问题陈述

我如何向多个卡夫卡主题生成事务性消息?spring-doc.cadn.net.cn

更多背景信息,请参见StackOverflow相关问题spring-doc.cadn.net.cn

溶液

在 Kafka Binder 中使用事务支持进行交易,然后提供AfterRollback处理器. 为了生成多个主题,请使用流桥应用程序接口。spring-doc.cadn.net.cn

以下是该代码的摘要:spring-doc.cadn.net.cn

@Autowired
StreamBridge bridge;

@Bean
Consumer<String> input() {
    return str -> {
        System.out.println(str);
        this.bridge.send("left", str.toUpperCase());
        this.bridge.send("right", str.toLowerCase());
        if (str.equals("Fail")) {
            throw new RuntimeException("test");
        }
    };
}

@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(BinderFactory binders) {
    return (container, dest, group) -> {
        ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
                MessageChannel.class)).getTransactionalProducerFactory();
        KafkaTemplate<byte[], byte[]> template = new KafkaTemplate<>(pf);
        DefaultAfterRollbackProcessor rollbackProcessor = rollbackProcessor(template);
        container.setAfterRollbackProcessor(rollbackProcessor);
    };
}

DefaultAfterRollbackProcessor rollbackProcessor(KafkaTemplate<byte[], byte[]> template) {
    return new DefaultAfterRollbackProcessor<>(
            new DeadLetterPublishingRecoverer(template), new FixedBackOff(2000L, 2L), template, true);
}

所需配置

spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: tx-
spring.cloud.stream.kafka.binder.required-acks=all
spring.cloud.stream.bindings.input-in-0.group=foo
spring.cloud.stream.bindings.input-in-0.destination=input
spring.cloud.stream.bindings.left.destination=left
spring.cloud.stream.bindings.right.destination=right

spring.cloud.stream.kafka.bindings.input-in-0.consumer.maxAttempts=1

为了测试,你可以使用以下工具:spring-doc.cadn.net.cn

@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
    return args -> {
        System.in.read();
        template.send("input", "Fail".getBytes());
        template.send("input", "Good".getBytes());
    };
}

一些重要说明:spring-doc.cadn.net.cn

请确保您在应用配置中没有任何DLQ设置,因为我们会手动配置DLT(默认情况下,DLT会发布到一个名为输入。双重学习技术基于初始消费者函数)。 另外,重置最大尝试次数关于消费者绑定到1以避免活页夹重试。 在上述示例中,最多尝试3次(初始尝试+2次尝试固定后退).spring-doc.cadn.net.cn

关于如何测试这段代码,请参见 StackOverflow 讨论串。 如果你用 Spring Cloud Stream 测试,增加更多消费者函数,务必设置隔离层级在消费者绑定到已读提交.spring-doc.cadn.net.cn

这个StackOverflow帖子也与此讨论相关。spring-doc.cadn.net.cn

运行多个可投票用户时应避免的陷阱

问题陈述

我如何运行多个可轮询的消费者实例并生成唯一client.id每一次?spring-doc.cadn.net.cn

溶液

假设我有以下定义:spring-doc.cadn.net.cn

spring.cloud.stream.pollable-source: foo
spring.cloud.stream.bindings.foo-in-0.group: my-group

运行应用时,Kafka的消费者会生成一个 client.id(类似消费者-我的组-1). 对于每个正在运行的应用程序实例,以下client.id结果会一样,导致意想不到的问题。spring-doc.cadn.net.cn

为了解决这个问题,你可以在每个应用程序实例上添加以下属性:spring-doc.cadn.net.cn

spring.cloud.stream.kafka.bindings.foo-in-0.consumer.configuration.client.id=${client.id}