“逻辑删除”记录的空有效负载和日志压缩
使用日志压缩时,可以使用null有效负载来标识密钥的删除。
您还可以收到null值,例如Deserializer可能会返回null当它无法反序列化值时。
要发送null有效负载,使用KafkaTemplate,您可以将 null 传递到send()方法。
一个例外是send(Message<?> message)变体。
因为spring-messaging Message<?>不能有null有效负载,您可以使用名为KafkaNull,框架发送null.
为方便起见,静态KafkaNull.INSTANCE被提供。
当您使用消息侦听器容器时,收到的ConsumerRecord有一个null value().
要配置@KafkaListener处理null有效负载,则必须使用@Payload注释required = false.
如果它是压缩日志的逻辑删除消息,则通常还需要密钥,以便应用程序可以确定哪个密钥是”deleted".
以下示例显示了这样的配置:
@KafkaListener(id = "deletableListener", topics = "myTopic")
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_KEY) String key) {
// value == null represents key deletion
}
当您使用类级别@KafkaListener与多个@KafkaHandler方法,需要一些额外的配置。
具体来说,您需要一个@KafkaHandler方法与KafkaNull有效载荷。
以下示例演示如何配置一个:
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String cat) {
...
}
@KafkaHandler
public void listen(Integer hat) {
...
}
@KafkaHandler
public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_KEY) int key) {
...
}
}
请注意,参数是null不KafkaNull.
| 请参阅手动分配所有分区。 |
此功能需要使用KafkaNullAwarePayloadArgumentResolver框架在使用默认MessageHandlerMethodFactory.
使用自定义MessageHandlerMethodFactory看添加自定义HandlerMethodArgumentResolver自@KafkaListener. |