4. 参考资料

参考文档的这一部分详细介绍了构成 Spring for Apache Kafka 的各种组件。 主要章节介绍了使用 Spring 开发 Kafka 应用程序的核心类。spring-doc.cadn.net.cn

4.1. 将 Spring 用于 Apache Kafka

本节详细说明了影响使用 Spring for Apache Kafka 的各种问题。有关快速但不太详细的介绍,请参阅快速导览spring-doc.cadn.net.cn

4.1.1. 连接到 Kafka

从 2.5 版开始,其中每一个都扩展了KafkaResourceFactory. 这允许在运行时通过添加Supplier<String>到他们的配置:setBootstrapServersSupplier(() → …​). 将为所有新连接调用此命令以获取服务器列表。 消费者和生产者通常是长寿的。 要关闭现有生产者,请调用reset()DefaultKafkaProducerFactory. 要关闭现有使用者,请调用stop()(然后start()) 在KafkaListenerEndpointRegistry和/或stop()start()在任何其他侦听器容器 Bean 上。spring-doc.cadn.net.cn

为方便起见,该框架还提供了一个ABSwitchCluster支持两组引导服务器;其中一个随时处于活动状态。 配置ABSwitchCluster并将其添加到生产者和消费者工厂,然后KafkaAdmin,通过调用setBootstrapServersSupplier(). 当你想切换时,请致电primary()secondary()并调用reset()在生产工厂建立新的连接;对于消费者来说,stop()start()所有侦听器容器。 使用时@KafkaListeners,stop()start()KafkaListenerEndpointRegistry豆。spring-doc.cadn.net.cn

有关更多信息,请参阅 Javadocs。spring-doc.cadn.net.cn

工厂监听器

从 2.5 版开始,DefaultKafkaProducerFactoryDefaultKafkaConsumerFactory可以配置为Listener在创建或关闭生产者或使用者时接收通知。spring-doc.cadn.net.cn

生产者工厂监听器
interface Listener<K, V> {

    default void producerAdded(String id, Producer<K, V> producer) {
    }

    default void producerRemoved(String id, Producer<K, V> producer) {
    }

}
消费者工厂监听器
interface Listener<K, V> {

    default void consumerAdded(String id, Consumer<K, V> consumer) {
    }

    default void consumerRemoved(String id, Consumer<K, V> consumer) {
    }

}

在每种情况下,id是通过将client-id属性(从metrics()创建后)到工厂beanName属性,分隔为..spring-doc.cadn.net.cn

例如,这些侦听器可用于创建和绑定千分尺KafkaClientMetrics实例,当创建新客户端时(并在客户端关闭时关闭它)。spring-doc.cadn.net.cn

该框架提供的侦听器正是这样做的;请参阅 Micrometer Native Metricsspring-doc.cadn.net.cn

4.1.2. 配置主题

如果定义KafkaAdminbean 中,它可以自动将主题添加到代理中。 为此,您可以添加一个NewTopic @Bean对于应用程序上下文中的每个主题。 2.3 版引入了一个新类TopicBuilder使此类豆的创建更加方便。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

Java
@Bean
public KafkaAdmin admin() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    return new KafkaAdmin(configs);
}

@Bean
public NewTopic topic1() {
    return TopicBuilder.name("thing1")
            .partitions(10)
            .replicas(3)
            .compact()
            .build();
}

@Bean
public NewTopic topic2() {
    return TopicBuilder.name("thing2")
            .partitions(10)
            .replicas(3)
            .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
            .build();
}

@Bean
public NewTopic topic3() {
    return TopicBuilder.name("thing3")
            .assignReplicas(0, Arrays.asList(0, 1))
            .assignReplicas(1, Arrays.asList(1, 2))
            .assignReplicas(2, Arrays.asList(2, 0))
            .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
            .build();
}
Kotlin
@Bean
fun admin() = KafkaAdmin(mapOf(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:9092"))

@Bean
fun topic1() =
    TopicBuilder.name("thing1")
        .partitions(10)
        .replicas(3)
        .compact()
        .build()

@Bean
fun topic2() =
    TopicBuilder.name("thing2")
        .partitions(10)
        .replicas(3)
        .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
        .build()

@Bean
fun topic3() =
    TopicBuilder.name("thing3")
        .assignReplicas(0, Arrays.asList(0, 1))
        .assignReplicas(1, Arrays.asList(1, 2))
        .assignReplicas(2, Arrays.asList(2, 0))
        .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
        .build()

从 2.6 版开始,您可以省略.partitions()和/或replicas()并且代理默认值将应用于这些属性。 代理版本必须至少为 2.4.0 才能支持此功能 - 请参阅 KIP-464spring-doc.cadn.net.cn

Java
@Bean
public NewTopic topic4() {
    return TopicBuilder.name("defaultBoth")
            .build();
}

@Bean
public NewTopic topic5() {
    return TopicBuilder.name("defaultPart")
            .replicas(1)
            .build();
}

@Bean
public NewTopic topic6() {
    return TopicBuilder.name("defaultRepl")
            .partitions(3)
            .build();
}
Kotlin
@Bean
fun topic4() = TopicBuilder.name("defaultBoth").build()

@Bean
fun topic5() = TopicBuilder.name("defaultPart").replicas(1).build()

@Bean
fun topic6() = TopicBuilder.name("defaultRepl").partitions(3).build()

从 2.7 版开始,您可以声明多个NewTopics 在单个KafkaAdmin.NewTopics豆子定义:spring-doc.cadn.net.cn

Java
@Bean
public KafkaAdmin.NewTopics topics456() {
    return new NewTopics(
            TopicBuilder.name("defaultBoth")
                .build(),
            TopicBuilder.name("defaultPart")
                .replicas(1)
                .build(),
            TopicBuilder.name("defaultRepl")
                .partitions(3)
                .build());
}
Kotlin
@Bean
fun topics456() = KafkaAdmin.NewTopics(
    TopicBuilder.name("defaultBoth")
        .build(),
    TopicBuilder.name("defaultPart")
        .replicas(1)
        .build(),
    TopicBuilder.name("defaultRepl")
        .partitions(3)
        .build()
)
使用 Spring Boot 时,一个KafkaAdminbean 会自动注册,因此您只需要NewTopic(和/或NewTopics) @Beans.

默认情况下,如果代理不可用,则会记录一条消息,但上下文会继续加载。 可以以编程方式调用管理员的initialize()稍后重试的方法。 如果您希望将此情况视为致命,请将管理员的fatalIfBrokerNotAvailable属性设置为true. 然后上下文无法初始化。spring-doc.cadn.net.cn

如果代理支持它(1.0.0 或更高版本),如果发现现有主题的分区少于NewTopic.numPartitions.

从 2.7 版开始,KafkaAdmin提供了在运行时创建和检查主题的方法。spring-doc.cadn.net.cn

对于更高级的功能,您可以使用AdminClient径直。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@Autowired
private KafkaAdmin admin;

...

    AdminClient client = AdminClient.create(admin.getConfigurationProperties());
    ...
    client.close();

4.1.3. 发送消息

本节介绍如何发送消息。spring-doc.cadn.net.cn

KafkaTemplate

本节介绍如何使用KafkaTemplate发送消息。spring-doc.cadn.net.cn

概述

KafkaTemplate包装生产者,并提供向 Kafka 主题发送数据的便捷方法。 以下列表显示了来自KafkaTemplate:spring-doc.cadn.net.cn

ListenableFuture<SendResult<K, V>> sendDefault(V data);

ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);

ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);

ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);

ListenableFuture<SendResult<K, V>> send(String topic, V data);

ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);

ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);

ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);

ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);

ListenableFuture<SendResult<K, V>> send(Message<?> message);

Map<MetricName, ? extends Metric> metrics();

List<PartitionInfo> partitionsFor(String topic);

<T> T execute(ProducerCallback<K, V, T> callback);

// Flush the producer.

void flush();

interface ProducerCallback<K, V, T> {

    T doInKafka(Producer<K, V> producer);

}

有关更多详细信息,请参阅 Javadocspring-doc.cadn.net.cn

sendDefaultAPI 要求已向模板提供默认主题。spring-doc.cadn.net.cn

API 会接收timestamp作为参数,并将此时间戳存储在记录中。 用户提供的时间戳的存储方式取决于在 Kafka 主题上配置的时间戳类型。 如果主题配置为CREATE_TIME,则记录用户指定的时间戳(如果未指定,则生成)。 如果主题配置为LOG_APPEND_TIME,则忽略用户指定的时间戳,代理会添加本地代理时间。spring-doc.cadn.net.cn

metricspartitionsFor方法委托给基础Producer. 这execute方法提供对底层Producer.spring-doc.cadn.net.cn

要使用该模板,您可以配置生产者工厂并在模板的构造函数中提供它。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@Bean
public ProducerFactory<Integer, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    // See https://kafka.apache.org/documentation/#producerconfigs for more properties
    return props;
}

@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
    return new KafkaTemplate<Integer, String>(producerFactory());
}

从 2.5 版开始,您现在可以覆盖工厂的ProducerConfig属性,以从同一工厂创建具有不同生产者配置的模板。spring-doc.cadn.net.cn

@Bean
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
    return new KafkaTemplate<>(pf);
}

@Bean
public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {
    return new KafkaTemplate<>(pf,
            Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
}

请注意,类型为ProducerFactory<?, ?>(例如由 Spring Boot 自动配置的那个)可以使用不同的缩小泛型类型进行引用。spring-doc.cadn.net.cn

您还可以使用标准配置模板<bean/>定义。spring-doc.cadn.net.cn

然后,要使用该模板,您可以调用其方法之一。spring-doc.cadn.net.cn

当您将方法Message<?>参数,则主题、分区和键信息在消息头中提供,其中包含以下项目:spring-doc.cadn.net.cn

消息有效负载是数据。spring-doc.cadn.net.cn

或者,您可以配置KafkaTemplate使用ProducerListener获取具有发送结果(成功或失败)的异步回调,而不是等待Future完成。 以下列表显示了ProducerListener接口:spring-doc.cadn.net.cn

public interface ProducerListener<K, V> {

    void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);

    void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata,
            Exception exception);

}

默认情况下,模板配置了LoggingProducerListener,它会记录错误,并且在发送成功时不执行任何作。spring-doc.cadn.net.cn

为方便起见,如果您只想实现其中一种方法,则提供了默认方法实现。spring-doc.cadn.net.cn

请注意,send 方法返回一个ListenableFuture<SendResult>. 您可以向监听器注册回调,以异步接收发送结果。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

ListenableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {

    @Override
    public void onSuccess(SendResult<Integer, String> result) {
        ...
    }

    @Override
    public void onFailure(Throwable ex) {
        ...
    }

});

SendResult有两个属性,一个ProducerRecordRecordMetadata. 有关这些对象的信息,请参阅 Kafka API 文档。spring-doc.cadn.net.cn

ThrowableonFailure可以转换为KafkaProducerException;其failedProducerRecord属性包含失败的记录。spring-doc.cadn.net.cn

从 2.5 版开始,您可以使用KafkaSendCallback而不是ListenableFutureCallback,更容易提取失败的ProducerRecord,避免了需要Throwable:spring-doc.cadn.net.cn

ListenableFuture<SendResult<Integer, String>> future = template.send("topic", 1, "thing");
future.addCallback(new KafkaSendCallback<Integer, String>() {

    @Override
    public void onSuccess(SendResult<Integer, String> result) {
        ...
    }

    @Override
    public void onFailure(KafkaProducerException ex) {
        ProducerRecord<Integer, String> failed = ex.getFailedProducerRecord();
        ...
    }

});

您还可以使用一对 lambda:spring-doc.cadn.net.cn

ListenableFuture<SendResult<Integer, String>> future = template.send("topic", 1, "thing");
future.addCallback(result -> {
        ...
    }, (KafkaFailureCallback<Integer, String>) ex -> {
            ProducerRecord<Integer, String> failed = ex.getFailedProducerRecord();
            ...
    });

如果您希望阻止发送线程以等待结果,您可以调用未来的get()方法;建议使用带超时的方法。 您可能希望调用flush()在等待之前,或者,为了方便起见,模板有一个构造函数,其中有一个autoFlush导致模板flush()每次发送。 仅当您已将linger.msproducer 属性,并希望立即发送部分批次。spring-doc.cadn.net.cn

例子

本节展示了向 Kafka 发送消息的示例:spring-doc.cadn.net.cn

示例 5.非阻塞(异步)
public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    ListenableFuture<SendResult<Integer, String>> future = template.send(record);
    future.addCallback(new KafkaSendCallback<Integer, String>() {

        @Override
        public void onSuccess(SendResult<Integer, String> result) {
            handleSuccess(data);
        }

        @Override
        public void onFailure(KafkaProducerException ex) {
            handleFailure(data, record, ex);
        }

    });
}
阻止(同步)
public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    try {
        template.send(record).get(10, TimeUnit.SECONDS);
        handleSuccess(data);
    }
    catch (ExecutionException e) {
        handleFailure(data, record, e.getCause());
    }
    catch (TimeoutException | InterruptedException e) {
        handleFailure(data, record, e);
    }
}

请注意,原因ExecutionExceptionKafkaProducerException使用failedProducerRecord财产。spring-doc.cadn.net.cn

RoutingKafkaTemplate

从 2.5 版开始,您可以使用RoutingKafkaTemplate在运行时根据目标选择生产者topic名字。spring-doc.cadn.net.cn

路由模板支持事务,execute,flushmetrics作,因为这些作的主题是未知的。

该模板需要java.util.regex.PatternProducerFactory<Object, Object>实例。 此映射应按顺序排列(例如LinkedHashMap)因为它是按顺序遍历的;您应该在开始时添加更具体的模式。spring-doc.cadn.net.cn

以下简单的 Spring Boot 应用程序提供了一个示例,说明如何使用相同的模板发送到不同的主题,每个主题使用不同的值序列化器。spring-doc.cadn.net.cn

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,
            ProducerFactory<Object, Object> pf) {

        // Clone the PF with a different Serializer, register with Spring for shutdown
        Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
        DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);
        context.registerBean(DefaultKafkaProducerFactory.class, "bytesPF", bytesPF);

        Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
        map.put(Pattern.compile("two"), bytesPF);
        map.put(Pattern.compile(".+"), pf); // Default PF with StringSerializer
        return new RoutingKafkaTemplate(map);
    }

    @Bean
    public ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) {
        return args -> {
            routingTemplate.send("one", "thing1");
            routingTemplate.send("two", "thing2".getBytes());
        };
    }

}

相应的@KafkaListeners 显示在注释属性中spring-doc.cadn.net.cn

有关实现类似结果的另一种技术,但具有将不同类型发送到同一主题的附加功能,请参阅委托序列化器和反序列化器spring-doc.cadn.net.cn

DefaultKafkaProducerFactory

KafkaTemplate一个ProducerFactory用于创建生产者。spring-doc.cadn.net.cn

当不使用 Transactions 时,默认情况下,DefaultKafkaProducerFactory创建所有客户端使用的单例生产者,如KafkaProducerjavadocs。 但是,如果您调用flush()在模板上,这可能会导致使用同一生产者的其他线程出现延迟。 从 2.3 版本开始,DefaultKafkaProducerFactory有一个新属性producerPerThread. 当设置为true,工厂将为每个线程创建(并缓存)一个单独的生产者,以避免此问题。spring-doc.cadn.net.cn

什么时候producerPerThreadtrue,用户代码必须调用closeThreadBoundProducer()当不再需要生产者时,在工厂。 这将物理关闭生产者并将其从ThreadLocal. 叫reset()destroy()不会清理这些生产者。

创建DefaultKafkaProducerFactory、键和/或值Serializer可以通过调用仅接受属性 Map 的构造函数从配置中获取类(请参阅中的示例KafkaTemplate),或Serializer实例可以传递给DefaultKafkaProducerFactory构造函数(在这种情况下,所有Producers 共享相同的实例)。 或者,您可以提供Supplier<Serializer>s(从 2.3 版开始),将用于获取单独的Serializer每个实例Producer:spring-doc.cadn.net.cn

@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
}

@Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
    return new KafkaTemplate<Integer, CustomValue>(producerFactory());
}

从版本 2.5.10 开始,您现在可以在创建工厂后更新生产者属性。 例如,如果必须在凭据更改后更新 SSL 密钥/信任存储位置,这可能很有用。 这些更改不会影响现有的生产者实例;叫reset()关闭任何现有生产者,以便使用新属性创建新生产者。 注意:您不能将事务性生产者工厂更改为非事务性工厂,反之亦然。spring-doc.cadn.net.cn

现在提供了两种新方法:spring-doc.cadn.net.cn

void updateConfigs(Map<String, Object> updates);

void removeConfig(String configKey);

从版本 2.8 开始,如果您将序列化器作为对象提供(在构造函数中或通过 setter),工厂将调用configure()使用配置属性配置它们的方法。spring-doc.cadn.net.cn

ReplyingKafkaTemplate

2.1.3 版本引入了一个子类KafkaTemplate提供请求/回复语义。 该类名为ReplyingKafkaTemplate并且有两种附加方法;下面显示了方法签名:spring-doc.cadn.net.cn

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
    Duration replyTimeout);

结果是ListenableFuture异步填充结果(或超时的异常)。 结果还有一个sendFuture属性,这是调用KafkaTemplate.send(). 您可以使用此未来来确定发送作的结果。spring-doc.cadn.net.cn

如果使用第一种方法,或者replyTimeout参数是null,模板的defaultReplyTimeout属性(默认为 5 秒)。spring-doc.cadn.net.cn

从 2.8.8 版本开始,模板有一个新方法waitForAssignment. 如果回复容器配置了auto.offset.reset=latest以避免在容器初始化之前发送请求和回复。spring-doc.cadn.net.cn

使用手动分区分配(无组管理)时,等待的持续时间必须大于容器的pollTimeout属性,因为在第一次轮询完成之前不会发送通知。

以下 Spring Boot 应用程序显示了如何使用该功能的示例:spring-doc.cadn.net.cn

@SpringBootApplication
public class KRequestingApplication {

    public static void main(String[] args) {
        SpringApplication.run(KRequestingApplication.class, args).close();
    }

    @Bean
    public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
        return args -> {
            if (!template.waitForAssignment(Duration.ofSeconds(10))) {
                throw new IllegalStateException("Reply container did not initialize");
            }
            ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
            RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
            SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
            System.out.println("Sent ok: " + sendResult.getRecordMetadata());
            ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
            System.out.println("Return value: " + consumerRecord.value());
        };
    }

    @Bean
    public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
            ProducerFactory<String, String> pf,
            ConcurrentMessageListenerContainer<String, String> repliesContainer) {

        return new ReplyingKafkaTemplate<>(pf, repliesContainer);
    }

    @Bean
    public ConcurrentMessageListenerContainer<String, String> repliesContainer(
            ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {

        ConcurrentMessageListenerContainer<String, String> repliesContainer =
                containerFactory.createContainer("kReplies");
        repliesContainer.getContainerProperties().setGroupId("repliesGroup");
        repliesContainer.setAutoStartup(false);
        return repliesContainer;
    }

    @Bean
    public NewTopic kRequests() {
        return TopicBuilder.name("kRequests")
            .partitions(10)
            .replicas(2)
            .build();
    }

    @Bean
    public NewTopic kReplies() {
        return TopicBuilder.name("kReplies")
            .partitions(10)
            .replicas(2)
            .build();
    }

}

请注意,我们可以使用 Boot 的自动配置容器工厂来创建回复容器。spring-doc.cadn.net.cn

如果将重要的反序列化器用于回复,请考虑使用ErrorHandlingDeserializer委托给配置的反序列化程序。 如此配置后,RequestReplyFuture将异常完成,您可以捕获ExecutionException,替换为DeserializationException在其cause财产。spring-doc.cadn.net.cn

从 2.6.7 版本开始,除了检测DeserializationExceptions,模板将调用replyErrorChecker函数(如果提供)。 如果返回异常,则未来将异常完成。spring-doc.cadn.net.cn

这是一个例子:spring-doc.cadn.net.cn

template.setReplyErrorChecker(record -> {
    Header error = record.headers().lastHeader("serverSentAnError");
    if (error != null) {
        return new MyException(new String(error.value()));
    }
    else {
        return null;
    }
});

...

RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
try {
    future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
    ConsumerRecord<Integer, String> consumerRecord = future.get(10, TimeUnit.SECONDS);
    ...
}
catch (InterruptedException e) {
    ...
}
catch (ExecutionException e) {
    if (e.getCause instanceof MyException) {
        ...
    }
}
catch (TimeoutException e) {
    ...
}

该模板设置一个标头(名为KafkaHeaders.CORRELATION_ID默认情况下),必须由服务器端回显。spring-doc.cadn.net.cn

在这种情况下,以下内容@KafkaListener应用程序响应:spring-doc.cadn.net.cn

@SpringBootApplication
public class KReplyingApplication {

    public static void main(String[] args) {
        SpringApplication.run(KReplyingApplication.class, args);
    }

    @KafkaListener(id="server", topics = "kRequests")
    @SendTo // use default replyTo expression
    public String listen(String in) {
        System.out.println("Server received: " + in);
        return in.toUpperCase();
    }

    @Bean
    public NewTopic kRequests() {
        return TopicBuilder.name("kRequests")
            .partitions(10)
            .replicas(2)
            .build();
    }

    @Bean // not required if Jackson is on the classpath
    public MessagingMessageConverter simpleMapperConverter() {
        MessagingMessageConverter messagingMessageConverter = new MessagingMessageConverter();
        messagingMessageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper());
        return messagingMessageConverter;
    }

}

@KafkaListenerinfrastructure 回显关联 ID 并确定回复主题。spring-doc.cadn.net.cn

转发侦听器结果@SendTo了解有关发送回复的更多信息。 模板使用默认标头KafKaHeaders.REPLY_TOPIC以指示回复所指向的主题。spring-doc.cadn.net.cn

从 2.2 版开始,模板会尝试从配置的回复容器中检测回复主题或分区。 如果容器配置为侦听单个主题或单个TopicPartitionOffset,用于设置回复标头。 如果容器以其他方式配置,则用户必须设置回复标头。 在这种情况下,一个INFO日志消息在初始化期间写入。 以下示例使用KafkaHeaders.REPLY_TOPIC:spring-doc.cadn.net.cn

record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "kReplies".getBytes()));

当您配置单个回复时TopicPartitionOffset,您可以对多个模板使用相同的回复主题,只要每个实例在不同的分区上监听即可。 使用单个回复主题进行配置时,每个实例必须使用不同的group.id. 在这种情况下,所有实例都会收到每个回复,但只有发送请求的实例才能找到相关 ID。 这对于自动缩放可能很有用,但会产生额外网络流量的开销,并且丢弃每个不需要的回复的成本很小。 使用此设置时,建议将模板的sharedReplyTopictrue,这降低了对 DEBUG 的意外回复的日志记录级别,而不是默认的 ERROR。spring-doc.cadn.net.cn

以下是配置回复容器以使用同一共享回复主题的示例:spring-doc.cadn.net.cn

@Bean
public ConcurrentMessageListenerContainer<String, String> replyContainer(
        ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {

    ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("topic2");
    container.getContainerProperties().setGroupId(UUID.randomUUID().toString()); // unique
    Properties props = new Properties();
    props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old replies
    container.getContainerProperties().setKafkaConsumerProperties(props);
    return container;
}
如果您有多个客户端实例,并且未按照上一段中所述配置它们,则每个实例都需要一个专用的回复主题。 另一种方法是将KafkaHeaders.REPLY_PARTITION并为每个实例使用专用分区。 这Header包含一个四字节的 int(大端)。 服务器必须使用此标头将回复路由到正确的分区 (@KafkaListener这样做)。 但是,在这种情况下,回复容器不得使用 Kafka 的组管理功能,并且必须配置为在固定分区上监听(通过使用TopicPartitionOffset在其ContainerProperties构造函数)。
DefaultKafkaHeaderMapper要求 Jackson 在类路径上(对于@KafkaListener). 如果它不可用,则消息转换器没有标头映射器,因此您必须配置MessagingMessageConverter使用SimpleKafkaHeaderMapper,如前所述。

默认情况下,使用 3 个标头:spring-doc.cadn.net.cn

这些标头名称由@KafkaListener用于路由回复的基础结构。spring-doc.cadn.net.cn

从 2.3 版开始,您可以自定义标头名称 - 模板有 3 个属性correlationHeaderName,replyTopicHeaderNamereplyPartitionHeaderName. 如果您的服务器不是 Spring 应用程序(或不使用@KafkaListener).spring-doc.cadn.net.cn

请求/回复Message<?>s

2.7 版将方法 添加到ReplyingKafkaTemplate发送和接收spring-messagingMessage<?>抽象化:spring-doc.cadn.net.cn

RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message);

<P> RequestReplyTypedMessageFuture<K, V, P> sendAndReceive(Message<?> message,
        ParameterizedTypeReference<P> returnType);

这些将使用模板的默认值replyTimeout,还有一些重载版本可能会在方法调用中超时。spring-doc.cadn.net.cn

如果消费者的Deserializer或模板的MessageConverter可以在没有任何其他信息的情况下转换有效负载,无论是通过配置还是回复消息中的类型元数据。spring-doc.cadn.net.cn

如果需要为返回类型提供类型信息,请使用第二种方法来协助消息转换器。 这也允许同一个模板接收不同的类型,即使回复中没有类型元数据,例如当服务器端不是 Spring 应用程序时。 以下是后者的示例:spring-doc.cadn.net.cn

示例 6.模板 Bean
Java
@Bean
ReplyingKafkaTemplate<String, String, String> template(
        ProducerFactory<String, String> pf,
        ConcurrentKafkaListenerContainerFactory<String, String> factory) {

    ConcurrentMessageListenerContainer<String, String> replyContainer =
            factory.createContainer("replies");
    replyContainer.getContainerProperties().setGroupId("request.replies");
    ReplyingKafkaTemplate<String, String, String> template =
            new ReplyingKafkaTemplate<>(pf, replyContainer);
    template.setMessageConverter(new ByteArrayJsonMessageConverter());
    template.setDefaultTopic("requests");
    return template;
}
Kotlin
@Bean
fun template(
    pf: ProducerFactory<String?, String>?,
    factory: ConcurrentKafkaListenerContainerFactory<String?, String?>
): ReplyingKafkaTemplate<String?, String, String?> {
    val replyContainer = factory.createContainer("replies")
    replyContainer.containerProperties.groupId = "request.replies"
    val template = ReplyingKafkaTemplate(pf, replyContainer)
    template.messageConverter = ByteArrayJsonMessageConverter()
    template.defaultTopic = "requests"
    return template
}
示例 7.使用模板
Java
RequestReplyTypedMessageFuture<String, String, Thing> future1 =
        template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
                new ParameterizedTypeReference<Thing>() { });
log.info(future1.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
Thing thing = future1.get(10, TimeUnit.SECONDS).getPayload();
log.info(thing.toString());

RequestReplyTypedMessageFuture<String, String, List<Thing>> future2 =
        template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
                new ParameterizedTypeReference<List<Thing>>() { });
log.info(future2.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
List<Thing> things = future2.get(10, TimeUnit.SECONDS).getPayload();
things.forEach(thing1 -> log.info(thing1.toString()));
Kotlin
val future1: RequestReplyTypedMessageFuture<String?, String?, Thing?>? =
    template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
        object : ParameterizedTypeReference<Thing?>() {})
log.info(future1?.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata?.toString())
val thing = future1?.get(10, TimeUnit.SECONDS)?.payload
log.info(thing.toString())

val future2: RequestReplyTypedMessageFuture<String?, String?, List<Thing?>?>? =
    template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
        object : ParameterizedTypeReference<List<Thing?>?>() {})
log.info(future2?.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata.toString())
val things = future2?.get(10, TimeUnit.SECONDS)?.payload
things?.forEach(Consumer { thing1: Thing? -> log.info(thing1.toString()) })
回复类型 Message<?>

@KafkaListener返回一个Message<?>,对于 2.5 之前的版本,有必要填充回复主题和相关 ID 标头。 在此示例中,我们使用请求中的回复主题标头:spring-doc.cadn.net.cn

@KafkaListener(id = "requestor", topics = "request")
@SendTo
public Message<?> messageReturn(String in) {
    return MessageBuilder.withPayload(in.toUpperCase())
            .setHeader(KafkaHeaders.TOPIC, replyTo)
            .setHeader(KafkaHeaders.MESSAGE_KEY, 42)
            .setHeader(KafkaHeaders.CORRELATION_ID, correlation)
            .build();
}

这也显示了如何在回复记录上设置键。spring-doc.cadn.net.cn

从 2.5 版开始,框架将检测这些标头是否丢失,并使用主题填充它们 - 从@SendTo值或传入的KafkaHeaders.REPLY_TOPIC标头(如果存在)。 它还将呼应即将到来的KafkaHeaders.CORRELATION_IDKafkaHeaders.REPLY_PARTITION,如果存在。spring-doc.cadn.net.cn

@KafkaListener(id = "requestor", topics = "request")
@SendTo  // default REPLY_TOPIC header
public Message<?> messageReturn(String in) {
    return MessageBuilder.withPayload(in.toUpperCase())
            .setHeader(KafkaHeaders.MESSAGE_KEY, 42)
            .build();
}
聚合多个回复

中的模板ReplyingKafkaTemplate严格用于单个请求/回复场景。 对于单个消息的多个接收者返回回复的情况,您可以使用AggregatingReplyingKafkaTemplate. 这是 Scatter-Gather 企业集成模式的客户端实现。spring-doc.cadn.net.cn

ReplyingKafkaTemplateAggregatingReplyingKafkaTemplate构造函数采用生产者工厂和监听器容器来接收回复;它有第三个参数BiPredicate<List<ConsumerRecord<K, R>>, Boolean> releaseStrategy每次收到回复时都会查阅;当谓词返回时true、集合ConsumerRecords 用于完成FuturesendAndReceive方法。spring-doc.cadn.net.cn

还有一个额外的属性returnPartialOnTimeout(默认为 false)。 当将其设置为true,而不是用KafkaReplyTimeoutException,部分结果正常完成未来(只要至少收到一条回复记录)。spring-doc.cadn.net.cn

从版本 2.3.5 开始,谓词也会在超时后调用(如果returnPartialOnTimeouttrue). 第一个参数是当前记录列表;二是true如果此调用是由于超时造成的。 谓词可以修改记录列表。spring-doc.cadn.net.cn

AggregatingReplyingKafkaTemplate<Integer, String, String> template =
        new AggregatingReplyingKafkaTemplate<>(producerFactory, container,
                        coll -> coll.size() == releaseSize);
...
RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =
        template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>> consumerRecord =
        future.get(30, TimeUnit.SECONDS);

请注意,返回类型是ConsumerRecord其值是ConsumerRecords. “外层”ConsumerRecord不是“真实”记录,它由模板合成,作为请求收到的实际回复记录的持有者。 当发生正常发布(发布策略返回 true)时,主题设置为aggregatedResults;如果returnPartialOnTimeout为 true,并且发生超时(并且至少已收到一条回复记录),则该主题设置为partialResultsAfterTimeout. 该模板为这些“主题”名称提供常量静态变量:spring-doc.cadn.net.cn

/**
 * Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
 * results in its value after a normal release by the release strategy.
 */
public static final String AGGREGATED_RESULTS_TOPIC = "aggregatedResults";

/**
 * Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
 * results in its value after a timeout.
 */
public static final String PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC = "partialResultsAfterTimeout";

真实的ConsumerRecords 在Collection包含接收回复的实际主题。spring-doc.cadn.net.cn

回复的侦听器容器必须配置为AckMode.MANUALAckMode.MANUAL_IMMEDIATE;消费者属性enable.auto.commit必须是false(自 2.3 版以来的默认值)。 为了避免丢失消息的可能性,模板仅在未完成的请求为零时提交偏移量,即当发布策略释放最后一个未完成的请求时。 重新平衡后,可能会重复回复传递;对于任何机上请求,这些请求将被忽略;当收到已发布回复的重复回复时,您可能会看到错误日志消息。
如果您使用ErrorHandlingDeserializer使用此聚合模板,框架不会自动检测DeserializationExceptions. 相反,记录(带有nullvalue) 将完整返回,标头中存在反序列化异常。 建议应用程序调用实用工具方法ReplyingKafkaTemplate.checkDeserialization()方法来确定是否发生反序列化异常。 有关更多信息,请参阅其 javadocs。 这replyErrorChecker也没有调用此聚合模板;您应该对回复的每个元素执行检查。

4.1.4. 接收消息

您可以通过配置MessageListenerContainer并提供消息侦听器或使用@KafkaListener注解。spring-doc.cadn.net.cn

消息侦听器

使用消息侦听器容器时,必须提供侦听器来接收数据。 目前有八个支持消息侦听器的接口。 以下列表显示了这些接口:spring-doc.cadn.net.cn

public interface MessageListener<K, V> { (1)

    void onMessage(ConsumerRecord<K, V> data);

}

public interface AcknowledgingMessageListener<K, V> { (2)

    void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);

}

public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { (3)

    void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);

}

public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { (4)

    void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);

}

public interface BatchMessageListener<K, V> { (5)

    void onMessage(List<ConsumerRecord<K, V>> data);

}

public interface BatchAcknowledgingMessageListener<K, V> { (6)

