|
该版本仍在开发中,尚未被视为稳定。对于最新稳定版本,请使用 spring-cloud-stream 5.0.0! |
消费唱片
在上述内容中大写函数,我们用通<弦>然后生成为通<弦>.
有时你可能需要以原始接收格式接收记录——接收记录.
这里有一个这样的函数。
@Bean
public Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<String>> lowercase() {
return s -> s.map(rec -> new String(rec.value()).toLowerCase());
}
在此函数中,注意我们消耗记录为Flux<ReceiverRecord<byte[], byte[]>>然后生成为通<弦>.接收记录是基本的接收记录,是专门的卡夫卡消费者记录在《反应堆卡夫卡》中。
使用响应式Kafka装订器时,上述函数将为你提供访问接收记录为每条进入记录输入。
不过,在这种情况下,你需要为 RecordMessageConverter 提供一个自定义实现。
默认情况下,响应式Kafka绑定器使用MessagingMessageConverter,将有效载荷和头部从以下消费者记录.
因此,当你的处理方法接收到有效载荷时,已从接收记录中提取并传递到方法,就像我们上面提到的第一个函数一样。
通过提供定制记录消息转换器在应用中实现时,你可以覆盖默认行为。
例如,如果你想以原始内容消费记录Flux<ReceiverRecord<byte[], byte[]>>那么你可以在应用中提供以下豆子定义。
@Bean
RecordMessageConverter fullRawReceivedRecord() {
return new RecordMessageConverter() {
private final RecordMessageConverter converter = new MessagingMessageConverter();
@Override
public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
Consumer<?, ?> consumer, Type payloadType) {
return MessageBuilder.withPayload(record).build();
}
@Override
public ProducerRecord<?, ?> fromMessage(Message<?> message, String defaultTopic) {
return this.converter.fromMessage(message, defaultTopic);
}
};
}
然后,你需要指示框架使用该转换器来完成所需的绑定。
这里有一个基于我们小写功能。
spring.cloud.stream.kafka.bindings.lowercase-in-0.consumer.converterBeanName=fullRawReceivedRecord"
0 中小写是我们输入绑定的名称小写功能。
对于出站(小写出0),我们仍然使用常规消息信息转换器.
在收件人消息在上述实现上,我们收到了原始文件消费者记录 (接收记录因为我们处于响应式结合器上下文中),然后将其包裹在消息.
然后该消息载荷为接收记录是提供给用户方法的。
如果reactiveAutoCommit是false(默认),呼叫rec.receiverOffset().acknowledge()(或commit())导致偏移量被提交;如果reactiveAutoCommit是true,通量供应消费者记录而不是S。
参见反应堆-卡夫卡更多信息请见文档和Javadocs。