该版本仍在开发中,尚未被视为稳定。对于最新稳定版本,请使用 spring-cloud-stream 5.0.0!spring-doc.cadn.net.cn

StreamsBuilderFactoryBean 配置器

通常需要对StreamsBuilderFactoryBean这就产生了卡夫卡流对象。 基于Spring Kafka提供的底层支持,该活页夹允许你自定义StreamsBuilderFactoryBean. 你可以使用StreamsBuilderFactoryBeanConfigurer以定制StreamsBuilderFactoryBean本身。 然后,一旦你获得了访问权限StreamsBuilderFactoryBean通过这个配置器,你可以自定义相应的配置卡夫卡流KafkaStreamsCustomzier. 这两个自定义器都是Spring for Apache Kafka项目的一部分。spring-doc.cadn.net.cn

这里有一个使用该条件的例子StreamsBuilderFactoryBeanConfigurer.spring-doc.cadn.net.cn

@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
    return sfb -> sfb.setStateListener((newState, oldState) -> {
         //Do some action here!
    });
}

上面展示了你可以做的自定义StreamsBuilderFactoryBean. 你基本上可以调用任何可用的突变作StreamsBuilderFactoryBean来定制它。 这个定制器会在开始制作工厂豆子前被活页夹调用。spring-doc.cadn.net.cn

一旦你获得了访问权限StreamsBuilderFactoryBean你也可以自定义底层卡夫卡流对象。 这里有一个实现这一目标的蓝图。spring-doc.cadn.net.cn

@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
    return factoryBean -> {
        factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
            @Override
            public void customize(KafkaStreams kafkaStreams) {
                kafkaStreams.setUncaughtExceptionHandler((t, e) -> {

                });
            }
        });
    };
}

KafkaStreamsCustomizer将被StreamsBuilderFactoryBeabn就在标的卡夫卡流开始吧。spring-doc.cadn.net.cn

只能有一个StreamsBuilderFactoryBeanConfigurer在整个申请中。 那么,当每个处理器分别备份时,我们如何考虑多个 Kafka Streams 处理器的情况StreamsBuilderFactoryBean对象? 在这种情况下,如果这些处理器需要自定义不同,应用程序需要基于应用ID应用某种过滤。spring-doc.cadn.net.cn

@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
    return factoryBean -> {
        if (factoryBean.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
                .equals("processor1-application-id")) {
            factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
                @Override
                public void customize(KafkaStreams kafkaStreams) {
                    kafkaStreams.setUncaughtExceptionHandler((t, e) -> {

                    });
                }
            });
        }
    };

使用 StreamsBuilderFactoryBeanConfigurer 注册全局状态存储

如上所述,该活页夹并不提供一流的全局状态存储注册功能。 为此,你需要通过以下方式使用自定义工具StreamsBuilderFactoryBeanConfigurer. 以下是实现方式的方法。spring-doc.cadn.net.cn

@Bean
public StreamsBuilderFactoryBeanConfigurer customizer() {
    return streamsBuilderFactoryBean -> {
        try {
            streamsBuilderFactoryBean.setInfrastructureCustomizer(new KafkaStreamsInfrastructureCustomizer() {
                  @Override
                  public void configureBuilder(StreamsBuilder builder) {
                      builder.addGlobalStore(
                              ...
                      );
                  }
              });
        }
        catch (Exception e) {

        }
    };
}

有没有什么自定义功能StreamsBuilder必须通过KafkaStreamsInfrastructureCustomizer如上所示。 如果StreamsBuilderFactoryBean#getObject()被召集以获取访问权限StreamsBuilder对象,可能在初始化时无法作为豆子工作,从而产生循环依赖问题。spring-doc.cadn.net.cn

如果你有多个处理器,你要把全局状态存储附加在右侧StreamsBuilder通过过滤掉对方StreamsBuilderFactoryBean如上所述,使用应用程序 ID 的对象。spring-doc.cadn.net.cn

使用 StreamsBuilderFactoryBeanConfigurer 注册生产异常处理程序

在错误处理部分,我们指出该活页夹并未提供处理生产异常的第一流方法。 不过情况是这样,你仍然可以使用StreamsBuilderFactoryBean用于注册生产异常处理程序的定制器。见下文。spring-doc.cadn.net.cn

@Bean
public StreamsBuilderFactoryBeanConfigurer configurer() {
    return fb -> {
        fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
                            CustomProductionExceptionHandler.class);
    };
}

同样,如果你有多个处理器,可能需要根据正确的设置StreamsBuilderFactoryBean. 你也可以使用配置属性添加此类生产异常处理程序(详见下文),但如果你选择程序化方法,这也是一个选项。spring-doc.cadn.net.cn