    void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);

}

public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { (7)

    void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);

}

public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { (8)

    void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);

}
1 使用此接口处理单个ConsumerRecord从 Kafka 消费者收到的实例poll()使用自动提交或容器管理的提交方法之一时的作。
2 使用此接口处理单个ConsumerRecord从 Kafka 消费者收到的实例poll()使用手动提交方法之一时的作。
3 使用此接口处理单个ConsumerRecord从 Kafka 消费者收到的实例poll()使用自动提交或容器管理的提交方法之一时的作。 访问Consumer对象。
4 使用此接口处理单个ConsumerRecord从 Kafka 消费者收到的实例poll()使用手动提交方法之一时的作。 访问Consumer对象。
5 使用此接口处理所有ConsumerRecord从 Kafka 消费者收到的实例poll()使用自动提交或容器管理的提交方法之一时的作。AckMode.RECORD使用此接口时不支持,因为侦听器被赋予了完整的批处理。
6 使用此接口处理所有ConsumerRecord从 Kafka 消费者收到的实例poll()使用手动提交方法之一时的作。
7 使用此接口处理所有ConsumerRecord从 Kafka 消费者收到的实例poll()使用自动提交或容器管理的提交方法之一时的作。AckMode.RECORD使用此接口时不支持,因为侦听器被赋予了完整的批处理。 访问Consumer对象。
8 使用此接口处理所有ConsumerRecord从 Kafka 消费者收到的实例poll()使用手动提交方法之一时的作。 访问Consumer对象。
Consumer对象不是线程安全的。 只能在调用侦听器的线程上调用其方法。
您不应执行任何Consumer<?, ?>影响消费者在侦听器中的位置和/或提交的偏移量的方法;容器需要管理此类信息。
消息侦听器容器

MessageListenerContainer提供实现:spring-doc.cadn.net.cn

KafkaMessageListenerContainer接收来自单个线程上所有主题或分区的所有消息。 这ConcurrentMessageListenerContainer一个或多个委托KafkaMessageListenerContainer实例来提供多线程消费。spring-doc.cadn.net.cn

从 2.2.7 版本开始,您可以添加RecordInterceptor到侦听器容器;在调用侦听器之前,将调用它,允许检查或修改记录。 如果拦截器返回 null,则不会调用侦听器。 从 2.7 版开始,它有额外的方法,这些方法在侦听器退出后调用(通常,或通过抛出异常)。 此外,从 2.7 版本开始,现在有一个BatchInterceptor,为批处理侦听器提供类似的功能。 此外,ConsumerAwareRecordInterceptor(和BatchInterceptor) 提供对Consumer<?, ?>. 例如,这可用于访问拦截器中的使用者指标。spring-doc.cadn.net.cn

您不应执行任何影响这些拦截器中消费者位置和/或已提交偏移量的方法;容器需要管理此类信息。
如果拦截器更改记录(通过创建新记录),则topic,partitionoffset必须保持不变,以避免意外副作用,例如记录丢失。

CompositeRecordInterceptorCompositeBatchInterceptor可用于调用多个拦截器。spring-doc.cadn.net.cn

默认情况下,从 2.8 版开始,在使用事务时,在事务开始之前调用拦截器。 您可以设置侦听器容器的interceptBeforeTx属性设置为false在事务开始后调用拦截器。spring-doc.cadn.net.cn

从版本 2.3.8、2.4.6 开始,ConcurrentMessageListenerContainer现在支持并发大于 1 时的静态成员身份。 这group.instance.id后缀为-nn起价1. 这,加上增加的session.timeout.ms,可用于减少重新平衡事件,例如,当应用程序实例重新启动时。spring-doc.cadn.net.cn

KafkaMessageListenerContainer

以下构造函数可用:spring-doc.cadn.net.cn

public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                    ContainerProperties containerProperties)

它收到一个ConsumerFactory以及有关主题和分区以及其他配置的信息,在ContainerProperties对象。ContainerProperties具有以下构造函数:spring-doc.cadn.net.cn

public ContainerProperties(TopicPartitionOffset... topicPartitions)

public ContainerProperties(String... topics)

public ContainerProperties(Pattern topicPattern)

第一个构造函数采用TopicPartitionOffset参数来显式指示容器使用哪些分区(使用 consumerassign()方法),并具有可选的初始偏移量。 默认情况下,正值是绝对偏移量。 默认情况下,负值相对于分区内的当前最后一个偏移量。 的构造函数TopicPartitionOffset这需要额外的boolean参数。 如果这是true,初始偏移量(正或负)相对于此使用者的当前位置。 偏移量在容器启动时应用。 第二个采用主题数组,Kafka 根据group.idproperty — 在组中分配分区。第三个使用正则表达式Pattern以选择主题。spring-doc.cadn.net.cn

要分配一个MessageListener到容器,您可以使用ContainerProps.setMessageListener方法。以下示例显示了如何执行此作:spring-doc.cadn.net.cn

ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
    ...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

请注意,在创建DefaultKafkaConsumerFactory,使用仅采用上述属性的构造函数意味着键和值Deserializer类是从配置中选取的。 或者Deserializer实例可以传递给DefaultKafkaConsumerFactory构造函数,在这种情况下,所有消费者共享相同的实例。另一种选择是提供Supplier<Deserializer>s(从 2.3 版开始),将用于获取单独的Deserializer每个实例Consumer:spring-doc.cadn.net.cn

DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

请参阅 JavadocContainerProperties有关您可以设置的各种属性的更多信息。spring-doc.cadn.net.cn

从 2.1.1 版本开始,一个名为logContainerConfig可用。 什么时候trueINFO启用日志记录时,每个侦听器容器都会写入一条日志消息,汇总其配置属性。spring-doc.cadn.net.cn

默认情况下,主题偏移量提交的日志记录在DEBUG日志记录级别。 从 2.1.2 版开始,中的属性ContainerPropertiescommitLogLevel允许您指定这些消息的日志级别。 例如,要将日志级别更改为INFO,您可以使用containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);.spring-doc.cadn.net.cn

从 2.2 版开始,一个名为missingTopicsFatal已添加(默认:false自 2.3.4 起)。 如果代理上不存在任何配置的主题,这可以防止容器启动。 如果容器配置为侦听主题模式 (regex),则不适用。 以前,容器线程在consumer.poll()方法等待主题出现,同时记录许多消息。 除了日志之外,没有迹象表明存在问题。spring-doc.cadn.net.cn

从 2.8 版本开始,新的容器属性authExceptionRetryInterval已被引入。 这会导致容器在获取任何消息后重试获取消息AuthenticationExceptionAuthorizationExceptionKafkaConsumer. 例如,当配置的用户被拒绝访问读取某个主题或凭据不正确时,就会发生这种情况。 定义authExceptionRetryInterval允许容器在授予适当的权限时恢复。spring-doc.cadn.net.cn

默认情况下,不配置任何间隔 - 身份验证和授权错误被视为致命错误,这会导致容器停止。

从 2.8 版开始,在创建消费者工厂时,如果您将反序列化器作为对象提供(在构造函数中或通过 setter),工厂将调用configure()使用配置属性配置它们的方法。spring-doc.cadn.net.cn

ConcurrentMessageListenerContainer

单个构造函数类似于KafkaListenerContainer构造 函数。 以下列表显示了构造函数的签名:spring-doc.cadn.net.cn

public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                            ContainerProperties containerProperties)

它还有一个concurrency财产。 例如container.setConcurrency(3)创建三个KafkaMessageListenerContainer实例。spring-doc.cadn.net.cn

对于第一个构造函数,Kafka 使用其组管理功能将分区分布在使用者之间。spring-doc.cadn.net.cn

当监听多个主题时,默认的分区分布可能不是你所期望的。 例如,如果您有三个主题,每个主题有五个分区,并且您想使用concurrency=15,则只看到 5 个活动使用者,每个使用者从每个主题分配一个分区,其他 10 个使用者处于空闲状态。 这是因为默认的 KafkaPartitionAssignorRangeAssignor(参见其 Javadoc)。 对于这种情况,您可能需要考虑使用RoundRobinAssignor相反,它将分区分布在所有使用者之间。 然后,为每个使用者分配一个主题或分区。 要更改PartitionAssignor,您可以将partition.assignment.strategy消费者属性 (ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG) 在提供给DefaultKafkaConsumerFactory.spring-doc.cadn.net.cn

使用 Spring Boot 时,您可以分配设置策略,如下所示:spring-doc.cadn.net.cn

spring.kafka.consumer.properties.partition.assignment.strategy=\
org.apache.kafka.clients.consumer.RoundRobinAssignor

当容器属性配置为TopicPartitionOffsets,则ConcurrentMessageListenerContainer分发TopicPartitionOffset跨委托的实例KafkaMessageListenerContainer实例。spring-doc.cadn.net.cn

如果,比如说,六个TopicPartitionOffset实例,并且concurrency3;每个容器都有两个分区。 五人份TopicPartitionOffset实例中,两个容器获得两个分区,第三个容器获得一个分区。 如果concurrency大于TopicPartitionsconcurrency向下调整,使每个容器都有一个分区。spring-doc.cadn.net.cn

client.id属性(如果设置)附加为-n哪里n是与并发相对应的使用者实例。 启用 JMX 时,这是为 MBean 提供唯一名称所必需的。

从 1.3 版本开始,MessageListenerContainer提供对基础指标的访问KafkaConsumer. 在以下情况下ConcurrentMessageListenerContainermetrics()方法返回所有目标的指标KafkaMessageListenerContainer实例。 这些指标被分组到Map<MetricName, ? extends Metric>通过client-id为标的KafkaConsumer.spring-doc.cadn.net.cn

从 2.3 版本开始,ContainerProperties提供idleBetweenPolls选项,让监听器容器中的主循环在KafkaConsumer.poll()调用。 从提供的选项中选择实际睡眠间隔作为最小值,并且max.poll.interval.ms消费者配置和当前记录的批处理时间。spring-doc.cadn.net.cn

提交偏移量

提供了几个用于提交偏移的选项。 如果enable.auto.commit消费者属性是true,Kafka 会根据其配置自动提交偏移量。 如果是false,容器支持多个AckMode设置(在下一个列表中描述)。 默认值AckModeBATCH. 从 2.3 版开始,框架集enable.auto.commitfalse除非在配置中显式设置。 以前,Kafka 默认值 (true如果未设置属性,则使用 )。spring-doc.cadn.net.cn

消费者poll()方法返回一个或多个ConsumerRecords. 这MessageListener为每条记录调用。 以下列表描述了容器对每个AckMode(当事务未被使用时):spring-doc.cadn.net.cn

使用事务时,偏移量被发送到事务,语义等同于RECORDBATCH,具体取决于侦听器类型(记录或批处理)。spring-doc.cadn.net.cn

MANUALMANUAL_IMMEDIATE要求监听器是AcknowledgingMessageListenerBatchAcknowledgingMessageListener. 请参阅消息侦听器

根据syncCommitscontainer 属性,则commitSync()commitAsync()方法。syncCommitstrue默认情况下;另请参阅setSyncCommitTimeout. 看setCommitCallback获取异步提交的结果;默认回调是LoggingCommitCallback它记录错误(并在调试级别成功)。spring-doc.cadn.net.cn

因为侦听器容器有自己的提交偏移量机制,所以它更喜欢 KafkaConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG成为false. 从 2.3 版开始,它无条件地将其设置为 false,除非在消费者工厂中专门设置或容器的消费者属性覆盖。spring-doc.cadn.net.cn

Acknowledgment有以下方法:spring-doc.cadn.net.cn

public interface Acknowledgment {

    void acknowledge();

}

此方法使侦听器能够控制何时提交偏移量。spring-doc.cadn.net.cn

从 2.3 版本开始,Acknowledgment接口有两个附加方法nack(long sleep)nack(int index, long sleep). 第一个用于记录侦听器,第二个用于批处理侦听器。 为侦听器类型调用错误的方法将抛出IllegalStateException.spring-doc.cadn.net.cn

如果要提交部分批处理,请使用nack(),使用事务时,将AckModeMANUAL;调用nack()将成功处理的记录的偏移量发送到交易。
nack()只能在调用侦听器的使用者线程上调用。
nack()使用无序提交时不允许。

使用记录侦听器时,当nack()调用时,将提交任何挂起的偏移量,丢弃上次轮询中的剩余记录,并在其分区上执行搜索,以便在下一个分区重新传递失败的记录和未处理的记录poll(). 消费者可以在重新交付之前暂停,方法是将sleep论点。 这类似于在容器配置了DefaultErrorHandler.spring-doc.cadn.net.cn

使用批处理侦听器时,可以在发生故障的批处理中指定索引。 什么时候nack()调用时,将在索引之前为记录提交偏移量,并在失败和丢弃的记录的分区上执行搜索,以便在下一个记录时重新传递它们poll().spring-doc.cadn.net.cn

有关更多信息,请参阅容器错误处理程序spring-doc.cadn.net.cn

消费者在睡眠期间暂停,以便我们继续轮询代理以保持消费者处于活动状态。实际睡眠时间及其分辨率取决于容器的pollTimeout默认为 5 秒。最短睡眠时间等于pollTimeout并且所有睡眠时间都将是它的倍数。对于较短的睡眠时间,或者为了提高其准确性,请考虑减少容器的pollTimeout.
侦听器容器自动启动

侦听器容器实现SmartLifecycleautoStartuptrue默认情况下。 容器在后期阶段启动 (Integer.MAX-VALUE - 100). 实现的其他组件SmartLifecycle,以处理来自侦听器的数据,应在较早的阶段启动。 这- 100为后续阶段留出了空间,使组件能够在容器之后自动启动。spring-doc.cadn.net.cn

手动提交偏移

通常,当使用AckMode.MANUALAckMode.MANUAL_IMMEDIATE,则必须按顺序确认,因为 Kafka 不维护每条记录的状态,仅维护每个组/分区的已提交偏移量。 从 2.8 版开始,您现在可以设置 container 属性asyncAcks,这允许按任何顺序确认轮询返回的记录的确认。 侦听器容器将推迟无序提交,直到收到缺少的确认。 使用者将暂停(不传递新记录),直到提交上一个轮询的所有偏移量。spring-doc.cadn.net.cn

虽然此功能允许应用程序异步处理记录,但应理解,它增加了失败后重复传递的可能性。
@KafkaListener注解

@KafkaListenerComments 用于将 Bean 方法指定为侦听器容器的侦听器。 豆子被包裹在一个MessagingMessageListenerAdapter配置了各种功能,例如转换器,以便在必要时转换数据以匹配方法参数。spring-doc.cadn.net.cn

您可以使用 SpEL 配置注释上的大多数属性#{…​}或属性占位符 (${…​}). 有关更多信息,请参阅 Javadocspring-doc.cadn.net.cn

录制听众

@KafkaListener注解为简单的 POJO 侦听器提供了一种机制。 以下示例演示如何使用它:spring-doc.cadn.net.cn

public class Listener {

    @KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
    public void listen(String data) {
        ...
    }

}

此机制需要@EnableKafka注释@Configuration类和监听器容器工厂,用于配置底层ConcurrentMessageListenerContainer. 默认情况下,名称为kafkaListenerContainerFactory是意料之中的。 以下示例演示如何使用ConcurrentMessageListenerContainer:spring-doc.cadn.net.cn

@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
                        kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
        ...
        return props;
    }
}

请注意,要设置容器属性,必须使用getContainerProperties()工厂的方法。 它用作注入容器的实际属性的模板。spring-doc.cadn.net.cn

从 2.1.1 版本开始,您现在可以将client.id属性。 这clientIdPrefix后缀为-n哪里n是表示使用并发时容器编号的整数。spring-doc.cadn.net.cn

从版本 2.2 开始,您现在可以覆盖容器工厂的concurrencyautoStartup属性,方法是在注释本身上使用属性。属性可以是简单值、属性占位符或 SpEL 表达式。以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@KafkaListener(id = "myListener", topics = "myTopic",
        autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {
    ...
}
显式分区分配

您还可以使用显式主题和分区(以及可选的初始偏移量)配置 POJO 侦听器。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@KafkaListener(id = "thing2", topicPartitions =
        { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
          @TopicPartition(topic = "topic2", partitions = "0",
             partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
        })
public void listen(ConsumerRecord<?, ?> record) {
    ...
}

您可以在partitionspartitionOffsets属性,但不能同时使用两者。spring-doc.cadn.net.cn

与大多数注释属性一样,您可以使用 SpEL 表达式;有关如何生成大型分区列表的示例,请参阅手动分配所有分区spring-doc.cadn.net.cn

从 2.5.5 版开始,您可以将初始偏移量应用于所有分配的分区:spring-doc.cadn.net.cn

@KafkaListener(id = "thing3", topicPartitions =
        { @TopicPartition(topic = "topic1", partitions = { "0", "1" },
             partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))
        })
public void listen(ConsumerRecord<?, ?> record) {
    ...
}

通配符表示*partitions属性。 必须只有一个@PartitionOffset每个@TopicPartition.spring-doc.cadn.net.cn

此外,当监听器实现ConsumerSeekAware,onPartitionsAssigned现在调用,即使在使用手动分配时也是如此。 例如,这允许当时进行任何任意寻道作。spring-doc.cadn.net.cn

从版本 2.6.4 开始,您可以指定以逗号分隔的分区列表或分区范围:spring-doc.cadn.net.cn

@KafkaListener(id = "pp", autoStartup = "false",
        topicPartitions = @TopicPartition(topic = "topic1",
                partitions = "0-5, 7, 10-15"))
public void process(String in) {
    ...
}

范围是包容的;上面的示例将分配分区0, 1, 2, 3, 4, 5, 7, 10, 11, 12, 13, 14, 15.spring-doc.cadn.net.cn

指定初始偏移量时可以使用相同的技术:spring-doc.cadn.net.cn

@KafkaListener(id = "thing3", topicPartitions =
        { @TopicPartition(topic = "topic1",
             partitionOffsets = @PartitionOffset(partition = "0-5", initialOffset = "0"))
        })
public void listen(ConsumerRecord<?, ?> record) {
    ...
}

初始偏移量将应用于所有 6 个分区。spring-doc.cadn.net.cn

手动确认

使用手动时AckMode,您还可以为侦听器提供Acknowledgment. 以下示例还演示了如何使用不同的容器工厂。spring-doc.cadn.net.cn

@KafkaListener(id = "cat", topics = "myTopic",
          containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
    ...
    ack.acknowledge();
}
消费者记录元数据

最后,有关记录的元数据可从消息标头获得。 可以使用以下标头名称来检索邮件的标头:spring-doc.cadn.net.cn

从 2.5 版本开始,RECEIVED_MESSAGE_KEY如果传入记录具有null钥匙;以前,标头填充了null价值。 这一更改是为了使框架与spring-messaging约定,其中null值标头不存在。spring-doc.cadn.net.cn

以下示例演示如何使用标头:spring-doc.cadn.net.cn

@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
        @Header(name = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false) Integer key,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
        ) {
    ...
}

从 2.5 版开始,您可以在ConsumerRecordMetadata参数。spring-doc.cadn.net.cn

@KafkaListener(...)
public void listen(String str, ConsumerRecordMetadata meta) {
    ...
}

这包含来自ConsumerRecord除了键和值。spring-doc.cadn.net.cn

批处理侦听器

从 1.1 版开始,您可以配置@KafkaListener方法来接收从消费者轮询收到的整批消费者记录。要将侦听器容器工厂配置为创建批量侦听器,您可以将batchListener财产。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@Bean
public KafkaListenerContainerFactory<?> batchFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);  // <<<<<<<<<<<<<<<<<<<<<<<<<
    return factory;
}
从 2.8 版开始,您可以覆盖工厂的batchListenerpropery 使用batch属性@KafkaListener注解。 这与对容器错误处理程序的更改一起,允许将同一工厂用于记录和批处理侦听器。

以下示例演示如何接收有效负载列表:spring-doc.cadn.net.cn

@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {
    ...
}

主题、分区、偏移量等在并行有效负载的标头中可用。以下示例演示如何使用标头:spring-doc.cadn.net.cn

@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,
        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
        @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
        @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    ...
}

或者,您可以收到ListMessage<?>对象,每个消息中的每个偏移量和其他详细信息,但它必须是唯一的参数(除了可选的Acknowledgment,使用手动提交时,和/或Consumer<?, ?>parameters)定义。以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
public void listen14(List<Message<?>> list) {
    ...
}

@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen15(List<Message<?>> list, Acknowledgment ack) {
    ...
}

@KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory")
public void listen16(List<Message<?>> list, Acknowledgment ack, Consumer<?, ?> consumer) {
    ...
}

在这种情况下,不会对有效负载执行转换。spring-doc.cadn.net.cn

如果BatchMessagingMessageConverter配置了RecordMessageConverter,您还可以将泛型类型添加到Message参数,并转换有效负载。有关更多信息,请参阅使用批处理侦听器的有效负载转换spring-doc.cadn.net.cn

您还可以收到以下列表ConsumerRecord<?, ?>对象,但它必须是唯一的参数(除了可选的Acknowledgment,当使用手动提交时,以及Consumer<?, ?>parameters)定义。以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list) {
    ...
}

@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) {
    ...
}

从 2.2 版本开始,侦听器可以接收完整的ConsumerRecords<?, ?>对象返回poll()方法,允许侦听器访问其他方法,例如partitions()(返回TopicPartition列表中的实例)和records(TopicPartition)(获取选择性记录)。 同样,这必须是唯一的参数(除了可选的Acknowledgment,当使用手动提交或Consumer<?, ?>parameters) 的方法。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@KafkaListener(id = "pollResults", topics = "myTopic", containerFactory = "batchFactory")
public void pollResults(ConsumerRecords<?, ?> records) {
    ...
}
如果容器工厂有RecordFilterStrategy已配置,则忽略ConsumerRecords<?, ?>listeners,带有WARN发出的日志消息。 只有在以下情况下,才能使用批处理侦听器过滤记录<List<?>>使用监听器的形式。 默认情况下,一次过滤一条记录;从 2.8 版开始,您可以覆盖filterBatch在一次调用中筛选整个批处理。
注释属性

从 2.0 版开始,id属性(如果存在)用作 Kafka 消费者group.id属性,覆盖使用者工厂中配置的属性(如果存在)。 您还可以设置groupId显式或设置idIsGroup设置为 false 以恢复之前使用消费者工厂的行为group.id.spring-doc.cadn.net.cn

您可以在大多数注释属性中使用属性占位符或 SpEL 表达式,如以下示例所示:spring-doc.cadn.net.cn

@KafkaListener(topics = "${some.property}")

@KafkaListener(topics = "#{someBean.someProperty}",
    groupId = "#{someBean.someProperty}.group")

从 2.1.2 版开始,SpEL 表达式支持一个特殊的标记:__listener. 它是一个伪 Bean 名称,表示存在此注释的当前 Bean 实例。spring-doc.cadn.net.cn

请考虑以下示例:spring-doc.cadn.net.cn

@Bean
public Listener listener1() {
    return new Listener("topic1");
}

@Bean
public Listener listener2() {
    return new Listener("topic2");
}

给定上一个示例中的 bean,然后我们可以使用以下内容:spring-doc.cadn.net.cn

public class Listener {

    private final String topic;

    public Listener(String topic) {
        this.topic = topic;
    }

    @KafkaListener(topics = "#{__listener.topic}",
        groupId = "#{__listener.topic}.group")
    public void listen(...) {
        ...
    }

    public String getTopic() {
        return this.topic;
    }

}

如果,万一你有一个名为__listener,您可以通过使用beanRef属性。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@KafkaListener(beanRef = "__x", topics = "#{__x.topic}",
    groupId = "#{__x.topic}.group")

从 2.2.4 版开始,您可以直接在 Comments 上指定 Kafka 消费者属性,这些属性将覆盖消费者工厂中配置的具有相同名称的任何属性。您不能指定group.idclient.id属性;它们将被忽略;使用groupIdclientIdPrefix注释属性。spring-doc.cadn.net.cn

这些属性被指定为具有普通 Java 的单个字符串Properties文件格式:foo:bar,foo=barfoo bar.spring-doc.cadn.net.cn

@KafkaListener(topics = "myTopic", groupId = "group", properties = {
    "max.poll.interval.ms:60000",
    ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100"
})
@KafkaListener(id = "one", topics = "one")
public void listen1(String in) {
    System.out.println("1: " + in);
}

@KafkaListener(id = "two", topics = "two",
        properties = "value.deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer")
public void listen2(byte[] in) {
    System.out.println("2: " + new String(in));
}
获取消费者group.id

当在多个容器中运行相同的侦听器代码时,能够确定哪个容器(由其group.id消费者属性),记录来自。spring-doc.cadn.net.cn

您可以调用KafkaUtils.getConsumerGroupId()在侦听器线程上执行此作。或者,您可以在方法参数中访问组 ID。spring-doc.cadn.net.cn

@KafkaListener(id = "bar", topicPattern = "${topicTwo:annotated2}", exposeGroupId = "${always:true}")
public void listener(@Payload String foo,
        @Header(KafkaHeaders.GROUP_ID) String groupId) {
...
}
这在接收List<?>的记录。 它在接收ConsumerRecords<?, ?>论点。 使用KafkaUtils在这种情况下的机制。
容器线程命名

侦听器容器当前使用两个任务执行器,一个用于调用消费者,另一个用于在 kafka 消费者属性enable.auto.commitfalse. 您可以通过设置consumerExecutorlistenerExecutor容器的属性ContainerProperties. 使用池执行器时,请确保有足够的线程可用于处理使用它们的所有容器的并发性。 使用ConcurrentMessageListenerContainer,每个使用者 (concurrency).spring-doc.cadn.net.cn

如果您不提供消费者执行程序,则SimpleAsyncTaskExecutor被使用。 此执行器创建名称类似于<beanName>-C-1(消费者线程)。 对于ConcurrentMessageListenerContainer<beanName>线程名称的一部分变为<beanName>-m哪里m表示使用者实例。n每次启动容器时递增。 因此,bean 名称为container,则此容器中的线程将被命名为container-0-C-1,container-1-C-1等,容器第一次启动后;container-0-C-2,container-1-C-2等等,在停止和随后启动之后。spring-doc.cadn.net.cn

@KafkaListener作为元注释

从 2.2 版开始,您现在可以使用@KafkaListener作为元注释。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@KafkaListener
public @interface MyThreeConsumersListener {

    @AliasFor(annotation = KafkaListener.class, attribute = "id")
    String id();

    @AliasFor(annotation = KafkaListener.class, attribute = "topics")
    String[] topics();

    @AliasFor(annotation = KafkaListener.class, attribute = "concurrency")
    String concurrency() default "3";

}

您必须至少对以下一个topics,topicPatterntopicPartitions(而且,通常,idgroupId除非您指定了group.id在消费者工厂配置中)。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@MyThreeConsumersListener(id = "my.group", topics = "my.topic")
public void listen1(String in) {
    ...
}
@KafkaListener在类上

当您使用@KafkaListener在类级别,必须指定@KafkaHandler在方法层面。 传递消息时,转换后的消息有效负载类型用于确定要调用的方法。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {

    @KafkaHandler
    public void listen(String foo) {
        ...
    }

    @KafkaHandler
    public void listen(Integer bar) {
        ...
    }

    @KafkaHandler(isDefault = true)
    public void listenDefault(Object object) {
        ...
    }

}

从 2.1.3 版开始,您可以指定@KafkaHandlermethod 作为默认方法,如果其他方法没有匹配项,则调用该方法。 最多只能指定一种方法。 使用时@KafkaHandler方法,有效负载必须已经转换为域对象(以便可以执行匹配)。 使用自定义解序列化器,JsonDeserializerJsonMessageConverter与其TypePrecedence设置为TYPE_ID. 有关详细信息,请参阅序列化、反序列化和消息转换。spring-doc.cadn.net.cn

由于 Spring 解析方法参数的方式存在一些限制,默认的@KafkaHandler无法接收离散标头;它必须使用ConsumerRecordMetadata消费者记录元数据中所述。
@KafkaHandler(isDefault = true)
public void listenDefault(Object object, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    ...
}

如果对象是String;这topic参数还将获得对object.spring-doc.cadn.net.cn

如果需要默认方法中有关记录的元数据,请使用以下命令:spring-doc.cadn.net.cn

@KafkaHandler(isDefault = true)
void listen(Object in, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {
    String topic = meta.topic();
    ...
}
@KafkaListener属性修改

从 V2.7.2 开始,您现在可以在创建容器之前以编程方式修改注释属性。 为此,请添加一个或多个KafkaListenerAnnotationBeanPostProcessor.AnnotationEnhancer到应用程序上下文。AnnotationEnhancer是一个BiFunction<Map<String, Object>, AnnotatedElement, Map<String, Object>并且必须返回属性映射。 属性值可以包含 SpEL 和/或属性占位符;在执行任何解析之前调用增强器。 如果存在多个增强器,并且它们实现Ordered,它们将按顺序被调用。spring-doc.cadn.net.cn

AnnotationEnhancer必须声明 bean 定义static因为它们在应用程序上下文的生命周期中非常早期就需要。

示例如下:spring-doc.cadn.net.cn

@Bean
public static AnnotationEnhancer groupIdEnhancer() {
    return (attrs, element) -> {
        attrs.put("groupId", attrs.get("id") + "." + (element instanceof Class
                ? ((Class<?>) element).getSimpleName()
                : ((Method) element).getDeclaringClass().getSimpleName()
                        +  "." + ((Method) element).getName()));
        return attrs;
    };
}
@KafkaListener生命周期管理

@KafkaListener注解不是应用程序上下文中的 bean。 相反,它们使用KafkaListenerEndpointRegistry. 该 bean 由框架自动声明并管理容器的生命周期;它将自动启动任何具有autoStartup设置为true. 所有容器工厂创建的所有容器必须位于同一容器中phase. 有关更多信息,请参阅侦听器容器自动启动。 可以使用注册表以编程方式管理生命周期。 启动或停止注册表将启动或停止所有已注册的容器。 或者,您可以使用其id属性。 您可以设置autoStartup,这将覆盖配置到容器工厂中的默认设置。 您可以从应用程序上下文(例如自动连接)获取对 bean 的引用,以管理其已注册的容器。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false")
public void listen(...) { ... }
@Autowired
private KafkaListenerEndpointRegistry registry;

...

    this.registry.getListenerContainer("myContainer").start();

...

注册表仅维护其管理的容器的生命周期;声明为 Bean 的容器不受注册表管理,可以从应用程序上下文中获取。 可以通过调用注册表的getListenerContainers()方法。 2.2.5 版本添加了一个方便的方法getAllListenerContainers(),它返回所有容器的集合,包括由注册表管理的容器和声明为 bean 的容器。 返回的集合将包括任何已初始化的原型 Bean,但它不会初始化任何惰性 Bean 声明。spring-doc.cadn.net.cn

刷新应用程序上下文后注册的终结点将立即启动,无论其autoStartup属性,以符合SmartLifecyclecontract,其中autoStartup仅在应用程序上下文初始化期间考虑。 延迟注册的一个示例是具有@KafkaListener在原型范围内,在初始化上下文后创建实例。 从 2.8.7 版开始,您可以将注册表的alwaysStartAfterRefresh属性设置为false然后是容器的autoStartup属性将定义容器是否启动。
@KafkaListener @Payload验证

从 2.2 版开始,现在可以更轻松地添加Validator验证@KafkaListener @Payload参数。 以前,您必须配置自定义DefaultMessageHandlerMethodFactory并将其添加到注册商。 现在,您可以将验证器添加到注册商本身。 以下代码显示了如何执行此作:spring-doc.cadn.net.cn

@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {

    ...

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
      registrar.setValidator(new MyValidator());
    }

}
当您将 Spring Boot 与验证Starters一起使用时,一个LocalValidatorFactoryBean是自动配置的,如以下示例所示:
@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {

    @Autowired
    private LocalValidatorFactoryBean validator;
    ...

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
      registrar.setValidator(this.validator);
    }
}

以下示例演示如何验证:spring-doc.cadn.net.cn

public static class ValidatedClass {

  @Max(10)
  private int bar;

  public int getBar() {
    return this.bar;
  }

  public void setBar(int bar) {
    this.bar = bar;
  }

}
@KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler",
      containerFactory = "kafkaJsonListenerContainerFactory")
public void validatedListener(@Payload @Valid ValidatedClass val) {
    ...
}

@Bean
public KafkaListenerErrorHandler validationErrorHandler() {
    return (m, e) -> {
        ...
    };
}

从版本 2.5.11 开始,验证现在适用于@KafkaHandler类级监听器中的方法。 看@KafkaListener在类上.spring-doc.cadn.net.cn

重新平衡监听器

ContainerProperties有一个名为consumerRebalanceListener,它采用 Kafka 客户端的ConsumerRebalanceListener接口。 如果未提供此属性,则容器将配置一个日志记录侦听器,该侦听器在INFO水平。 该框架还添加了一个子接口ConsumerAwareRebalanceListener. 以下列表显示了ConsumerAwareRebalanceListener接口定义:spring-doc.cadn.net.cn

