|
这个版本仍在开发中,目前尚未被认为是稳定的。要使用最新稳定版本,请使用 Spring for Apache Kafka 4.0.4! |
空负载与“墓碑”记录的日志压缩
使用日志压缩时,您可以发送和接收带有
你还可以因为其他原因接收到null值,例如当一个Deserializer无法反序列化某个值时可能会返回null。
要通过KafkaTemplate发送一个null载荷,您可以在send()方法的值参数中传递null。send(Message<?> message)变体是此规则的一个例外。由于spring-messagingMessage<?>不能有null载荷,因此可以使用一种称为KafkaNull的特殊载荷类型,框架会发送null。为方便起见,提供了静态KafkaNull.INSTANCE。
使用消息监听器容器时,接收到的 ConsumerRecord 具有 null value()。
To configure the <code>0</code> to handle <code>1</code> payloads, you must use the <code>2</code> annotation with <code>3</code>.
If it is a tombstone message for a compacted log, you usually also need the key so that your application can determine which key was "<code>4</code>".
The following example shows such a configuration:
@KafkaListener(id = "deletableListener", topics = "myTopic")
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_KEY) String key) {
// value == null represents key deletion
}
当您在类级别使用@KafkaListener并具有多个@KafkaHandler方法时,需要一些额外的配置。具体来说,您需要一个带有KafkaNull负载的@KafkaHandler方法。下面的示例展示了如何进行配置:
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String cat) {
...
}
@KafkaHandler
public void listen(Integer hat) {
...
}
@KafkaHandler
public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_KEY) int key) {
...
}
}
注意,参数是 null,而不是 KafkaNull。
| 请参阅 手动分配所有分区。 |
此功能需要使用一个 KafkaNullAwarePayloadArgumentResolver,当使用默认的 MessageHandlerMethodFactory时框架会对其进行配置。当使用自定义的 MessageHandlerMethodFactory时,请参阅向@KafkaListener中添加自定义HandlerMethodArgumentResolver。 |