|
这个版本仍在开发中,目前尚未被认为是稳定的。要使用最新稳定版本,请使用 Spring for Apache Kafka 4.0.4! |
发送消息
本部分介绍了如何发送消息。
使用KafkaTemplate
本部分介绍了如何使用 KafkaTemplate 来发送消息。
概述
The KafkaTemplate wraps a producer and provides convenience methods to send data to Kafka topics.
The following listing shows the relevant methods from KafkaTemplate:
CompletableFuture<SendResult<K, V>> sendDefault(V data);
CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, V data);
CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
CompletableFuture<SendResult<K, V>> send(Message<?> message);
Map<MetricName, ? extends Metric> metrics();
List<PartitionInfo> partitionsFor(String topic);
<T> T execute(ProducerCallback<K, V, T> callback);
<T> T executeInTransaction(OperationsCallback<K, V, T> callback);
// Flush the producer.
void flush();
interface ProducerCallback<K, V, T> {
T doInKafka(Producer<K, V> producer);
}
interface OperationsCallback<K, V, T> {
T doInOperations(KafkaOperations<K, V> operations);
}
有关更多详细信息,请参阅javadoc。
The sendDefault API 需要将默认主题提供给模板。
API接受一个timestamp作为参数,并将这个时间戳存储在记录中。 用户提供的时间戳是如何被存储的取决于Kafka主题上配置的时间戳类型。 如果主题被配置为使用CREATE_TIME,用户指定的时间戳被记录(如果没有指定,则生成)。 如果主题被配置为使用LOG_APPEND_TIME,则忽略用户指定的时间戳,代理添加本地代理时间。
要使用模板,您可以配置一个生产者工厂并将其提供给模板的构造函数。 以下示例展示了如何操作:
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// See https://kafka.apache.org/documentation/#producerconfigs for more properties
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory());
}
从版本2.5开始,您可以现在重写工厂的ProducerConfig属性,以从同一个工厂创建具有不同生产者配置的模板。
@Bean
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
return new KafkaTemplate<>(pf);
}
@Bean
public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {
return new KafkaTemplate<>(pf,
Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
}
注意,类型为 ProducerFactory<?, ?>(例如由 Spring Boot 自动配置的)的 bean 可以通过不同的具体泛型类型进行引用。
您也可以通过使用标准 <bean/> 定义来配置模板。
然后,要使用模板,你可以调用其方法。
当你使用带有 Message<?> 参数的方法时,会在消息头中提供主题、分区、键和时间戳信息,该消息头包括以下项目:
-
KafkaHeaders.TOPIC -
KafkaHeaders.PARTITION -
KafkaHeaders.KEY -
KafkaHeaders.TIMESTAMP
消息有效载荷就是数据。
可选地,你可以用 ProducerListener 配置 KafkaTemplate,以在发送结果(成功或失败)时异步回调,而不是等待 Future 完成。
以下代码示例显示了 ProducerListener 接口的定义:
public interface ProducerListener<K, V> {
default void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {
}
default void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata, Exception exception) {
}
}
默认情况下,模板配置为 LoggingProducerListener,这会记录错误,当发送成功时则不执行任何操作。
为了方便,如果您只想实现其中一个方法,已提供默认的方法实现。
请注意,send方法返回一个 CompletableFuture<SendResult>。
您可以使用监听器注册一个回调,以异步接收send的结果。
以下示例展示了如何实现:
CompletableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
future.whenComplete((result, ex) -> {
...
});
SendResult有兩個屬性,即ProducerRecord和RecordMetadata。
請參閱Kafka API文檔以了解有關這些物件的資訊。
The Throwable 可以被 cast 到一个 KafkaProducerException;它的 producerRecord 属性包含失败记录。
如果你希望阻塞发送线程以等待结果,可以调用 future 的 get() 方法;使用带超时的 linger.ms 方法是推荐的。
如果你设置了 flush(),可能希望在等待前调用 autoFlush,或者为了方便,模板提供一个带 flush() 参数的构造函数,该构造函数会在每次发送时 linger.ms。
刷新仅在设置了 6 生产者属性且希望立即发送部分批次时需要。
示例
本部分展示了向Kafka发送消息的示例:
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
CompletableFuture<SendResult<String, String>> future = template.send(record);
future.whenComplete((result, ex) -> {
if (ex == null) {
handleSuccess(data);
}
else {
handleFailure(data, record, ex);
}
});
}
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
try {
template.send(record).get(10, TimeUnit.SECONDS);
handleSuccess(data);
}
catch (ExecutionException e) {
handleFailure(data, record, e.getCause());
}
catch (TimeoutException | InterruptedException e) {
handleFailure(data, record, e);
}
}
注意 ExecutionException 的原因是 KafkaProducerException 通过 producerRecord 属性。
使用RoutingKafkaTemplate
从 2.5 版本开始,您可以使用一个 RoutingKafkaTemplate 来在运行时选择 producer,基于目的地 topic 的名称。
The routing template does not support transactions, execute, flush, or metrics operations because the topic is not known for those operations. |
该模板需要一个将 java.util.regex.Pattern 映射到 ProducerFactory<Object, Object> 的实例的映射。
该映射应有序(例如一个 LinkedHashMap),因为它将按顺序遍历;你应该在前面添加更具体的模式。
以下简单的 Spring Boot 应用程序通过使用相同的模板向不同的主题发送消息,每个主题使用不同的值序列化器。
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,
ProducerFactory<Object, Object> pf) {
// Clone the PF with a different Serializer, register with Spring for shutdown
Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);
context.registerBean("bytesPF", DefaultKafkaProducerFactory.class, () -> bytesPF);
Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
map.put(Pattern.compile("two"), bytesPF);
map.put(Pattern.compile(".+"), pf); // Default PF with StringSerializer
return new RoutingKafkaTemplate(map);
}
@Bean
public ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) {
return args -> {
routingTemplate.send("one", "thing1");
routingTemplate.send("two", "thing2".getBytes());
};
}
}
对应的此示例中的@KafkaListener如在Annotation Properties所示。
另一种实现类似结果的技术,同时还具备将不同类型发送到同一主题的能力,请参见 委托序列化器和反序列化器。
使用DefaultKafkaProducerFactory
正如在使用KafkaTemplate中所述,使用ProducerFactory来创建生产者。
当不使用 Transactions 时,DefaultKafkaProducerFactory 默认会创建一个用于所有客户端的单例生产者,正如 KafkaProducer JavaDocs 所推荐的。
然而,如果你在 flush() 上调用模板方法,这可能会导致使用相同生产者的其他线程出现延迟。
从 2.3 版本开始,DefaultKafkaProducerFactory 引入了一个新属性 producerPerThread。
当设置为 true 时,工厂将为每个线程创建(并缓存)一个单独的生产者,以避免这个问题。
When producerPerThread is true, user code must call closeThreadBoundProducer() on the factory when the producer is no longer needed.
This will physically close the producer and remove it from the ThreadLocal.
Calling reset() or destroy() will not clean up these producers. |
也请参见 KafkaTemplate 事务和非事务发布。
当创建一个DefaultKafkaProducerFactory时,可以通过调用仅接受一个属性映射(属性)Map的构造函数从配置中获取key和/或value的Serializer类(参见使用KafkaTemplate示例),或者将Serializer实例传递给DefaultKafkaProducerFactory构造函数(在这种情况下,所有Producer共享相同的实例)。
或者,可以提供Supplier<Serializer>(从2.3版本开始)来获取每个Producer的单独Serializer实例:
@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
}
@Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
return new KafkaTemplate<Integer, CustomValue>(producerFactory());
}
从版本2.5.10开始,您可以在工厂创建后更新生产者属性。
这可能很有用,例如,如果您需要在凭据更改后更新SSL密钥/信任存储的位置。
更改不会影响现有的生产者实例;调用reset()以关闭任何现有的生产者,以便使用新属性创建新的生产者。
| 你不能将事务性生产者工厂更改为非事务性,反之亦然。 |
两个新的方法现在已提供:
void updateConfigs(Map<String, Object> updates);
void removeConfig(String configKey);
从 2.8 版本开始,如果你以对象形式提供序列化器(通过构造函数或 setter 提供),工厂将调用 configure() 方法来使用配置属性配置它们。
使用ReplyingKafkaTemplate
版本 2.1.3 引入了一个 KafkaTemplate 的子类,以提供请求/回复语义。
该类命名为 ReplyingKafkaTemplate,并具有两个额外的方法;以下显示了方法签名:
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
Duration replyTimeout);
(也参见 使用 Message<?>s 的请求/回复)。
结果是一个result,它会异步地填充结果(或在超时时抛出异常)。
结果还有一个property属性,它是调用method的结果。
你可以使用这个future来确定发送操作的结果。
如果使用第一个方法,或者replyTimeout参数为null时,将使用模板的defaultReplyTimeout属性(默认为5秒)。
从 2.8.8 版本开始,模板新增了一个方法 waitForAssignment。
这在回复容器配置为 auto.offset.reset=latest 以避免在容器初始化之前先发送一个请求和回复时特别有用。
在使用手动分区分配(无组管理)时,等待的持续时间必须大于容器的 pollTimeout 属性,因为通知将在第一次轮询完成后才发送。 |
以下这个 Spring Boot 应用程序展示了如何使用该功能的示例:
@SpringBootApplication
public class KRequestingApplication {
public static void main(String[] args) {
SpringApplication.run(KRequestingApplication.class, args).close();
}
@Bean
public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
return args -> {
if (!template.waitForAssignment(Duration.ofSeconds(10))) {
throw new IllegalStateException("Reply container did not initialize");
}
ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
System.out.println("Sent ok: " + sendResult.getRecordMetadata());
ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
System.out.println("Return value: " + consumerRecord.value());
};
}
@Bean
public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
ProducerFactory<String, String> pf,
ConcurrentMessageListenerContainer<String, String> repliesContainer) {
return new ReplyingKafkaTemplate<>(pf, repliesContainer);
}
@Bean
public ConcurrentMessageListenerContainer<String, String> repliesContainer(
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> repliesContainer =
containerFactory.createContainer("kReplies");
repliesContainer.getContainerProperties().setGroupId("repliesGroup");
repliesContainer.setAutoStartup(false);
return repliesContainer;
}
@Bean
public NewTopic kRequests() {
return TopicBuilder.name("kRequests")
.partitions(10)
.replicas(2)
.build();
}
@Bean
public NewTopic kReplies() {
return TopicBuilder.name("kReplies")
.partitions(10)
.replicas(2)
.build();
}
}
请注意,我们可以使用 Boot 自动配置的容器工厂来创建回复容器。
如果用于回复的反序列化器并非不重要,请考虑使用一个ErrorHandlingDeserializer,该反序列化器会委托给您配置的反序列化器。
这样配置后,RequestReplyFuture将异常完成,您可以捕获到ExecutionException,其中DeserializationException位于它的cause属性中。
从2.6.7版本开始,除了检测DeserializationException之外,模板还会调用replyErrorChecker函数(如果提供了的话)。如果该函数返回异常,则未来将完成为异常状态。
这是一个示例:
template.setReplyErrorChecker(record -> {
Header error = record.headers().lastHeader("serverSentAnError");
if (error != null) {
return new MyException(new String(error.value()));
}
else {
return null;
}
});
...
RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
try {
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, String> consumerRecord = future.get(10, TimeUnit.SECONDS);
...
}
catch (InterruptedException e) {
...
}
catch (ExecutionException e) {
if (e.getCause() instanceof MyException) {
...
}
}
catch (TimeoutException e) {
...
}
该模板设置了一个标题(默认命名为KafkaHeaders.CORRELATION_ID),服务器端必须将其回显。
在这种情况下,以下@KafkaListener应用程序会响应:
@SpringBootApplication
public class KReplyingApplication {
public static void main(String[] args) {
SpringApplication.run(KReplyingApplication.class, args);
}
@KafkaListener(id="server", topics = "kRequests")
@SendTo // use default replyTo expression
public String listen(String in) {
System.out.println("Server received: " + in);
return in.toUpperCase();
}
@Bean
public NewTopic kRequests() {
return TopicBuilder.name("kRequests")
.partitions(10)
.replicas(2)
.build();
}
@Bean // not required if Jackson is on the classpath
public MessagingMessageConverter simpleMapperConverter() {
MessagingMessageConverter messagingMessageConverter = new MessagingMessageConverter();
messagingMessageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper());
return messagingMessageConverter;
}
}
基础设施会回显关联ID并确定回复主题。
有关发送回复的更多信息,请参阅使用@SendTo转发监听器结果。 模板使用默认标头KafKaHeaders.REPLY_TOPIC来指示回复的主题。
从版本 2.2 开始,模板会尝试检测回复主题或分区。如果容器被配置为监听单个主题或者单个 TopicPartitionOffset,则使用它来设置回复头信息。如果容器以其他方式配置,则用户必须手动设置回复头信息。在这种情况下,在初始化期间会写入一条 INFO 级别的日志消息。以下示例使用了 KafkaHeaders.REPLY_TOPIC:
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "kReplies".getBytes()));
当您使用单个回复 TopicPartitionOffset 进行配置时,只要每个实例监听不同的分区,就可以为多个模板使用相同的回复主题。
在使用单个回复主题进行配置时,每个实例必须使用不同的 group.id。
在这种情况下,所有实例都会收到每个回复,但只有发送请求的实例才能找到关联ID。
这可能对自动扩展有用,但会增加额外的网络流量开销,并且需要花费少量成本来丢弃每个不需要的回复。
当您使用此设置时,我们建议将模板的 sharedReplyTopic 设置为 true,这样可以将意外回复的日志级别从默认的 ERROR 降低到 DEBUG。
以下是一个配置回复容器以使用相同的共享回复主题的示例:
@Bean
public ConcurrentMessageListenerContainer<String, String> replyContainer(
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("topic2");
container.getContainerProperties().setGroupId(UUID.randomUUID().toString()); // unique
Properties props = new Properties();
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old replies
container.getContainerProperties().setKafkaConsumerProperties(props);
return container;
}
| 如果您有多个客户端实例,并且没有按照上一节所述进行配置,那么每个实例都需要一个专用的回复主题。 另一种方法是设置
服务器必须使用此标头将回复路由到正确的分区( 不过在这种情况下,回复容器不应使用Kafka的组管理功能,而应被配置为监听固定分区(通过在它的 |
如果 DefaultKafkaHeaderMapper 需要Jackson在类路径中(对于@KafkaListener)。 如果没有可用,消息转换器没有头映射器,因此您必须配置一个MessagingMessageConverter与SimpleKafkaHeaderMapper,如前所述。 |
默认情况下,使用3个header:
-
KafkaHeaders.CORRELATION_ID- 用于将回复与请求关联起来 -
KafkaHeaders.REPLY_TOPIC- 用于告诉服务器回复的位置 -
KafkaHeaders.REPLY_PARTITION- (可选)用于告诉服务器回复哪个分区
这些标题名称由@KafkaListener基础设施使用来路由回复。
从 2.3 版本开始,您可以自定义标题名称——模板有三个属性 correlationHeaderName、replyTopicHeaderName 和 replyPartitionHeaderName。如果您的服务器不是 Spring 应用程序(或不使用 @KafkaListener),这会很有用。
相反,如果请求应用程序不是 Spring 应用程序,并且将关联信息放在不同的标题中,则从版本 3.0 开始,您可以在侦听器容器工厂上配置自定义 correlationHeaderName 并回显该标题。
以前,监听器必须回显自定义的关联标头。 |
请求/回复与Message<?>s
版本 2.7 在 ReplyingKafkaTemplate 上添加了方法,用于发送和接收 spring-messaging 的 Message<?> 抽象:
RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message);
<P> RequestReplyTypedMessageFuture<K, V, P> sendAndReceive(Message<?> message,
ParameterizedTypeReference<P> returnType);
这些将使用模板的默认replyTimeout,还有可以接受超时时间作为方法调用参数的重载版本。
如果消费者Deserializer或模板的MessageConverter可以在没有附加信息的情况下通过配置或回复消息中的类型元数据转换负载,则使用第一种方法。
如果需要为返回类型提供类型信息以协助消息转换器,请使用第二种方法。这种方法还允许相同的模板接收不同类型,即使回复中没有类型元数据(例如服务器端不是Spring应用程序时)。以下是后一种情况的示例:
-
Java
-
Kotlin
@Bean
ReplyingKafkaTemplate<String, String, String> template(
ProducerFactory<String, String> pf,
ConcurrentKafkaListenerContainerFactory<String, String> factory) {
ConcurrentMessageListenerContainer<String, String> replyContainer =
factory.createContainer("replies");
replyContainer.getContainerProperties().setGroupId("request.replies");
ReplyingKafkaTemplate<String, String, String> template =
new ReplyingKafkaTemplate<>(pf, replyContainer);
template.setMessageConverter(new ByteArrayJsonMessageConverter());
template.setDefaultTopic("requests");
return template;
}
@Bean
fun template(
pf: ProducerFactory<String?, String>?,
factory: ConcurrentKafkaListenerContainerFactory<String?, String?>
): ReplyingKafkaTemplate<String?, String, String?> {
val replyContainer = factory.createContainer("replies")
replyContainer.containerProperties.groupId = "request.replies"
val template = ReplyingKafkaTemplate(pf, replyContainer)
template.messageConverter = ByteArrayJsonMessageConverter()
template.defaultTopic = "requests"
return template
}
-
Java
-
Kotlin
RequestReplyTypedMessageFuture<String, String, Thing> future1 =
template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
new ParameterizedTypeReference<Thing>() { });
log.info(future1.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
Thing thing = future1.get(10, TimeUnit.SECONDS).getPayload();
log.info(thing.toString());
RequestReplyTypedMessageFuture<String, String, List<Thing>> future2 =
template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
new ParameterizedTypeReference<List<Thing>>() { });
log.info(future2.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
List<Thing> things = future2.get(10, TimeUnit.SECONDS).getPayload();
things.forEach(thing1 -> log.info(thing1.toString()));
val future1: RequestReplyTypedMessageFuture<String?, String?, Thing?>? =
template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
object : ParameterizedTypeReference<Thing?>() {})
log.info(future1?.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata?.toString())
val thing = future1?.get(10, TimeUnit.SECONDS)?.payload
log.info(thing.toString())
val future2: RequestReplyTypedMessageFuture<String?, String?, List<Thing?>?>? =
template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
object : ParameterizedTypeReference<List<Thing?>?>() {})
log.info(future2?.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata.toString())
val things = future2?.get(10, TimeUnit.SECONDS)?.payload
things?.forEach(Consumer { thing1: Thing? -> log.info(thing1.toString()) })
回复类型消息<?>
当 @KafkaListener 返回 Message<?> 时,在2.5版本之前的版本中,必须填充回复主题和相关id标头。在此示例中,我们使用请求中的回复主题标头:
@KafkaListener(id = "requestor", topics = "request")
@SendTo
public Message<?> messageReturn(String in) {
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader(KafkaHeaders.TOPIC, replyTo)
.setHeader(KafkaHeaders.KEY, 42)
.setHeader(KafkaHeaders.CORRELATION_ID, correlation)
.build();
}
这还展示了如何在回复记录上设置一个键。
从版本 2.5 开始,如果检测到这些标题缺失,框架将使用主题填充它们——无论是从 @SendTo 值确定的主题还是传入的 KafkaHeaders.REPLY_TOPIC 标题(如果存在的话)。它还会回显传入的 KafkaHeaders.CORRELATION_ID 和 KafkaHeaders.REPLY_PARTITION,如果它们存在。
@KafkaListener(id = "requestor", topics = "request")
@SendTo // default REPLY_TOPIC header
public Message<?> messageReturn(String in) {
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader(KafkaHeaders.KEY, 42)
.build();
}
聚合多个回复
模板在使用ReplyingKafkaTemplate中仅适用于单个请求/回复场景。对于单一消息的多个接收者都返回一个回复的情况,可以使用AggregatingReplyingKafkaTemplate。这是散射-聚集企业集成模式的客户端实现。
与 ReplyingKafkaTemplate 类似,AggregatingReplyingKafkaTemplate 构造函数需要一个生产者工厂和一个监听容器来接收回复;它还有一个第三个参数 BiPredicate<List<ConsumerRecord<K, R>>, Boolean> releaseStrategy,每次接收到回复时都会被调用;当该谓词返回 true 时,使用 ConsumerRecord 的集合来完成由 sendAndReceive 方法返回的 Future。
有一个额外的属性 returnPartialOnTimeout(默认为 false)。 当将其设置为 true 时,与使用 KafkaReplyTimeoutException 完成未来不同,部分结果在至少收到一条回复记录的情况下会正常完成未来。
从版本 2.3.5 开始,如果超时(returnPartialOnTimeout 是 true),也会调用谓词。 第一个参数是当前记录列表;第二个参数为 true 表示此调用是由超时引起的。 谓词可以修改记录列表。
AggregatingReplyingKafkaTemplate<Integer, String, String> template =
new AggregatingReplyingKafkaTemplate<>(producerFactory, container,
coll -> coll.size() == releaseSize);
...
RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =
template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>> consumerRecord =
future.get(30, TimeUnit.SECONDS);
请注意,返回类型是 ConsumerRecord,其值是一个包含多个 ConsumerRecord 的集合。 ConsumerRecord 是模板合成的“外部”记录,并非真正的记录,用于持有实际回复记录。
当正常发布发生时(发布策略返回 true),主题设置为 aggregatedResults;如果 returnPartialOnTimeout 为 true 并且超时发生(并且至少收到一条回复记录),则主题设置为 partialResultsAfterTimeout。
该模板提供了这些“主题”名称的常量静态变量:
/**
* Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
* results in its value after a normal release by the release strategy.
*/
public static final String AGGREGATED_RESULTS_TOPIC = "aggregatedResults";
/**
* Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
* results in its value after a timeout.
*/
public static final String PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC = "partialResultsAfterTimeout";
实际的 ConsumerRecord 包含从其中接收回复的实际主题。
回复的侦听器容器必须配置为AckMode.MANUAL或AckMode.MANUAL_IMMEDIATE;消费者属性enable.auto.commit必须是false(自版本2.3以来的默认值)。为了避免任何消息丢失的可能性,模板仅在没有待处理请求时提交偏移量,即当最后一个待处理请求被释放策略释放时。 重新平衡后可能会出现重复的回复传递;对于任何进行中的请求都会忽略这些重复的回复;当接收到已释放回复的重复回复时,您可能会看到错误日志消息。 |
如果您在此聚合模板中使用 ErrorHandlingDeserializer,框架将不会自动检测DeserializationExceptions。相反,记录(带有一个 建议应用程序调用实用程序方法 有关更多详细信息,请参阅其JavaDoc。
|