public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {

    void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

    void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

    void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

    void onPartitionsLost(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

}

请注意,撤销分区时有两个回调。 第一个立即调用。 第二个在提交任何挂起的偏移量后调用。 如果您希望在某个外部存储库中维护偏移量,这很有用,如以下示例所示:spring-doc.cadn.net.cn

containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {

    @Override
    public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        // acknowledge any pending Acknowledgments (if using manual acks)
    }

    @Override
    public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        // ...
            store(consumer.position(partition));
        // ...
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // ...
            consumer.seek(partition, offsetTracker.getOffset() + 1);
        // ...
    }
});
从 2.4 版本开始,一种新的方法onPartitionsLost()已添加(类似于ConsumerRebalanceLister). 默认实现ConsumerRebalanceLister简单调用onPartionsRevoked. 默认实现ConsumerAwareRebalanceListener什么都不做。 当为侦听器容器提供自定义侦听器(任一类型)时,重要的是您的实现不要调用onPartitionsRevokedonPartitionsLost. 如果实现ConsumerRebalanceListener您应该覆盖默认方法。 这是因为侦听器容器将调用自己的onPartitionsRevoked从其实施onPartitionsLost在实现上调用该方法之后。 如果实现委托给默认行为,onPartitionsRevoked每次调用Consumer在容器的侦听器上调用该方法。
转发侦听器结果@SendTo

从 2.0 版开始,如果您还注释了@KafkaListener使用@SendTo注释,方法调用返回结果,则结果将转发到@SendTo.spring-doc.cadn.net.cn

@SendTovalue 可以有多种形式:spring-doc.cadn.net.cn

从版本 2.1.11 和 2.2.1 开始,属性占位符在@SendTo值。spring-doc.cadn.net.cn

表达式求值的结果必须是String表示主题名称。 以下示例显示了各种使用@SendTo:spring-doc.cadn.net.cn

@KafkaListener(topics = "annotated21")
@SendTo("!{request.value()}") // runtime SpEL
public String replyingListener(String in) {
    ...
}

@KafkaListener(topics = "${some.property:annotated22}")
@SendTo("#{myBean.replyTopic}") // config time SpEL
public Collection<String> replyingBatchListener(List<String> in) {
    ...
}

@KafkaListener(topics = "annotated23", errorHandler = "replyErrorHandler")
@SendTo("annotated23reply") // static reply topic definition
public String replyingListenerWithErrorHandler(String in) {
    ...
}
...
@KafkaListener(topics = "annotated25")
@SendTo("annotated25reply1")
public class MultiListenerSendTo {

    @KafkaHandler
    public String foo(String in) {
        ...
    }

    @KafkaHandler
    @SendTo("!{'annotated25reply2'}")
    public String bar(@Payload(required = false) KafkaNull nul,
            @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) {
        ...
    }

}
为了支持@SendTo,则必须为侦听器容器工厂提供KafkaTemplate(在其replyTemplate属性),用于发送回复。 这应该是一个KafkaTemplate而不是ReplyingKafkaTemplate用于客户端的请求/回复处理。 使用 Spring Boot 时,boot 会自动将模板配置到出厂;配置自己的工厂时,必须按照以下示例所示进行设置。

从 2.2 版开始,您可以添加ReplyHeadersConfigurer到侦听器容器工厂。 参考此选项以确定要在回复消息中设置的标头。 以下示例演示如何添加ReplyHeadersConfigurer:spring-doc.cadn.net.cn

@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(cf());
    factory.setReplyTemplate(template());
    factory.setReplyHeadersConfigurer((k, v) -> k.equals("cat"));
    return factory;
}

如果您愿意,您还可以添加更多标题。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(cf());
    factory.setReplyTemplate(template());
    factory.setReplyHeadersConfigurer(new ReplyHeadersConfigurer() {

      @Override
      public boolean shouldCopy(String headerName, Object headerValue) {
        return false;
      }

      @Override
      public Map<String, Object> additionalHeaders() {
        return Collections.singletonMap("qux", "fiz");
      }

    });
    return factory;
}

当您使用@SendTo,则必须配置ConcurrentKafkaListenerContainerFactory使用KafkaTemplate在其replyTemplate属性来执行发送。spring-doc.cadn.net.cn

除非你使用请求/回复语义,否则只有简单的send(topic, value)方法,因此您可能希望创建一个子类来生成分区或键。 以下示例显示了如何执行此作:
@Bean
public KafkaTemplate<String, String> myReplyingTemplate() {
    return new KafkaTemplate<Integer, String>(producerFactory()) {

        @Override
        public ListenableFuture<SendResult<String, String>> send(String topic, String data) {
            return super.send(topic, partitionForData(data), keyForData(data), data);
        }

        ...

    };
}

如果侦听器方法返回Message<?>Collection<Message<?>>,侦听器方法负责设置回复的消息头。 例如,当处理来自ReplyingKafkaTemplate,您可以执行以下作:spring-doc.cadn.net.cn

@KafkaListener(id = "messageReturned", topics = "someTopic")
public Message<?> listen(String in, @Header(KafkaHeaders.REPLY_TOPIC) byte[] replyTo,
        @Header(KafkaHeaders.CORRELATION_ID) byte[] correlation) {
    return MessageBuilder.withPayload(in.toUpperCase())
            .setHeader(KafkaHeaders.TOPIC, replyTo)
            .setHeader(KafkaHeaders.MESSAGE_KEY, 42)
            .setHeader(KafkaHeaders.CORRELATION_ID, correlation)
            .setHeader("someOtherHeader", "someValue")
            .build();
}

使用请求/回复语义时,发送方可以请求目标分区。spring-doc.cadn.net.cn

您可以注释@KafkaListener方法@SendTo即使没有返回任何结果。 这是为了允许配置errorHandler可以将有关失败消息传递的信息转发到某个主题。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@KafkaListener(id = "voidListenerWithReplyingErrorHandler", topics = "someTopic",
        errorHandler = "voidSendToErrorHandler")
@SendTo("failures")
public void voidListenerWithReplyingErrorHandler(String in) {
    throw new RuntimeException("fail");
}

@Bean
public KafkaListenerErrorHandler voidSendToErrorHandler() {
    return (m, e) -> {
        return ... // some information about the failure and input data
    };
}

有关详细信息,请参阅处理异常spring-doc.cadn.net.cn

如果侦听器方法返回Iterable,默认情况下,每个元素的记录在发送值时。 从 2.3.5 版本开始,将splitIterables属性@KafkaListenerfalse整个结果将作为单个ProducerRecord. 这需要在回复模板的生产者配置中有一个合适的序列化器。 但是,如果回复是Iterable<Message<?>>该属性将被忽略,并且每个消息都单独发送。
过滤消息

在某些情况下,例如重新平衡,可能会重新传递已处理的消息。 框架无法知道此类消息是否已被处理。 这是一个应用程序级功能。 这被称为幂等接收器模式,Spring Integration 提供了它的实现spring-doc.cadn.net.cn

Spring for Apache Kafka 项目还通过FilteringMessageListenerAdapter类,它可以将MessageListener. 此类采用RecordFilterStrategy在其中实现filter方法来表示消息是重复的,应该丢弃。 这有一个名为ackDiscarded,指示适配器是否应确认已丢弃的记录。 是的false默认情况下。spring-doc.cadn.net.cn

当您使用@KafkaListener,将RecordFilterStrategy(并且可选ackDiscarded) ,以便侦听器被包装在适当的过滤适配器中。spring-doc.cadn.net.cn

此外,一个FilteringBatchMessageListenerAdapter,用于使用批处理消息侦听器时。spring-doc.cadn.net.cn

FilteringBatchMessageListenerAdapter如果您的@KafkaListener收到一个ConsumerRecords<?, ?>而不是List<ConsumerRecord<?, ?>>因为ConsumerRecords是不可变的。

从 2.8.4 版本开始,您可以覆盖侦听器容器工厂的默认值RecordFilterStrategy通过使用filter属性。spring-doc.cadn.net.cn

@KafkaListener(id = "filtered", topics = "topic", filter = "differentFilter")
public void listen(Thing thing) {
    ...
}
重试交付

请参阅DefaultErrorHandler处理异常中spring-doc.cadn.net.cn

开始@KafkaListeners 在序列中

一个常见的用例是在另一个侦听器使用了主题中的所有记录后启动侦听器。 例如,您可能希望在处理来自其他主题的记录之前将一个或多个压缩主题的内容加载到内存中。 从 2.7.3 版本开始,一个新组件ContainerGroupSequencer已被引入。 它使用@KafkaListener containerGroup属性将容器分组在一起,并在当前组中的所有容器都空闲时启动下一组中的容器。spring-doc.cadn.net.cn

最好用一个例子来说明。spring-doc.cadn.net.cn

@KafkaListener(id = "listen1", topics = "topic1", containerGroup = "g1", concurrency = "2")
public void listen1(String in) {
}

@KafkaListener(id = "listen2", topics = "topic2", containerGroup = "g1", concurrency = "2")
public void listen2(String in) {
}

@KafkaListener(id = "listen3", topics = "topic3", containerGroup = "g2", concurrency = "2")
public void listen3(String in) {
}

@KafkaListener(id = "listen4", topics = "topic4", containerGroup = "g2", concurrency = "2")
public void listen4(String in) {
}

@Bean
ContainerGroupSequencer sequencer(KafkaListenerEndpointRegistry registry) {
    return new ContainerGroupSequencer(registry, 5000, "g1", "g2");
}

在这里,我们有 4 个听众,分为两组,g1g2.spring-doc.cadn.net.cn

在应用程序上下文初始化期间,排序器将autoStartup属性设置为false. 它还设置了idleEventInterval对于任何容器(尚未设置一个容器)到提供的值(在本例中为 5000 毫秒)。 然后,当应用程序上下文启动排序器时,将启动第一组中的容器。 如ListenerContainerIdleEvent收到 s,则每个容器中的每个单独的子容器都将停止。 当ConcurrentMessageListenerContainer停止时,父容器将停止。 当一个组中的所有容器都已停止时,将启动下一个组中的容器。 组中的组或容器数没有限制。spring-doc.cadn.net.cn

默认情况下,最后一组中的容器 (g2以上)在空闲时不会停止。 要修改该行为,请将stopLastGroupWhenIdletrue在音序器上。spring-doc.cadn.net.cn

顺便说一句;以前,每个组中的容器都添加到类型为Collection<MessageListenerContainer>其中 bean 名称是containerGroup. 这些集合现在已被弃用,取而代之的是类型为ContainerGroup替换为 bean 名称,该名称是组名称,后缀为.group;在上面的示例中,将有 2 个 beang1.groupg2.group. 这Collectionbeans 将在将来的版本中删除。spring-doc.cadn.net.cn

KafkaTemplate接收

本节介绍如何使用KafkaTemplate接收消息。spring-doc.cadn.net.cn

从 2.8 版开始,该模板有四个receive()方法:spring-doc.cadn.net.cn

ConsumerRecord<K, V> receive(String topic, int partition, long offset);

ConsumerRecord<K, V> receive(String topic, int partition, long offset, Duration pollTimeout);

ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested);

ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested, Duration pollTimeout);

如您所见,您需要知道需要检索的记录的分区和偏移量;一个新的Consumer为每个作创建(和关闭)。spring-doc.cadn.net.cn

使用最后两种方法,将单独检索每条记录,并将结果组装成ConsumerRecords对象。 创建TopicPartitionOffsets 对于请求,则仅支持正的绝对偏移量。spring-doc.cadn.net.cn

4.1.5. 侦听器容器属性

表 1.ContainerProperties性能
属性 默认值 描述

ackCountspring-doc.cadn.net.cn

1spring-doc.cadn.net.cn

提交待处理偏移量之前的记录数,当ackModeCOUNTCOUNT_TIME.spring-doc.cadn.net.cn

adviceChainspring-doc.cadn.net.cn

nullspring-doc.cadn.net.cn

一条链Advice对象(例如MethodInterceptor围绕建议)包装消息侦听器,按顺序调用。spring-doc.cadn.net.cn

ackModespring-doc.cadn.net.cn

Batchspring-doc.cadn.net.cn

控制偏移量的提交频率 - 请参阅提交偏移量。spring-doc.cadn.net.cn

ackTimespring-doc.cadn.net.cn

5000spring-doc.cadn.net.cn

ackModeTIMECOUNT_TIME.spring-doc.cadn.net.cn

assignmentCommitOptionspring-doc.cadn.net.cn

LATEST_ONLY _NO_TXspring-doc.cadn.net.cn

是否承诺分配的初始职位;默认情况下,只有在ConsumerConfig.AUTO_OFFSET_RESET_CONFIGlatest即使存在事务管理器,它也不会在事务中运行。 请参阅 javadocsContainerProperties.AssignmentCommitOption有关可用选项的更多信息。spring-doc.cadn.net.cn

authExceptionRetryIntervalspring-doc.cadn.net.cn

nullspring-doc.cadn.net.cn

如果不是空,则Duration在轮投票之间休眠AuthenticationExceptionAuthorizationException由 Kafka 客户端抛出。 当 null 时,此类异常被视为致命异常,容器将停止。spring-doc.cadn.net.cn

clientIdspring-doc.cadn.net.cn

(空字符串)spring-doc.cadn.net.cn

的前缀client.id消费者财产。 覆盖消费者工厂client.id财产;在并发容器中,-n作为每个消费者实例的后缀添加。spring-doc.cadn.net.cn

checkDeserExWhenKeyNullspring-doc.cadn.net.cn

falsespring-doc.cadn.net.cn

设置为true始终检查DeserializationException标头,当null key收到。 当消费者代码无法确定ErrorHandlingDeserializer已配置,例如在使用委托反序列化程序时。spring-doc.cadn.net.cn

checkDeserExWhenValueNullspring-doc.cadn.net.cn

falsespring-doc.cadn.net.cn

设置为true始终检查DeserializationException标头,当null value收到。 当消费者代码无法确定ErrorHandlingDeserializer已配置,例如在使用委托反序列化程序时。spring-doc.cadn.net.cn

commitCallbackspring-doc.cadn.net.cn

nullspring-doc.cadn.net.cn

当存在时和syncCommitsfalse提交完成后调用的回调。spring-doc.cadn.net.cn

offsetAndMetadataProviderspring-doc.cadn.net.cn

nullspring-doc.cadn.net.cn

的提供程序OffsetAndMetadata;默认情况下,提供程序会创建偏移量和元数据,其中包含空元数据。提供程序提供了一种自定义元数据的方法。spring-doc.cadn.net.cn

commitLogLevelspring-doc.cadn.net.cn

调试spring-doc.cadn.net.cn

与提交偏移量相关的日志的日志记录级别。spring-doc.cadn.net.cn

consumerRebalanceListenerspring-doc.cadn.net.cn

nullspring-doc.cadn.net.cn

重新平衡的听众;请参阅重新平衡侦听器spring-doc.cadn.net.cn

consumerStartTimoutspring-doc.cadn.net.cn

30多岁spring-doc.cadn.net.cn

在记录错误之前等待使用者启动的时间;例如,如果您使用线程不足的任务执行器,则可能会发生这种情况。spring-doc.cadn.net.cn

consumerTaskExecutorspring-doc.cadn.net.cn

SimpleAsyncTaskExecutorspring-doc.cadn.net.cn

用于运行使用者线程的任务执行器。 默认执行器创建名为<name>-C-n;使用KafkaMessageListenerContainer,名称是 bean 名称;使用ConcurrentMessageListenerContainer名称是后缀为-n其中,每个子容器的 n 递增。spring-doc.cadn.net.cn

deliveryAttemptHeaderspring-doc.cadn.net.cn

falsespring-doc.cadn.net.cn

请参阅 Delivery Attempts 标头spring-doc.cadn.net.cn

eosModespring-doc.cadn.net.cn

V2spring-doc.cadn.net.cn

恰好一次语义模式;请参阅 Exactly Once 语义spring-doc.cadn.net.cn

fixTxOffsetsspring-doc.cadn.net.cn

falsespring-doc.cadn.net.cn

当使用事务生产者生成的记录时,并且使用者位于分区的末尾,由于用于指示事务提交/回滚的伪记录,并且可能存在回滚记录,因此滞后可能会错误地报告为大于零。 这在功能上不会影响消费者,但一些用户表示担心“滞后”不是零。 将此属性设置为true并且容器将纠正此类错误报告的偏移量。 在下一次轮询之前执行检查,以避免增加提交处理的显着复杂性。 在撰写本文时,只有当消费者配置了isolation.level=read_committedmax.poll.records大于 1。 有关更多信息,请参阅 KAFKA-10683spring-doc.cadn.net.cn

groupIdspring-doc.cadn.net.cn

nullspring-doc.cadn.net.cn

覆盖消费者group.id财产;由@KafkaListener idgroupId财产。spring-doc.cadn.net.cn

idleBeforeDataMultiplierspring-doc.cadn.net.cn

5.0spring-doc.cadn.net.cn

乘数idleEventInterval在收到任何记录之前应用。 收到记录后,将不再应用乘数。 从 2.8 版开始可用。spring-doc.cadn.net.cn

idleBetweenPollsspring-doc.cadn.net.cn

0spring-doc.cadn.net.cn

用于通过在轮询之间休眠线程来减慢传递速度。 处理一批记录的时间加上此值必须小于max.poll.interval.ms消费者财产。spring-doc.cadn.net.cn

idleEventIntervalspring-doc.cadn.net.cn

nullspring-doc.cadn.net.cn

设置后,启用发布ListenerContainerIdleEvents,请参阅应用程序事件检测空闲和无响应使用者。 另请参阅idleBeforeDataMultiplier.spring-doc.cadn.net.cn

idlePartitionEventIntervalspring-doc.cadn.net.cn

nullspring-doc.cadn.net.cn

设置后,启用发布ListenerContainerIdlePartitionEvents,请参阅应用程序事件检测空闲和无响应使用者。spring-doc.cadn.net.cn

kafkaConsumerPropertiesspring-doc.cadn.net.cn

没有spring-doc.cadn.net.cn

用于覆盖在消费者工厂上配置的任何任意消费者属性。spring-doc.cadn.net.cn

logContainerConfigspring-doc.cadn.net.cn

falsespring-doc.cadn.net.cn

设置为 true 以在 INFO 级别记录所有容器属性。spring-doc.cadn.net.cn

messageListenerspring-doc.cadn.net.cn

nullspring-doc.cadn.net.cn

消息侦听器。spring-doc.cadn.net.cn

micrometerEnabledspring-doc.cadn.net.cn

truespring-doc.cadn.net.cn

是否为使用者线程维护千分尺计时器。spring-doc.cadn.net.cn

missingTopicsFatalspring-doc.cadn.net.cn

falsespring-doc.cadn.net.cn

如果代理上不存在确认的主题,则为 true 将阻止容器启动。spring-doc.cadn.net.cn

monitorIntervalspring-doc.cadn.net.cn

30多岁spring-doc.cadn.net.cn

检查使用者线程状态的频率NonResponsiveConsumerEvents. 看noPollThresholdpollTimeout.spring-doc.cadn.net.cn

noPollThresholdspring-doc.cadn.net.cn

3.0spring-doc.cadn.net.cn

乘以pollTimeOut以确定是否发布NonResponsiveConsumerEvent. 看monitorInterval.spring-doc.cadn.net.cn

onlyLogRecordMetadataspring-doc.cadn.net.cn

falsespring-doc.cadn.net.cn

设置为 false 以记录完整的使用者记录(错误、调试日志等),而不仅仅是topic-partition@offset. 荒废的。 替换为KafkaUtils.setConsumerRecordFormatter.spring-doc.cadn.net.cn

pollTimeoutspring-doc.cadn.net.cn

5000spring-doc.cadn.net.cn

超时传递到Consumer.poll().spring-doc.cadn.net.cn

schedulerspring-doc.cadn.net.cn

ThreadPoolTaskSchedulerspring-doc.cadn.net.cn

用于运行使用者监视任务的调度程序。spring-doc.cadn.net.cn

shutdownTimeoutspring-doc.cadn.net.cn

10000spring-doc.cadn.net.cn

阻止stop()方法,直到所有使用者停止,并在发布容器停止事件之前。spring-doc.cadn.net.cn

stopContainerWhenFencedspring-doc.cadn.net.cn

falsespring-doc.cadn.net.cn

如果出现以下情况,请停止侦听器容器ProducerFencedException被抛出。 有关详细信息,请参阅回滚后处理器spring-doc.cadn.net.cn

stopImmediatespring-doc.cadn.net.cn

falsespring-doc.cadn.net.cn

当容器停止时,在当前记录之后停止处理,而不是在处理上一次轮询中的所有记录之后停止处理。spring-doc.cadn.net.cn

subBatchPerPartitionspring-doc.cadn.net.cn

参见描述。spring-doc.cadn.net.cn

使用批处理侦听器时,如果这是true,则调用侦听器,轮询结果拆分为子批次,每个分区一个。 违约false除非使用EOSMode.ALPHA- 参见 Exactly Once 语义spring-doc.cadn.net.cn

syncCommitTimeoutspring-doc.cadn.net.cn

nullspring-doc.cadn.net.cn

在以下情况下使用的超时syncCommitstrue. 如果未设置,容器将尝试确定default.api.timeout.ms消费者属性并使用它;否则它将使用 60 秒。spring-doc.cadn.net.cn

syncCommitsspring-doc.cadn.net.cn

truespring-doc.cadn.net.cn

是否使用同步或异步提交进行偏移;看commitCallback.spring-doc.cadn.net.cn

topics topicPattern topicPartitionsspring-doc.cadn.net.cn

不适用spring-doc.cadn.net.cn

配置的主题、主题模式或显式分配的主题/分区。 互斥;必须至少提供一个;强制执行者ContainerProperties构造 函数。spring-doc.cadn.net.cn

transactionManagerspring-doc.cadn.net.cn

nullspring-doc.cadn.net.cn

请参阅事务。spring-doc.cadn.net.cn

表 2.AbstractListenerContainer性能
属性 默认值 描述

afterRollbackProcessorspring-doc.cadn.net.cn

DefaultAfterRollbackProcessorspring-doc.cadn.net.cn

AfterRollbackProcessor在回滚事务后调用。spring-doc.cadn.net.cn

applicationEventPublisherspring-doc.cadn.net.cn

应用程序上下文spring-doc.cadn.net.cn

事件发布者。spring-doc.cadn.net.cn

batchErrorHandlerspring-doc.cadn.net.cn

参见描述。spring-doc.cadn.net.cn

已弃用 - 请参阅commonErrorHandler.spring-doc.cadn.net.cn

batchInterceptorspring-doc.cadn.net.cn

nullspring-doc.cadn.net.cn

设置一个BatchInterceptor在调用批处理侦听器之前调用;不适用于唱片侦听器。 另请参阅interceptBeforeTx.spring-doc.cadn.net.cn

beanNamespring-doc.cadn.net.cn

豆子名称spring-doc.cadn.net.cn

容器的 bean 名称;后缀为-n用于子容器。spring-doc.cadn.net.cn

commonErrorHandlerspring-doc.cadn.net.cn

参见描述。spring-doc.cadn.net.cn

DefaultErrorHandlernulltransactionManagerDefaultAfterRollbackProcessor被使用。 请参阅容器错误处理程序spring-doc.cadn.net.cn

containerPropertiesspring-doc.cadn.net.cn

ContainerPropertiesspring-doc.cadn.net.cn

容器属性实例。spring-doc.cadn.net.cn

errorHandlerspring-doc.cadn.net.cn

参见描述。spring-doc.cadn.net.cn

已弃用 - 请参阅commonErrorHandler.spring-doc.cadn.net.cn

genericErrorHandlerspring-doc.cadn.net.cn

参见描述。spring-doc.cadn.net.cn

已弃用 - 请参阅commonErrorHandler.spring-doc.cadn.net.cn

groupIdspring-doc.cadn.net.cn

参见描述。spring-doc.cadn.net.cn

containerProperties.groupId,如果存在,否则group.id来自消费工厂的财产。spring-doc.cadn.net.cn

interceptBeforeTxspring-doc.cadn.net.cn

truespring-doc.cadn.net.cn

确定recordInterceptor在事务开始之前或之后调用。spring-doc.cadn.net.cn

listenerIdspring-doc.cadn.net.cn

参见描述。spring-doc.cadn.net.cn

用户配置容器的 Bean 名称或id属性@KafkaListeners.spring-doc.cadn.net.cn

listenerInfospring-doc.cadn.net.cn

spring-doc.cadn.net.cn

要填充的值KafkaHeaders.LISTENER_INFO页眉。 跟@KafkaListener,则该值是从info属性。 此标头可用于各种位置,例如RecordInterceptor,RecordFilterStrategy以及侦听器代码本身。spring-doc.cadn.net.cn

pauseRequestedspring-doc.cadn.net.cn

(只读)spring-doc.cadn.net.cn

如果已请求使用者暂停,则为 True。spring-doc.cadn.net.cn

recordInterceptorspring-doc.cadn.net.cn

nullspring-doc.cadn.net.cn

设置一个RecordInterceptor在调用记录侦听器之前调用;不适用于批处理侦听器。另请参阅interceptBeforeTx.spring-doc.cadn.net.cn

topicCheckTimeoutspring-doc.cadn.net.cn

30多岁spring-doc.cadn.net.cn

missingTopicsFatalcontainer 属性为true,等待多长时间(以秒为单位),以describeTopics作完成。spring-doc.cadn.net.cn

表 3.KafkaMessageListenerContainer性能
属性 默认值 描述

assignedPartitionsspring-doc.cadn.net.cn

(只读)spring-doc.cadn.net.cn

当前分配给此容器的分区(显式或非显式)。spring-doc.cadn.net.cn

assignedPartitionsByClientIdspring-doc.cadn.net.cn

(只读)spring-doc.cadn.net.cn

当前分配给此容器的分区(显式或非显式)。spring-doc.cadn.net.cn

clientIdSuffixspring-doc.cadn.net.cn

nullspring-doc.cadn.net.cn

由并发容器用于为每个子容器的消费者提供唯一的client.id.spring-doc.cadn.net.cn

containerPausedspring-doc.cadn.net.cn

不适用spring-doc.cadn.net.cn

如果已请求暂停并且使用者实际上已暂停,则为 True。spring-doc.cadn.net.cn

表 4.ConcurrentMessageListenerContainer性能
属性 默认值 描述

alwaysClientIdSuffixspring-doc.cadn.net.cn

truespring-doc.cadn.net.cn

设置为 false 以禁止向client.idconsumer 属性,当concurrency只有 1。spring-doc.cadn.net.cn

assignedPartitionsspring-doc.cadn.net.cn

(只读)spring-doc.cadn.net.cn

当前分配给此容器子级的分区聚合KafkaMessageListenerContainers(显式或非显式)。spring-doc.cadn.net.cn

assignedPartitionsByClientIdspring-doc.cadn.net.cn

(只读)spring-doc.cadn.net.cn

当前分配给此容器子级的分区KafkaMessageListenerContainers(显式或非显式),由子容器的消费者的client.id财产。spring-doc.cadn.net.cn

concurrencyspring-doc.cadn.net.cn

1spring-doc.cadn.net.cn

子数KafkaMessageListenerContainers 来管理。spring-doc.cadn.net.cn

containerPausedspring-doc.cadn.net.cn

不适用spring-doc.cadn.net.cn

如果已请求暂停并且所有子容器的使用者实际上都已暂停,则为 true。spring-doc.cadn.net.cn

containersspring-doc.cadn.net.cn

不适用spring-doc.cadn.net.cn

对所有子项的引用KafkaMessageListenerContainers.spring-doc.cadn.net.cn

4.1.6. 动态创建容器

有几种技术可用于在运行时创建侦听器容器。 本节探讨其中的一些技术。spring-doc.cadn.net.cn

MessageListener 实现

如果您直接实现自己的监听器,则只需使用容器工厂为该监听器创建一个原始容器:spring-doc.cadn.net.cn

示例 8.用户监听器
Java
public class MyListener implements MessageListener<String, String> {

    @Override
    public void onMessage(ConsumerRecord<String, String> data) {
        // ...
    }

}

private ConcurrentMessageListenerContainer<String, String> createContainer(
        ConcurrentKafkaListenerContainerFactory<String, String> factory, String topic, String group) {

    ConcurrentMessageListenerContainer<String, String> container = factory.createContainer(topic);
    container.getContainerProperties().setMessageListener(new MyListener());
    container.getContainerProperties().setGroupId(group);
    container.setBeanName(group);
    container.start();
    return container;
}
Kotlin
class MyListener : MessageListener<String?, String?> {

    override fun onMessage(data: ConsumerRecord<String?, String?>) {
        // ...
    }

}

private fun createContainer(
    factory: ConcurrentKafkaListenerContainerFactory<String, String>, topic: String, group: String
): ConcurrentMessageListenerContainer<String, String> {
    val container = factory.createContainer(topic)
    container.containerProperties.messageListener = MyListener()
    container.containerProperties.groupId = group
    container.beanName = group
    container.start()
    return container
}
原型豆

@KafkaListener可以通过将 bean 声明为原型来动态创建:spring-doc.cadn.net.cn

示例 9.原型
Java
public class MyPojo {

    private final String id;

    private final String topic;

    public MyPojo(String id, String topic) {
        this.id = id;
        this.topic = topic;
    }

    public String getId() {
        return this.id;
    }

    public String getTopic() {
        return this.topic;
    }

    @KafkaListener(id = "#{__listener.id}", topics = "#{__listener.topic}")
    public void listen(String in) {
        System.out.println(in);
    }

}

@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
MyPojo pojo(String id, String topic) {
	return new MyPojo(id, topic);
}

applicationContext.getBean(MyPojo.class, "one", "topic2");
applicationContext.getBean(MyPojo.class, "two", "topic3");
Kotlin
class MyPojo(id: String?, topic: String?) {

    @KafkaListener(id = "#{__listener.id}", topics = ["#{__listener.topics}"])
    fun listen(`in`: String?) {
        println(`in`)
    }

}

@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
fun pojo(id: String?, topic: String?): MyPojo {
    return MyPojo(id, topic)
}

applicationContext.getBean(MyPojo::class.java, "one", arrayOf("topic2"))
applicationContext.getBean(MyPojo::class.java, "two", arrayOf("topic3"))
侦听器必须具有唯一的 ID。 从 2.8.9 版本开始,KafkaListenerEndpointRegistry有了新方法unregisterListenerContainer(String id)以允许您重复使用 ID。 取消注册容器不会stop()容器,你必须自己做。

4.1.7. 应用程序事件

以下 Spring 应用程序事件由侦听器容器及其使用者发布:spring-doc.cadn.net.cn

  • ConsumerStartingEvent- 在使用者线程首次启动时发布,在开始轮询之前。spring-doc.cadn.net.cn

  • ConsumerStartedEvent- 当使用者即将开始轮询时发布。spring-doc.cadn.net.cn

  • ConsumerFailedToStartEvent- 如果没有,则已发布ConsumerStartingEventconsumerStartTimeoutcontainer 属性。 此事件可能表示配置的任务执行器没有足够的线程来支持使用它的容器及其并发性。 发生这种情况时,还会记录一条错误消息。spring-doc.cadn.net.cn

  • ListenerContainerIdleEvent:在未收到任何消息时发布idleInterval(如果已配置)。spring-doc.cadn.net.cn

  • ListenerContainerNoLongerIdleEvent:在之前发布记录后使用记录时发布ListenerContainerIdleEvent.spring-doc.cadn.net.cn

  • ListenerContainerPartitionIdleEvent:当未从该分区收到消息时发布idlePartitionEventInterval(如果已配置)。spring-doc.cadn.net.cn

  • ListenerContainerPartitionNoLongerIdleEvent:当从先前发布过ListenerContainerPartitionIdleEvent.spring-doc.cadn.net.cn

  • NonResponsiveConsumerEvent:当消费者似乎在poll方法。spring-doc.cadn.net.cn

  • ConsumerPartitionPausedEvent:在分区暂停时由每个使用者发布。spring-doc.cadn.net.cn

  • ConsumerPartitionResumedEvent:由每个使用者在恢复分区时发布。spring-doc.cadn.net.cn

  • ConsumerPausedEvent:在容器暂停时由每个使用者发布。spring-doc.cadn.net.cn

  • ConsumerResumedEvent:在容器恢复时由每个使用者发布。spring-doc.cadn.net.cn

  • ConsumerStoppingEvent:由每个消费者在停止前发布。spring-doc.cadn.net.cn

  • ConsumerStoppedEvent:在使用者关闭后发布。 请参阅线程安全spring-doc.cadn.net.cn

  • ContainerStoppedEvent:当所有消费者都停止时发布。spring-doc.cadn.net.cn

默认情况下,应用程序上下文的事件多播器在调用线程上调用事件侦听器。 如果将多播程序更改为使用异步执行器,则不得调用任何Consumer当事件包含对消费者的引用时的方法。

ListenerContainerIdleEvent具有以下属性:spring-doc.cadn.net.cn

ListenerContainerNoLongerIdleEvent具有相同的属性,但idleTimepaused.spring-doc.cadn.net.cn

ListenerContainerPartitionIdleEvent具有以下属性:spring-doc.cadn.net.cn

ListenerContainerPartitionNoLongerIdleEvent具有相同的属性,但idleTimepaused.spring-doc.cadn.net.cn

NonResponsiveConsumerEvent具有以下属性:spring-doc.cadn.net.cn

ConsumerPausedEvent,ConsumerResumedEventConsumerStopping事件具有以下属性:spring-doc.cadn.net.cn

