此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.5spring-doc.cadn.net.cn

@KafkaListener在类上

当您使用@KafkaListener在类级别,您必须指定@KafkaHandler在方法级别。 如果没有@KafkaHandler在此类或其子类的任何方法上,框架将拒绝此类配置。 这@KafkaHandlerannotation 是该方法的明确和简洁目的所必需的。 否则,如果没有额外的限制,很难对这种方法或其他方法做出决定。spring-doc.cadn.net.cn

在传送消息时,转换后的消息负载类型用于确定要调用的方法。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {

    @KafkaHandler
    public void listen(String foo) {
        ...
    }

    @KafkaHandler
    public void listen(Integer bar) {
        ...
    }

    @KafkaHandler(isDefault = true)
    public void listenDefault(Object object) {
        ...
    }

}

从版本 2.1.3 开始,您可以指定@KafkaHandlermethod 作为默认方法,如果其他方法不匹配,则调用该方法。 最多可以指定一种方法。 使用@KafkaHandler方法,则有效负载必须已转换为 domain 对象(以便可以执行匹配)。 使用自定义反序列化器JsonDeserializerJsonMessageConverter及其TypePrecedence设置为TYPE_ID. 有关更多信息,请参见 序列化、反序列化和消息转换spring-doc.cadn.net.cn

由于 Spring 解析方法参数的方式存在一些限制,默认的@KafkaHandler无法接收离散标头;它必须使用ConsumerRecordMetadataConsumer Record Metadata 中所述。
@KafkaHandler(isDefault = true)
public void listenDefault(Object object, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    ...
}

如果对象是String;这topicparameter 还将获得对object.spring-doc.cadn.net.cn

如果您需要 default 方法中有关记录的元数据,请使用以下命令:spring-doc.cadn.net.cn

@KafkaHandler(isDefault = true)
void listen(Object in, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {
    String topic = meta.topic();
    ...
}

此外,这也不会奏效。 这topic解析为payload.spring-doc.cadn.net.cn

@KafkaHandler(isDefault = true)
public void listenDefault(String payload, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    // payload.equals(topic) is True.
    ...
}

如果默认方法中存在需要离散自定义标头的用例,请使用以下命令:spring-doc.cadn.net.cn

@KafkaHandler(isDefault = true)
void listenDefault(String payload, @Headers Map<String, Object> headers) {
    Object myValue = headers.get("MyCustomHeader");
    ...
}