|
此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 spring-cloud-stream 4.2.1! |
使用记录
在上述upppercase函数,我们将记录作为Flux<String>然后将其生成为Flux<String>.
在某些情况下,您可能需要以原始接收格式接收记录 -ReceiverRecord.
下面是这样一个函数。
@Bean
public Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<String>> lowercase() {
return s -> s.map(rec -> new String(rec.value()).toLowerCase());
}
在此函数中,请注意,我们将记录作为Flux<ReceiverRecord<byte[], byte[]>>然后将其生成为Flux<String>.ReceiverRecord是基本的接收记录,它是一个专门的 KafkaConsumerRecord在 Reactor Kafka 中。
当使用响应式 Kafka Binder 时,上述函数将允许你访问ReceiverRecordtype 来获取每个传入记录。
但是,在这种情况下,您需要为RecordMessageConverter提供自定义实现。
默认情况下,反应式 Kafka Binder 使用 MessagingMessageConverter 将有效负载和标头从ConsumerRecord.
因此,当你的处理程序方法收到它时,有效负载已经从收到的记录中提取并传递给方法,就像我们上面看到的第一个函数一样。
通过提供自定义RecordMessageConverterimplementation 中,您可以覆盖默认行为。
例如,如果要将记录作为原始记录使用Flux<ReceiverRecord<byte[], byte[]>>,则可以在应用程序中提供以下 Bean 定义。
@Bean
RecordMessageConverter fullRawReceivedRecord() {
return new RecordMessageConverter() {
private final RecordMessageConverter converter = new MessagingMessageConverter();
@Override
public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
Consumer<?, ?> consumer, Type payloadType) {
return MessageBuilder.withPayload(record).build();
}
@Override
public ProducerRecord<?, ?> fromMessage(Message<?> message, String defaultTopic) {
return this.converter.fromMessage(message, defaultTopic);
}
};
}
然后,您需要指示框架将此转换器用于所需的绑定。
下面是一个基于我们的lowercase功能。
spring.cloud.stream.kafka.bindings.lowercase-in-0.consumer.converterBeanName=fullRawReceivedRecord"
lowercase-in-0是我们lowercase功能。
对于出站 (lowercase-out-0),我们仍然使用常规的MessagingMessageConverter.
在toMessageimplementation 的ConsumerRecord (ReceiverRecord因为我们处于响应式 Binder 上下文中),然后将其包装在Message.
然后,该消息有效负载(即ReceiverRecord提供给用户方法。
如果reactiveAutoCommit是false(默认),调用rec.receiverOffset().acknowledge()(或commit()) 导致 offset 被提交;如果reactiveAutoCommit是true,助焊剂供应ConsumerRecords 来代替。
请参阅reactor-kafka文档和 JavaDocs 了解更多信息。