ConsumerPartitionPausedEvent,ConsumerPartitionResumedEvent事件具有以下属性:spring-doc.cadn.net.cn

ConsumerStartingEvent,ConsumerStartingEvent,ConsumerFailedToStartEvent,ConsumerStoppedEventContainerStoppedEvent事件具有以下属性:spring-doc.cadn.net.cn

所有容器(无论是子容器还是父容器)发布ContainerStoppedEvent. 对于父容器,源和容器属性相同。spring-doc.cadn.net.cn

此外,ConsumerStoppedEvent具有以下附加属性:spring-doc.cadn.net.cn

您可以使用此事件在出现此类情况后重新启动容器:spring-doc.cadn.net.cn

if (event.getReason.equals(Reason.FENCED)) {
    event.getSource(MessageListenerContainer.class).start();
}
检测空闲和无响应的使用者

异步使用者虽然高效,但一个问题是检测它们何时处于空闲状态。 如果一段时间内没有邮件到达,您可能需要采取一些措施。spring-doc.cadn.net.cn

您可以将侦听器容器配置为发布ListenerContainerIdleEvent当一段时间过去没有消息传递时。 当容器处于空余状态时,每隔一次就会发布一个事件idleEventInterval毫秒。spring-doc.cadn.net.cn

要配置此功能,请将idleEventInterval在容器上。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@Bean
public KafkaMessageListenerContainer(ConsumerFactory<String, String> consumerFactory) {
    ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
    ...
    containerProps.setIdleEventInterval(60000L);
    ...
    KafkaMessageListenerContainer<String, String> container = new KafKaMessageListenerContainer<>(...);
    return container;
}

以下示例演示如何设置idleEventInterval对于一个@KafkaListener:spring-doc.cadn.net.cn

@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
    ...
    factory.getContainerProperties().setIdleEventInterval(60000L);
    ...
    return factory;
}

在每种情况下,当容器处于空闲状态时,每分钟发布一次事件。spring-doc.cadn.net.cn

如果由于某种原因,消费者poll()方法不退出,没有收到任何消息,也无法生成空闲事件(这是早期版本的kafka-clients当无法联系到经纪人时)。 在这种情况下,容器会发布一个NonResponsiveConsumerEvent如果轮询未在3xpollTimeout财产。 默认情况下,每个容器中每 30 秒执行一次此检查。 您可以通过将monitorInterval(默认 30 秒)和noPollThreshold(默认 3.0)属性ContainerProperties配置侦听器容器时。 这noPollThreshold应大于1.0以避免由于竞争条件而获得虚假事件。 接收此类事件可以停止容器,从而唤醒使用者以便它可以停止。spring-doc.cadn.net.cn

从 2.6.2 版开始,如果容器发布了ListenerContainerIdleEvent,它将发布一个ListenerContainerNoLongerIdleEvent随后收到记录时。spring-doc.cadn.net.cn

事件消耗

您可以通过实现ApplicationListener— 要么是普通侦听器,要么是缩小为仅接收此特定事件的侦听器。 您还可以使用@EventListener,在 Spring Framework 4.2 中引入。spring-doc.cadn.net.cn

下一个示例将@KafkaListener@EventListener到单个类中。 您应该了解,应用程序侦听器会获取所有容器的事件,因此,如果要根据空闲的容器执行特定作,则可能需要检查侦听器 ID。 您还可以使用@EventListener condition为此目的。spring-doc.cadn.net.cn

有关事件属性的信息,请参阅应用程序事件。spring-doc.cadn.net.cn

该事件通常发布在消费者线程上,因此可以安全地与Consumer对象。spring-doc.cadn.net.cn

以下示例同时使用@KafkaListener@EventListener:spring-doc.cadn.net.cn

public class Listener {

    @KafkaListener(id = "qux", topics = "annotated")
    public void listen4(@Payload String foo, Acknowledgment ack) {
        ...
    }

    @EventListener(condition = "event.listenerId.startsWith('qux-')")
    public void eventHandler(ListenerContainerIdleEvent event) {
        ...
    }

}
事件侦听器可以看到所有容器的事件。因此,在前面的示例中,我们根据侦听器 ID 缩小了接收到的事件范围。由于为@KafkaListener支持并发,实际容器命名为id-n其中n是每个实例的唯一值,以支持并发。这就是我们使用startsWith在条件下。
如果您希望使用 idle 事件来停止列表器容器,则不应调用container.stop()在调用侦听器的线程上。这样做会导致延迟和不必要的日志消息。相反,您应该将事件移交给可以停止容器的其他线程。此外,您不应该stop()容器实例(如果它是子容器)。 您应该停止并发容器。
空闲时的当前位置

请注意,您可以通过实现ConsumerSeekAware在您的听众中。 看onIdleContainer()寻求特定的抵消spring-doc.cadn.net.cn

4.1.8. 主题/分区初始偏移量

有几种方法可以设置分区的初始偏移量。spring-doc.cadn.net.cn

手动分配分区时,您可以在配置的TopicPartitionOffset参数(参见消息侦听器容器)。 您还可以随时查找特定的偏移量。spring-doc.cadn.net.cn

当您使用组管理时,代理分配分区:spring-doc.cadn.net.cn

  • 对于新的group.id,则初始偏移量由auto.offset.reset消费者属性 (earliestlatest).spring-doc.cadn.net.cn

  • 对于现有组 ID,初始偏移量是该组 ID 的当前偏移量。 但是,可以在初始化期间(或之后的任何时间)查找特定偏移量。spring-doc.cadn.net.cn

4.1.9. 寻求特定偏移量

为了寻求,您的监听器必须实现ConsumerSeekAware,它有以下方法:spring-doc.cadn.net.cn

void registerSeekCallback(ConsumerSeekCallback callback);

void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

void onPartitionsRevoked(Collection<TopicPartition> partitions)

void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

registerSeekCallback在启动容器时以及每当分配分区时调用。 在初始化后的某个任意时间进行搜索时,应使用此回调。 您应该保存对回调的引用。 如果您在多个容器(或在ConcurrentMessageListenerContainer),您应该将回调存储在ThreadLocal或由侦听器键控的其他结构Thread.spring-doc.cadn.net.cn

使用组管理时,onPartitionsAssigned在分配分区时调用。 例如,您可以使用此方法通过调用回调来设置分区的初始偏移量。 您还可以使用此方法将此线程的回调与分配的分区相关联(请参阅下面的示例)。 您必须使用回调参数,而不是传递给registerSeekCallback. 从版本 2.5.5 开始,即使使用手动分区分配,也会调用此方法。spring-doc.cadn.net.cn

onPartitionsRevoked在容器停止或 Kafka 撤销分配时调用。您应该丢弃此线程的回调并删除与已撤销分区的任何关联。spring-doc.cadn.net.cn

回调具有以下方法:spring-doc.cadn.net.cn

void seek(String topic, int partition, long offset);

void seekToBeginning(String topic, int partition);

void seekToBeginning(Collection=<TopicPartitions> partitions);

void seekToEnd(String topic, int partition);

void seekToEnd(Collection=<TopicPartitions> partitions);

void seekRelative(String topic, int partition, long offset, boolean toCurrent);

void seekToTimestamp(String topic, int partition, long timestamp);

void seekToTimestamp(Collection<TopicPartition> topicPartitions, long timestamp);

seekRelative在 2.3 版中添加了,以执行相对搜索。spring-doc.cadn.net.cn

seekToTimestamp2.3 版中还添加了方法。spring-doc.cadn.net.cn

onIdleContaineronPartitionsAssigned方法,第二种方法是首选,因为在对消费者的offsetsForTimes方法。 从其他位置调用时,容器将收集所有时间戳查找请求,并对offsetsForTimes.

您还可以从onIdleContainer()检测到空闲容器时。 有关如何启用空闲容器检测,请参阅检测空闲和无响应的使用者spring-doc.cadn.net.cn

seekToBeginning方法很有用,例如,在处理压缩主题并且您希望每次启动应用程序时都搜索到开头:
public class MyListener implements ConsumerSeekAware {

...

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        callback.seekToBeginning(assignments.keySet());
    }

}

要在运行时任意查找,请使用registerSeekCallback对于适当的线程。spring-doc.cadn.net.cn

这是一个简单的 Spring Boot 应用程序,演示了如何使用回调;它向主题发送 10 条记录;打<Enter>导致所有分区从头开始搜索。spring-doc.cadn.net.cn

@SpringBootApplication
public class SeekExampleApplication {

    public static void main(String[] args) {
        SpringApplication.run(SeekExampleApplication.class, args);
    }

    @Bean
    public ApplicationRunner runner(Listener listener, KafkaTemplate<String, String> template) {
        return args -> {
            IntStream.range(0, 10).forEach(i -> template.send(
                new ProducerRecord<>("seekExample", i % 3, "foo", "bar")));
            while (true) {
                System.in.read();
                listener.seekToStart();
            }
        };
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("seekExample", 3, (short) 1);
    }

}

@Component
class Listener implements ConsumerSeekAware {

    private static final Logger logger = LoggerFactory.getLogger(Listener.class);

    private final ThreadLocal<ConsumerSeekCallback> callbackForThread = new ThreadLocal<>();

    private final Map<TopicPartition, ConsumerSeekCallback> callbacks = new ConcurrentHashMap<>();

    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) {
        this.callbackForThread.set(callback);
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        assignments.keySet().forEach(tp -> this.callbacks.put(tp, this.callbackForThread.get()));
    }

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        partitions.forEach(tp -> this.callbacks.remove(tp));
        this.callbackForThread.remove();
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
    }

    @KafkaListener(id = "seekExample", topics = "seekExample", concurrency = "3")
    public void listen(ConsumerRecord<String, String> in) {
        logger.info(in.toString());
    }

    public void seekToStart() {
        this.callbacks.forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
    }

}

为了让事情变得简单,2.3 版本添加了AbstractConsumerSeekAware类,它跟踪要用于主题/分区的回调。 以下示例演示如何在每次容器空闲时查找每个分区中处理的最后一条记录。 它还具有允许任意外部调用按一条记录倒带分区的方法。spring-doc.cadn.net.cn

public class SeekToLastOnIdleListener extends AbstractConsumerSeekAware {

    @KafkaListener(id = "seekOnIdle", topics = "seekOnIdle")
    public void listen(String in) {
        ...
    }

    @Override
    public void onIdleContainer(Map<org.apache.kafka.common.TopicPartition, Long> assignments,
            ConsumerSeekCallback callback) {

            assignments.keySet().forEach(tp -> callback.seekRelative(tp.topic(), tp.partition(), -1, true));
    }

    /**
    * Rewind all partitions one record.
    */
    public void rewindAllOneRecord() {
        getSeekCallbacks()
            .forEach((tp, callback) ->
                callback.seekRelative(tp.topic(), tp.partition(), -1, true));
    }

    /**
    * Rewind one partition one record.
    */
    public void rewindOnePartitionOneRecord(String topic, int partition) {
        getSeekCallbackFor(new org.apache.kafka.common.TopicPartition(topic, partition))
            .seekRelative(topic, partition, -1, true);
    }

}

2.6 版为抽象类添加了便利方法:spring-doc.cadn.net.cn

public class MyListener extends AbstractConsumerSeekAware {

    @KafkaListener(...)
    void listn(...) {
        ...
    }
}

public class SomeOtherBean {

    MyListener listener;

    ...

    void someMethod() {
        this.listener.seekToTimestamp(System.currentTimeMillis - 60_000);
    }

}

4.1.10. 集装箱工厂

@KafkaListener注解一个ConcurrentKafkaListenerContainerFactory用于为带注释的方法创建容器。spring-doc.cadn.net.cn

从 2.2 版开始,您可以使用相同的工厂创建任何ConcurrentMessageListenerContainer. 如果您想创建多个具有相似属性的容器,或者您希望使用一些外部配置的工厂,例如 Spring Boot 自动配置提供的工厂,这可能很有用。 创建容器后,您可以进一步修改其属性,其中许多属性是通过使用container.getContainerProperties(). 以下示例配置ConcurrentMessageListenerContainer:spring-doc.cadn.net.cn

@Bean
public ConcurrentMessageListenerContainer<String, String>(
        ConcurrentKafkaListenerContainerFactory<String, String> factory) {

    ConcurrentMessageListenerContainer<String, String> container =
        factory.createContainer("topic1", "topic2");
    container.setMessageListener(m -> { ... } );
    return container;
}
以这种方式创建的容器不会添加到终结点注册表中。 它们应该创建为@Bean定义,以便它们向应用程序上下文注册。

从 2.3.4 版本开始,您可以添加ContainerCustomizer到工厂,以便在创建和配置每个容器后进一步配置它。spring-doc.cadn.net.cn

@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    ...
    factory.setContainerCustomizer(container -> { /* customize the container */ });
    return factory;
}

4.1.11. 线程安全

使用并发消息侦听器容器时,将在所有使用者线程上调用单个侦听器实例。 因此,侦听器需要是线程安全的,并且最好使用无状态侦听器。 如果无法使侦听器成为线程安全的,或者添加同步会显着降低添加并发的好处,则可以使用以下几种技术之一:spring-doc.cadn.net.cn

为了便于清理线程状态(对于前面列表中的第二项和第三项),从 2.2 版开始,侦听器容器发布ConsumerStoppedEvent当每个线程退出时。您可以使用ApplicationListener@EventListener删除方法ThreadLocal<?>instances 或remove()线程作用域的 bean。请注意,SimpleThreadScope不会销毁具有销毁接口(例如DisposableBean),所以你应该destroy()实例自己。spring-doc.cadn.net.cn

默认情况下,应用程序上下文的事件多播器在调用线程上调用事件侦听器。如果将多播器更改为使用异步执行器,则线程清理无效。

4.1.12. 监控

监控侦听器性能

从 2.3 版本开始,监听器容器将自动创建和更新 MicrometerTimers 表示侦听器,如果Micrometer在类路径上检测到单个MeterRegistry存在于应用程序上下文中。可以通过设置ContainerProperty micrometerEnabledfalse.spring-doc.cadn.net.cn

维护两个计时器 - 一个用于成功调用侦听器,一个用于失败。spring-doc.cadn.net.cn

计时器被命名为spring.kafka.listener并具有以下标签:spring-doc.cadn.net.cn

您可以使用ContainerProperties micrometerTags财产。spring-doc.cadn.net.cn

使用并发容器,为每个线程创建计时器,并且name标签后缀为-n其中 n 是0concurrency-1.
监控 KafkaTemplate 性能

从 2.5 版开始,模板将自动创建和更新 MicrometerTimers 用于发送作,如果Micrometer在类路径上检测到单个MeterRegistry存在于应用程序上下文中。 可以通过设置模板的micrometerEnabled属性设置为false.spring-doc.cadn.net.cn

维护两个计时器 - 一个用于成功调用侦听器,一个用于失败。spring-doc.cadn.net.cn

计时器被命名为spring.kafka.template并具有以下标签:spring-doc.cadn.net.cn

您可以使用模板的micrometerTags财产。spring-doc.cadn.net.cn

微米原生指标

从 2.5 版本开始,该框架提供了工厂监听器来管理千分尺KafkaClientMetrics每当创建和关闭生产者和消费者时,实例。spring-doc.cadn.net.cn

要启用此功能,只需将监听器添加到生产者和消费者工厂:spring-doc.cadn.net.cn

@Bean
public ConsumerFactory<String, String> myConsumerFactory() {
    Map<String, Object> configs = consumerConfigs();
    ...
    DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(configs);
    ...
    cf.addListener(new MicrometerConsumerListener<String, String>(meterRegistry(),
            Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
    ...
    return cf;
}

@Bean
public ProducerFactory<String, String> myProducerFactory() {
    Map<String, Object> configs = producerConfigs();
    configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
    ...
    DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
    ...
    pf.addListener(new MicrometerProducerListener<String, String>(meterRegistry(),
            Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
    ...
    return pf;
}

消费者/生产者id传递给侦听器的标记将添加到带有标记名称的仪表的标记中spring.id.spring-doc.cadn.net.cn

获取 Kafka 指标之一的示例
double count = this.meterRegistry.get("kafka.producer.node.incoming.byte.total")
                .tag("customTag", "customTagValue")
                .tag("spring.id", "myProducerFactory.myClientId-1")
                .functionCounter()
                .count()

StreamsBuilderFactoryBean- 请参阅 KafkaStreams 千分尺支持spring-doc.cadn.net.cn

4.1.13. 事务

本节介绍 Spring for Apache Kafka 如何支持事务。spring-doc.cadn.net.cn

概述

0.11.0.0 客户端库添加了对事务的支持。 Spring for Apache Kafka 通过以下方式添加了支持:spring-doc.cadn.net.cn

通过提供DefaultKafkaProducerFactory使用transactionIdPrefix. 在这种情况下,而不是管理单个共享Producer,工厂维护事务生产者的缓存。 当用户调用close()在生产者上,它被返回到缓存中以供重用,而不是实际关闭。 这transactional.id每个生产者的属性是transactionIdPrefix + n哪里n开头为0并且会为每个新生产者递增,除非事务是由具有基于记录的侦听器的侦听器容器启动的。 在这种情况下,transactional.id<transactionIdPrefix>.<group.id>.<topic>.<partition>. 这是为了正确支持击剑僵尸,如此处所述。 此新行为已在版本 1.3.7、2.0.6、2.1.10 和 2.2.0 中添加。 如果要恢复到以前的行为,可以将producerPerConsumerPartition属性DefaultKafkaProducerFactoryfalse.spring-doc.cadn.net.cn

虽然批处理侦听器支持事务,但默认情况下,不支持僵尸隔离,因为批处理可能包含来自多个主题或分区的记录。 但是,从版本 2.3.2 开始,如果将 container 属性设置为subBatchPerPartition设置为 true。 在这种情况下,从上次轮询接收的每个分区都会调用一次批处理侦听器,就好像每个轮询只返回单个分区的记录一样。 这是true默认情况下,从 2.5 版开始,当事务启用时EOSMode.ALPHA;将其设置为false如果您正在使用事务但不关心僵尸围栏。 另请参阅 Exactly Once 语义

使用 Spring Boot,只需将spring.kafka.producer.transaction-id-prefixproperty - Boot 将自动配置KafkaTransactionManagerbean 并将其连接到侦听器容器中。spring-doc.cadn.net.cn

从 2.5.8 版开始,您现在可以配置maxAge生产者工厂的财产。 当使用可能为代理的transactional.id.expiration.ms. 与电流kafka-clients,这可能会导致ProducerFencedException无需重新平衡。 通过设置maxAge小于transactional.id.expiration.ms,如果生产者超过最大使用年限,工厂将刷新生产者。
KafkaTransactionManager

KafkaTransactionManager是 Spring Framework 的PlatformTransactionManager. 在其构造函数中提供了对生产者工厂的引用。 如果您提供自定义生产者工厂,则它必须支持事务。 看ProducerFactory.transactionCapable().spring-doc.cadn.net.cn

您可以使用KafkaTransactionManager具有正常的 Spring 事务支持(@Transactional,TransactionTemplate等)。 如果事务处于活动状态,则任何KafkaTemplate在事务范围内执行的作使用事务的Producer. 管理器根据成功或失败提交或回滚事务。 您必须配置KafkaTemplate使用相同的ProducerFactory作为事务管理器。spring-doc.cadn.net.cn

事务同步

本节涉及仅生产者事务(不是由侦听器容器启动的事务);有关在容器启动事务时链接事务的信息,请参阅使用使用者发起的事务。spring-doc.cadn.net.cn

如果你想将记录发送到 kafka 并执行一些数据库更新,你可以使用普通的 Spring 事务管理,比如说,一个DataSourceTransactionManager.spring-doc.cadn.net.cn

@Transactional
public void process(List<Thing> things) {
    things.forEach(thing -> this.kafkaTemplate.send("topic", thing));
    updateDb(things);
}

的拦截器@Transactional注释启动事务,并且KafkaTemplate将事务与该事务管理器同步;每次发送都会参与该交易。 当该方法退出时,数据库事务将提交,然后是 Kafka 事务。 如果您希望以相反的顺序执行提交(首先是 Kafka),请使用 嵌套@Transactional方法,外部方法配置为使用DataSourceTransactionManager,并且配置为使用KafkaTransactionManager.spring-doc.cadn.net.cn

有关在 Kafka 优先或 DB 优先配置中同步 JDBC 和 Kafka 事务的应用程序示例,请参阅 Kafka 事务与其他事务管理器的示例spring-doc.cadn.net.cn

从版本 2.5.17、2.6.12、2.7.9 和 2.8.0 开始,如果在同步事务上提交失败(在主事务提交之后),则将向调用者抛出异常。 以前,这被静默忽略(在调试时记录)。 如有必要,应用程序应采取补救措施来补偿已提交的主事务。
使用使用者发起的事务

ChainedKafkaTransactionManager现在已弃用,从 2.7 版开始;请参阅 JavaDocs 了解其超类ChainedTransactionManager了解更多信息。 相反,请使用KafkaTransactionManager启动 Kafka 事务,并使用@Transactional以启动另一笔事务。spring-doc.cadn.net.cn

有关链接 JDBC 和 Kafka 事务的示例应用程序,请参阅 Kafka 事务与其他事务管理器的示例spring-doc.cadn.net.cn

KafkaTemplate本地交易

您可以使用KafkaTemplate在本地事务中执行一系列作。以下示例显示了如何执行此作:spring-doc.cadn.net.cn

boolean result = template.executeInTransaction(t -> {
    t.sendDefault("thing1", "thing2");
    t.sendDefault("cat", "hat");
    return true;
});

回调中的参数是模板本身(this). 如果回调正常退出,则事务被提交。如果抛出异常,则事务将回滚。spring-doc.cadn.net.cn

如果有KafkaTransactionManager(或同步)事务,则不使用它。相反,使用新的“嵌套”事务。
transactionIdPrefix

概述中所述,生产者工厂配置了此属性来构建生产者transactional.id财产。 指定此属性时存在二分法,即当使用EOSMode.ALPHA,在侦听器容器线程上生成记录时,它必须在所有实例上都相同,以满足围栏僵尸(在概述中也提到)。但是,当使用是由侦听器容器启动的事务生成记录时,每个实例上的前缀必须不同。版本 2.3 使配置更简单,尤其是在 Spring Boot 应用程序中。在以前的版本中,您必须创建两个生产者工厂和KafkaTemplates - 一个用于在侦听器容器线程上生成记录,另一个用于由kafkaTemplate.executeInTransaction()或由事务拦截器在@Transactional方法。spring-doc.cadn.net.cn

现在,您可以覆盖工厂的transactionalIdPrefixKafkaTemplateKafkaTransactionManager.spring-doc.cadn.net.cn

将事务管理器和模板用于侦听器容器时,通常会将其默认为生产者工厂的属性。 使用EOSMode.ALPHA. 跟EOSMode.BETA不再需要使用相同的transactional.id,即使是消费者发起的交易;事实上,它在每个实例上必须是唯一的,就像生产者发起的事务一样。 对于由模板(或@Transaction)应分别在模板和事务管理器上设置属性。 此属性在每个应用程序实例上必须具有不同的值。spring-doc.cadn.net.cn

这个问题(不同的规则transactional.id) 在EOSMode.BETA正在使用(代理版本 >= 2.5);请参阅 Exactly Once 语义
KafkaTemplate事务性和非事务性发布

通常,当KafkaTemplate是事务性的(配置了支持事务的生产者工厂),则需要事务。 事务可以通过TransactionTemplate一个@Transactional方法, 调用executeInTransaction,或通过侦听器容器,当配置了KafkaTransactionManager. 任何在事务范围之外使用模板的尝试都会导致模板抛出IllegalStateException. 从 2.4.3 版开始,您可以将模板的allowNonTransactional属性设置为true. 在这种情况下,模板将允许作在没有事务的情况下运行,方法是调用ProducerFactorycreateNonTransactionalProducer()方法;生产者将像往常一样被缓存或线程绑定,以便重用。 看DefaultKafkaProducerFactory.spring-doc.cadn.net.cn

与批处理侦听器的交易

当侦听器在使用事务时失败时,将AfterRollbackProcessor在回滚发生后调用以执行某些作。 使用默认值AfterRollbackProcessor使用记录侦听器时,将执行搜索,以便重新传递失败的记录。 但是,使用批处理侦听器时,将重新传递整个批处理,因为框架不知道批处理中的哪条记录失败了。 有关详细信息,请参阅回滚后处理器spring-doc.cadn.net.cn

使用批处理侦听器时,版本 2.4.2 引入了一种替代机制来处理处理批处理时的故障;这BatchToRecordAdapter. 当集装箱工厂batchListener设置为 true 配置了BatchToRecordAdapter,则一次调用一条记录的侦听器。这允许在批处理中进行错误处理,同时仍然可以停止处理整个批处理,具体取决于异常类型。默认BatchToRecordAdapter提供,可以配置标准ConsumerRecordRecoverer例如DeadLetterPublishingRecoverer. 以下测试用例配置片段说明了如何使用此功能:spring-doc.cadn.net.cn

public static class TestListener {

    final List<String> values = new ArrayList<>();

    @KafkaListener(id = "batchRecordAdapter", topics = "test")
    public void listen(String data) {
        values.add(data);
        if ("bar".equals(data)) {
            throw new RuntimeException("reject partial");
        }
    }

}

@Configuration
@EnableKafka
public static class Config {

    ConsumerRecord<?, ?> failed;

    @Bean
    public TestListener test() {
        return new TestListener();
    }

    @Bean
    public ConsumerFactory<?, ?> consumerFactory() {
        return mock(ConsumerFactory.class);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        factory.setBatchToRecordAdapter(new DefaultBatchToRecordAdapter<>((record, ex) ->  {
            this.failed = record;
        }));
        return factory;
    }

}

4.1.14. 恰好一次语义

您可以为侦听器容器提供KafkaAwareTransactionManager实例。 如此配置后,容器会在调用侦听器之前启动事务。 任何KafkaTemplate侦听器执行的作参与事务。 如果侦听器成功处理了记录(或多条记录,当使用BatchMessageListener),容器使用producer.sendOffsetsToTransaction()),然后再提交事务。 如果侦听器抛出异常,则事务将回滚,使用者将重新定位,以便在下一次轮询时可以检索回滚的记录。 请参阅回滚后处理器,了解更多信息以及处理反复失败的记录。spring-doc.cadn.net.cn

使用事务可以启用 Exactly Once 语义 (EOS)。spring-doc.cadn.net.cn

这意味着,对于read→process-writesequence,则保证序列恰好完成一次。 (读取和进程至少具有一次语义)。spring-doc.cadn.net.cn

Spring for Apache Kafka 版本 2.5 及更高版本支持两种 EOS 模式:spring-doc.cadn.net.cn

带模式V1,如果另一个实例具有相同的transactional.id已启动。 Spring 通过使用Producer对于每个group.id/topic/partition;发生重新平衡时,新实例将使用相同的transactional.id老生产者被围起来。spring-doc.cadn.net.cn

带模式V2,则不必为每个group.id/topic/partition因为消费者元数据与偏移量一起发送到事务,并且代理可以使用该信息来确定生产者是否被隔离。spring-doc.cadn.net.cn

从 2.6 版开始,默认的EOSModeV2.spring-doc.cadn.net.cn

将容器配置为使用ALPHA,将容器属性EOSModeALPHA,以恢复到以前的行为。spring-doc.cadn.net.cn

V2(默认),您的代理必须是 2.5 或更高版本;kafka-clients3.0 版本,生产者将不再回退到V1;如果经纪人不支持V2,则引发异常。 如果您的代理早于 2.5,则必须将EOSModeV1,将DefaultKafkaProducerFactory producerPerConsumerPartition设置为true而且,如果您使用的是批处理监听器,则应将subBatchPerPartitiontrue.

当您的代理升级到 2.5 或更高版本时,您应该将模式切换到V2,但生产商数量将保持原样。 然后,您可以使用producerPerConsumerPartition设置为false减少生产商的数量;您也不应再将subBatchPerPartitioncontainer 属性。spring-doc.cadn.net.cn

如果您的代理已经是 2.5 或更高版本,您应该将DefaultKafkaProducerFactory producerPerConsumerPartition属性设置为false,以减少所需的生产者数量。spring-doc.cadn.net.cn

使用时EOSMode.V2producerPerConsumerPartition=falsetransactional.id在所有应用程序实例中必须是唯一的。

使用时V2模式,则不再需要将subBatchPerPartitiontrue;它将默认为falseEOSModeV2.spring-doc.cadn.net.cn

有关更多信息,请参阅 KIP-447spring-doc.cadn.net.cn

V1V2以前是ALPHABETA; 它们已被更改以使框架与 KIP-732 保持一致。spring-doc.cadn.net.cn

4.1.15. 将 Spring Bean 连接到生产者/消费者拦截器

Apache Kafka 提供了一种向生产者和消费者添加拦截器的机制。这些对象由 Kafka 而不是 Spring 管理,因此正常的 Spring 依赖注入不适用于依赖 Spring Bean 中的连接。但是,您可以使用拦截器手动连接这些依赖项config()方法。 以下 Spring Boot 应用程序展示了如何通过覆盖 boot 的默认工厂以将一些依赖 bean 添加到配置属性中来执行此作。spring-doc.cadn.net.cn

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public ConsumerFactory<?, ?> kafkaConsumerFactory(SomeBean someBean) {
        Map<String, Object> consumerProperties = new HashMap<>();
        // consumerProperties.put(..., ...)
        // ...
        consumerProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName());
        consumerProperties.put("some.bean", someBean);
        return new DefaultKafkaConsumerFactory<>(consumerProperties);
    }

    @Bean
    public ProducerFactory<?, ?> kafkaProducerFactory(SomeBean someBean) {
        Map<String, Object> producerProperties = new HashMap<>();
        // producerProperties.put(..., ...)
        // ...
        Map<String, Object> producerProperties = properties.buildProducerProperties();
        producerProperties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyProducerInterceptor.class.getName());
        producerProperties.put("some.bean", someBean);
        DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(producerProperties);
        return factory;
    }

    @Bean
    public SomeBean someBean() {
        return new SomeBean();
    }

    @KafkaListener(id = "kgk897", topics = "kgh897")
    public void listen(String in) {
        System.out.println("Received " + in);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> template.send("kgh897", "test");
    }

    @Bean
    public NewTopic kRequests() {
        return TopicBuilder.name("kgh897")
            .partitions(1)
            .replicas(1)
            .build();
    }

}
public class SomeBean {

    public void someMethod(String what) {
        System.out.println(what + " in my foo bean");
    }

}
public class MyProducerInterceptor implements ProducerInterceptor<String, String> {

    private SomeBean bean;

    @Override
    public void configure(Map<String, ?> configs) {
        this.bean = (SomeBean) configs.get("some.bean");
    }

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        this.bean.someMethod("producer interceptor");
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    }

    @Override
    public void close() {
    }

}
public class MyConsumerInterceptor implements ConsumerInterceptor<String, String> {

    private SomeBean bean;

    @Override
    public void configure(Map<String, ?> configs) {
        this.bean = (SomeBean) configs.get("some.bean");
    }

    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
        this.bean.someMethod("consumer interceptor");
        return records;
    }

    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
    }

    @Override
    public void close() {
    }

}
producer interceptor in my foo bean
consumer interceptor in my foo bean
Received test

4.1.16. 暂停和恢复侦听器容器

添加了 2.1.3 版本pause()resume()方法到监听器容器。 以前,您可以在ConsumerAwareMessageListener并通过监听ListenerContainerIdleEvent,它提供了对Consumer对象。 虽然可以使用事件侦听器暂停空闲容器中的使用者,但在某些情况下,这不是线程安全的,因为不能保证在使用者线程上调用事件侦听器。 要安全地暂停和恢复消费者,您应该使用pauseresume侦听器容器上的方法。 一个pause()在下一个之前生效poll();一个resume()在当前之后生效poll()返回。 当容器暂停时,它将继续poll()使用者,如果正在使用组管理,则避免重新平衡,但它不会检索任何记录。 有关更多信息,请参阅 Kafka 文档。spring-doc.cadn.net.cn

从 2.1.5 版开始,您可以调用isPauseRequested()看看是否pause()被叫了。 然而,消费者可能还没有真正停下来。isConsumerPaused()如果全部Consumer实例实际上已经暂停了。spring-doc.cadn.net.cn

此外(也是从 2.1.5 开始),ConsumerPausedEventConsumerResumedEvent实例与容器一起发布为source属性和TopicPartition涉及的实例partitions财产。spring-doc.cadn.net.cn

以下简单的 Spring Boot 应用程序演示了使用容器注册表获取对@KafkaListener方法的容器并暂停或恢复其消费者以及接收相应的事件:spring-doc.cadn.net.cn

@SpringBootApplication
public class Application implements ApplicationListener<KafkaEvent> {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args).close();
    }

    @Override
    public void onApplicationEvent(KafkaEvent event) {
        System.out.println(event);
    }

    @Bean
    public ApplicationRunner runner(KafkaListenerEndpointRegistry registry,
            KafkaTemplate<String, String> template) {
        return args -> {
            template.send("pause.resume.topic", "thing1");
            Thread.sleep(10_000);
            System.out.println("pausing");
            registry.getListenerContainer("pause.resume").pause();
            Thread.sleep(10_000);
            template.send("pause.resume.topic", "thing2");
            Thread.sleep(10_000);
            System.out.println("resuming");
            registry.getListenerContainer("pause.resume").resume();
            Thread.sleep(10_000);
        };
    }

    @KafkaListener(id = "pause.resume", topics = "pause.resume.topic")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("pause.resume.topic")
            .partitions(2)
            .replicas(1)
            .build();
    }

}

