|
对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.6! |
“Tombstone”记录的 Null 负载和日志压缩
当您使用 Log Compaction 时,您可以使用nullpayloads 来标识密钥的删除。
您还可以接收null值,例如Deserializer那可能会回来null当它无法反序列化值时。
要发送nullpayload 使用KafkaTemplate中,您可以将 null 传递到send()方法。
一个例外是send(Message<?> message)变体。
因为spring-messaging Message<?>不能有nullpayload 中,您可以使用名为KafkaNull,框架发送null.
为方便起见,静态KafkaNull.INSTANCE。
当您使用消息侦听器容器时,收到的ConsumerRecord具有null value().
要配置@KafkaListener处理nullpayload 中,您必须使用@Payloadannotation 替换为required = false.
如果它是压缩日志的逻辑删除消息,则通常还需要 key,以便您的应用程序可以确定哪个 key 是”deleted".
以下示例显示了此类配置:
@KafkaListener(id = "deletableListener", topics = "myTopic")
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_KEY) String key) {
// value == null represents key deletion
}
当您使用类级别@KafkaListener具有多个@KafkaHandler方法,则需要一些额外的配置。
具体来说,您需要一个@KafkaHandler方法替换为KafkaNull有效载荷。
以下示例显示如何配置一个:
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String cat) {
...
}
@KafkaHandler
public void listen(Integer hat) {
...
}
@KafkaHandler
public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_KEY) int key) {
...
}
}
请注意,参数是null不KafkaNull.
| 请参阅手动分配所有分区。 |
此功能需要使用KafkaNullAwarePayloadArgumentResolver框架将在使用默认MessageHandlerMethodFactory.
使用自定义MessageHandlerMethodFactory看添加自定义HandlerMethodArgumentResolver自@KafkaListener. |