Let me start with a question: Will this Flux break?
public static void main(String[] args) {
Flux.interval(Duration.ofMillis(100))
.concatMap(i -> Mono.just(i)
.delayElement(Duration.ofSeconds(1))
.then(Mono.error(new RuntimeException("DIE!!! " + i)))
)
.onErrorContinue((e, o) -> {
System.out.println("No worries, I have you covered!" + e.getMessage());
})
.blockLast();
}
I've got onErrorContinue()
at the very end, so I should be covered, right? Let's see the output now:
No worries, I have you covered!DIE!!! 0
No worries, I have you covered!DIE!!! 1
No worries, I have you covered!DIE!!! 2
Exception in thread "main" reactor.core.Exceptions$OverflowException: Could not emit tick 32 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)
at reactor.core.Exceptions.failWithOverflow(Exceptions.java:234)
...
onErrorContinue() and onErrorResume() does not solve all problems
Though onErrorContinue()
and onErrorResume()
can handle the RuntimeException
above (by the way if you want to know the difference between onErrorContinue()
and onErrorResume()
check out this other article of mine), yet cannot deal with the above scenario -- the Flux.interval is a hot Flux producing items 10x faster than what the downstream could consume. In this case the program exits, but if your Flux is a listener or running on some background thread, it will just die silently and may leave you scratching your head as why the Flux isn't working with all "proper error handling" in place.
This StackOverflow Question gives a very clear answer on how to deal with such problems. So, depending on your case, the best way in my opinion is to use onBackpressureBuffer()
if the incoming items cannot be dropped, and onBackpressureDrop()
if you just want an ongoing flow of items which the content isn't important (like a heartbeat).
Solution
I put an extra log()
here to better show what is happening:
public static void main(String[] args) {
Flux.interval(Duration.ofMillis(100))
.onBackpressureDrop()
.log()
.concatMap(i -> Mono.just(i)
.delayElement(Duration.ofSeconds(1))
.then(Mono.error(new RuntimeException("DIE!!! " + i)))
)
.onErrorContinue((e, o) -> {
System.out.println("No worries, I have you covered!" + e.getMessage());
})
.blockLast();
}
This will give the following output:
12:33:32.677 [main] INFO reactor.Flux.OnBackpressureDrop.1 - onSubscribe(FluxOnBackpressureDrop.DropSubscriber)
12:33:32.680 [main] INFO reactor.Flux.OnBackpressureDrop.1 - request(32)
12:33:32.785 [parallel-1] INFO reactor.Flux.OnBackpressureDrop.1 - onNext(0)
12:33:32.885 [parallel-1] INFO reactor.Flux.OnBackpressureDrop.1 - onNext(1)
12:33:32.984 [parallel-1] INFO reactor.Flux.OnBackpressureDrop.1 - onNext(2)
12:33:33.085 [parallel-1] INFO reactor.Flux.OnBackpressureDrop.1 - onNext(3)
12:33:33.187 [parallel-1] INFO reactor.Flux.OnBackpressureDrop.1 - onNext(4)
12:33:33.286 [parallel-1] INFO reactor.Flux.OnBackpressureDrop.1 - onNext(5)
12:33:33.383 [parallel-1] INFO reactor.Flux.OnBackpressureDrop.1 - onNext(6)
12:33:33.483 [parallel-1] INFO reactor.Flux.OnBackpressureDrop.1 - onNext(7)
12:33:33.585 [parallel-1] INFO reactor.Flux.OnBackpressureDrop.1 - onNext(8)
12:33:33.684 [parallel-1] INFO reactor.Flux.OnBackpressureDrop.1 - onNext(9)
12:33:33.784 [parallel-1] INFO reactor.Flux.OnBackpressureDrop.1 - onNext(10)
No worries, I have you covered!DIE!!! 0
12:33:33.888 [parallel-1] INFO reactor.Flux.OnBackpressureDrop.1 - onNext(11)
12:33:33.987 [parallel-1] INFO reactor.Flux.OnBackpressureDrop.1 - onNext(12)
12:33:34.083 [parallel-1] INFO reactor.Flux.OnBackpressureDrop.1 - onNext(13)
12:33:34.186 [parallel-1] INFO reactor.Flux.OnBackpressureDrop.1 - onNext(14)
12:33:34.283 [parallel-1] INFO reactor.Flux.OnBackpressureDrop.1 - onNext(15)
12:33:34.385 [parallel-1] INFO reactor.Flux.OnBackpressureDrop.1 - onNext(16)
12:33:34.486 [parallel-1] INFO reactor.Flux.OnBackpressureDrop.1 - onNext(17)
12:33:34.585 [parallel-1] INFO reactor.Flux.OnBackpressureDrop.1 - onNext(18)
12:33:34.687 [parallel-1] INFO reactor.Flux.OnBackpressureDrop.1 - onNext(19)
12:33:34.784 [parallel-1] INFO reactor.Flux.OnBackpressureDrop.1 - onNext(20)
No worries, I have you covered!DIE!!! 1
12:33:34.883 [parallel-1] INFO reactor.Flux.OnBackpressureDrop.1 - onNext(21)
12:33:34.984 [parallel-1] INFO reactor.Flux.OnBackpressureDrop.1 - onNext(22)
12:33:35.086 [parallel-1] INFO reactor.Flux.OnBackpressureDrop.1 - onNext(23)
12:33:35.184 [parallel-1] INFO reactor.Flux.OnBackpressureDrop.1 - onNext(24)
12:33:35.288 [parallel-1] INFO reactor.Flux.OnBackpressureDrop.1 - onNext(25)
12:33:35.387 [parallel-1] INFO reactor.Flux.OnBackpressureDrop.1 - onNext(26)
12:33:35.486 [parallel-1] INFO reactor.Flux.OnBackpressureDrop.1 - onNext(27)
12:33:35.587 [parallel-1] INFO reactor.Flux.OnBackpressureDrop.1 - onNext(28)
12:33:35.684 [parallel-1] INFO reactor.Flux.OnBackpressureDrop.1 - onNext(29)
12:33:35.788 [parallel-1] INFO reactor.Flux.OnBackpressureDrop.1 - onNext(30)
No worries, I have you covered!DIE!!! 2
12:33:35.886 [parallel-1] INFO reactor.Flux.OnBackpressureDrop.1 - onNext(31)
No worries, I have you covered!DIE!!! 3
No worries, I have you covered!DIE!!! 4
No worries, I have you covered!DIE!!! 5
No worries, I have you covered!DIE!!! 6
No worries, I have you covered!DIE!!! 7
No worries, I have you covered!DIE!!! 8
No worries, I have you covered!DIE!!! 9
No worries, I have you covered!DIE!!! 10
No worries, I have you covered!DIE!!! 11
No worries, I have you covered!DIE!!! 12
No worries, I have you covered!DIE!!! 13
No worries, I have you covered!DIE!!! 14
No worries, I have you covered!DIE!!! 15
No worries, I have you covered!DIE!!! 16
No worries, I have you covered!DIE!!! 17
No worries, I have you covered!DIE!!! 18
No worries, I have you covered!DIE!!! 19
No worries, I have you covered!DIE!!! 20
No worries, I have you covered!DIE!!! 21
No worries, I have you covered!DIE!!! 22
12:33:55.938 [parallel-4] INFO reactor.Flux.OnBackpressureDrop.1 - request(24)
12:33:55.988 [parallel-1] INFO reactor.Flux.OnBackpressureDrop.1 - onNext(232)
12:33:56.086 [parallel-1] INFO reactor.Flux.OnBackpressureDrop.1 - onNext(233)
...
What I wanted to show is that,
- every Flux step has an internal buffer of 32 items
- the Flux now drops whatever it cannot take to process
- yet Flux doesn't request for items immediately even when there are empty spaces in the buffer. Instead in this case it waits and requests 24 at once (which, at the time is the item 232 onwards). Not sure about how it comes up with 24 (a quarter full? Time interval?) But I'll leave that for now.
Hope you guys learnt something. Happy programming!
Comments (0)