以下列表显示了上述示例的结果:spring-doc.cadn.net.cn

partitions assigned: [pause.resume.topic-1, pause.resume.topic-0]
thing1
pausing
ConsumerPausedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
resuming
ConsumerResumedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
thing2

4.1.17. 暂停和恢复侦听器容器上的分区

从 2.7 版开始,您可以使用pausePartition(TopicPartition topicPartition)resumePartition(TopicPartition topicPartition)监听器容器中的方法。 暂停和恢复分别发生在poll()类似于pause()resume()方法。 这isPartitionPauseRequested()如果已请求该分区的暂停,则方法返回 true。 这isPartitionPaused()如果该分区实际上已暂停,则方法返回 true。spring-doc.cadn.net.cn

同样从 2.7 版本开始ConsumerPartitionPausedEventConsumerPartitionResumedEvent实例与容器一起发布为source属性和TopicPartition实例。spring-doc.cadn.net.cn

4.1.18. 序列化、反序列化和消息转换

概述

Apache Kafka 提供了一个高级 API,用于序列化和反序列化记录值及其键。 它与org.apache.kafka.common.serialization.Serializer<T>org.apache.kafka.common.serialization.Deserializer<T>抽象与一些内置实现。 同时,我们可以使用ProducerConsumer配置属性。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
...
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

对于更复杂或更特殊的情况,请执行KafkaConsumer(因此,KafkaProducer) 提供过载 要接受的构造函数SerializerDeserializer实例keysvalues分别。spring-doc.cadn.net.cn

使用此 API 时,DefaultKafkaProducerFactoryDefaultKafkaConsumerFactory还提供属性(通过构造函数或 setter 方法)来注入自定义SerializerDeserializer实例到目标中ProducerConsumer. 此外,您可以传入Supplier<Serializer>Supplier<Deserializer>实例通过构造函数 - 这些Suppliers 在创建每个ProducerConsumer.spring-doc.cadn.net.cn

字符串序列化

从 2.5 版开始,Spring for Apache Kafka 提供了ToStringSerializerParseStringDeserializer使用实体的字符串表示的类。 他们依赖于方法toString和一些Function<String>BiFunction<String, Headers>以解析 String 并填充实例的属性。 通常,这会在类上调用一些静态方法,例如parse:spring-doc.cadn.net.cn

ToStringSerializer<Thing> thingSerializer = new ToStringSerializer<>();
//...
ParseStringDeserializer<Thing> deserializer = new ParseStringDeserializer<>(Thing::parse);

默认情况下,ToStringSerializer配置为传达有关记录中序列化实体的类型信息Headers. 您可以通过将addTypeInfo属性设置为 false。 此信息可供ParseStringDeserializer在接收方。spring-doc.cadn.net.cn

  • ToStringSerializer.ADD_TYPE_INFO_HEADERS(默认true):您可以将其设置为false要在ToStringSerializer(将addTypeInfo属性)。spring-doc.cadn.net.cn

ParseStringDeserializer<Object> deserializer = new ParseStringDeserializer<>((str, headers) -> {
    byte[] header = headers.lastHeader(ToStringSerializer.VALUE_TYPE).value();
    String entityType = new String(header);

    if (entityType.contains("Thing")) {
        return Thing.parse(str);
    }
    else {
        // ...parsing logic
    }
});

您可以配置Charset用于转换String往返byte[]默认值为UTF-8.spring-doc.cadn.net.cn

您可以使用解析器方法的名称配置反序列化器,方法是使用ConsumerConfig性能:spring-doc.cadn.net.cn

属性必须包含类的完全限定名称,后跟方法名称,用句点分隔.. 该方法必须是静态的,并且签名为(String, Headers)(String).spring-doc.cadn.net.cn

一个ToFromStringSerde还提供了用于 Kafka Streams 的。spring-doc.cadn.net.cn

JSON

Spring for Apache Kafka 还提供了JsonSerializerJsonDeserializer基于 Jackson JSON 对象映射器。 这JsonSerializer允许将任何 Java 对象写为 JSONbyte[]. 这JsonDeserializer需要额外的Class<?> targetType参数,以允许对已使用的byte[]到适当的目标对象。 以下示例演示如何创建JsonDeserializer:spring-doc.cadn.net.cn

JsonDeserializer<Thing> thingDeserializer = new JsonDeserializer<>(Thing.class);

您可以自定义两者JsonSerializerJsonDeserializer使用ObjectMapper. 您还可以扩展它们以在configure(Map<String, ?> configs, boolean isKey)方法。spring-doc.cadn.net.cn

从版本 2.3 开始,默认情况下,所有 JSON 感知组件都配置了JacksonUtils.enhancedObjectMapper()实例,它附带了MapperFeature.DEFAULT_VIEW_INCLUSIONDeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES功能已禁用。 此外,此类实例还提供了用于自定义数据类型的知名模块,例如 Java 时间和 Kotlin 支持。 看JacksonUtils.enhancedObjectMapper()JavaDocs 了解更多信息。 此方法还注册一个org.springframework.kafka.support.JacksonMimeTypeModuleorg.springframework.util.MimeType对象序列化为纯字符串,以实现网络上的平台间兼容性。 一个JacksonMimeTypeModule可以在应用程序上下文中注册为 bean,它将自动配置到弹簧靴ObjectMapper实例.spring-doc.cadn.net.cn

同样从 2.3 版本开始,JsonDeserializer提供TypeReference基于构造函数,以便更好地处理目标通用容器类型。spring-doc.cadn.net.cn

从 2.1 版开始,您可以在记录中传达类型信息Headers,允许处理多种类型。 此外,您可以使用以下 Kafka 属性配置序列化器和反序列化程序。 如果您提供了SerializerDeserializer实例KafkaConsumerKafkaProducer分别。spring-doc.cadn.net.cn

配置属性

从版本 2.2 开始,类型信息标头(如果由序列化程序添加)将由反序列化程序删除。 您可以通过将removeTypeHeaders属性设置为false,直接在反序列化程序上或使用前面所述的配置属性。spring-doc.cadn.net.cn

从版本 2.8 开始,如果您以编程方式构造序列化程序或反序列化程序,如编程构造中所示,只要您没有显式设置任何属性(使用set*()方法或使用 Fluent API)。 以前,在以编程方式创建时,从未应用配置属性;如果直接在对象上显式设置属性,情况仍然如此。
映射类型

从版本 2.2 开始,使用 JSON 时,现在可以使用前面列表中的属性提供类型映射。 以前,必须在序列化程序和反序列化程序中自定义类型映射器。 映射由逗号分隔的列表组成token:className对。 在出站时,有效负载的类名将映射到相应的Tokens。 入站时,类型标头中的Tokens映射到相应的类名。spring-doc.cadn.net.cn

以下示例创建一组映射:spring-doc.cadn.net.cn

senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
senderProps.put(JsonSerializer.TYPE_MAPPINGS, "cat:com.mycat.Cat, hat:com.myhat.hat");
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProps.put(JsonDeSerializer.TYPE_MAPPINGS, "cat:com.yourcat.Cat, hat:com.yourhat.hat");
相应的对象必须兼容。

如果您使用 Spring Boot,则可以在application.properties(或 yaml)文件。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.type.mapping=cat:com.mycat.Cat,hat:com.myhat.Hat

只能使用属性执行简单配置。 对于更高级的配置(例如使用自定义ObjectMapper在序列化程序和反序列化程序中),应使用接受预生成序列化程序和反序列化程序的生产者和使用者工厂构造函数。 以下 Spring Boot 示例覆盖了默认工厂:spring-doc.cadn.net.cn

@Bean
public ConsumerFactory<String, Thing> kafkaConsumerFactory(JsonDeserializer customValueDeserializer) {
    Map<String, Object> properties = new HashMap<>();
    // properties.put(..., ...)
    // ...
    return new DefaultKafkaConsumerFactory<>(properties,
        new StringDeserializer(), customValueDeserializer);
}

@Bean
public ProducerFactory<String, Thing> kafkaProducerFactory(JsonSerializer customValueSerializer) {

    return new DefaultKafkaProducerFactory<>(properties.buildProducerProperties(),
        new StringSerializer(), customValueSerializer);
}

还提供了 setter,作为使用这些构造函数的替代方法。spring-doc.cadn.net.cn

从版本 2.2 开始,您可以使用具有布尔值的重载构造函数之一显式将反序列化程序配置为使用提供的目标类型并忽略标头中的类型信息useHeadersIfPresent(即true默认情况下)。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

DefaultKafkaConsumerFactory<Integer, Cat1> cf = new DefaultKafkaConsumerFactory<>(props,
        new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));
使用方法确定类型

从版本 2.5 开始,您现在可以通过属性配置反序列化程序,以调用方法来确定目标类型。 如果存在,这将覆盖上面讨论的任何其他技术。 如果数据是由不使用 Spring 序列化器的应用程序发布的,并且您需要根据数据或其他标头反序列化为不同的类型,这将很有用。 将这些属性设置为方法名称 - 完全限定的类名,后跟方法名称,用句点分隔.. 该方法必须声明为public static,具有三个签名之一(String topic, byte[] data, Headers headers),(byte[] data, Headers headers)(byte[] data)并返回JacksonJavaType.spring-doc.cadn.net.cn

您可以使用任意标头或检查数据以确定类型。spring-doc.cadn.net.cn

示例
JavaType thing1Type = TypeFactory.defaultInstance().constructType(Thing1.class);

JavaType thing2Type = TypeFactory.defaultInstance().constructType(Thing2.class);

public static JavaType thingOneOrThingTwo(byte[] data, Headers headers) {
    // {"thisIsAFieldInThing1":"value", ...
    if (data[21] == '1') {
        return thing1Type;
    }
    else {
        return thing2Type;
    }
}

对于更复杂的数据检查,请考虑使用JsonPath或类似,但是,确定类型的测试越简单,过程就越有效。spring-doc.cadn.net.cn

下面是以编程方式创建反序列化程序的示例(在构造函数中向使用者工厂提供反序列化程序时):spring-doc.cadn.net.cn

JsonDeserializer<Object> deser = new JsonDeserializer<>()
        .trustedPackages("*")
        .typeResolver(SomeClass::thing1Thing2JavaTypeForTopic);

...

public static JavaType thing1Thing2JavaTypeForTopic(String topic, byte[] data, Headers headers) {
    ...
}
程序化施工

从版本 2.3 开始,以编程方式构造序列化程序/反序列化程序以在生产者/消费者工厂中使用时,您可以使用 fluent API,这简化了配置。spring-doc.cadn.net.cn

@Bean
public ProducerFactory<MyKeyType, MyValueType> pf() {
    Map<String, Object> props = new HashMap<>();
    // props.put(..., ...)
    // ...
    DefaultKafkaProducerFactory<MyKeyType, MyValueType> pf = new DefaultKafkaProducerFactory<>(props,
        new JsonSerializer<MyKeyType>()
            .forKeys()
            .noTypeInfo(),
        new JsonSerializer<MyValueType>()
            .noTypeInfo());
    return pf;
}

@Bean
public ConsumerFactory<MyKeyType, MyValueType> cf() {
    Map<String, Object> props = new HashMap<>();
    // props.put(..., ...)
    // ...
    DefaultKafkaConsumerFactory<MyKeyType, MyValueType> cf = new DefaultKafkaConsumerFactory<>(props,
        new JsonDeserializer<>(MyKeyType.class)
            .forKeys()
            .ignoreTypeHeaders(),
        new JsonDeserializer<>(MyValueType.class)
            .ignoreTypeHeaders());
    return cf;
}

要以编程方式提供类型映射,类似于使用方法确定类型,请使用typeFunction财产。spring-doc.cadn.net.cn

示例
JsonDeserializer<Object> deser = new JsonDeserializer<>()
        .trustedPackages("*")
        .typeFunction(MyUtils::thingOneOrThingTwo);

或者,只要不使用 fluent API 来配置属性,或使用set*()方法,工厂将使用配置属性配置序列化器/反序列化器;请参阅配置属性spring-doc.cadn.net.cn

委派序列化器和解序列化器
使用标题

2.3 版引入了DelegatingSerializerDelegatingDeserializer,允许生成和使用具有不同键和/或值类型的记录。 生产者必须设置标头DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR设置为用于选择要用于该值的序列化程序的选择器值,以及DelegatingSerializer.KEY_SERIALIZATION_SELECTOR钥匙;如果未找到匹配项,则IllegalStateException被抛出。spring-doc.cadn.net.cn

对于传入记录,反序列化程序使用相同的标头来选择要使用的反序列化程序;如果未找到匹配项或标头不存在,则原始byte[]被返回。spring-doc.cadn.net.cn

您可以将选择器的映射配置为Serializer / Deserializer通过构造函数,或者您可以通过 Kafka 生产者/消费者属性使用密钥进行配置DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIGDelegatingSerializer.KEY_SERIALIZATION_SELECTOR_CONFIG. 对于序列化程序,生产者属性可以是Map<String, Object>其中键是选择器,值是Serializer实例,一个序列化程序Class或类名。 该属性也可以是逗号分隔的映射条目的字符串,如下所示。spring-doc.cadn.net.cn

对于反序列化程序,使用者属性可以是Map<String, Object>其中键是选择器,值是Deserializer实例,一个解序列化器Class或类名。 该属性也可以是逗号分隔的映射条目的字符串,如下所示。spring-doc.cadn.net.cn

要配置使用属性,请使用以下语法:spring-doc.cadn.net.cn

producerProps.put(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
    "thing1:com.example.MyThing1Serializer, thing2:com.example.MyThing2Serializer")

consumerProps.put(DelegatingDeserializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
    "thing1:com.example.MyThing1Deserializer, thing2:com.example.MyThing2Deserializer")

然后,生产者将设置DelegatingSerializer.VALUE_SERIALIZATION_SELECTORheader 到thing1thing2.spring-doc.cadn.net.cn

此技术支持将不同类型发送到同一主题(或不同主题)。spring-doc.cadn.net.cn

从版本 2.5.1 开始,如果类型(键或值)是Serdes (Long,Integer,等)。相反,序列化程序会将标头设置为类型的类名。无需为这些类型配置序列化程序或反序列化程序,它们将动态创建(一次)。

有关将不同类型发送到不同主题的另一种技术,请参阅RoutingKafkaTemplate.spring-doc.cadn.net.cn

按类型

2.8 版本引入了DelegatingByTypeSerializer.spring-doc.cadn.net.cn

@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
    return new DefaultKafkaProducerFactory<>(config,
            null, new DelegatingByTypeSerializer(Map.of(
                    byte[].class, new ByteArraySerializer(),
                    Bytes.class, new BytesSerializer(),
                    String.class, new StringSerializer())));
}

从版本 2.8.3 开始,您可以配置序列化器以检查映射键是否可从目标对象分配,这在委托序列化器可以序列化子类时很有用。在这种情况下,如果存在 amiguous 匹配项,则有序的Map,例如LinkedHashMap应该提供。spring-doc.cadn.net.cn

按主题

从 2.8 版本开始,DelegatingByTopicSerializerDelegatingByTopicDeserializer允许根据主题名称选择序列化器/反序列化器。正则表达式Patterns 用于查找要使用的实例。可以使用构造函数或通过属性(逗号分隔的列表pattern:serializer).spring-doc.cadn.net.cn

producerConfigs.put(DelegatingByTopicSerializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
            "topic[0-4]:" + ByteArraySerializer.class.getName()
        + ", topic[5-9]:" + StringSerializer.class.getName());
...
ConsumerConfigs.put(DelegatingByTopicDeserializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
            "topic[0-4]:" + ByteArrayDeserializer.class.getName()
        + ", topic[5-9]:" + StringDeserializer.class.getName());

KEY_SERIALIZATION_TOPIC_CONFIG将其用于键时。spring-doc.cadn.net.cn

@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
    return new DefaultKafkaProducerFactory<>(config,
            null,
            new DelegatingByTopicSerializer(Map.of(
                    Pattern.compile("topic[0-4]"), new ByteArraySerializer(),
                    Pattern.compile("topic[5-9]"), new StringSerializer())),
                    new JsonSerializer<Object>());  // default
}

您可以使用以下命令指定在没有模式匹配时使用的默认序列化器/反序列化器DelegatingByTopicSerialization.KEY_SERIALIZATION_TOPIC_DEFAULTDelegatingByTopicSerialization.VALUE_SERIALIZATION_TOPIC_DEFAULT.spring-doc.cadn.net.cn

附加属性DelegatingByTopicSerialization.CASE_SENSITIVE(默认true),当设置为false使主题查找不区分大小写。spring-doc.cadn.net.cn

重试解序列化程序

RetryingDeserializer使用委托DeserializerRetryTemplate当委托在反序列化期间可能出现暂时性错误(例如网络问题)时重试反序列化。spring-doc.cadn.net.cn

ConsumerFactory cf = new DefaultKafkaConsumerFactory(myConsumerConfigs,
    new RetryingDeserializer(myUnreliableKeyDeserializer, retryTemplate),
    new RetryingDeserializer(myUnreliableValueDeserializer, retryTemplate));

请参阅 spring-retry 项目以配置RetryTemplate具有重试策略、退避策略等。spring-doc.cadn.net.cn

Spring Messaging 消息转换

尽管SerializerDeserializerAPI 与低级 Kafka 相比非常简单灵活ConsumerProducer透视,当使用时,您可能需要在 Spring Messaging 级别上具有更大的灵活性@KafkaListenerSpring Integration 的 Apache Kafka 支持。 让您轻松地进行转换org.springframework.messaging.Message,Spring for Apache Kafka 提供了一个MessageConverter抽象与MessagingMessageConverter实现及其JsonMessageConverter(和子类)自定义。 您可以注入MessageConverter变成一个KafkaTemplate实例直接并使用AbstractKafkaListenerContainerFactorybean 定义@KafkaListener.containerFactory()财产。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setMessageConverter(new JsonMessageConverter());
    return factory;
}
...
@KafkaListener(topics = "jsonData",
                containerFactory = "kafkaJsonListenerContainerFactory")
public void jsonListener(Cat cat) {
...
}

使用 Spring Boot 时,只需将转换器定义为@BeanSpring Boot 自动配置会将其连接到自动配置的模板和容器工厂中。spring-doc.cadn.net.cn

当您使用@KafkaListener,则将参数类型提供给消息转换器以协助转换。spring-doc.cadn.net.cn

只有当@KafkaListener注释是在方法级别声明的。 具有类级@KafkaListener,则有效负载类型用于选择哪个@KafkaHandler方法调用,因此必须先转换后才能选择该方法。spring-doc.cadn.net.cn

在消费者端,您可以配置JsonMessageConverter;它可以处理ConsumerRecord类型的值byte[],BytesStringso 应与ByteArrayDeserializer,BytesDeserializerStringDeserializer. (byte[]Bytes效率更高,因为它们避免了不必要的byte[]String转换)。 您还可以配置JsonMessageConverter对应于解序列化器,如果你愿意的话。spring-doc.cadn.net.cn

在生产者端,当您使用 Spring Integration 或KafkaTemplate.send(Message<?> message)方法(参见KafkaTemplate),您必须配置与配置的 Kafka 兼容的消息转换器Serializer.spring-doc.cadn.net.cn

同样,使用byte[]Bytes效率更高,因为它们避免了Stringbyte[]转换。spring-doc.cadn.net.cn

为方便起见,从 2.3 版本开始,该框架还提供了StringOrBytesSerializer它可以序列化所有三种值类型,以便它可以与任何消息转换器一起使用。spring-doc.cadn.net.cn

从版本 2.7.1 开始,消息有效负载转换可以委托给spring-messaging SmartMessageConverter;例如,这使得转换可以基于MessageHeaders.CONTENT_TYPE页眉。spring-doc.cadn.net.cn

KafkaMessageConverter.fromMessage()方法被调用以将出站转换为ProducerRecordProducerRecord.value()财产。 这KafkaMessageConverter.toMessage()方法被调用用于从ConsumerRecord有效载荷为ConsumerRecord.value()财产。 这SmartMessageConverter.toMessage()方法被调用来创建一个新的出站Message<?>Message传递给'fromMessage()'(通常由KafkaTemplate.send(Message<?> msg)). 同样,在KafkaMessageConverter.toMessage()方法,在转换器创建新的Message<?>ConsumerRecordSmartMessageConverter.fromMessage()方法,然后使用新转换的有效负载创建最终入站消息。 无论哪种情况,如果SmartMessageConverter返回null,则使用原始消息。

当默认转换器在KafkaTemplate和侦听器容器工厂,则配置SmartMessageConverter通过调用setMessagingConverter()在模板上并通过contentMessageConverter属性@KafkaListener方法。spring-doc.cadn.net.cn

template.setMessagingConverter(mySmartConverter);
@KafkaListener(id = "withSmartConverter", topics = "someTopic",
    contentTypeConverter = "mySmartConverter")
public void smart(Thing thing) {
    ...
}
使用 Spring 数据投影接口

从版本 2.1.1 开始,您可以将 JSON 转换为 Spring Data Projection 接口而不是具体类型。这允许对数据进行非常选择性和低耦合的绑定,包括从 JSON 文档中的多个位置查找值。例如,可以将以下接口定义为消息有效负载类型:spring-doc.cadn.net.cn

interface SomeSample {

  @JsonPath({ "$.username", "$.user.name" })
  String getUsername();

}
@KafkaListener(id="projection.listener", topics = "projection")
public void projection(SomeSample in) {
    String username = in.getUsername();
    ...
}

默认情况下,访问器方法将用于将属性名称查找为接收的 JSON 文档中的字段。 这@JsonPathexpression 允许自定义值查找,甚至可以定义多个 JSON 路径表达式,从多个位置查找值,直到表达式返回实际值。spring-doc.cadn.net.cn

要启用此功能,请使用ProjectingMessageConverter配置了适当的委托转换器(用于出站转换和转换非投影接口)。 您还必须添加spring-data:spring-data-commonscom.jayway.jsonpath:json-path到类路径。spring-doc.cadn.net.cn

当用作参数时@KafkaListener方法,接口类型会正常自动传递给转换器。spring-doc.cadn.net.cn

ErrorHandlingDeserializer

当反序列化程序无法反序列化消息时,Spring 无法处理该问题,因为它发生在poll()返回。 为了解决这个问题,请ErrorHandlingDeserializer已被引入。 此反序列化程序委托给实际的反序列化程序(键或值)。 如果委托未能反序列化记录内容,则ErrorHandlingDeserializer返回一个nullvalue 和DeserializationException在包含原因和原始字节的标头中。 当您使用记录级别MessageListener,如果ConsumerRecord包含一个DeserializationException标头,容器的ErrorHandler使用 failedConsumerRecord. 记录不会传递给侦听器。spring-doc.cadn.net.cn

或者,您可以配置ErrorHandlingDeserializer要通过提供failedDeserializationFunction,这是一个Function<FailedDeserializationInfo, T>. 调用此函数以创建T,以通常的方式传递给听众。 类型FailedDeserializationInfo,其中包含向函数提供的所有上下文信息。 您可以找到DeserializationException(作为序列化的 Java 对象)在标头中。 请参阅 Javadoc 中的ErrorHandlingDeserializer了解更多信息。spring-doc.cadn.net.cn

您可以使用DefaultKafkaConsumerFactory接受键和值的构造函数Deserializer适当的物体和电线ErrorHandlingDeserializer已配置了适当委托的实例。 或者,您可以使用使用者配置属性(由ErrorHandlingDeserializer) 来实例化委托。 属性名称是ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASSErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS. 属性值可以是类或类名称。 以下示例演示如何设置这些属性:spring-doc.cadn.net.cn

... // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")
return new DefaultKafkaConsumerFactory<>(props);

以下示例使用failedDeserializationFunction.spring-doc.cadn.net.cn

public class BadFoo extends Foo {

  private final FailedDeserializationInfo failedDeserializationInfo;

  public BadFoo(FailedDeserializationInfo failedDeserializationInfo) {
    this.failedDeserializationInfo = failedDeserializationInfo;
  }

  public FailedDeserializationInfo getFailedDeserializationInfo() {
    return this.failedDeserializationInfo;
  }

}

public class FailedFooProvider implements Function<FailedDeserializationInfo, Foo> {

  @Override
  public Foo apply(FailedDeserializationInfo info) {
    return new BadFoo(info);
  }

}

前面的示例使用以下配置:spring-doc.cadn.net.cn

...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_FUNCTION, FailedFooProvider.class);
...
如果消费者配置了ErrorHandlingDeserializer配置KafkaTemplate及其生产者带有一个序列化器,可以处理普通对象和原始对象byte[]值,这是由反序列化异常产生的。 模板的泛型值类型应为Object. 一种技术是使用DelegatingByTypeSerializer;下面是一个例子:
@Bean
public ProducerFactory<String, Object> producerFactory() {
  return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
    new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
          MyNormalObject.class, new JsonSerializer<Object>())));
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
  return new KafkaTemplate<>(producerFactory());
}

使用ErrorHandlingDeserializer使用批处理侦听器时,必须检查消息标头中的反序列化异常。 当与DefaultBatchErrorHandler,您可以使用该标头来确定异常失败的记录,并通过BatchListenerFailedException.spring-doc.cadn.net.cn

@KafkaListener(id = "test", topics = "test")
void listen(List<Thing> in, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<Map<String, Object>> headers) {
    for (int i = 0; i < in.size(); i++) {
        Thing thing = in.get(i);
        if (thing == null
                && headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER) != null) {
            DeserializationException deserEx = ListenerUtils.byteArrayToDeserializationException(this.logger,
                    (byte[]) headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));
            if (deserEx != null) {
                logger.error(deserEx, "Record at index " + i + " could not be deserialized");
            }
            throw new BatchListenerFailedException("Deserialization", deserEx, i);
        }
        process(thing);
    }
}

ListenerUtils.byteArrayToDeserializationException()可用于将标头转换为DeserializationException.spring-doc.cadn.net.cn

食用时List<ConsumerRecord<?, ?>,ListenerUtils.getExceptionFromHeader()代替:spring-doc.cadn.net.cn

@KafkaListener(id = "kgh2036", topics = "kgh2036")
void listen(List<ConsumerRecord<String, Thing>> in) {
    for (int i = 0; i < in.size(); i++) {
        ConsumerRecord<String, Thing> rec = in.get(i);
        if (rec.value() == null) {
            DeserializationException deserEx = ListenerUtils.getExceptionFromHeader(rec,
                    SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger);
            if (deserEx != null) {
                logger.error(deserEx, "Record at offset " + rec.offset() + " could not be deserialized");
                throw new BatchListenerFailedException("Deserialization", deserEx, i);
            }
        }
        process(rec.value());
    }
}
如果您还使用DeadLetterPublishingRecoverer,则为DeserializationException将有一个record.value()类型byte[];这不应该序列化。 考虑使用DelegatingByTypeSerializer配置为使用ByteArraySerializerbyte[]以及所有其他类型的普通序列化程序(Json、Avro 等)。
使用批量侦听器的有效负载转换

您还可以使用JsonMessageConverterBatchMessagingMessageConverter在使用批处理侦听器容器工厂时转换批处理消息。 有关更多信息,请参阅序列化,反序列化和消息转换Spring Messaging Message Conversionspring-doc.cadn.net.cn

默认情况下,转换的类型是从 listener 参数推断出来的。 如果将JsonMessageConverter使用DefaultJackson2TypeMapper它有它的TypePrecedence设置为TYPE_ID(而不是默认的INFERRED),转换器会改用标头中的类型信息(如果存在)。 例如,这允许使用接口而不是具体类声明侦听器方法。 此外,类型转换器支持映射,因此反序列化可以与源不同(只要数据兼容)。 当您使用类级@KafkaListener实例其中有效负载必须已转换,以确定要调用的方法。 以下示例创建使用此方法的 bean:spring-doc.cadn.net.cn

@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);
    factory.setMessageConverter(new BatchMessagingMessageConverter(converter()));
    return factory;
}

@Bean
public JsonMessageConverter converter() {
    return new JsonMessageConverter();
}

请注意,要实现此目的,转换目标的方法签名必须是具有单个泛型参数类型的容器对象,如下所示:spring-doc.cadn.net.cn

@KafkaListener(topics = "blc1")
public void listen(List<Foo> foos, @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    ...
}

请注意,您仍然可以访问批处理标头。spring-doc.cadn.net.cn

如果批处理转换器具有支持它的记录转换器,您还可以接收根据泛型类型转换有效负载的消息列表。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@KafkaListener(topics = "blc3", groupId = "blc3")
public void listen1(List<Message<Foo>> fooMessages) {
    ...
}
ConversionService定制

从 2.1.1 版本开始,org.springframework.core.convert.ConversionService默认使用o.s.messaging.handler.annotation.support.MessageHandlerMethodFactory为了解析调用监听器的参数,方法随实现以下任何接口的所有 Bean 一起提供:spring-doc.cadn.net.cn

这使您可以进一步自定义侦听器反序列化,而无需更改ConsumerFactoryKafkaListenerContainerFactory.spring-doc.cadn.net.cn

设置自定义MessageHandlerMethodFactoryKafkaListenerEndpointRegistrar通过KafkaListenerConfigurerbean 禁用此功能。
添加自定义HandlerMethodArgumentResolver@KafkaListener

从 2.4.2 版开始,您可以添加自己的HandlerMethodArgumentResolver并解析自定义方法参数。 您所需要做的就是实现KafkaListenerConfigurer和使用方法setCustomMethodArgumentResolvers()从课堂上KafkaListenerEndpointRegistrar.spring-doc.cadn.net.cn

@Configuration
class CustomKafkaConfig implements KafkaListenerConfigurer {

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
        registrar.setCustomMethodArgumentResolvers(
            new HandlerMethodArgumentResolver() {

                @Override
                public boolean supportsParameter(MethodParameter parameter) {
                    return CustomMethodArgument.class.isAssignableFrom(parameter.getParameterType());
                }

                @Override
                public Object resolveArgument(MethodParameter parameter, Message<?> message) {
                    return new CustomMethodArgument(
                        message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class)
                    );
                }
            }
        );
    }

}

您还可以通过添加自定义MessageHandlerMethodFactoryKafkaListenerEndpointRegistrar豆。 如果执行此作,并且应用程序需要处理逻辑删除记录,则使用null value()(例如,从压缩的主题),您应该添加一个KafkaNullAwarePayloadArgumentResolver到工厂;它必须是最后一个解析器,因为它支持所有类型,并且可以在没有@Payload注解。 如果您正在使用DefaultMessageHandlerMethodFactory,将此解析器设置为最后一个自定义解析器;工厂将确保该旋转转换器在标准之前使用PayloadMethodArgumentResolver,它不知道KafkaNull负载。spring-doc.cadn.net.cn

4.1.19. 消息头

0.11.0.0 客户端引入了对消息中标头的支持。 从 2.0 版开始,Spring for Apache Kafka 现在支持将这些标头映射到和映射spring-messaging MessageHeaders.spring-doc.cadn.net.cn

映射的先前版本ConsumerRecordProducerRecord到 spring-messagingMessage<?>,其中 value 属性映射到payload和其他属性 (topic,partition,依此类推)被映射到标头。情况仍然如此,但现在可以映射其他(任意)标头。

Apache Kafka 标头有一个简单的 API,如以下接口定义所示:spring-doc.cadn.net.cn

public interface Header {

    String key();

    byte[] value();

}

KafkaHeaderMapper提供策略来在 Kafka 之间映射标头条目HeadersMessageHeaders. 其接口定义如下:spring-doc.cadn.net.cn

public interface KafkaHeaderMapper {

    void fromHeaders(MessageHeaders headers, Headers target);

    void toHeaders(Headers source, Map<String, Object> target);

}

SimpleKafkaHeaderMapper将原始标头映射为byte[],具有用于转换为String值。spring-doc.cadn.net.cn

DefaultKafkaHeaderMapper将键映射到MessageHeaders标头名称,为了支持出站消息的丰富标头类型,执行 JSON 转换。“特殊”标头(键为spring_json_header_types) 包含<key>:<type>. 此标头用于入站端,以提供每个标头值到原始类型的适当转换。spring-doc.cadn.net.cn

在入站方面,所有 KafkaHeader实例映射到MessageHeaders. 在出站端,默认情况下,所有MessageHeaders映射,但id,timestamp,以及映射到ConsumerRecord性能。spring-doc.cadn.net.cn

您可以通过向映射器提供模式来指定要为出站消息映射的标头。以下列表显示了一些示例映射:spring-doc.cadn.net.cn

public DefaultKafkaHeaderMapper() { (1)
    ...
}

public DefaultKafkaHeaderMapper(ObjectMapper objectMapper) { (2)
    ...
}

public DefaultKafkaHeaderMapper(String... patterns) { (3)
    ...
}

public DefaultKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns) { (4)
    ...
}
1 使用默认的 JacksonObjectMapper并映射大多数标头,如示例之前所述。
2 使用提供的JacksonObjectMapper并映射大多数标头,如示例之前所述。
3 使用默认的 JacksonObjectMapper并根据提供的模式映射标头。
4 使用提供的JacksonObjectMapper并根据提供的模式映射标头。

模式相当简单,可以包含前导通配符 ()、尾随通配符或两者(例如,.cat.*). 您可以使用前导!. 与标头名称匹配的第一个模式(无论是正数还是负数)获胜。spring-doc.cadn.net.cn

当您提供自己的模式时,我们建议将!id!timestamp,因为这些标头在入站端是只读的。spring-doc.cadn.net.cn

默认情况下,映射器仅反序列化java.langjava.util. 您可以通过添加带有addTrustedPackages方法。 如果您收到来自不受信任来源的消息,您可能希望仅添加您信任的包。要信任所有包,您可以使用mapper.addTrustedPackages("*").
映射String原始形式的标头值在与不知道映射器的 JSON 格式的系统通信时非常有用。

从版本 2.2.5 开始,您可以指定某些字符串值标头不应使用 JSON 映射,而应映射到原始byte[]. 这AbstractKafkaHeaderMapper有新的属性;mapAllStringsOut当设置为 true 时,所有字符串值标头都将转换为byte[]使用charset属性(默认UTF-8). 此外,还有一个属性rawMappedHeaders,这是header name : boolean; 如果映射包含标头名称,并且标头包含String值,它将被映射为 rawbyte[]使用字符集。此映射还用于映射原始传入byte[]headers 到String使用字符集当且仅当 map 值中的布尔值为true. 如果布尔值为false,或者标头名称不在地图中,并带有true值,则传入标头仅映射为原始未映射标头。spring-doc.cadn.net.cn

以下测试用例说明了此机制。spring-doc.cadn.net.cn

@Test
public void testSpecificStringConvert() {
    DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
    Map<String, Boolean> rawMappedHeaders = new HashMap<>();
    rawMappedHeaders.put("thisOnesAString", true);
    rawMappedHeaders.put("thisOnesBytes", false);
    mapper.setRawMappedHeaders(rawMappedHeaders);
    Map<String, Object> headersMap = new HashMap<>();
    headersMap.put("thisOnesAString", "thing1");
    headersMap.put("thisOnesBytes", "thing2");
    headersMap.put("alwaysRaw", "thing3".getBytes());
    MessageHeaders headers = new MessageHeaders(headersMap);
    Headers target = new RecordHeaders();
    mapper.fromHeaders(headers, target);
    assertThat(target).containsExactlyInAnyOrder(
            new RecordHeader("thisOnesAString", "thing1".getBytes()),
            new RecordHeader("thisOnesBytes", "thing2".getBytes()),
            new RecordHeader("alwaysRaw", "thing3".getBytes()));
    headersMap.clear();
    mapper.toHeaders(target, headersMap);
    assertThat(headersMap).contains(
            entry("thisOnesAString", "thing1"),
            entry("thisOnesBytes", "thing2".getBytes()),
            entry("alwaysRaw", "thing3".getBytes()));
}

默认情况下,两个标头映射器都映射所有入站标头。从版本 2.8.8 开始,这些模式也可以应用于入站映射。要为入站映射创建映射器,请在相应的映射器上使用静态方法之一:spring-doc.cadn.net.cn

public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}

public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(ObjectMapper objectMapper, String... patterns) {
}

public static SimpleKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}
DefaultKafkaHeaderMapper inboundMapper = DefaultKafkaHeaderMapper.forInboundOnlyWithMatchers("!abc*", "*");

这将排除所有以abc并包括所有其他。spring-doc.cadn.net.cn

默认情况下,DefaultKafkaHeaderMapper用于MessagingMessageConverterBatchMessagingMessageConverter,只要Jackson在阶级道路上。spring-doc.cadn.net.cn

使用批处理转换器,转换后的标头可在KafkaHeaders.BATCH_CONVERTED_HEADERS作为List<Map<String, Object>>其中,列表中某个位置的映射对应于有效负载中的数据位置。spring-doc.cadn.net.cn

如果没有转换器(因为 Jackson 不存在,或者它被显式设置为null),消费者记录中的标头在KafkaHeaders.NATIVE_HEADERS页眉。 此标头是一个Headers对象(或List<Headers>在批处理转换器的情况下),其中列表中的位置对应于有效负载中的数据位置)。spring-doc.cadn.net.cn

某些类型不适合 JSON 序列化,并且简单的toString()序列化可能是这些类型的首选。 这DefaultKafkaHeaderMapper有一个名为addToStringClasses()这使您可以提供应以这种方式处理的类的名称,以便进行出站映射。 在入站映射期间,它们映射为String. 默认情况下,只有org.springframework.util.MimeTypeorg.springframework.http.MediaType以这种方式映射。
从 2.3 版开始,简化了字符串值标头的处理。 默认情况下,此类标头不再采用 JSON 编码(即它们没有封闭"…​"添加)。 该类型仍会添加到 JSON_TYPES 标头中,以便接收系统可以转换回 String(从byte[]). 映射器可以处理(解码)旧版本生成的标头(它检查前导);这样,使用 2.3 的应用程序可以使用旧版本的记录。"
要与早期版本兼容,请将encodeStringstrue,如果使用 2.3 版本生成的记录可能被使用早期版本的应用程序使用。 当所有应用程序都使用 2.3 或更高版本时,您可以将属性保留为默认值false.
@Bean
MessagingMessageConverter converter() {
    MessagingMessageConverter converter = new MessagingMessageConverter();
    DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
    mapper.setEncodeStrings(true);
    converter.setHeaderMapper(mapper);
    return converter;
}

如果使用 Spring Boot,它会自动将此转换器 bean 配置为 auto-configuredKafkaTemplate;否则,您应该将此转换器添加到模板中。spring-doc.cadn.net.cn

4.1.20. “墓碑”记录的空有效负载和日志压缩

使用日志压缩时,可以使用null有效负载来标识密钥的删除。spring-doc.cadn.net.cn

您还可以收到null值,例如Deserializer可能会返回null当它无法反序列化值时。spring-doc.cadn.net.cn

要发送null有效负载,使用KafkaTemplate,您可以将 null 传递到send()方法。 一个例外是send(Message<?> message)变体。 因为spring-messaging Message<?>不能有null有效负载,您可以使用名为KafkaNull,框架发送null. 为方便起见,静态KafkaNull.INSTANCE被提供。spring-doc.cadn.net.cn

当您使用消息侦听器容器时,收到的ConsumerRecord有一个null value().spring-doc.cadn.net.cn

要配置@KafkaListener处理null有效负载,则必须使用@Payload注释required = false. 如果它是压缩日志的逻辑删除消息,则通常还需要该密钥,以便应用程序可以确定“删除”了哪个密钥。 以下示例显示了这样的配置:spring-doc.cadn.net.cn

@KafkaListener(id = "deletableListener", topics = "myTopic")
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
    // value == null represents key deletion
}

当您使用类级别@KafkaListener与多个@KafkaHandler方法,需要一些额外的配置。 具体来说,您需要一个@KafkaHandler方法与KafkaNull有效载荷。 以下示例演示如何配置一个:spring-doc.cadn.net.cn

@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {

    @KafkaHandler
    public void listen(String cat) {
        ...
    }

    @KafkaHandler
    public void listen(Integer hat) {
        ...
    }

    @KafkaHandler
    public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) {
        ...
    }

}

请注意,参数是nullKafkaNull.spring-doc.cadn.net.cn

此功能需要使用KafkaNullAwarePayloadArgumentResolver框架在使用默认MessageHandlerMethodFactory. 使用自定义MessageHandlerMethodFactory添加自定义HandlerMethodArgumentResolver@KafkaListener.

4.1.21. 处理异常

本节介绍如何处理使用 Spring for Apache Kafka 时可能出现的各种异常。spring-doc.cadn.net.cn

侦听器错误处理程序

从 2.0 版开始,@KafkaListenerannotation 有一个新属性:errorHandler.spring-doc.cadn.net.cn

您可以使用errorHandler提供KafkaListenerErrorHandler实现。 此功能接口具有一种方法,如以下列表所示:spring-doc.cadn.net.cn

@FunctionalInterface
public interface KafkaListenerErrorHandler {

    Object handleError(Message<?> message, ListenerExecutionFailedException exception) throws Exception;

}

您可以访问 spring-messagingMessage<?>消息转换器生成的对象和侦听器抛出的异常,该异常包装在ListenerExecutionFailedException. 错误处理程序可以抛出原始异常或新异常,这些异常会抛出到容器。 错误处理程序返回的任何内容都将被忽略。spring-doc.cadn.net.cn

从 2.7 版开始,您可以将rawRecordHeader属性MessagingMessageConverterBatchMessagingMessageConverter这导致原始的ConsumerRecord添加到转换后的Message<?>KafkaHeaders.RAW_DATA页眉。 例如,如果您希望使用DeadLetterPublishingRecoverer在侦听器错误处理程序中。 它可用于请求/回复方案,在该方案中,您希望在捕获死信主题中的失败记录后,在重试一定次数后将失败结果发送给发件人。spring-doc.cadn.net.cn

@Bean
KafkaListenerErrorHandler eh(DeadLetterPublishingRecoverer recoverer) {
    return (msg, ex) -> {
        if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 9) {
            recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), ex);
            return "FAILED";
        }
        throw ex;
    };
}

它有一个子接口(ConsumerAwareListenerErrorHandler) 通过以下方法访问消费者对象:spring-doc.cadn.net.cn

Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer);

例如,如果错误处理程序实现了此接口,则可以相应地调整偏移量。 例如,若要重置偏移量以重播失败的消息,可以执行如下作:spring-doc.cadn.net.cn

@Bean
public ConsumerAwareListenerErrorHandler listen3ErrorHandler() {
    return (m, e, c) -> {
        this.listen3Exception = e;
        MessageHeaders headers = m.getHeaders();
        c.seek(new org.apache.kafka.common.TopicPartition(
                headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class),
                headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)),
                headers.get(KafkaHeaders.OFFSET, Long.class));
        return null;
    };
}

同样,您可以对批处理侦听器执行如下作:spring-doc.cadn.net.cn

@Bean
public ConsumerAwareListenerErrorHandler listen10ErrorHandler() {
    return (m, e, c) -> {
        this.listen10Exception = e;
        MessageHeaders headers = m.getHeaders();
        List<String> topics = headers.get(KafkaHeaders.RECEIVED_TOPIC, List.class);
        List<Integer> partitions = headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, List.class);
        List<Long> offsets = headers.get(KafkaHeaders.OFFSET, List.class);
        Map<TopicPartition, Long> offsetsToReset = new HashMap<>();
        for (int i = 0; i < topics.size(); i++) {
            int index = i;
            offsetsToReset.compute(new TopicPartition(topics.get(i), partitions.get(i)),
                    (k, v) -> v == null ? offsets.get(index) : Math.min(v, offsets.get(index)));
        }
        offsetsToReset.forEach((k, v) -> c.seek(k, v));
        return null;
    };
}

这会将批处理中的每个主题/分区重置为批处理中的最低偏移量。spring-doc.cadn.net.cn

前面两个示例是简单的实现,您可能需要在错误处理程序中进行更多检查。
容器错误处理程序

从 2.8 版开始,旧版ErrorHandlerBatchErrorHandler接口已被新的CommonErrorHandler. 这些错误处理程序可以处理记录侦听器和批处理侦听器的错误,从而允许单个侦听器容器工厂为这两种类型的侦听器创建容器。CommonErrorHandler提供了替换大多数旧版框架错误处理程序实现的实现,并且弃用了旧版错误处理程序。 监听器容器和监听器容器工厂仍然支持旧接口;它们将在将来的版本中被弃用。spring-doc.cadn.net.cn

将自定义旧版错误处理程序实现迁移到CommonErrorHandler有关将自定义错误处理程序迁移到CommonErrorHandler.spring-doc.cadn.net.cn

使用事务时,默认情况下不配置错误处理程序,以便异常将回滚事务。 事务容器的错误处理由AfterRollbackProcessor. 如果您在使用事务时提供自定义错误处理程序,则如果要回滚事务,它必须引发异常。spring-doc.cadn.net.cn

此接口具有默认方法isAckAfterHandle()容器调用它来确定如果错误处理程序返回而不引发异常,是否应该提交偏移量;默认情况下,它返回 true。spring-doc.cadn.net.cn

通常,当错误未被“处理”时(例如,在执行查找作之后),框架提供的错误处理程序将抛出异常。 默认情况下,此类异常由容器记录在ERROR水平。 所有框架错误处理程序都扩展了KafkaExceptionLogLevelAware它允许您控制记录这些异常的级别。spring-doc.cadn.net.cn

/**
 * Set the level at which the exception thrown by this handler is logged.
 * @param logLevel the level (default ERROR).
 */
public void setLogLevel(KafkaException.Level logLevel) {
    ...
}

您可以指定一个全局错误处理程序,用于容器工厂中的所有侦听器。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
        kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    ...
    factory.setCommonErrorHandler(myErrorHandler);
    ...
    return factory;
}

默认情况下,如果带 Comments 的监听器方法抛出异常,则将其抛给容器,并根据容器配置处理消息。spring-doc.cadn.net.cn

容器在调用错误处理程序之前提交任何挂起的偏移量提交。spring-doc.cadn.net.cn

如果您使用的是 Spring Boot,则只需将错误处理程序添加为@Bean并且 Boot 会将其添加到自动配置的工厂中。spring-doc.cadn.net.cn

默认错误处理程序

这个新的错误处理程序将SeekToCurrentErrorHandlerRecoveringBatchErrorHandler,它们现在一直是多个版本的默认错误处理程序。 一个区别是批处理侦听器的回退行为(当BatchListenerFailedException引发)等效于重试完整批次spring-doc.cadn.net.cn

错误处理程序可以恢复(跳过)不断失败的记录。 默认情况下,在十次失败后,将记录失败的记录(在ERROR水平)。 您可以使用自定义恢复器 (BiConsumer) 和BackOff控制每个之间的交付尝试和延迟。 使用FixedBackOffFixedBackOff.UNLIMITED_ATTEMPTS导致(实际上)无限重试。 以下示例配置三次尝试后的恢复:spring-doc.cadn.net.cn

DefaultErrorHandler errorHandler =
    new DefaultErrorHandler((record, exception) -> {
        // recover after 3 failures, with no back off - e.g. send to a dead-letter topic
    }, new FixedBackOff(0L, 2L));

要使用此处理程序的自定义实例配置侦听器容器,请将其添加到容器工厂。spring-doc.cadn.net.cn

例如,使用@KafkaListener集装箱工厂,可以添加DefaultErrorHandler如下:spring-doc.cadn.net.cn

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckMode(AckMode.RECORD);
    factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000L, 2L)));
    return factory;
}

对于记录侦听器,这将重试最多 2 次(3 次投放尝试),回退 1 秒,而不是默认配置(FixedBackOff(0L, 9)). 重试用尽后,只会记录失败。spring-doc.cadn.net.cn

举个例子;如果poll返回六条记录(每个分区 0、1、2 中的两条),侦听器在第四条记录上抛出异常,则容器通过提交前三条消息的偏移量来确认前三条消息。 这DefaultErrorHandler寻求偏移分区 1 的偏移量 1 和分区 2 的偏移量 0。 下一个poll()返回三条未处理的记录。spring-doc.cadn.net.cn

如果AckModeBATCH,容器在调用错误处理程序之前提交前两个分区的偏移量。spring-doc.cadn.net.cn

对于批处理侦听器,侦听器必须抛出BatchListenerFailedException指示批处理中的哪些记录失败。spring-doc.cadn.net.cn

事件的顺序为:spring-doc.cadn.net.cn

  • 在索引之前提交记录的偏移量。spring-doc.cadn.net.cn

  • 如果重试未用尽,请执行搜索,以便重新传递所有剩余记录(包括失败的记录)。spring-doc.cadn.net.cn

  • 如果重试用尽,请尝试恢复失败的记录(仅限默认日志)并执行搜索,以便重新传递剩余的记录(不包括失败的记录)。 已提交已恢复记录的偏移量spring-doc.cadn.net.cn

  • 如果重试用尽且恢复失败,则执行搜索,就像重试未用尽一样。spring-doc.cadn.net.cn

默认恢复程序在重试用尽后记录失败的记录。 您可以使用自定义恢复器,也可以使用框架提供的恢复器,例如DeadLetterPublishingRecoverer.spring-doc.cadn.net.cn

当使用 POJO 批处理监听器时(例如List<Thing>),并且您没有要添加到异常的完整使用者记录,您可以只添加失败记录的索引:spring-doc.cadn.net.cn

@KafkaListener(id = "recovering", topics = "someTopic")
public void listen(List<Thing> things) {
    for (int i = 0; i < records.size(); i++) {
        try {
            process(things.get(i));
        }
        catch (Exception e) {
            throw new BatchListenerFailedException("Failed to process", i);
        }
    }
}

当容器配置为AckMode.MANUAL_IMMEDIATE,可以将错误处理程序配置为提交恢复记录的偏移量;将commitRecovered属性设置为true.spring-doc.cadn.net.cn

使用事务时,类似的功能由DefaultAfterRollbackProcessor. 请参阅回滚后处理器spring-doc.cadn.net.cn

DefaultErrorHandler将某些异常视为致命异常,并且跳过此类异常的重试;恢复器在第一次失败时调用。默认情况下,被视为致命的异常包括:spring-doc.cadn.net.cn

因为这些异常不太可能在重试投放时得到解决。spring-doc.cadn.net.cn

您可以向不可重试类别添加更多异常类型,或完全替换分类异常的映射。请参阅 JavadocsDefaultErrorHandler.addNotRetryableException()DefaultErrorHandler.setClassifications()更多信息,以及spring-retry BinaryExceptionClassifier.spring-doc.cadn.net.cn

这是一个示例,它添加了IllegalArgumentException到不可重试的异常:spring-doc.cadn.net.cn

@Bean
public DefaultErrorHandler errorHandler(ConsumerRecordRecoverer recoverer) {
    DefaultErrorHandler handler = new DefaultErrorHandler(recoverer);
    handler.addNotRetryableExceptions(IllegalArgumentException.class);
    return handler;
}

错误处理程序可以配置一个或多个RetryListeners,接收重试和恢复进度的通知。 从 2.8.10 版本开始,添加了批处理侦听器的方法。spring-doc.cadn.net.cn

@FunctionalInterface
public interface RetryListener {

    void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);

    default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
    }

    default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
    }

	default void failedDelivery(ConsumerRecords<?, ?> records, Exception ex, int deliveryAttempt) {
	}

	default void recovered(ConsumerRecords<?, ?> records, Exception ex) {
	}

	default void recoveryFailed(ConsumerRecords<?, ?> records, Exception original, Exception failure) {
	}

}

有关更多信息,请参阅 javadocs。spring-doc.cadn.net.cn

如果恢复器失败(抛出异常),则失败的记录将包含在查找中。 如果恢复器失败,则BackOff将默认重置,并且重新传递将再次经历回退,然后再尝试恢复。 要在恢复失败后跳过重试,请将错误处理程序的resetStateOnRecoveryFailurefalse.

您可以为错误处理程序提供BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>以确定BackOff使用,基于失败的记录和/或异常:spring-doc.cadn.net.cn

handler.setBackOffFunction((record, ex) -> { ... });

如果函数返回null,则处理程序的默认值BackOff将被使用。spring-doc.cadn.net.cn

设置resetStateOnExceptionChangetrue重试序列将重新启动(包括选择新的BackOff,如果已配置),如果异常类型在失败之间发生变化。 默认情况下,不考虑异常类型。spring-doc.cadn.net.cn

使用批处理错误处理程序的转换错误

从 2.8 版开始,批处理监听器现在可以正确处理转换错误,当使用MessageConverter使用ByteArrayDeserializer一个BytesDeserializerStringDeserializer,以及DefaultErrorHandler. 当发生转换错误时,有效负载将设置为 null,并将反序列化异常添加到记录标头中,类似于ErrorHandlingDeserializer. 列表ConversionExceptions 在侦听器中可用,因此侦听器可以抛出BatchListenerFailedException指示发生转换异常的第一个索引。spring-doc.cadn.net.cn

@KafkaListener(id = "test", topics = "topic")
void listen(List<Thing> in, @Header(KafkaHeaders.CONVERSION_FAILURES) List<ConversionException> exceptions) {
    for (int i = 0; i < in.size(); i++) {
        Foo foo = in.get(i);
        if (foo == null && exceptions.get(i) != null) {
            throw new BatchListenerFailedException("Conversion error", exceptions.get(i), i);
        }
        process(foo);
    }
}
重试完整批次

现在,这是DefaultErrorHandler对于批处理侦听器,其中侦听器抛出的异常不是BatchListenerFailedException.spring-doc.cadn.net.cn

无法保证在重新交付批次时,该批次具有相同的记录数和/或重新交付的记录按相同的顺序。 因此,不可能轻松维护批处理的重试状态。 这FallbackBatchErrorHandler采取以下方法。 如果批处理侦听器抛出的异常不是BatchListenerFailedException,则从内存中的记录批次执行重试。 为了避免在扩展重试序列期间重新平衡,错误处理程序会暂停使用者,在休眠之前轮询它以进行回退,每次重试,然后再次调用侦听器。 如果/当重试用尽时,则ConsumerRecordRecoverer为批处理中的每条记录调用。 如果恢复器抛出异常,或者线程在其睡眠期间中断,则将在下一次轮询时重新传递这批记录。 在退出之前,无论结果如何,消费者都会恢复。spring-doc.cadn.net.cn

此机制不能用于事务。

在等待BackOffinterval,错误处理程序将以短暂的睡眠循环,直到达到所需的延迟,同时检查容器是否已停止,允许睡眠在stop()而不是造成延误。spring-doc.cadn.net.cn

容器停止错误处理程序

CommonContainerStoppingErrorHandler如果侦听器抛出异常,则停止容器。 对于记录侦听器,当AckModeRECORD,则提交已处理记录的偏移量。 对于记录侦听器,当AckMode是任何手动值,则提交已确认记录的偏移量。 对于唱片侦听器,w当AckModeBATCH,或者对于批处理侦听器,当容器重新启动时,将重播整个批处理。spring-doc.cadn.net.cn

容器停止后,将ListenerExecutionFailedException被抛出。 这是为了导致事务回滚(如果启用了事务)。spring-doc.cadn.net.cn

委托错误处理程序

CommonDelegatingErrorHandler可以委托给不同的错误处理程序,具体取决于异常类型。 例如,您可能希望调用DefaultErrorHandler对于大多数例外情况,或CommonContainerStoppingErrorHandler对于其他人来说。spring-doc.cadn.net.cn

日志记录错误处理程序

CommonLoggingErrorHandler简单地记录异常;使用记录侦听器时,上一次轮询的剩余记录将传递给侦听器。 对于批处理侦听器,将记录批处理中的所有记录。spring-doc.cadn.net.cn

对记录侦听器和批处理侦听器使用不同的常见错误处理程序

如果您希望对记录和批处理侦听器使用不同的错误处理策略,请CommonMixedErrorHandler允许为每种侦听器类型配置特定的错误处理程序。spring-doc.cadn.net.cn

常见错误处理程序 Summery
旧版错误处理程序及其替换程序
旧版错误处理程序 更换

LoggingErrorHandlerspring-doc.cadn.net.cn

CommonLoggingErrorHandlerspring-doc.cadn.net.cn

BatchLoggingErrorHandlerspring-doc.cadn.net.cn

CommonLoggingErrorHandlerspring-doc.cadn.net.cn

ConditionalDelegatingErrorHandlerspring-doc.cadn.net.cn

DelegatingErrorHandlerspring-doc.cadn.net.cn

ConditionalDelegatingBatchErrorHandlerspring-doc.cadn.net.cn

DelegatingErrorHandlerspring-doc.cadn.net.cn

ContainerStoppingErrorHandlerspring-doc.cadn.net.cn

CommonContainerStoppingErrorHandlerspring-doc.cadn.net.cn

ContainerStoppingBatchErrorHandlerspring-doc.cadn.net.cn

CommonContainerStoppingErrorHandlerspring-doc.cadn.net.cn

SeekToCurrentErrorHandlerspring-doc.cadn.net.cn

DefaultErrorHandlerspring-doc.cadn.net.cn

SeekToCurrentBatchErrorHandlerspring-doc.cadn.net.cn

无需更换,使用DefaultErrorHandler与无限BackOff.spring-doc.cadn.net.cn

RecoveringBatchErrorHandlerspring-doc.cadn.net.cn

DefaultErrorHandlerspring-doc.cadn.net.cn

RetryingBatchErrorHandlerspring-doc.cadn.net.cn

无更换 - 使用DefaultErrorHandler并抛出BatchListenerFailedException.spring-doc.cadn.net.cn

将自定义旧版错误处理程序实现迁移到CommonErrorHandler

请参阅中的 javadocsCommonErrorHandler.spring-doc.cadn.net.cn

要将ErrorHandlerConsumerAwareErrorHandler实现,你应该实现handleRecord()并离开remainingRecords()返回false(默认值)。 您还应该实现handleOtherException()- 处理记录处理范围之外发生的异常(例如消费者错误)。spring-doc.cadn.net.cn

要将RemainingRecordsErrorHandler实现,你应该实现handleRemaining()并覆盖remainingRecords()返回true. 您还应该实现handleOtherException()- 处理记录处理范围之外发生的异常(例如消费者错误)。spring-doc.cadn.net.cn

要将任何BatchErrorHandler实现,你应该实现handleBatch()您还应该实现handleOtherException()- 处理记录处理范围之外发生的异常(例如消费者错误)。spring-doc.cadn.net.cn

后回滚处理器

使用事务时,如果侦听器抛出异常(并且错误处理程序(如果存在)抛出异常),则事务将回滚。 默认情况下,任何未处理的记录(包括失败的记录)都会在下一次轮询时重新提取。 这是通过执行seek作中的DefaultAfterRollbackProcessor. 使用批处理侦听器,将重新处理整批记录(容器不知道批处理中的哪条记录失败)。 要修改此行为,您可以使用自定义AfterRollbackProcessor. 例如,对于基于记录的侦听器,您可能希望跟踪失败的记录,并在尝试了几次后放弃,也许可以通过将其发布到死信主题。spring-doc.cadn.net.cn

从 2.2 版开始,DefaultAfterRollbackProcessor现在可以恢复(跳过)不断失败的记录。 默认情况下,在十次失败后,将记录失败的记录(在ERROR水平)。 您可以使用自定义恢复器 (BiConsumer)和最大故障。 设置maxFailures属性设置为负数会导致无限重试。 以下示例配置三次尝试后的恢复:spring-doc.cadn.net.cn

AfterRollbackProcessor<String, String> processor =
    new DefaultAfterRollbackProcessor((record, exception) -> {
        // recover after 3 failures, with no back off - e.g. send to a dead-letter topic
    }, new FixedBackOff(0L, 2L));

当您不使用事务时,您可以通过配置DefaultErrorHandler. 请参阅容器错误处理程序spring-doc.cadn.net.cn

使用批处理侦听器无法进行恢复,因为框架不知道批处理中的哪条记录不断失败。 在这种情况下,应用程序侦听器必须处理不断失败的记录。

从 2.2.5 版本开始,DefaultAfterRollbackProcessor可以在新事务中调用(在失败的事务回滚后启动)。 然后,如果您使用DeadLetterPublishingRecoverer要发布失败的记录,处理器会将恢复记录在原始主题/分区中的偏移量发送到事务。 要启用此功能,请将commitRecoveredkafkaTemplate属性DefaultAfterRollbackProcessor.spring-doc.cadn.net.cn

如果恢复器失败(抛出异常),则失败的记录将包含在查找中。 从 2.5.5 版开始,如果恢复器出现故障,BackOff将默认重置,并且重新传递将再次经历回退,然后再尝试恢复。 对于早期版本,BackOff未重置,并在下一次故障时重新尝试恢复。 要恢复到以前的行为,请将处理器的resetStateOnRecoveryFailure属性设置为false.

从 2.6 版开始,您现在可以为处理器提供BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>以确定BackOff使用,基于失败的记录和/或异常:spring-doc.cadn.net.cn

handler.setBackOffFunction((record, ex) -> { ... });

如果函数返回null,处理器的默认值BackOff将被使用。spring-doc.cadn.net.cn

从 2.6.3 版本开始,将resetStateOnExceptionChangetrue重试序列将重新启动(包括选择新的BackOff,如果已配置),如果异常类型在失败之间发生变化。 默认情况下,不考虑异常类型。spring-doc.cadn.net.cn

从 2.3.1 版本开始,类似于DefaultErrorHandlerDefaultAfterRollbackProcessor将某些异常视为致命异常,并且跳过此类异常的重试;恢复器在第一次失败时调用。默认情况下,被视为致命的异常包括:spring-doc.cadn.net.cn

因为这些异常不太可能在重试投放时得到解决。spring-doc.cadn.net.cn

您可以向不可重试类别添加更多异常类型,或完全替换分类异常的映射。请参阅 JavadocsDefaultAfterRollbackProcessor.setClassifications()更多信息,以及spring-retry BinaryExceptionClassifier.spring-doc.cadn.net.cn

这是一个示例,它添加了IllegalArgumentException到不可重试的异常:spring-doc.cadn.net.cn

@Bean
public DefaultAfterRollbackProcessor errorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer) {
    DefaultAfterRollbackProcessor processor = new DefaultAfterRollbackProcessor(recoverer);
    processor.addNotRetryableException(IllegalArgumentException.class);
    return processor;
}
与电流kafka-clients,则容器无法检测到ProducerFencedException是由重新平衡引起的,或者如果生产者的transactional.id由于超时或过期而被吊销。因为在大多数情况下,它是由重新平衡引起的,所以容器不会调用AfterRollbackProcessor(因为寻找分区是不合适的,因为我们不再被分配它们)。如果您确保超时足够大以处理每个事务并定期执行“空”事务(例如,通过ListenerContainerIdleEvent) 可以避免因超时和过期而导致的隔离。或者,您可以将stopContainerWhenFencedcontainer 属性设置为true并且容器将停止,避免记录丢失。您可以使用ConsumerStoppedEvent并检查Reason属性FENCED以检测此条件。由于该事件还具有对容器的引用,因此您可以使用此事件重新启动容器。

从 2.7 版本开始,在等待BackOffinterval,错误处理程序将以短暂的睡眠循环,直到达到所需的延迟,同时检查容器是否已停止,允许睡眠在stop()而不是造成延误。spring-doc.cadn.net.cn

从 2.7 版开始,处理器可以配置一个或多个RetryListeners,接收重试和恢复进度的通知。spring-doc.cadn.net.cn

@FunctionalInterface
public interface RetryListener {

    void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);

    default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
    }

    default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
    }

}

有关更多信息,请参阅 javadocs。spring-doc.cadn.net.cn

Delivery Attempts 标头

以下内容仅适用于记录侦听器,不适用于批处理侦听器。spring-doc.cadn.net.cn

从 2.5 版开始,当使用ErrorHandlerAfterRollbackProcessor实现DeliveryAttemptAware,则可以启用添加KafkaHeaders.DELIVERY_ATTEMPT标头 (kafka_deliveryAttempt) 到记录。 此标头的值是从 1 开始的递增整数。 接收原始数据时ConsumerRecord<?, ?>整数位于byte[4].spring-doc.cadn.net.cn

int delivery = ByteBuffer.wrap(record.headers()
    .lastHeader(KafkaHeaders.DELIVERY_ATTEMPT).value())
    .getInt()

使用时@KafkaListener使用DefaultKafkaHeaderMapperSimpleKafkaHeaderMapper,可以通过添加@Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery作为监听器方法的参数。spring-doc.cadn.net.cn

要启用此标头的填充,请设置容器属性deliveryAttemptHeadertrue. 默认情况下,它是禁用的,以避免查找每条记录的状态和添加标头的(小)开销。spring-doc.cadn.net.cn

DefaultErrorHandlerDefaultAfterRollbackProcessor支持此功能。spring-doc.cadn.net.cn

侦听器信息标头

在某些情况下,能够知道侦听器在哪个容器中运行是很有用的。spring-doc.cadn.net.cn

从 2.8.4 版开始,您现在可以将listenerInfo属性,或将info属性@KafkaListener注解。 然后,容器会在KafkaListener.LISTENER_INFO所有传入消息的标头;然后它可以用于记录拦截器、过滤器等,或用于侦听器本身。spring-doc.cadn.net.cn

@KafkaListener(id = "something", topic = "topic", filter = "someFilter",
        info = "this is the something listener")
public void listen2(@Payload Thing thing,
        @Header(KafkaHeaders.LISTENER_INFO) String listenerInfo) {
...
}

当用于RecordInterceptorRecordFilterStrategy实现时,标头作为字节数组在消费者记录中,使用KafkaListenerAnnotationBeanPostProcessorcharSet财产。spring-doc.cadn.net.cn

标头映射器还转换为String创建时MessageHeaders从使用者记录中,并且永远不要将此标头映射到出站记录上。spring-doc.cadn.net.cn

对于 POJO 批处理侦听器,从 2.8.6 版开始,标头被复制到批处理的每个成员中,并且也可以作为单个String参数。spring-doc.cadn.net.cn

