使用响应式卡夫卡活页夹的基本示例

在本节中,我们展示了一些使用响应式绑定器编写响应式 Kafka 应用的基本代码片段及其相关细节。spring-doc.cadn.net.cn

@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
    return s -> s.map(String::toUpperCase);
}

你可以用上面的大写函数同时支持基于消息通道的 Kafka 绑定器(春-云-溪-绑定-卡夫卡以及反应性卡夫卡结合剂(春云流束缚剂卡夫卡反应),本节讨论的话题。 在使用常规Kafka绑定器时,尽管你在应用程序中使用了反应类型(即在大写函数),你只会在函数执行时获得反应流。 在函数执行上下文之外,没有响应式益处,因为底层绑定器不基于响应式栈。 因此,尽管看起来像是带来了完整的端到端响应式堆栈,但该应用实际上只是部分被动式。spring-doc.cadn.net.cn

现在假设你用的是适合卡夫卡的反应性活页夹——春云流束缚剂卡夫卡反应上述函数的应用。 这种活页夹的实现将从高端消费到链底端发布,提供全方位的反应性效益。 这是因为底层的绑定器是建立在 Reactor Kafka 核心 API 之上。 在消费者端,它使用 KafkaReceiver,这是一种 Kafka 消费者的响应式实现。 同样,在生产者端,它使用 KafkaSender API,这是 Kafka 生产器的响应式实现。 由于响应式Kafka绑定器的基础建立在合适的响应式Kafka API之上,应用程序能够充分享受使用响应式技术的全部优势。 使用这种响应式卡夫卡活页夹时,应用内置了自动背压等反应功能。spring-doc.cadn.net.cn

从4.0.2版本开始,你可以自定义接收者选项发件选项通过提供一个或多个接收器选项定制器SenderOptionsCustomizer豆子。 它们是双功能接收绑定名称和初始选项,返回自定义选项。 接口延伸命令因此,当多个自定义器存在时,定制器将按要求顺序应用。spring-doc.cadn.net.cn

绑定器默认不会提交偏移量。 从4.0.2版本开始,KafkaHeaders.致谢首部包含一个接收机偏移量该对象允许你通过调用其来导致偏移量被提交确认()commit()方法。
@Bean
public Consumer<Flux<Message<String>>> consume() {
    return msg -> {
        process(msg.getPayload());
        msg.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, ReceiverOffset.class).acknowledge();
    }
}

参见反应堆-卡夫卡更多信息请见文档和Javadocs。spring-doc.cadn.net.cn

此外,从4.0.3版本开始,Kafka的消费者属性reactiveAtmostOnce可以设置为true绑定器会在每次轮询返回的记录处理前自动提交偏移量。 另外,从4.0.3版本开始,你可以设置consumer 属性reactiveAutoCommittrue而在每次轮询返回的记录处理完后,绑页器会自动提交偏移量。 在这种情况下,确认头不存在。spring-doc.cadn.net.cn

还提供了4.0.2版本reactiveAutoCommit但实现不正确,表现类似于reactiveAtMostOnce.

以下是如何使用的示例reactiveAutoCommit.spring-doc.cadn.net.cn

@Bean
Consumer<Flux<Flux<ConsumerRecord<?, String>>>> input() {
	return flux -> flux
			.doOnNext(inner -> inner
				.doOnNext(val -> {
					log.info(val.value());
				})
				.subscribe())
			.subscribe();
}

注意反应堆-卡夫卡返回 aFlux<Flux<ConsumerRecord<?, ?>>>使用自动提交时, 鉴于Spring无法访问内部通量的内容,应用程序必须处理原生流量消费者记录;内容没有消息转换或转换服务。 这需要使用本地译码(通过指定反串化器配置中相应类型的记录键/值。spring-doc.cadn.net.cn