This seems one of the hot searches for Reactor, at least when I type onErrorContinue in Google, onErrorResume would pop up beside it. Let me paste my testing code with some of my interpretations below.
Base Function
This is a simple function to multiple 5 consecutive numbers each by 2, then summing them up, with an exception thrown for i==2:
public static void main(String... args) {
Flux.range(1,5)
.doOnNext(i -> System.out.println("input=" + i))
.map(i -> i == 2 ? i / 0 : i)
.map(i -> i * 2)
.reduce((i,j) -> i+j)
.doOnNext(i -> System.out.println("sum=" + i))
.block();
}
obviously, the output:
input=1
input=2
Exception in thread "main" java.lang.ArithmeticException: / by zero
Just onErrorResume()
First, the code:
public static void main(String... args) {
Flux.range(1,5)
.doOnNext(i -> System.out.println("input=" + i))
.map(i -> i == 2 ? i / 0 : i)
.map(i -> i * 2)
.onErrorResume(err -> {
log.info("onErrorResume");
return Flux.empty();
})
.reduce((i,j) -> i+j)
.doOnNext(i -> System.out.println("sum=" + i))
.block();
}
Then, the output:
input=1
input=2
17:40:47.828 [main] INFO com.example.demo.config.TestRunner - onErrorResume
sum=2
No surprises. As stated in the doc, onErrorResume
replaces the Flux with what is returned, so nothing after and including 2 will be processed. The only thing worth mentioning is that the onErrorResume()
doesn't have to be immediately after the error to catch it. This had me thinking because in the docs, only onErrorContinue()
had the word upstream highlighted but apparently it has some other meaning (as you see below).
Just onErrorContinue()
Code:
public static void main(String... args) {
Flux.range(1,5)
.doOnNext(i -> System.out.println("input=" + i))
.map(i -> i == 2 ? i / 0 : i)
.map(i -> i * 2)
.onErrorContinue((err, i) -> {log.info("onErrorContinue={}", i);})
.reduce((i,j) -> i+j)
.doOnNext(i -> System.out.println("sum=" + i))
.block();
}
and the output:
input=1
input=2
17:43:10.656 [main] INFO com.example.demo.config.TestRunner - onErrorContinue=2
input=3
input=4
input=5
sum=26
Again, no surprises. onErrorContinue
would drop the error element 2, then continue with 3 to 5.
onErrorResume() then onErrorContinue()
Now this is where it becomes interesting. Have a guess: would the onErrorResume()
take the error or onErrorContinue()
?
public static void main(String... args) {
Flux.range(1,5)
.doOnNext(i -> System.out.println("input=" + i))
.map(i -> i == 2 ? i / 0 : i)
.map(i -> i * 2)
.onErrorResume(err -> {
log.info("onErrorResume");
return Flux.empty();
})
.onErrorContinue((err, i) -> {log.info("onErrorContinue={}", i);})
.reduce((i,j) -> i+j)
.doOnNext(i -> System.out.println("sum=" + i))
.block();
}
Now, the output:
input=1
input=2
17:47:05.789 [main] INFO com.example.demo.config.TestRunner - onErrorContinue=2
input=3
input=4
input=5
sum=26
Unexpected or not? The onErrorContinue()
would actually take the error before orErrorResume()
could ever get their hands on it. This may look obvious when both error handlers are in the same function, but when you only have onErrorResume()
in your function and that some caller actually has onErrorContinue()
, it may not be obvious why your onErrorResume()
was not called at the first place.
Mimic onErrorContinue() using onErrorResume()
Some posts suggest that we get rid of onErrorContinue()
at all and just use onErrorResume()
in all scenarios. But the above already shows that they yield different results. So how is this done?
public static void main(String... args) {
Flux.range(1,5)
.doOnNext(i -> System.out.println("input=" + i))
.flatMap(i -> Mono.just(i)
.map(j -> j == 2 ? j / 0 : j)
.map(j -> j * 2)
.onErrorResume(err -> {
System.out.println("onErrorResume");
return Mono.empty();
})
)
.reduce((i,j) -> i+j)
.doOnNext(i -> System.out.println("sum=" + i))
.block();
}
So the essence is to wrap the operations which might throw an error in a flatMap
or concatMap
, and use onErrorResume()
on it. This way, it yields the same result:
input=1
input=2
onErrorResume
input=3
input=4
input=5
sum=26
Mimic onErrorContinue() using onErrorResume() with downstream onErrorContinue()
Sometimes, onErrorContinue()
is put in the caller and you have no control over it. But you still want your onErrorResume()
. What shall you do?
public static void main(String... args) {
Flux.range(1,5)
.doOnNext(i -> System.out.println("input=" + i))
.flatMap(i -> Mono.just(i)
.map(j -> j == 2 ? j / 0 : j)
.map(j -> j * 2)
.onErrorResume(err -> {
System.out.println("onErrorResume");
return Mono.empty();
})
.onErrorStop()
)
.onErrorContinue((err, i) -> {log.info("onErrorContinue={}", i);})
.reduce((i,j) -> i+j)
.doOnNext(i -> System.out.println("sum=" + i))
.block();
}
The secret is to add onErrorStop()
in the end of the onErrorResume()
block -- this would block the onErrorContinue()
so that it would not take up the error before onErrorResume()
does. Try removing onErrorStop()
and you will see onErrorContinue()
pop up as before.
Hope this clears the void. Happy coding!
Extended Reading
If you want to know more, Here's my new article about the limitations of onErrorContinue()
and onErrorResume()
Comments (0)