消耗批次
从3.0版本开始,当spring.cloud.stream.bindings.<name>.consumer.batch-mode设置为true,所有通过投票卡夫卡获得的记录消费者将以名单<?>转向听者法。
否则,方法将一次调用一条记录。
批次的规模由Kafka消费者属性控制Max.poll.records,fetch.min.bytes,fetch.max.wait.ms;更多信息请参阅卡夫卡文献。
接收批次时,允许以下类型签名:
List<Person>
Message<List<Person>>
在第一种选项中名单<人物>,监听者不会收到任何消息头部。
如果第二种类型签名(消息<列表<人>>)使用,然后可以访问头部;然而,所有报头仍然是 a 的形式收集.
我们来看下面的例子。
假设消息包含十个列表人对象。
这消息头关于消息包含一个以键为头部名称和值为列表的头部映射。
该列表包含该头部的头值,顺序与有效载荷列表相同。
因此,应用程序是否正确访问该头部取决于消息头基于有效载荷列表的迭代绘制的地图。
注意,类型签名形式为名单<消息<人>>在批量消费模式下不允许。
从版本开始4.0.2,该绑定器在批量处理时支持 DLQ 功能。
请记住,在批量处理消费者绑定时,之前轮询收到的所有记录都会被传递到DLQ主题。
使用批量模式时不支持在绑定器内重试,所以最大尝试次数将被覆盖为1。
你可以配置一个默认错误处理(使用ListenerContainerCustomizer)以实现类似的功能,以便在活页夹中重试。
你也可以用手动说明书AckMode并呼叫Ackowledgment.nack(索引,睡眠)提交部分批次的偏移量,并重新交付剩余记录。
有关这些技术的更多信息,请参阅 Spring for Apache Kafka 文档。 |
接收时卡夫卡无在批处理模式下,接收到的列表会包含对应的空元素卡夫卡无对象。
这对两者都是如此名单<人物>和消息<列表<人>>风格类型签名。 |
批量消费时的可观测性
在批量消费记录时,不直接支持观察追踪传播功能。
这是因为 Spring for Apache Kafka 库(Kafka 绑定器使用的库)不支持批处理监听器;它仅支持唱片听众。
在批处理监听器中,接收的记录可以来自多个主题/分区,也来自多个生产者,添加追踪信息是可选的。
由于批次中记录之间可能没有相关性,框架无法对追踪它们做出任何假设,比如提供单一的跟踪ID。
如果你使用类型签名Message<List<String>>然后你可以得到一个叫kafka_batchConvertedHeaders,其中包含一个与你的有效载荷条目数量相同的列表。
该列表包含地图其中包含追踪头部。
然而,应用程序是否会正确迭代并开始观察。