| This version is still in development and is not considered stable yet. For the latest stable version, please use Spring Integration 6.3.4! | 
| This version is still in development and is not considered stable yet. For the latest stable version, please use Spring Integration 6.3.4! | 
Starting with version 5.5, the ConsumerEndpointSpec provides a reactive() configuration property with an optional customizer Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>.
This option configures the target endpoint as a ReactiveStreamsConsumer instance, independently of the input channel type, which is converted to a Flux via IntegrationReactiveUtils.messageChannelToFlux().
The provided function is used from the Flux.transform() operator to customize (publishOn(), log(), doOnNext() etc.) a reactive stream source from the input channel.
The following example demonstrates how to change the publishing thread from the input channel independently of the final subscriber and producer to that DirectChannel:
@Bean
public IntegrationFlow reactiveEndpointFlow() {
    return IntegrationFlow
            .from("inputChannel")
            .transformWith(t -> t
                              .<String, Integer>transformer(Integer::parseInt)
                              .reactive(flux -> flux.publishOn(Schedulers.parallel()))
            )
            .get();
}See Reactive Streams Support for more information.