此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.9! |
@KafkaListener
在类上
当您使用@KafkaListener
在类级别,必须指定@KafkaHandler
在方法级别。如果否@KafkaHandler
在此类或其子类的任何方法上,框架将拒绝此类配置。 这@KafkaHandler
为了该方法的明确和简洁的目的,需要注释。否则,很难在没有额外限制的情况下对这种方法或其他方法做出决定。
传递消息时,转换后的消息有效负载类型用于确定要调用的方法。以下示例演示如何执行此作:
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String foo) {
...
}
@KafkaHandler
public void listen(Integer bar) {
...
}
@KafkaHandler(isDefault = true)
public void listenDefault(Object object) {
...
}
}
从 2.1.3 版开始,您可以指定@KafkaHandler
method 作为默认方法,如果其他方法没有匹配项,则调用该方法。最多只能指定一个方法。使用@KafkaHandler
方法,则有效负载必须已经转换为域对象(以便可以执行匹配)。使用自定义反序列化程序,则JacksonJsonDeserializer
或JacksonJsonMessageConverter
与其TypePrecedence
设置为TYPE_ID
. 有关详细信息,请参阅序列化、反序列化和消息转换。
由于 Spring 解析方法参数的方式存在一些限制,默认的@KafkaHandler 无法接收离散标头;它必须使用ConsumerRecordMetadata 如消费者记录元数据中所述。 |
例如:
@KafkaHandler(isDefault = true)
public void listenDefault(Object object, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
...
}
如果对象是String
; 这topic
参数还将获得对object
.
如果需要默认方法中有关记录的元数据,请使用以下命令:
@KafkaHandler(isDefault = true)
void listen(Object in, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {
String topic = meta.topic();
...
}
此外,这也行不通。 这topic
解析为payload
.
@KafkaHandler(isDefault = true)
public void listenDefault(String payload, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
// payload.equals(topic) is True.
...
}
如果在默认方法中需要离散自定义标头的用例,请使用以下命令:
@KafkaHandler(isDefault = true)
void listenDefault(String payload, @Headers Map<String, Object> headers) {
Object myValue = headers.get("MyCustomHeader");
...
}