异步@KafkaListener返回类型

从 3.2 版本开始,@KafkaListener(和@KafkaHandler) 方法可以使用异步返回类型指定,从而让异步发送回复。 返回类型包括CompletableFuture<?>,Mono<?>和 Kotlinsuspend功能。spring-doc.cadn.net.cn

@KafkaListener(id = "myListener", topics = "myTopic")
public CompletableFuture<String> listen(String data) {
    ...
    CompletableFuture<String> future = new CompletableFuture<>();
    future.complete("done");
    return future;
}
@KafkaListener(id = "myListener", topics = "myTopic")
public Mono<Void> listen(String data) {
    ...
    return Mono.empty();
}
AckMode将自动将MANUAL并在检测到异步返回类型时启用无序提交;相反,异步完成将在异步作完成时确认。 当异步结果完成并出现错误时,消息是否恢复取决于容器错误处理程序。 如果侦听器方法中发生某些异常,导致创建异步结果对象,则必须捕获该异常并返回适当的返回对象,该对象将导致消息被确认或恢复。

如果KafkaListenerErrorHandler在具有异步返回类型(包括 Kotlin 挂起函数)的监听器上配置,则在失败后调用错误处理程序。 有关此错误处理程序及其用途的更多信息,请参阅处理异常。spring-doc.cadn.net.cn