Spring Cloud Stream - Functional and Reactive with Multiple outputs

Spring Cloud Stream - Functional and Reactive with Multiple outputs

Written by Ketone Maniac on Jan 30th, 2022 Views Report Post

In a nutshell, the last part of this blog shows a fancy way to split an input Spring Cloud stream into different output streams using reactive functions like the following (taken from the official example ):

@Bean
@SuppressWarnings({ "unchecked", "rawtypes" })
public static Function<Flux<Integer>, 
                        Tuple2<Flux<String>, Flux<String>>> singleInputMultipleOutputs() {
    return flux -> {
        Flux<Integer> connectedFlux = flux.publish().autoConnect(2);
        UnicastProcessor even = UnicastProcessor.create();
        UnicastProcessor odd = UnicastProcessor.create();
        Flux<Integer> evenFlux = connectedFlux.filter(number -> number % 2 == 0)
                                .doOnNext(number -> even.onNext("EVEN: " + number));
        Flux<Integer> oddFlux = connectedFlux.filter(number -> number % 2 != 0)
                                .doOnNext(number -> odd.onNext("ODD: " + number));

        return Tuples.of(Flux.from(even).doOnSubscribe(x -> evenFlux.subscribe()), 
                            Flux.from(odd).doOnSubscribe(x -> oddFlux.subscribe()));
    };
}

Just wanted to give a little warning for those planning to use this feature: I've faced a few issues outlined below as per date of this writing (Jan 2022):

Problems

Use of Tuples

Reactor Tuples only goes from Tuple2 to Tuple8. If you need to split more than 8 output channels, you'll have to work around it -- what I did was to "group the outputs above the 8th output to another topic, and re-feed it back into another function to split it.

Incompatibility of Spring Cloud Sleuth

This is still an ongoing issue. Currently the only way out is to disable it by spring.sleuth.function.enabled=false

Comments (0)