|
For the latest stable version, please use Spring for Apache Kafka 3.2.4! |
|
For the latest stable version, please use Spring for Apache Kafka 3.2.4! |
When you use Log Compaction, you can send and receive messages with null payloads to identify the deletion of a key.
You can also receive null values for other reasons, such as a Deserializer that might return null when it cannot deserialize a value.
To send a null payload by using the KafkaTemplate, you can pass null into the value argument of the send() methods.
One exception to this is the send(Message<?> message) variant.
Since spring-messaging Message<?> cannot have a null payload, you can use a special payload type called KafkaNull, and the framework sends null.
For convenience, the static KafkaNull.INSTANCE is provided.
When you use a message listener container, the received ConsumerRecord has a null value().
To configure the @KafkaListener to handle null payloads, you must use the @Payload annotation with required = false.
If it is a tombstone message for a compacted log, you usually also need the key so that your application can determine which key was "deleted".
The following example shows such a configuration:
@KafkaListener(id = "deletableListener", topics = "myTopic")
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_KEY) String key) {
// value == null represents key deletion
}
When you use a class-level @KafkaListener with multiple @KafkaHandler methods, some additional configuration is needed.
Specifically, you need a @KafkaHandler method with a KafkaNull payload.
The following example shows how to configure one:
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String cat) {
...
}
@KafkaHandler
public void listen(Integer hat) {
...
}
@KafkaHandler
public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_KEY) int key) {
...
}
}
Note that the argument is null, not KafkaNull.
| See Manually Assigning All Partitions. |
This feature requires the use of a KafkaNullAwarePayloadArgumentResolver which the framework will configure when using the default MessageHandlerMethodFactory.
When using a custom MessageHandlerMethodFactory, see Adding custom HandlerMethodArgumentResolver to @KafkaListener.
|
This feature requires the use of a KafkaNullAwarePayloadArgumentResolver which the framework will configure when using the default MessageHandlerMethodFactory.
When using a custom MessageHandlerMethodFactory, see Adding custom HandlerMethodArgumentResolver to @KafkaListener.
|