生产者拦截器在Spring中管理

从 3.0.0 版本开始,对于生产者拦截器,你可以让 Spring 直接管理它作为一个 bean,而不是将拦截器类名提供给 Apache Kafka 生产者配置。 如果你选择这种方法,那么你需要将此生产者拦截器设置为 KafkaTemplate。 以下示例使用上面的 MyProducerInterceptor,但已更改为不使用内部的配置属性。spring-doc.cadn.net.cn

public class MyProducerInterceptor implements ProducerInterceptor<String, String> {

    private final SomeBean bean;

    public MyProducerInterceptor(SomeBean bean) {
        this.bean = bean;
    }

    @Override
    public void configure(Map<String, ?> configs) {
    }

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        this.bean.someMethod("producer interceptor");
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    }

    @Override
    public void close() {
    }

}
@Bean
public MyProducerInterceptor myProducerInterceptor(SomeBean someBean) {
  return new MyProducerInterceptor(someBean);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf, MyProducerInterceptor myProducerInterceptor) {
   KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(pf);
   kafkaTemplate.setProducerInterceptor(myProducerInterceptor);
}

在记录发送之前,生产者拦截器的onSend方法会被调用。 一旦服务器在发布数据后发送确认,那么onAcknowledgement方法会被调用。 onAcknowledgement在生产者调用任何用户回调之前被调用。spring-doc.cadn.net.cn

如果通过 Spring 管理的多个此类生产者拦截器需要在 KafkaTemplate 上应用,则需要使用 CompositeProducerInterceptor 代替。 CompositeProducerInterceptor 允许按添加顺序为各个生产者拦截器进行添加。 底层 ProducerInterceptor 实现的方法会按照它们被添加到 CompositeProducerInterceptor 的顺序进行调用。spring-doc.cadn.net.cn