这个版本仍在开发中,目前尚未被认为是稳定的。要使用最新稳定版本,请使用 Spring for Apache Kafka 4.0.4spring-doc.cadn.net.cn

空负载与“墓碑”记录的日志压缩

使用日志压缩时,您可以发送和接收带有1的消息,以标识键的删除。spring-doc.cadn.net.cn

你还可以因为其他原因接收到null值,例如当一个Deserializer无法反序列化某个值时可能会返回nullspring-doc.cadn.net.cn

要通过KafkaTemplate发送一个null载荷,您可以在send()方法的值参数中传递null。send(Message<?> message)变体是此规则的一个例外。由于spring-messagingMessage<?>不能有null载荷,因此可以使用一种称为KafkaNull的特殊载荷类型,框架会发送null。为方便起见,提供了静态KafkaNull.INSTANCEspring-doc.cadn.net.cn

使用消息监听器容器时,接收到的 ConsumerRecord 具有 null value()spring-doc.cadn.net.cn

spring-doc.cadn.net.cn

To configure the <code>0</code> to handle <code>1</code> payloads, you must use the <code>2</code> annotation with <code>3</code>.spring-doc.cadn.net.cn

If it is a tombstone message for a compacted log, you usually also need the key so that your application can determine which key was "<code>4</code>".spring-doc.cadn.net.cn

The following example shows such a configuration:spring-doc.cadn.net.cn

spring-doc.cadn.net.cn

@KafkaListener(id = "deletableListener", topics = "myTopic")
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_KEY) String key) {
    // value == null represents key deletion
}

当您在类级别使用@KafkaListener并具有多个@KafkaHandler方法时,需要一些额外的配置。具体来说,您需要一个带有KafkaNull负载的@KafkaHandler方法。下面的示例展示了如何进行配置:spring-doc.cadn.net.cn

@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {

    @KafkaHandler
    public void listen(String cat) {
        ...
    }

    @KafkaHandler
    public void listen(Integer hat) {
        ...
    }

    @KafkaHandler
    public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_KEY) int key) {
        ...
    }

}

注意,参数是 null,而不是 KafkaNullspring-doc.cadn.net.cn

此功能需要使用一个 KafkaNullAwarePayloadArgumentResolver,当使用默认的 MessageHandlerMethodFactory时框架会对其进行配置。
当使用自定义的 MessageHandlerMethodFactory时,请参阅@KafkaListener中添加自定义HandlerMethodArgumentResolver