“逻辑删除”记录的空有效负载和日志压缩

使用日志压缩时,您可以使用null有效负载来识别密钥的删除。您还可以收到null值,例如可能返回null当它无法反序列化值时。spring-doc.cadn.net.cn

生成空有效负载

您可以发送nullvalue 替换为ReactivePulsarTemplate通过传递一个nullmessage 参数值设置为send方法,例如:spring-doc.cadn.net.cn

reactiveTemplate
        .send(null, Schema.STRING)
        .subscribe();
发送空值时,必须指定模式类型,因为系统无法从null有效载荷。

使用空有效负载

@ReactivePularListenernull有效负载根据其消息参数的类型传递到侦听器方法中,如下所示:spring-doc.cadn.net.cn

参数类型 传入价值

原始spring-doc.cadn.net.cn

nullspring-doc.cadn.net.cn

用户定义spring-doc.cadn.net.cn

nullspring-doc.cadn.net.cn

org.apache.pulsar.client.api.Message<T>spring-doc.cadn.net.cn

非空 Pulsar 消息,其getValue()返回nullspring-doc.cadn.net.cn

org.springframework.messaging.Message<T>spring-doc.cadn.net.cn

非空 Spring 消息,其getPayload()返回PulsarNullspring-doc.cadn.net.cn

Flux<org.apache.pulsar.client.api.Message<T>>spring-doc.cadn.net.cn

非空通量,其条目是非空脉冲星消息,其getValue()返回nullspring-doc.cadn.net.cn

Flux<org.springframework.messaging.Message<T>>spring-doc.cadn.net.cn

非空通量,其条目是非空的 Spring 消息,其getPayload()返回PulsarNullspring-doc.cadn.net.cn

当传入值为null(即具有原始或用户定义类型的单记录侦听器)您必须使用@Payload参数注释required = false.
使用弹簧时org.springframework.messaging.Message对于侦听器有效负载类型,其通用类型信息必须足够宽才能接受Message<PulsarNull>(例如。Message,Message<?>Message<Object>). 这是因为 Spring Message 不允许其有效负载使用 null 值,而是使用PulsarNull占 位 符。

如果它是压缩日志的逻辑删除消息,则通常还需要密钥,以便应用程序可以确定哪个密钥是”deleted". 以下示例显示了这样的配置:spring-doc.cadn.net.cn

@ReactivePulsarListener(
        topics = "my-topic",
        subscriptionName = "my-topic-sub",
        schemaType = SchemaType.STRING)
Mono<Void> myListener(
        @Payload(required = false) String msg,
        @Header(PulsarHeaders.KEY) String key) {
    ...
}
使用流式消息侦听器 (Flux) 标头支持有限,因此在日志压缩方案中不太有用。