此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.9spring-doc.cadn.net.cn

“逻辑删除”记录的空有效负载和日志压缩

使用日志压缩时,可以使用null有效负载来标识密钥的删除。spring-doc.cadn.net.cn

您还可以收到null值,例如Deserializer可能会返回null当它无法反序列化值时。spring-doc.cadn.net.cn

要发送null有效负载,使用KafkaTemplate,您可以将 null 传递到send()方法。 一个例外是send(Message<?> message)变体。 因为spring-messaging Message<?>不能有null有效负载,您可以使用名为KafkaNull,框架发送null. 为方便起见,静态KafkaNull.INSTANCE被提供。spring-doc.cadn.net.cn

当您使用消息侦听器容器时,收到的ConsumerRecord有一个null value().spring-doc.cadn.net.cn

要配置@KafkaListener处理null有效负载,则必须使用@Payload注释required = false. 如果它是压缩日志的逻辑删除消息,则通常还需要密钥,以便应用程序可以确定哪个密钥是”deleted". 以下示例显示了这样的配置:spring-doc.cadn.net.cn

@KafkaListener(id = "deletableListener", topics = "myTopic")
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_KEY) String key) {
    // value == null represents key deletion
}

当您使用类级别@KafkaListener与多个@KafkaHandler方法,需要一些额外的配置。 具体来说,您需要一个@KafkaHandler方法与KafkaNull有效载荷。 以下示例演示如何配置一个:spring-doc.cadn.net.cn

@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {

    @KafkaHandler
    public void listen(String cat) {
        ...
    }

    @KafkaHandler
    public void listen(Integer hat) {
        ...
    }

    @KafkaHandler
    public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_KEY) int key) {
        ...
    }

}

请注意,参数是nullKafkaNull.spring-doc.cadn.net.cn

此功能需要使用KafkaNullAwarePayloadArgumentResolver框架在使用默认MessageHandlerMethodFactory. 使用自定义MessageHandlerMethodFactory添加自定义HandlerMethodArgumentResolver@KafkaListener.