这个版本仍在开发中,目前尚未被认为是稳定的。要使用最新稳定版本,请使用 Spring for Apache Kafka 4.0.4spring-doc.cadn.net.cn

@KafkaListener 在一个类上

当在类级别使用 @KafkaListener 时,必须在方法级别指定 @KafkaHandler。 如果该类或其子类中的任何方法都没有 @KafkaHandler,框架将拒绝此类配置。 @KafkaHandler 注解用于方法的明确且简洁的目的。 否则,在没有额外限制的情况下,很难对该方法或其他方法做出决策。spring-doc.cadn.net.cn

当消息被传递时,会根据转换后的消息负载类型来确定要调用的方法。 以下示例展示了如何实现:spring-doc.cadn.net.cn

@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {

    @KafkaHandler
    public void listen(String foo) {
        ...
    }

    @KafkaHandler
    public void listen(Integer bar) {
        ...
    }

    @KafkaHandler(isDefault = true)
    public void listenDefault(Object object) {
        ...
    }

}

从 2.1.3 版本开始,你可以将一个 @KafkaHandler 方法指定为默认方法,如果在其他方法上没有匹配时会调用该方法。 最多只能指定一个默认方法。 在使用 @KafkaHandler 方法时,负载必须已经转换为领域对象(以便进行匹配)。 可以使用自定义反序列化器 JacksonJsonDeserializer,或使用 JacksonJsonMessageConverter 且其 TypePrecedence 设置为 TYPE_ID。 参见 序列化、反序列化和消息转换 以获取更多信息。spring-doc.cadn.net.cn

由于一些限制,在Spring解析方法参数的方式中,一个默认的@KafkaHandler无法接收离散的头信息;它必须使用ConsumerRecordMetadata,如在消费者记录元数据中讨论的那样。
@KafkaHandler(isDefault = true)
public void listenDefault(Object object, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    ...
}

如果对象是 String,则不会工作;topic 参数也会获得对 object 的引用。spring-doc.cadn.net.cn

如果你需要在默认方法中使用记录的元数据,请使用以下内容:spring-doc.cadn.net.cn

@KafkaHandler(isDefault = true)
void listen(Object in, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {
    String topic = meta.topic();
    ...
}

也,这种方式同样不会起作用。 其中 topic 会被解析为 payloadspring-doc.cadn.net.cn

@KafkaHandler(isDefault = true)
public void listenDefault(String payload, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    // payload.equals(topic) is True.
    ...
}

如果在默认方法中需要使用离散的自定义头,可以使用以下方法:spring-doc.cadn.net.cn

@KafkaHandler(isDefault = true)
void listenDefault(String payload, @Headers Map<String, Object> headers) {
    Object myValue = headers.get("MyCustomHeader");
    ...
}