PLATFORM
  • Tails

    Create websites with TailwindCSS

  • Blocks

    Design blocks for your website

  • Wave

    Start building the next great SAAS

  • Pines

    Alpine & Tailwind UI Library

  • Auth

    Plug'n Play Authentication for Laravel

  • Designer comingsoon

    Create website designs with AI

  • DevBlog comingsoon

    Blog platform for developers

  • Static

    Build a simple static website

  • SaaS Adventure

    21-day program to build a SAAS

Written By

Spring Cloud Stream - Functional and Reactive with Multiple outputs

Spring Cloud Stream - Functional and Reactive with Multiple outputs

In a nutshell, the last part of this blog shows a fancy way to split an input Spring Cloud stream into different output streams using reactive functions like the following (taken from the official example ):

@Bean
@SuppressWarnings({ "unchecked", "rawtypes" })
public static Function<Flux<Integer>, 
                        Tuple2<Flux<String>, Flux<String>>> singleInputMultipleOutputs() {
    return flux -> {
        Flux<Integer> connectedFlux = flux.publish().autoConnect(2);
        UnicastProcessor even = UnicastProcessor.create();
        UnicastProcessor odd = UnicastProcessor.create();
        Flux<Integer> evenFlux = connectedFlux.filter(number -> number % 2 == 0)
                                .doOnNext(number -> even.onNext("EVEN: " + number));
        Flux<Integer> oddFlux = connectedFlux.filter(number -> number % 2 != 0)
                                .doOnNext(number -> odd.onNext("ODD: " + number));

        return Tuples.of(Flux.from(even).doOnSubscribe(x -> evenFlux.subscribe()), 
                            Flux.from(odd).doOnSubscribe(x -> oddFlux.subscribe()));
    };
}

Just wanted to give a little warning for those planning to use this feature: I've faced a few issues outlined below as per date of this writing (Jan 2022):

Problems

Use of Tuples

Reactor Tuples only goes from Tuple2 to Tuple8. If you need to split more than 8 output channels, you'll have to work around it -- what I did was to "group the outputs above the 8th output to another topic, and re-feed it back into another function to split it.

Incompatibility of Spring Cloud Sleuth

This is still an ongoing issue. Currently the only way out is to disable it by spring.sleuth.function.enabled=false

Comments (0)

loading comments