@KafkaListener(id = "list2", topics = "someTopic", containerFactory = "batchFactory",
        info = "info for batch")
public void listen(List<Thing> list,
        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
        @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
        @Header(KafkaHeaders.OFFSET) List<Long> offsets,
        @Header(KafkaHeaders.LISTENER_INFO) String info) {
            ...
}
如果批处理侦听器具有筛选器,并且筛选器导致一个空批处理,则需要将required = false@Header参数,因为该信息不适用于空批次。

如果您收到List<Message<Thing>>信息在KafkaHeaders.LISTENER_INFO每个的标题Message<?>.spring-doc.cadn.net.cn

有关使用批处理的更多信息,请参阅批处理侦听器spring-doc.cadn.net.cn

发布死信记录

您可以配置DefaultErrorHandlerDefaultAfterRollbackProcessor当达到记录的最大故障数时,与记录恢复器一起使用。 该框架提供了DeadLetterPublishingRecoverer,它将失败的消息发布到另一个主题。 恢复器需要一个KafkaTemplate<Object, Object>,用于发送记录。 您还可以选择使用BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>,调用该函数来解析目标主题和分区。spring-doc.cadn.net.cn

默认情况下,死信记录被发送到名为<originalTopic>.DLT(原始主题名称后缀为.DLT) 并连接到与原始记录相同的分区。因此,当您使用默认解析器时,死信主题必须至少具有与原始主题一样多的分区。

如果返回的TopicPartition有一个负分区,则该分区未在ProducerRecord,因此分区由 Kafka 选择。从 2.2.4 版本开始,任何ListenerExecutionFailedException(抛出,例如,当在@KafkaListener方法)通过groupId财产。 这允许目标解析器使用它,除了ConsumerRecord以选择死信主题。spring-doc.cadn.net.cn

以下示例显示了如何连接自定义目标解析器:spring-doc.cadn.net.cn

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
        (r, e) -> {
            if (e instanceof FooException) {
                return new TopicPartition(r.topic() + ".Foo.failures", r.partition());
            }
            else {
                return new TopicPartition(r.topic() + ".other.failures", r.partition());
            }
        });
CommonErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(0L, 2L));

发送到死信主题的记录使用以下标头进行增强:spring-doc.cadn.net.cn

关键异常仅由以下原因引起DeserializationExceptions 所以没有DLT_KEY_EXCEPTION_CAUSE_FQCN.spring-doc.cadn.net.cn

有两种机制可以添加更多标头。spring-doc.cadn.net.cn

  1. 对恢复器进行子类化并覆盖createProducerRecord()-叫super.createProducerRecord()并添加更多标题。spring-doc.cadn.net.cn

  2. 提供一个BiFunction接收消费者记录和异常,返回一个Headers对象;来自那里的标头将被复制到最终的制作人记录中;另请参阅管理死信记录头。 用setHeadersFunction()BiFunction.spring-doc.cadn.net.cn

第二个更易于实现,但第一个有更多信息可用,包括已经组装的标准标头。spring-doc.cadn.net.cn

从 2.3 版开始,当与ErrorHandlingDeserializer,发布者将恢复记录value(),到无法反序列化的原始值。 以前,value()为 null,用户代码必须解码DeserializationException从邮件头。 此外,还可以提供多个KafkaTemplates 给出版商;例如,如果要发布byte[]DeserializationException,以及使用与成功反序列化的记录不同的序列化程序的值。 下面是配置发布者的示例KafkaTemplateStringbyte[]序列化器:spring-doc.cadn.net.cn

@Bean
public DeadLetterPublishingRecoverer publisher(KafkaTemplate<?, ?> stringTemplate,
        KafkaTemplate<?, ?> bytesTemplate) {

    Map<Class<?>, KafkaTemplate<?, ?>> templates = new LinkedHashMap<>();
    templates.put(String.class, stringTemplate);
    templates.put(byte[].class, bytesTemplate);
    return new DeadLetterPublishingRecoverer(templates);
}

发布者使用映射键来查找适合value()即将出版。 一个LinkedHashMap建议按顺序检查密钥。spring-doc.cadn.net.cn

发布时null值,当有多个模板时,恢复器将为Void类; 如果不存在,则values().iterator()将被使用。spring-doc.cadn.net.cn

从 2.7 开始,您可以使用setFailIfSendResultIsError方法,以便在消息发布失败时引发异常。您还可以使用setWaitForSendResultTimeout.spring-doc.cadn.net.cn

如果恢复器失败(抛出异常),则失败的记录将包含在查找中。 从 2.5.5 版开始,如果恢复器出现故障,BackOff将默认重置,并且重新传递将再次经历回退,然后再尝试恢复。 对于早期版本,BackOff未重置,并在下一次故障时重新尝试恢复。 要恢复到以前的行为,请将错误处理程序的resetStateOnRecoveryFailure属性设置为false.

从 2.6.3 版本开始,将resetStateOnExceptionChangetrue重试序列将重新启动(包括选择新的BackOff,如果已配置),如果异常类型在失败之间发生变化。 默认情况下,不考虑异常类型。spring-doc.cadn.net.cn

从版本 2.3 开始,恢复器还可以与 Kafka Streams 一起使用 - 有关更多信息,请参阅从反序列化异常中恢复spring-doc.cadn.net.cn

ErrorHandlingDeserializer在标头中添加反序列化异常ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADERErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER(使用 Java 序列化)。 默认情况下,这些标头不会保留在发布到死信主题的邮件中。 从 2.7 版开始,如果键和值都反序列化失败,则两者的原始值将填充到发送到 DLT 的记录中。spring-doc.cadn.net.cn

如果传入记录相互依赖,但可能无序到达,则将失败的记录重新发布到原始主题的尾部(一定次数)可能很有用,而不是将其直接发送到死信主题。 有关示例,请参阅此 Stack Overflow 问题spring-doc.cadn.net.cn

以下错误处理程序配置将完全执行此作:spring-doc.cadn.net.cn

@Bean
public ErrorHandler eh(KafkaOperations<String, String> template) {
    return new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template,
            (rec, ex) -> {
                org.apache.kafka.common.header.Header retries = rec.headers().lastHeader("retries");
                if (retries == null) {
                    retries = new RecordHeader("retries", new byte[] { 1 });
                    rec.headers().add(retries);
                }
                else {
                    retries.value()[0]++;
                }
                return retries.value()[0] > 5
                        ? new TopicPartition("topic.DLT", rec.partition())
                        : new TopicPartition("topic", rec.partition());
            }), new FixedBackOff(0L, 0L));
}

从 V2.7 开始,恢复程序会检查目标解析器选择的分区是否实际存在。 如果分区不存在,则ProducerRecord设置为null,允许KafkaProducer以选择分区。 您可以通过将verifyPartition属性设置为false.spring-doc.cadn.net.cn

管理死信记录标题

参考上文发布死信记录,该DeadLetterPublishingRecoverer有两个属性,用于在标头已存在时管理这些标头(例如,在重新处理失败的死信记录时,包括使用非阻塞重试时)。spring-doc.cadn.net.cn

Apache Kafka 支持多个同名的标头;要获取“最新”值,您可以使用headers.lastHeader(headerName);要获取多个标头的迭代器,请使用headers.headers(headerName).iterator().spring-doc.cadn.net.cn

当重复重新发布失败的记录时,这些标头可能会增长(并最终导致发布失败,因为RecordTooLargeException);对于异常标头,尤其是堆栈跟踪标头,尤其如此。spring-doc.cadn.net.cn

使用这两个属性的原因是,虽然您可能只想保留最后一个异常信息,但您可能希望保留记录在每次失败中传递的主题的历史记录。spring-doc.cadn.net.cn

appendOriginalHeaders应用于所有名为ORIGINALstripPreviousExceptionHeaders应用于所有名为EXCEPTION.spring-doc.cadn.net.cn

从 2.8.4 版开始,您现在可以控制将哪些标准标头添加到输出记录中。 请参阅enum HeadersToAdd对于默认添加的(当前)10 个标准标头的通用名称(这些不是实际的标头名称,只是一个抽象;实际的标头名称由getHeaderNames()子类可以覆盖的方法。spring-doc.cadn.net.cn

要排除标头,请使用excludeHeaders()方法;例如,若要禁止在标头中添加异常堆栈跟踪,请使用:spring-doc.cadn.net.cn

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.excludeHeaders(HeaderNames.HeadersToAdd.EX_STACKTRACE);

此外,您可以通过添加ExceptionHeadersCreator;这也会禁用所有标准异常标头。spring-doc.cadn.net.cn

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.setExceptionHeadersCreator((kafkaHeaders, exception, isKey, headerNames) -> {
    kafkaHeaders.add(new RecordHeader(..., ...));
});

同样从版本 2.8.4 开始,您现在可以通过addHeadersFunction方法。 这允许应用其他函数,即使已经注册了另一个函数,例如,在使用非阻塞重试时。spring-doc.cadn.net.cn

ExponentialBackOffWithMaxRetries实现

Spring Framework 提供了许多BackOff实现。 默认情况下,ExponentialBackOff将无限期重试;在多次重试尝试后放弃需要计算maxElapsedTime. 从版本 2.7.3 开始,Spring for Apache Kafka 提供了ExponentialBackOffWithMaxRetries这是一个接收maxRetries属性并自动计算maxElapsedTime,这更方便一些。spring-doc.cadn.net.cn

@Bean
DefaultErrorHandler handler() {
    ExponentialBackOffWithMaxRetries bo = new ExponentialBackOffWithMaxRetries(6);
    bo.setInitialInterval(1_000L);
    bo.setMultiplier(2.0);
    bo.setMaxInterval(10_000L);
    return new DefaultErrorHandler(myRecoverer, bo);
}

这将在1, 2, 4, 8, 10, 10秒,然后再调用恢复器。spring-doc.cadn.net.cn

4.1.22. JAAS 和 Kerberos

从 2.0 版开始,一个KafkaJaasLoginModuleInitializer已添加类以协助 Kerberos 配置。您可以将此 Bean 与所需的配置一起添加到应用程序上下文中。以下示例配置了这样的 Bean:spring-doc.cadn.net.cn

@Bean
public KafkaJaasLoginModuleInitializer jaasConfig() throws IOException {
    KafkaJaasLoginModuleInitializer jaasConfig = new KafkaJaasLoginModuleInitializer();
    jaasConfig.setControlFlag("REQUIRED");
    Map<String, String> options = new HashMap<>();
    options.put("useKeyTab", "true");
    options.put("storeKey", "true");
    options.put("keyTab", "/etc/security/keytabs/kafka_client.keytab");
    options.put("principal", "[email protected]");
    jaasConfig.setOptions(options);
    return jaasConfig;
}

4.1.23. 生产者和消费者记录日志记录

从版本 2.7.12、2.8.4 开始,您可以确定这些记录在调试日志等中的呈现方式。spring-doc.cadn.net.cn

KafkaUtils.setProducerRecordFormatter()KafkaUtils.setProducerRecordFormatter()了解更多信息。spring-doc.cadn.net.cn

4.2. Apache Kafka Streams 支持

从版本 1.1.4 开始,Spring for Apache Kafka 为 Kafka Streams 提供了一流的支持。要从 Spring 应用程序使用它,请kafka-streamsjar 必须存在于类路径上。它是 Spring for Apache Kafka 项目的可选依赖项,不会传递下载。spring-doc.cadn.net.cn

4.2.1. 基础知识

参考 Apache Kafka Streams 文档建议使用该 API 的以下方式:spring-doc.cadn.net.cn

// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.

StreamsBuilder builder = ...;  // when using the Kafka Streams DSL

// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;

KafkaStreams streams = new KafkaStreams(builder, config);

// Start the Kafka Streams instance
streams.start();

// Stop the Kafka Streams instance
streams.close();

因此,我们有两个主要组件:spring-doc.cadn.net.cn

KStream暴露给KafkaStreams实例由单个StreamsBuilder同时启动和停止,即使它们具有不同的逻辑。 换句话说,由StreamsBuilder与单个生命周期控制相关联。 一次KafkaStreams实例已被streams.close(),则无法重新启动。 相反,新的KafkaStreams必须创建实例以重新启动流处理。

4.2.2. 弹簧管理

为了从 Spring 应用程序上下文的角度简化 Kafka Streams 的使用,并通过容器使用生命周期管理,Spring for Apache Kafka 引入了StreamsBuilderFactoryBean. 这是一个AbstractFactoryBean实现以公开StreamsBuildersingleton 实例作为 bean。 以下示例创建这样的 bean:spring-doc.cadn.net.cn

@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
从版本 2.2 开始,流配置现在作为KafkaStreamsConfiguration对象而不是StreamsConfig.

StreamsBuilderFactoryBean还实现SmartLifecycle管理内部KafkaStreams实例。 与 Kafka Streams API 类似,您必须定义KStream实例,然后启动KafkaStreams. 这也适用于 Kafka Streams 的 Spring API。因此,当您使用 defaultautoStartup = trueStreamsBuilderFactoryBean,您必须声明KStream实例StreamsBuilder在刷新应用程序上下文之前。 例如KStream可以是常规的 bean 定义,而使用 Kafka Streams API 时不会产生任何影响。以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@Bean
public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
    KStream<Integer, String> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
    // Fluent KStream API
    return stream;
}

如果您想手动控制生命周期(例如,通过某些条件停止和启动),您可以引用StreamsBuilderFactoryBean直接使用工厂 bean () 前缀进行 bean。 因为&StreamsBuilderFactoryBean使用其内部KafkaStreams实例,可以安全地停止并重新启动它。一个新的KafkaStreams在每个start(). 您还可以考虑使用不同的StreamsBuilderFactoryBean实例,如果您想控制KStream实例。spring-doc.cadn.net.cn

您还可以指定KafkaStreams.StateListener,Thread.UncaughtExceptionHandlerStateRestoreListener选项StreamsBuilderFactoryBean,这些KafkaStreams实例。 此外,除了间接将这些选项设置为StreamsBuilderFactoryBean,从 2.1.5 版开始,您可以使用KafkaStreamsCustomizercallback 接口来配置内部KafkaStreams实例。 请注意KafkaStreamsCustomizer覆盖StreamsBuilderFactoryBean. 如果您需要执行一些KafkaStreams作,您可以直接访问该内部KafkaStreams实例,使用StreamsBuilderFactoryBean.getKafkaStreams(). 您可以自动布线StreamsBuilderFactoryBeanbean 的定义,但您应该确保在 bean 定义中使用完整类型,如以下示例所示:spring-doc.cadn.net.cn

@Bean
public StreamsBuilderFactoryBean myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;

或者,您可以添加@Qualifier如果使用接口 bean 定义,则按名称注入。以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
@Qualifier("&myKStreamBuilder")
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;

从 2.4.1 版开始,工厂 bean 有一个新属性infrastructureCustomizer带类型KafkaStreamsInfrastructureCustomizer; 这允许自定义StreamsBuilder(例如,添加状态存储)和/或Topology在创建流之前。spring-doc.cadn.net.cn

public interface KafkaStreamsInfrastructureCustomizer {

	void configureBuilder(StreamsBuilder builder);

	void configureTopology(Topology topology);

}

提供了默认的无作实现,以避免在不需要时必须实现这两种方法。spring-doc.cadn.net.cn

一个CompositeKafkaStreamsInfrastructureCustomizer,用于需要应用多个定制器时。spring-doc.cadn.net.cn

4.2.3. KafkaStreams 千分尺支持

在 2.5.3 版本中引入,您可以配置KafkaStreamsMicrometerListener自动注册千分尺KafkaStreams由工厂 Bean 管理的对象:spring-doc.cadn.net.cn

streamsBuilderFactoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry,
        Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));

4.2.4. Streams JSON 序列化和反序列化

为了在以 JSON 格式读取或写入主题或状态存储时序列化和反序列化数据,Spring for Apache Kafka 提供了一个JsonSerde使用 JSON 的实现,委托给JsonSerializerJsonDeserializer序列化、反序列化和消息转换中所述。 这JsonSerde实现通过其构造函数(目标类型或ObjectMapper). 在下面的示例中,我们使用JsonSerde序列化和反序列化CatKafka 流的有效负载(JsonSerde可以在需要实例的地方以类似的方式使用):spring-doc.cadn.net.cn

stream.through(Serdes.Integer(), new JsonSerde<>(Cat.class), "cats");

从版本 2.3 开始,以编程方式构造序列化程序/反序列化程序以在生产者/消费者工厂中使用时,您可以使用 fluent API,这简化了配置。spring-doc.cadn.net.cn

stream.through(new JsonSerde<>(MyKeyType.class)
        .forKeys()
        .noTypeInfo(),
    new JsonSerde<>(MyValueType.class)
        .noTypeInfo(),
    "myTypes");

4.2.5. 使用KafkaStreamBrancher

KafkaStreamBrancherclass 引入了一种更方便的方法来构建条件分支KStream.spring-doc.cadn.net.cn

考虑以下不使用KafkaStreamBrancher:spring-doc.cadn.net.cn

KStream<String, String>[] branches = builder.stream("source").branch(
      (key, value) -> value.contains("A"),
      (key, value) -> value.contains("B"),
      (key, value) -> true
     );
branches[0].to("A");
branches[1].to("B");
branches[2].to("C");

以下示例使用KafkaStreamBrancher:spring-doc.cadn.net.cn

new KafkaStreamBrancher<String, String>()
   .branch((key, value) -> value.contains("A"), ks -> ks.to("A"))
   .branch((key, value) -> value.contains("B"), ks -> ks.to("B"))
   //default branch should not necessarily be defined in the end of the chain!
   .defaultBranch(ks -> ks.to("C"))
   .onTopOf(builder.stream("source"));
   //onTopOf method returns the provided stream so we can continue with method chaining

4.2.6. 配置

要配置 Kafka Streams 环境,请StreamsBuilderFactoryBean需要一个KafkaStreamsConfiguration实例。 有关所有可能的选项,请参阅 Apache Kafka 文档spring-doc.cadn.net.cn

从版本 2.2 开始,流配置现在作为KafkaStreamsConfiguration对象,而不是作为StreamsConfig.

为了避免在大多数情况下使用样板代码,尤其是在开发微服务时,Spring for Apache Kafka 提供了@EnableKafkaStreams注释,您应该将其放在@Configuration类。 您只需要声明一个KafkaStreamsConfiguration名为 beandefaultKafkaStreamsConfig. 一个StreamsBuilderFactoryBeanbean,命名为defaultKafkaStreamsBuilder,在应用程序上下文中自动声明。 您可以声明和使用任何额外的StreamsBuilderFactoryBeanBeans也是如此。 您可以通过提供实现StreamsBuilderFactoryBeanConfigurer. 如果有多个这样的豆子,它们将根据它们的Ordered.order财产。spring-doc.cadn.net.cn

默认情况下,当工厂 Bean 停止时,KafkaStreams.cleanUp()方法被调用。 从 2.1.2 版开始,工厂 bean 具有额外的构造函数,将CleanupConfig对象,该对象具有属性,可让您控制cleanUp()方法在start()stop()或者两者都不是。 从 2.7 版开始,默认值是从不清理本地状态。spring-doc.cadn.net.cn

4.2.7. 标头扩充器

2.3 版添加了HeaderEnricher实现Transformer. 这可用于在流处理中添加标头;标头值是 SpEL 表达式;表达式求值的根对象有 3 个属性:spring-doc.cadn.net.cn

表达式必须返回byte[]String(将转换为byte[]UTF-8).spring-doc.cadn.net.cn

要在流中使用扩充器,请执行以下作:spring-doc.cadn.net.cn

.transform(() -> enricher)

转换器不会改变keyvalue;它只是添加标题。spring-doc.cadn.net.cn

如果您的流是多线程的,则需要为每条记录创建一个新实例。
.transform(() -> new HeaderEnricher<..., ...>(expressionMap))

这是一个简单的示例,添加一个文字标头和一个变量:spring-doc.cadn.net.cn

Map<String, Expression> headers = new HashMap<>();
headers.put("header1", new LiteralExpression("value1"));
SpelExpressionParser parser = new SpelExpressionParser();
headers.put("header2", parser.parseExpression("context.timestamp() + ' @' + context.offset()"));
HeaderEnricher<String, String> enricher = new HeaderEnricher<>(headers);
KStream<String, String> stream = builder.stream(INPUT);
stream
        .transform(() -> enricher)
        .to(OUTPUT);

4.2.8.MessagingTransformer

2.3 版添加了MessagingTransformer这允许 Kafka Streams 拓扑与 Spring Messaging 组件(例如 Spring Integration 流)交互。 转换器需要实现MessagingFunction.spring-doc.cadn.net.cn

@FunctionalInterface
public interface MessagingFunction {

	Message<?> exchange(Message<?> message);

}

Spring Integration 使用其GatewayProxyFactoryBean. 它还需要一个MessagingMessageConverter将键、值和元数据(包括标头)转换为 Spring Messaging 或从 Spring Messaging 转换Message<?>. 看[从KStream] 以获取更多信息。spring-doc.cadn.net.cn

4.2.9. 从反序列化异常中恢复

2.3 版引入了RecoveringDeserializationExceptionHandler当发生反序列化异常时,它可以采取一些作。 请参阅 Kafka 文档DeserializationExceptionHandler,其中RecoveringDeserializationExceptionHandler是一个实现。 这RecoveringDeserializationExceptionHandler配置了ConsumerRecordRecoverer实现。 该框架提供了DeadLetterPublishingRecoverer将失败的记录发送到死信主题。 有关此恢复器的更多信息,请参阅发布死信记录spring-doc.cadn.net.cn

要配置恢复器,请将以下属性添加到流配置中:spring-doc.cadn.net.cn

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
    Map<String, Object> props = new HashMap<>();
    ...
    props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
            RecoveringDeserializationExceptionHandler.class);
    props.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, recoverer());
    ...
    return new KafkaStreamsConfiguration(props);
}

@Bean
public DeadLetterPublishingRecoverer recoverer() {
    return new DeadLetterPublishingRecoverer(kafkaTemplate(),
            (record, ex) -> new TopicPartition("recovererDLQ", -1));
}

当然,recoverer()bean 可以是你自己的实现ConsumerRecordRecoverer.spring-doc.cadn.net.cn

4.2.10. Kafka Streams 示例

以下示例结合了本章中涵盖的所有主题:spring-doc.cadn.net.cn

@Configuration
@EnableKafka
@EnableKafkaStreams
public static class KafkaStreamsConfig {

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
        return new KafkaStreamsConfiguration(props);
    }

    @Bean
    public StreamsBuilderFactoryBeanConfigurer configurer() {
        return fb -> fb.setStateListener((newState, oldState) -> {
            System.out.println("State transition from " + oldState + " to " + newState);
        });
    }

    @Bean
    public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
        KStream<Integer, String> stream = kStreamBuilder.stream("streamingTopic1");
        stream
                .mapValues((ValueMapper<String, String>) String::toUpperCase)
                .groupByKey()
                .windowedBy(TimeWindows.of(Duration.ofMillis(1000)))
                .reduce((String value1, String value2) -> value1 + value2,
                		Named.as("windowStore"))
                .toStream()
                .map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
                .filter((i, s) -> s.length() > 40)
                .to("streamingTopic2");

        stream.print(Printed.toSysOut());

        return stream;
    }

}

4.3. 测试应用程序

spring-kafka-testjar 包含一些有用的实用程序来帮助测试您的应用程序。spring-doc.cadn.net.cn

4.3.1. 卡夫卡测试实用程序

o.s.kafka.test.utils.KafkaTestUtils提供了许多静态帮助程序方法来使用记录、检索各种记录偏移量等。 有关完整详细信息,请参阅其 Javadocsspring-doc.cadn.net.cn

4.3.2. JUnit

o.s.kafka.test.utils.KafkaTestUtils还提供了一些静态方法来设置生产者和消费者属性。 以下列表显示了这些方法签名:spring-doc.cadn.net.cn

/**
 * Set up test properties for an {@code <Integer, String>} consumer.
 * @param group the group id.
 * @param autoCommit the auto commit.
 * @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
 * @return the properties.
 */
public static Map<String, Object> consumerProps(String group, String autoCommit,
                                       EmbeddedKafkaBroker embeddedKafka) { ... }

/**
 * Set up test properties for an {@code <Integer, String>} producer.
 * @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
 * @return the properties.
 */
public static Map<String, Object> producerProps(EmbeddedKafkaBroker embeddedKafka) { ... }

从 2.5 版开始,consumerProps方法将ConsumerConfig.AUTO_OFFSET_RESET_CONFIGearliest. 这是因为,在大多数情况下,您希望使用者使用测试用例中发送的任何消息。 这ConsumerConfig默认值为latest这意味着在使用者开始之前,测试已经发送的消息将不会收到这些记录。 要恢复到以前的行为,请将属性设置为latest调用该方法后。spring-doc.cadn.net.cn

使用嵌入式代理时,通常最佳做法是为每个测试使用不同的主题,以防止串扰。 如果由于某种原因无法做到这一点,请注意consumeFromEmbeddedTopics方法的默认行为是在分配后将分配的分区查找到开头。 由于它无权访问使用者属性,因此您必须使用重载方法,该方法采用seekToEndboolean 参数来搜索到末尾而不是开头。spring-doc.cadn.net.cn

一个 JUnit 4@Rule包装器EmbeddedKafkaBroker用于创建嵌入式 Kafka 和嵌入式 Zookeeper 服务器。 (有关使用 @EmbeddedKafka 的信息,请参阅 注释@EmbeddedKafka与 JUnit 5)。 以下列表显示了这些方法的签名:spring-doc.cadn.net.cn

/**
 * Create embedded Kafka brokers.
 * @param count the number of brokers.
 * @param controlledShutdown passed into TestUtils.createBrokerConfig.
 * @param topics the topics to create (2 partitions per).
 */
public EmbeddedKafkaRule(int count, boolean controlledShutdown, String... topics) { ... }

/**
 *
 * Create embedded Kafka brokers.
 * @param count the number of brokers.
 * @param controlledShutdown passed into TestUtils.createBrokerConfig.
 * @param partitions partitions per topic.
 * @param topics the topics to create.
 */
public EmbeddedKafkaRule(int count, boolean controlledShutdown, int partitions, String... topics) { ... }

EmbeddedKafkaBrokerclass 有一个实用方法,可让您使用它创建的所有主题。 以下示例演示如何使用它:spring-doc.cadn.net.cn

Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(
        consumerProps);
Consumer<Integer, String> consumer = cf.createConsumer();
embeddedKafka.consumeFromAllEmbeddedTopics(consumer);

KafkaTestUtils有一些实用方法可以从消费者那里获取结果。 以下列表显示了这些方法签名:spring-doc.cadn.net.cn

/**
 * Poll the consumer, expecting a single record for the specified topic.
 * @param consumer the consumer.
 * @param topic the topic.
 * @return the record.
 * @throws org.junit.ComparisonFailure if exactly one record is not received.
 */
public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic) { ... }

/**
 * Poll the consumer for records.
 * @param consumer the consumer.
 * @return the records.
 */
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer) { ... }

以下示例演示如何使用KafkaTestUtils:spring-doc.cadn.net.cn

...
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = KafkaTestUtils.getSingleRecord(consumer, "topic");
...

当嵌入式 Kafka 和嵌入式 Zookeeper 服务器由EmbeddedKafkaBroker,一个名为spring.embedded.kafka.brokers设置为 Kafka 代理的地址和名为spring.embedded.zookeeper.connect设置为 Zookeeper 的地址。 方便的常量 (EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERSEmbeddedKafkaBroker.SPRING_EMBEDDED_ZOOKEEPER_CONNECT) 为该物业提供。spring-doc.cadn.net.cn

使用EmbeddedKafkaBroker.brokerProperties(Map<String, String>),您可以为 Kafka 服务器提供其他属性。 有关可能的代理属性的更多信息,请参阅 Kafka Configspring-doc.cadn.net.cn

4.3.3. 配置主题

以下示例配置创建名为cathat有五个分区,一个名为thing1有 10 个分区,以及一个名为thing2有 15 个分区:spring-doc.cadn.net.cn

public class MyTests {

    @ClassRule
    private static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, false, 5, "cat", "hat");

    @Test
    public void test() {
        embeddedKafkaRule.getEmbeddedKafka()
              .addTopics(new NewTopic("thing1", 10, (short) 1), new NewTopic("thing2", 15, (short) 1));
        ...
      }

}

默认情况下,addTopics当出现问题时(例如添加已经存在的主题)将抛出异常。 版本 2.6 添加了该方法的新版本,该方法返回Map<String, Exception>;键是主题名称,值是null成功,或Exception失败。spring-doc.cadn.net.cn

4.3.4. 对多个测试类使用相同的代理

没有内置支持这样做,但您可以对多个测试类使用相同的代理,类似于以下内容:spring-doc.cadn.net.cn

public final class EmbeddedKafkaHolder {

    private static EmbeddedKafkaBroker embeddedKafka = new EmbeddedKafkaBroker(1, false)
            .brokerListProperty("spring.kafka.bootstrap-servers");

    private static boolean started;

    public static EmbeddedKafkaBroker getEmbeddedKafka() {
        if (!started) {
            try {
                embeddedKafka.afterPropertiesSet();
            }
            catch (Exception e) {
                throw new KafkaException("Embedded broker failed to start", e);
            }
            started = true;
        }
        return embeddedKafka;
    }

    private EmbeddedKafkaHolder() {
        super();
    }

}

这假设有一个 Spring Boot 环境,嵌入式代理替换了引导服务器属性。spring-doc.cadn.net.cn

然后,在每个测试类中,您可以使用类似于以下内容的内容:spring-doc.cadn.net.cn

static {
    EmbeddedKafkaHolder.getEmbeddedKafka().addTopics("topic1", "topic2");
}

private static final EmbeddedKafkaBroker broker = EmbeddedKafkaHolder.getEmbeddedKafka();

如果您不使用 Spring Boot,您可以使用以下命令获取引导服务器broker.getBrokersAsString().spring-doc.cadn.net.cn

前面的示例没有提供在所有测试完成后关闭代理的机制。 例如,如果您在 Gradle 守护进程中运行测试,这可能会成为问题。 在这种情况下,您不应该使用此技术,或者您应该使用一些东西来调用destroy()EmbeddedKafkaBroker当您的测试完成时。

4.3.5. @EmbeddedKafka注释

我们通常建议您将该规则用作@ClassRule以避免在测试之间启动和停止代理(并为每个测试使用不同的主题)。 从 2.0 版本开始,如果您使用 Spring 的测试应用程序上下文缓存,您还可以声明一个EmbeddedKafkaBrokerbean,因此单个代理可以跨多个测试类使用。 为方便起见,我们提供了一个名为@EmbeddedKafka以注册EmbeddedKafkaBroker豆。 以下示例演示如何使用它:spring-doc.cadn.net.cn

@RunWith(SpringRunner.class)
@DirtiesContext
@EmbeddedKafka(partitions = 1,
         topics = {
                 KafkaStreamsTests.STREAMING_TOPIC1,
                 KafkaStreamsTests.STREAMING_TOPIC2 })
public class KafkaStreamsTests {

    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;

    @Test
    public void someTest() {
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", this.embeddedKafka);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
        Consumer<Integer, String> consumer = cf.createConsumer();
        this.embeddedKafka.consumeFromAnEmbeddedTopic(consumer, KafkaStreamsTests.STREAMING_TOPIC2);
        ConsumerRecords<Integer, String> replies = KafkaTestUtils.getRecords(consumer);
        assertThat(replies.count()).isGreaterThanOrEqualTo(1);
    }

    @Configuration
    @EnableKafkaStreams
    public static class KafkaStreamsConfiguration {

        @Value("${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
        private String brokerAddresses;

        @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
        public KafkaStreamsConfiguration kStreamsConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses);
            return new KafkaStreamsConfiguration(props);
        }

    }

}

从 2.2.4 版开始,您还可以使用@EmbeddedKafka注释来指定 Kafka 端口属性。spring-doc.cadn.net.cn

以下示例将topics,brokerPropertiesbrokerPropertiesLocation属性@EmbeddedKafka支持属性占位符解析:spring-doc.cadn.net.cn

@TestPropertySource(locations = "classpath:/test.properties")
@EmbeddedKafka(topics = { "any-topic", "${kafka.topics.another-topic}" },
        brokerProperties = { "log.dir=${kafka.broker.logs-dir}",
                            "listeners=PLAINTEXT://localhost:${kafka.broker.port}",
                            "auto.create.topics.enable=${kafka.broker.topics-enable:true}" },
        brokerPropertiesLocation = "classpath:/broker.properties")

在前面的示例中,属性占位符${kafka.topics.another-topic},${kafka.broker.logs-dir}${kafka.broker.port}从 Spring 解析Environment. 此外,代理属性是从broker.propertiesclasspath 资源由brokerPropertiesLocation. 属性占位符已解析为brokerPropertiesLocationURL 和资源中找到的任何属性占位符。 属性定义brokerProperties覆盖在brokerPropertiesLocation.spring-doc.cadn.net.cn

您可以使用@EmbeddedKafka使用 JUnit 4 或 JUnit 5 进行注释。spring-doc.cadn.net.cn

4.3.6. 使用 JUnit5 @EmbeddedKafka注释

