该版本仍在开发中,尚未被视为稳定。对于最新稳定版本,请使用 spring-cloud-stream 5.0.0!spring-doc.cadn.net.cn

消费唱片

在上述内容中大写函数,我们用通<弦>然后生成为通<弦>. 有时你可能需要以原始接收格式接收记录——接收记录. 这里有一个这样的函数。spring-doc.cadn.net.cn

@Bean
public Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<String>> lowercase() {
    return s -> s.map(rec -> new String(rec.value()).toLowerCase());
}

在此函数中,注意我们消耗记录为Flux<ReceiverRecord<byte[], byte[]>>然后生成为通<弦>.接收记录是基本的接收记录,是专门的卡夫卡消费者记录在《反应堆卡夫卡》中。 使用响应式Kafka装订器时,上述函数将为你提供访问接收记录为每条进入记录输入。 不过,在这种情况下,你需要为 RecordMessageConverter 提供一个自定义实现。 默认情况下,响应式Kafka绑定器使用MessagingMessageConverter,将有效载荷和头部从以下消费者记录. 因此,当你的处理方法接收到有效载荷时,已从接收记录中提取并传递到方法,就像我们上面提到的第一个函数一样。 通过提供定制记录消息转换器在应用中实现时,你可以覆盖默认行为。 例如,如果你想以原始内容消费记录Flux<ReceiverRecord<byte[], byte[]>>那么你可以在应用中提供以下豆子定义。spring-doc.cadn.net.cn

@Bean
RecordMessageConverter fullRawReceivedRecord() {
    return new RecordMessageConverter() {

        private final RecordMessageConverter converter = new MessagingMessageConverter();

        @Override
        public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
                Consumer<?, ?> consumer, Type payloadType) {
            return MessageBuilder.withPayload(record).build();
        }

        @Override
        public ProducerRecord<?, ?> fromMessage(Message<?> message, String defaultTopic) {
            return this.converter.fromMessage(message, defaultTopic);
        }

    };
}

然后,你需要指示框架使用该转换器来完成所需的绑定。 这里有一个基于我们小写功能。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.bindings.lowercase-in-0.consumer.converterBeanName=fullRawReceivedRecord"

0 中小写是我们输入绑定的名称小写功能。 对于出站(小写出0),我们仍然使用常规消息信息转换器.spring-doc.cadn.net.cn

收件人消息在上述实现上,我们收到了原始文件消费者记录 (接收记录因为我们处于响应式结合器上下文中),然后将其包裹在消息. 然后该消息载荷为接收记录是提供给用户方法的。spring-doc.cadn.net.cn

如果reactiveAutoCommitfalse(默认),呼叫rec.receiverOffset().acknowledge()(或commit())导致偏移量被提交;如果reactiveAutoCommittrue,通量供应消费者记录而不是S。 参见反应堆-卡夫卡更多信息请见文档和Javadocs。spring-doc.cadn.net.cn