In a nutshell, the last part of this blog shows a fancy way to split an input Spring Cloud stream into different output streams using reactive functions like the following (taken from the official example ):
@Bean
@SuppressWarnings({ "unchecked", "rawtypes" })
public static Function<Flux<Integer>,
Tuple2<Flux<String>, Flux<String>>> singleInputMultipleOutputs() {
return flux -> {
Flux<Integer> connectedFlux = flux.publish().autoConnect(2);
UnicastProcessor even = UnicastProcessor.create();
UnicastProcessor odd = UnicastProcessor.create();
Flux<Integer> evenFlux = connectedFlux.filter(number -> number % 2 == 0)
.doOnNext(number -> even.onNext("EVEN: " + number));
Flux<Integer> oddFlux = connectedFlux.filter(number -> number % 2 != 0)
.doOnNext(number -> odd.onNext("ODD: " + number));
return Tuples.of(Flux.from(even).doOnSubscribe(x -> evenFlux.subscribe()),
Flux.from(odd).doOnSubscribe(x -> oddFlux.subscribe()));
};
}
Just wanted to give a little warning for those planning to use this feature: I've faced a few issues outlined below as per date of this writing (Jan 2022):
Problems
Use of Tuples
Reactor Tuples only goes from Tuple2 to Tuple8. If you need to split more than 8 output channels, you'll have to work around it -- what I did was to "group the outputs above the 8th output to another topic, and re-feed it back into another function to split it.
Incompatibility of Spring Cloud Sleuth
This is still an ongoing issue. Currently the only way out is to disable it by
spring.sleuth.function.enabled=false
Comments (0)