|
这个版本仍在开发中,目前尚未被认为是稳定的。要使用最新稳定版本,请使用 Spring for Apache Kafka 4.0.4! |
异步 @KafkaListener 返回类型
从 3.2 版本开始,@KafkaListener(以及 @KafkaHandler)方法可以指定异步返回类型,让回复可以异步发送。
支持的返回类型包括 CompletableFuture<?>、Mono<?> 以及 Kotlin 的 suspend 函数。
@KafkaListener(id = "myListener", topics = "myTopic")
public CompletableFuture<String> listen(String data) {
...
CompletableFuture<String> future = new CompletableFuture<>();
future.complete("done");
return future;
}
@KafkaListener(id = "myListener", topics = "myTopic")
public Mono<Void> listen(String data) {
...
return Mono.empty();
}
当检测到异步返回类型时,AckMode 将会自动设置为 MANUAL 并启用乱序提交;相反,在异步操作完成后,异步完成将进行确认。
当异步结果因错误而完成时,消息是否恢复取决于容器错误处理程序。
如果在监听器方法中发生异常,导致无法创建异步结果对象,你必须捕获该异常并返回一个合适的返回对象,该对象将导致消息进行确认或恢复。 |
如果在监听器中配置了异步返回类型(包括 Kotlin 的挂起函数),并且配置了 KafkaListenerErrorHandler,则错误处理程序会在失败后被调用。
有关此错误处理程序及其用途的更多信息,请参见 处理异常。