从 2.3 版开始,有两种方法可以使用@EmbeddedKafka使用 JUnit5 进行注释。 当与@SpringJunitConfig注释,则将嵌入式代理添加到测试应用程序上下文中。 您可以在类或方法级别自动将代理连接到测试中,以获取代理地址列表。spring-doc.cadn.net.cn

使用 spring test 上下文时,EmbdeddedKafkaCondition创建经纪人;该条件包括一个参数解析器,因此您可以访问测试方法中的代理...spring-doc.cadn.net.cn

@EmbeddedKafka
public class EmbeddedKafkaConditionTests {

    @Test
    public void test(EmbeddedKafkaBroker broker) {
        String brokerList = broker.getBrokersAsString();
        ...
    }

}

如果类注释为@EmbeddedBroker也没有ExtendedWith(SpringExtension.class).@SpringJunitConfig@SpringBootTest是如此元注释,并且当这些注释中的任何一个也存在时,将使用基于上下文的代理。spring-doc.cadn.net.cn

当有可用的 Spring 测试应用程序上下文时,主题和代理属性可以包含属性占位符,只要在某处定义了属性,这些占位符就会被解析。 如果没有可用的 Spring 上下文,则不会解析这些占位符。

4.3.7. 嵌入式代理@SpringBootTest附注

Spring Initializr 现在会自动添加spring-kafka-test测试范围内对项目配置的依赖关系。spring-doc.cadn.net.cn

如果您的应用程序在spring-cloud-stream如果您想使用嵌入式代理进行测试,则必须删除spring-cloud-stream-test-support依赖项,因为它将真正的绑定程序替换为测试用例的测试绑定程序。 如果您希望某些测试使用测试绑定器,而某些测试使用嵌入式代理,则使用实际绑定器的测试需要通过在测试类中排除绑定器自动配置来禁用测试绑定器。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@RunWith(SpringRunner.class)
@SpringBootTest(properties = "spring.autoconfigure.exclude="
    + "org.springframework.cloud.stream.test.binder.TestSupportBinderAutoConfiguration")
public class MyApplicationTests {
    ...
}

有几种方法可以在 Spring Boot 应用程序测试中使用嵌入式代理。spring-doc.cadn.net.cn

他们包括:spring-doc.cadn.net.cn

JUnit4 类规则

以下示例显示如何使用 JUnit4 类规则创建嵌入式代理:spring-doc.cadn.net.cn

@RunWith(SpringRunner.class)
@SpringBootTest
public class MyApplicationTests {

    @ClassRule
    public static EmbeddedKafkaRule broker = new EmbeddedKafkaRule(1,
        false, "someTopic")
            .brokerListProperty("spring.kafka.bootstrap-servers");
    }

    @Autowired
    private KafkaTemplate<String, String> template;

    @Test
    public void test() {
        ...
    }

}

请注意,由于这是一个 Spring Boot 应用程序,因此我们覆盖代理列表属性以设置 Boot 的属性。spring-doc.cadn.net.cn

@EmbeddedKafka注释或EmbeddedKafkaBroker

以下示例演示如何使用@EmbeddedKafka用于创建嵌入式代理的注释:spring-doc.cadn.net.cn

@RunWith(SpringRunner.class)
@EmbeddedKafka(topics = "someTopic",
        bootstrapServersProperty = "spring.kafka.bootstrap-servers")
public class MyApplicationTests {

    @Autowired
    private KafkaTemplate<String, String> template;

    @Test
    public void test() {
        ...
    }

}

4.3.8. Hamcrest 匹配器

o.s.kafka.test.hamcrest.KafkaMatchers提供以下匹配器:spring-doc.cadn.net.cn

/**
 * @param key the key
 * @param <K> the type.
 * @return a Matcher that matches the key in a consumer record.
 */
public static <K> Matcher<ConsumerRecord<K, ?>> hasKey(K key) { ... }

/**
 * @param value the value.
 * @param <V> the type.
 * @return a Matcher that matches the value in a consumer record.
 */
public static <V> Matcher<ConsumerRecord<?, V>> hasValue(V value) { ... }

/**
 * @param partition the partition.
 * @return a Matcher that matches the partition in a consumer record.
 */
public static Matcher<ConsumerRecord<?, ?>> hasPartition(int partition) { ... }

/**
 * Matcher testing the timestamp of a {@link ConsumerRecord} assuming the topic has been set with
 * {@link org.apache.kafka.common.record.TimestampType#CREATE_TIME CreateTime}.
 *
 * @param ts timestamp of the consumer record.
 * @return a Matcher that matches the timestamp in a consumer record.
 */
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(long ts) {
  return hasTimestamp(TimestampType.CREATE_TIME, ts);
}

/**
 * Matcher testing the timestamp of a {@link ConsumerRecord}
 * @param type timestamp type of the record
 * @param ts timestamp of the consumer record.
 * @return a Matcher that matches the timestamp in a consumer record.
 */
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(TimestampType type, long ts) {
  return new ConsumerRecordTimestampMatcher(type, ts);
}

4.3.9. AssertJ 条件

您可以使用以下 AssertJ 条件:spring-doc.cadn.net.cn

/**
 * @param key the key
 * @param <K> the type.
 * @return a Condition that matches the key in a consumer record.
 */
public static <K> Condition<ConsumerRecord<K, ?>> key(K key) { ... }

/**
 * @param value the value.
 * @param <V> the type.
 * @return a Condition that matches the value in a consumer record.
 */
public static <V> Condition<ConsumerRecord<?, V>> value(V value) { ... }

/**
 * @param key the key.
 * @param value the value.
 * @param <K> the key type.
 * @param <V> the value type.
 * @return a Condition that matches the key in a consumer record.
 * @since 2.2.12
 */
public static <K, V> Condition<ConsumerRecord<K, V>> keyValue(K key, V value) { ... }

/**
 * @param partition the partition.
 * @return a Condition that matches the partition in a consumer record.
 */
public static Condition<ConsumerRecord<?, ?>> partition(int partition) { ... }

/**
 * @param value the timestamp.
 * @return a Condition that matches the timestamp value in a consumer record.
 */
public static Condition<ConsumerRecord<?, ?>> timestamp(long value) {
  return new ConsumerRecordTimestampCondition(TimestampType.CREATE_TIME, value);
}

/**
 * @param type the type of timestamp
 * @param value the timestamp.
 * @return a Condition that matches the timestamp value in a consumer record.
 */
public static Condition<ConsumerRecord<?, ?>> timestamp(TimestampType type, long value) {
  return new ConsumerRecordTimestampCondition(type, value);
}

4.3.10. 示例

以下示例汇集了本章中涵盖的大多数主题:spring-doc.cadn.net.cn

public class KafkaTemplateTests {

    private static final String TEMPLATE_TOPIC = "templateTopic";

    @ClassRule
    public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, TEMPLATE_TOPIC);

    @Test
    public void testTemplate() throws Exception {
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false",
            embeddedKafka.getEmbeddedKafka());
        DefaultKafkaConsumerFactory<Integer, String> cf =
                            new DefaultKafkaConsumerFactory<Integer, String>(consumerProps);
        ContainerProperties containerProperties = new ContainerProperties(TEMPLATE_TOPIC);
        KafkaMessageListenerContainer<Integer, String> container =
                            new KafkaMessageListenerContainer<>(cf, containerProperties);
        final BlockingQueue<ConsumerRecord<Integer, String>> records = new LinkedBlockingQueue<>();
        container.setupMessageListener(new MessageListener<Integer, String>() {

            @Override
            public void onMessage(ConsumerRecord<Integer, String> record) {
                System.out.println(record);
                records.add(record);
            }

        });
        container.setBeanName("templateTests");
        container.start();
        ContainerTestUtils.waitForAssignment(container,
                            embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic());
        Map<String, Object> producerProps =
                            KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
        ProducerFactory<Integer, String> pf =
                            new DefaultKafkaProducerFactory<Integer, String>(producerProps);
        KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
        template.setDefaultTopic(TEMPLATE_TOPIC);
        template.sendDefault("foo");
        assertThat(records.poll(10, TimeUnit.SECONDS), hasValue("foo"));
        template.sendDefault(0, 2, "bar");
        ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
        assertThat(received, hasKey(2));
        assertThat(received, hasPartition(0));
        assertThat(received, hasValue("bar"));
        template.send(TEMPLATE_TOPIC, 0, 2, "baz");
        received = records.poll(10, TimeUnit.SECONDS);
        assertThat(received, hasKey(2));
        assertThat(received, hasPartition(0));
        assertThat(received, hasValue("baz"));
    }

}

前面的示例使用 Hamcrest 匹配器。 跟AssertJ,最后一部分如下所示:spring-doc.cadn.net.cn

assertThat(records.poll(10, TimeUnit.SECONDS)).has(value("foo"));
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
// using individual assertions
assertThat(received).has(key(2));
assertThat(received).has(value("bar"));
assertThat(received).has(partition(0));
template.send(TEMPLATE_TOPIC, 0, 2, "baz");
received = records.poll(10, TimeUnit.SECONDS);
// using allOf()
assertThat(received).has(allOf(keyValue(2, "baz"), partition(0)));

4.4. 非阻塞重试

这是一项实验性功能,在删除实验性指定之前,不进行重大 API 更改的通常规则不适用于此功能。 鼓励用户试用该功能并通过 GitHub 问题或 GitHub 讨论提供反馈。 这仅与 API 有关;该功能被认为是完整且可靠的。

使用 Kafka 实现非阻塞重试 / dlt 功能通常需要设置额外的主题并创建和配置相应的监听器。 从 2.7 开始,Spring for Apache Kafka 通过@RetryableTopicannotation 和RetryTopicConfiguration类来简化引导。spring-doc.cadn.net.cn

4.4.1. 模式的工作原理

如果消息处理失败,则消息将转发到具有回退时间戳的重试主题。 然后,重试主题使用者会检查时间戳,如果时间戳不是到期的,则会暂停该主题分区的消耗。 到期时,将恢复分区消耗,并再次使用消息。 如果消息处理再次失败,则消息将被转发到下一个重试主题,并重复该模式,直到成功处理发生,或者尝试用尽,并将消息发送到死信主题(如果已配置)。spring-doc.cadn.net.cn

举例来说,如果您有一个“main-topic”主题,并且想要设置指数退避为 1000 毫秒的非阻塞重试,乘数为 2 次和 4 次最大尝试,它将创建 main-topic-retry-1000、main-topic-retry-2000、main-topic-retry-4000 和 main-topic-dlt 主题并配置相应的使用者。 该框架还负责创建主题以及设置和配置侦听器。spring-doc.cadn.net.cn

使用此策略,您将失去 Kafka 对该主题的排序保证。
您可以将AckMode您喜欢的模式,但是RECORD建议。
目前,此功能不支持类级别@KafkaListener附注

4.4.2. 回退延迟精度

概述和保证

所有消息处理和回退都由使用者线程处理,因此,延迟精度在尽最大努力的基础上得到保证。 如果一条消息的处理时间长于该使用者的下一条消息的回退期,则下一条消息的延迟将高于预期。 此外,对于短延迟(大约 1 秒或更短),线程必须执行的维护工作(例如提交偏移量)可能会延迟消息处理的执行。 如果重试主题的使用者正在处理多个分区,精度也会受到影响,因为我们依赖于从轮询中唤醒使用者并具有完整的 pollTimeouts 来进行计时调整。spring-doc.cadn.net.cn

话虽这么说,对于处理单个分区的使用者来说,在大多数情况下,消息的处理应该在确切的到期时间后 100 毫秒内进行。spring-doc.cadn.net.cn

可以保证消息永远不会在到期时间之前得到处理。
调整延迟精度

消息的处理延迟精度依赖于两个ContainerProperties:ContainerProperties.pollTimeoutContainerProperties.idlePartitionEventInterval. 这两个属性都将在重试主题中自动设置,而 dlt 的ListenerContainerFactory到该主题的最小延迟值的四分之一,最小值为 250 毫秒,最大值为 5000 毫秒。 仅当属性具有默认值时才会设置这些值 - 如果您自己更改任一值,则不会覆盖您的更改。 这样,如果需要,可以调整重试主题的精度和性能。spring-doc.cadn.net.cn

您可以单独拥有ListenerContainerFactory主主题和重试主题的实例 - 这样,您可以进行不同的设置以更好地满足您的需求,例如为主要主题设置较高的轮询超时,为重试主题设置较低的轮询超时。

4.4.3. 配置

使用@RetryableTopic注解

配置重试主题和 dlt@KafkaListenerannotated 方法,您只需添加@RetryableTopic注释,Spring for Apache Kafka 将使用默认配置引导所有必要的主题和消费者。spring-doc.cadn.net.cn

@RetryableTopic(kafkaTemplate = "myRetryableTopicKafkaTemplate")
@KafkaListener(topics = "my-annotated-topic", groupId = "myGroupId")
public void processMessage(MyPojo message) {
        // ... message processing
}

您可以在同一类中指定一个方法来处理 dlt 消息,方法是使用@DltHandler注解。 如果未提供 DltHandler 方法,则会创建一个默认使用者,该使用者仅记录使用情况。spring-doc.cadn.net.cn

@DltHandler
public void processMessage(MyPojo message) {
// ... message processing, persistence, etc
}
如果您没有指定 kafkaTemplate 名称retryTopicDefaultKafkaTemplate将被查找。 如果未找到 bean,则抛出异常。
RetryTopicConfiguration

您还可以通过创建RetryTopicConfigurationbean 中的@Configuration带注释的类。spring-doc.cadn.net.cn

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, Object> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .create(template);
}

这将使用默认配置为用“@KafkaListener”注释的方法中的所有主题创建重试主题和 dlt 以及相应的消费者。这KafkaTemplate实例是消息转发所必需的。spring-doc.cadn.net.cn

为了实现对如何处理每个主题的非阻塞重审的更细粒度的控制,不止一个RetryTopicConfiguration可以提供 bean。spring-doc.cadn.net.cn

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackoff(3000)
            .maxAttempts(5)
            .includeTopics("my-topic", "my-other-topic")
            .create(template);
}

@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .exponentialBackoff(1000, 2, 5000)
            .maxAttempts(4)
            .excludeTopics("my-topic", "my-other-topic")
            .retryOn(MyException.class)
            .create(template);
}
重试主题和 dlt 的消费者将被分配给一个消费者组,该组 ID 是您在groupId参数@KafkaListener带有主题后缀的注释。如果您不提供任何内容,它们都将属于同一组,并且重试主题的重新平衡将导致主要主题不必要的重新平衡。
如果消费者配置了ErrorHandlingDeserializer,要处理反真实化异常,配置KafkaTemplate及其生产者带有一个序列化器,可以处理普通对象和原始对象byte[]值,这是由反序列化异常产生的。 模板的泛型值类型应为Object. 一种技术是使用DelegatingByTypeSerializer;下面是一个例子:
@Bean
public ProducerFactory<String, Object> producerFactory() {
  return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
    new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
          MyNormalObject.class, new JsonSerializer<Object>())));
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
  return new KafkaTemplate<>(producerFactory());
}
倍数@KafkaListener注释可以用于同一主题,无论是否手动分配分区以及非阻塞重试,但给定主题将仅使用一种配置。 最好使用单个RetryTopicConfiguration用于配置此类主题的 bean;如果多个@RetryableTopic注解用于同一主题,它们都应该具有相同的值,否则其中一个注解将应用于该主题的所有侦听器,而其他注解的值将被忽略。

4.4.4. 特性

大多数功能都可用于@RetryableTopic注释和RetryTopicConfiguration豆。spring-doc.cadn.net.cn

回退配置

BackOff 配置依赖于BackOffPolicy接口从Spring Retry项目。spring-doc.cadn.net.cn

@RetryableTopic(attempts = 5,
    backoff = @Backoff(delay = 1000, multiplier = 2, maxDelay = 5000))
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
        // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackoff(3000)
            .maxAttempts(4)
            .build();
}

您还可以提供 Spring Retry 的SleepingBackOffPolicy接口:spring-doc.cadn.net.cn

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .customBackOff(new MyCustomBackOffPolicy())
            .maxAttempts(5)
            .build();
}
默认回退策略为FixedBackOffPolicy最多尝试 3 次,间隔为 1000 毫秒。
默认最大延迟为 30 秒ExponentialBackOffPolicy. 如果退避策略需要值大于该值的延迟,请相应地调整 maxDelay 属性。
第一次尝试计入maxAttempts,因此如果您提供maxAttempts值为 4,则将有原始尝试加上 3 次重试。
单主题固定延迟重试

如果您使用的是固定延迟策略,例如FixedBackOffPolicyNoBackOffPolicy您可以使用单个主题来完成非阻塞重试。 本主题将以提供的或默认后缀为后缀,并且不会附加索引或延迟值。spring-doc.cadn.net.cn

@RetryableTopic(backoff = @Backoff(2000), fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
        // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackoff(3000)
            .maxAttempts(5)
            .useSingleTopicForFixedDelays()
            .build();
}
默认行为是为每次尝试创建单独的重试主题,并附加其索引值:retry-0、retry-1、...
全局超时

您可以设置重试过程的全局超时。 如果达到该时间,则下次使用者抛出异常时,消息将直接转到 DLT,或者如果没有可用的 DLT,则仅结束处理。spring-doc.cadn.net.cn

@RetryableTopic(backoff = @Backoff(2000), timeout = 5000)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
        // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackoff(2000)
            .timeoutAfter(5000)
            .build();
}
默认值是没有设置超时,这也可以通过提供 -1 作为 timout 值来实现。
异常分类器

您可以指定要重试的异常和不重试的异常。 您还可以将其设置为遍历原因以查找嵌套异常。spring-doc.cadn.net.cn

@RetryableTopic(include = {MyRetryException.class, MyOtherRetryException.class}, traversingCauses = true)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
        throw new RuntimeException(new MyRetryException()); // Will retry
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .notRetryOn(MyDontRetryException.class)
            .create(template);
}
默认行为是重试所有异常,而不是遍历原因。

从 2.8.3 开始,有一个致命异常的全局列表,这将导致记录无需任何重试即可发送到 DLT。 有关致命异常的默认列表,请参阅 DefaultErrorHandler。 您可以使用以下命令在此列表中添加或删除例外:spring-doc.cadn.net.cn

@Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
public DefaultDestinationTopicResolver topicResolver(ApplicationContext applicationContext,
                                               @Qualifier(RetryTopicInternalBeanNames
                                                       .INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
    DefaultDestinationTopicResolver ddtr = new DefaultDestinationTopicResolver(clock, applicationContext);
    ddtr.addNotRetryableExceptions(MyFatalException.class);
    ddtr.removeNotRetryableException(ConversionException.class);
    return ddtr;
}
要禁用致命异常的分类,请使用setClassifications方法DefaultDestinationTopicResolver.
包含和排除主题

您可以决定哪些主题将由RetryTopicConfigurationbean 通过 .includeTopic(String topic)、.includeTopics(Collection<String> topics) .excludeTopic(String topic) 和 .excludeTopics(Collection<String> topics) 方法。spring-doc.cadn.net.cn

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .includeTopics(List.of("my-included-topic", "my-other-included-topic"))
            .create(template);
}

@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .excludeTopic("my-excluded-topic")
            .create(template);
}
默认行为是包含所有主题。
主题自动创建

除非另有说明,否则框架将使用NewTopicKafkaAdmin豆。 您可以指定将用于创建主题的分区数和复制因子,并且可以关闭此功能。spring-doc.cadn.net.cn

请注意,如果您不使用 Spring Boot,则必须提供 KafkaAdmin bean 才能使用此功能。
@RetryableTopic(numPartitions = 2, replicationFactor = 3)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
        // ... message processing
}

@RetryableTopic(autoCreateTopics = false)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
        // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .autoCreateTopicsWith(2, 3)
            .create(template);
}

@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .doNotAutoCreateRetryTopics()
            .create(template);
}
默认情况下,主题会自动创建一个分区和 1 的复制因子。
故障标头管理

在考虑如何管理故障标头(原始标头和异常标头)时,框架将委托给DeadLetterPublishingRecover以决定是追加还是替换标头。spring-doc.cadn.net.cn

默认情况下,它显式地将appendOriginalHeadersfalse和叶子stripPreviousExceptionHeaders设置为DeadLetterPublishingRecover.spring-doc.cadn.net.cn

这意味着默认配置中仅保留第一个“原始”和最后一个异常标头。这是为了避免在涉及许多重试步骤时创建过大的消息(例如,由于堆栈跟踪标头)。spring-doc.cadn.net.cn

有关详细信息,请参阅管理死信记录标题spring-doc.cadn.net.cn

若要重新配置框架以对这些属性使用不同的设置,请将标准DeadLetterPublishingRecovererFactorybean 通过添加一个recovererCustomizer:spring-doc.cadn.net.cn

@Bean(RetryTopicInternalBeanNames.DEAD_LETTER_PUBLISHING_RECOVERER_FACTORY_BEAN_NAME)
DeadLetterPublishingRecovererFactory factory(DestinationTopicResolver resolver) {
    DeadLetterPublishingRecovererFactory factory = new DeadLetterPublishingRecovererFactory(resolver);
    factory.setDeadLetterPublishingRecovererCustomizer(dlpr -> {
        dlpr.appendOriginalHeaders(true);
        dlpr.setStripPreviousExceptionHeaders(false);
    });
    return factory;
}

从 2.8.4 版本开始,如果您希望添加自定义标头(除了工厂添加的重试信息标头外,还可以添加headersFunction到工厂 -factory.setHeadersFunction((rec, ex) → { …​ })spring-doc.cadn.net.cn

4.4.5. 结合阻塞和非阻塞重试

从 2.8.4 开始,您可以将框架配置为同时使用阻塞和非阻塞重试。 例如,您可以有一组异常,这些异常也可能会在下一个记录上触发错误,例如DatabaseAccessException,因此您可以在将同一记录发送到重试主题之前重试几次,或者直接发送到 DLT。spring-doc.cadn.net.cn

要配置阻止重试,您只需通过addRetryableExceptions方法ListenerContainerFactoryConfigurerbean 如下。 默认策略为FixedBackOff,有九次重试,它们之间没有延迟。 或者,您可以提供自己的退避策略。spring-doc.cadn.net.cn

@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
                                               DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
                                               @Qualifier(RetryTopicInternalBeanNames
                                                       .INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
    ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
    lcfc.setBlockingRetryableExceptions(MyBlockingRetryException.class, MyOtherBlockingRetryException.class);
    lcfc.setBlockingRetriesBackOff(new FixedBackOff(500, 5)); // Optional
    return lcfc;
}

如果需要进一步调整异常分类,可以设置自己的异常分类Map的分类通过ListenerContainerFactoryConfigurer.setErrorHandlerCustomizer()方法,例如:spring-doc.cadn.net.cn

lcfc.setErrorHandlerCustomizer(ceh -> ((DefaultErrorHandler) ceh).setClassifications(myClassificationsMap, myDefaultValue));
结合全局可重试主题的致命异常分类,可以为所需的任何行为配置框架,例如让某些异常同时触发阻止和非阻止重试,仅触发一种或另一种类型,或者直接转到 DLT,而无需任何类型的重试。

下面是两种配置协同工作的示例:spring-doc.cadn.net.cn

@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
                                            DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
                                            @Qualifier(RetryTopicInternalBeanNames
                                                    .INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
    ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
    lcfc.setBlockingRetryableExceptions(ShouldRetryOnlyBlockingException.class, ShouldRetryViaBothException.class);
    return lcfc;
}

@Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
public DefaultDestinationTopicResolver ddtr(ApplicationContext applicationContext,
                                            @Qualifier(RetryTopicInternalBeanNames
                                                    .INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
    DefaultDestinationTopicResolver ddtr = new DefaultDestinationTopicResolver(clock, applicationContext);
    ddtr.addNotRetryableExceptions(ShouldRetryOnlyBlockingException.class, ShouldSkipBothRetriesException.class);
    return ddtr;
}

在此示例中:spring-doc.cadn.net.cn

  • ShouldRetryOnlyBlockingException.class将仅通过阻止重试,如果所有重试都失败,将直接转到 DLT。spring-doc.cadn.net.cn

  • ShouldRetryViaBothException.class将通过阻止重试,如果所有阻止重试都失败,则将转发到下一个重试主题以进行另一组尝试。spring-doc.cadn.net.cn

  • ShouldSkipBothRetriesException.class永远不会以任何方式重试,如果第一次处理尝试失败,将直接转到 DLT。spring-doc.cadn.net.cn

请注意,阻止重试行为是允许列表 - 您添加您确实想要以这种方式重试的异常;而非阻塞重试分类适用于致命异常,因此是拒绝列表 - 您添加您不想执行非阻塞重试的异常,而是直接发送到 DLT。
非阻塞异常分类行为还取决于特定主题的配置。

4.4.6. 主题命名

重试主题和 DLT 的命名方法是在主主题后缀为提供的或默认值,并附加该主题的延迟或索引。spring-doc.cadn.net.cn

“我的主题” → “我的主题重试-0”、“我的主题重试-1”, ..., “我的主题-dlt”spring-doc.cadn.net.cn

“my-other-topic” → “my-topic-myRetrySuffix-1000”、“my-topic-myRetrySuffix-2000”, ..., “my-topic-myDltSuffix”。spring-doc.cadn.net.cn

重试主题和 DLT 后缀

您可以指定重试和 dlt 主题将使用的后缀。spring-doc.cadn.net.cn

@RetryableTopic(retryTopicSuffix = "-my-retry-suffix", dltTopicSuffix = "-my-dlt-suffix")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
        // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .retryTopicSuffix("-my-retry-suffix")
            .dltTopicSuffix("-my-dlt-suffix")
            .create(template);
}
默认后缀是“-retry”和“-dlt”,分别用于重试主题和 dlt。
附加主题的索引或延迟

可以在后缀后附加主题的索引或延迟值。spring-doc.cadn.net.cn

@RetryableTopic(topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
        // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .suffixTopicsWithIndexValues()
            .create(template);
    }
默认行为是以延迟值为后缀,但具有多个主题的固定延迟配置除外,在这种情况下,主题以主题的索引为后缀。
自定义命名策略

更复杂的命名策略可以通过注册实现RetryTopicNamesProviderFactory.默认实现是SuffixingRetryTopicNamesProviderFactory可以通过以下方式注册不同的实现:spring-doc.cadn.net.cn

@Bean
public RetryTopicNamesProviderFactory myRetryNamingProviderFactory() {
    return new CustomRetryTopicNamesProviderFactory();
}

例如,除了标准后缀之外,以下实现还为 retry/dl 主题名称添加了前缀:spring-doc.cadn.net.cn

public class CustomRetryTopicNamesProviderFactory implements RetryTopicNamesProviderFactory {

	@Override
    public RetryTopicNamesProvider createRetryTopicNamesProvider(
                DestinationTopic.Properties properties) {

        if(properties.isMainEndpoint()) {
            return new SuffixingRetryTopicNamesProvider(properties);
        }
        else {
            return new SuffixingRetryTopicNamesProvider(properties) {

                @Override
                public String getTopicName(String topic) {
                    return "my-prefix-" + super.getTopicName(topic);
                }

            };
        }
    }

}

4.4.7. Dlt策略

该框架提供了一些使用 DLT 的策略。可以提供 DLT 处理方法,使用默认日志记录方法,或者根本没有 DLT。此外,您还可以选择如果 DLT 处理失败会发生什么。spring-doc.cadn.net.cn

DLT处理方法

您可以指定用于处理主题的 Dlt 的方法,以及该处理失败时的行为。spring-doc.cadn.net.cn

为此,您可以使用@DltHandler类的方法中的注释,其中包含@RetryableTopic注释。 请注意,相同的方法将用于所有@RetryableTopic该类中的 Commentted 方法。spring-doc.cadn.net.cn

@RetryableTopic
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
        // ... message processing
}

@DltHandler
public void processMessage(MyPojo message) {
// ... message processing, persistence, etc
}

DLT 处理程序方法也可以通过 RetryTopicConfigurationBuilder.dltHandlerMethod(String, String) 方法提供,将应处理 DLT 消息的 bean 名称和方法名称作为参数传递。spring-doc.cadn.net.cn

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .dltProcessor("myCustomDltProcessor", "processDltMessage")
            .create(template);
}

@Component
public class MyCustomDltProcessor {

    private final MyDependency myDependency;

    public MyCustomDltProcessor(MyDependency myDependency) {
        this.myDependency = myDependency;
    }

    public void processDltMessage(MyPojo message) {
       // ... message processing, persistence, etc
    }
}
如果未提供 DLT 处理程序,则使用默认的 RetryTopicConfigurer.LoggingDltListenerHandlerMethod。

从版本 2.8 开始,如果您根本不想在此应用程序中使用 DLT,包括通过默认处理程序(或您希望延迟使用),您可以控制 DLT 容器是否启动,而与容器工厂的autoStartup财产。spring-doc.cadn.net.cn

使用@RetryableTopic注释,请将autoStartDltHandler属性设置为false;使用配置生成器时,请使用.autoStartDltHandler(false).spring-doc.cadn.net.cn

稍后可以通过KafkaListenerEndpointRegistry.spring-doc.cadn.net.cn

DLT 故障行为

如果 DLT 处理失败,则有两种可能的行为可用:ALWAYS_RETRY_ON_ERRORFAIL_ON_ERROR.spring-doc.cadn.net.cn

在前者中,记录被转发回 DLT 主题,因此它不会阻止其他 DLT 记录的处理。 在后者中,使用者在不转发消息的情况下结束执行。spring-doc.cadn.net.cn

@RetryableTopic(dltProcessingFailureStrategy =
			DltStrategy.FAIL_ON_ERROR)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
        // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .dltProcessor(MyCustomDltProcessor.class, "processDltMessage")
            .doNotRetryOnDltFailure()
            .create(template);
}
默认行为是ALWAYS_RETRY_ON_ERROR.
从 2.8.3 版本开始,ALWAYS_RETRY_ON_ERROR如果记录导致引发致命异常,则不会将记录路由回 DLT, 例如DeserializationException因为,通常,此类异常总是会被抛出。

被视为致命的例外情况是:spring-doc.cadn.net.cn

您可以使用DestinationTopicResolver豆。spring-doc.cadn.net.cn

有关详细信息,请参阅异常分类器spring-doc.cadn.net.cn

配置无 DLT

该框架还提供了不为主题配置 DLT 的可能性。 在这种情况下,重审用尽后,处理就结束了。spring-doc.cadn.net.cn

@RetryableTopic(dltProcessingFailureStrategy =
			DltStrategy.NO_DLT)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
        // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .doNotConfigureDlt()
            .create(template);
}

4.4.8. 指定 ListenerContainerFactory

默认情况下,RetryTopic 配置将使用@KafkaListener注释,但您可以指定一个不同的注释来创建重试主题和 DLT 侦听器容器。spring-doc.cadn.net.cn

对于@RetryableTopic注释,您可以提供工厂的 bean 名称,并使用RetryTopicConfigurationbean 中,您可以提供 bean 名称或实例本身。spring-doc.cadn.net.cn

@RetryableTopic(listenerContainerFactory = "my-retry-topic-factory")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
        // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template,
        ConcurrentKafkaListenerContainerFactory<Integer, MyPojo> factory) {

    return RetryTopicConfigurationBuilder
            .newInstance()
            .listenerFactory(factory)
            .create(template);
}

@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .listenerFactory("my-retry-topic-factory")
            .create(template);
}
从 2.8.3 开始,您可以将相同的工厂用于可重试和不可重试的主题。

如果需要将出厂配置行为恢复到之前的 2.8.3,可以将标准RetryTopicConfigurer豆子和套装useLegacyFactoryConfigurertrue如:spring-doc.cadn.net.cn

@Bean(name = RetryTopicInternalBeanNames.RETRY_TOPIC_CONFIGURER)
public RetryTopicConfigurer retryTopicConfigurer(DestinationTopicProcessor destinationTopicProcessor,
                                                ListenerContainerFactoryResolver containerFactoryResolver,
                                                ListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer,
                                                BeanFactory beanFactory,
                                                RetryTopicNamesProviderFactory retryTopicNamesProviderFactory) {
    RetryTopicConfigurer retryTopicConfigurer = new RetryTopicConfigurer(destinationTopicProcessor, containerFactoryResolver, listenerContainerFactoryConfigurer, beanFactory, retryTopicNamesProviderFactory);
    retryTopicConfigurer.useLegacyFactoryConfigurer(true);
    return retryTopicConfigurer;
}

4.4.9. 更改 KafkaBackOffException 日志记录级别

当重试主题中的消息不应使用时,将KafkaBackOffException被抛出。默认情况下,此类异常记录在DEBUG级别,但您可以通过在ListenerContainerFactoryConfigurer@Configuration类。spring-doc.cadn.net.cn

例如,要将日志记录级别更改为 WARN,您可以添加:spring-doc.cadn.net.cn

@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
public ListenerContainerFactoryConfigurer listenerContainer(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
                                                            DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
                                                            @Qualifier(RetryTopicInternalBeanNames
                                                                    .INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
    ListenerContainerFactoryConfigurer configurer = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
    configurer.setErrorHandlerCustomizer(commonErrorHandler -> ((DefaultErrorHandler) commonErrorHandler).setLogLevel(KafkaException.Level.WARN));
    return configurer;
}