Hello,
I'm trying to get some error handling working. The Javadoc for Stream.retry() says:
"Create a new Stream whose will re-subscribe its oldest parent-child stream pair. The action will start propagating errors after {@param numRetries}. This is generally useful for retry strategies and fault-tolerant streams."
First off, I'm not sure what "oldest parent-child stream pair" means. parent and child streams seem like a pretty basic concept, but I'm not sure where they are described or what a parent-child stream pair is.
Anyway, I wrote this test (designed to be dropped into the project's StreamTest.java), and I expected the numbers to sum up to 15:
@Test
public void testComposedErrorHandlingWithRetryErrors() throws InterruptedException {
Stream<String> stream = Streams.just("1", "2", "3", "4", "5");
// Fail twice
CountDownLatch failures = new CountDownLatch(2);
final AtomicBoolean exception = new AtomicBoolean(false);
Stream<Integer> s =
stream.map(i -> Integer.parseInt(i))
.observe(i -> {
if (i == 3 && failures.getCount() > 0) {
failures.countDown();
throw new IllegalArgumentException();
}
})
.retry(3)
.reduce(0, (acc, next) -> acc + next)
.when(IllegalArgumentException.class, e -> exception.set(true));
await(4, s, is(15));
assertThat("exception triggered", exception.get(), is(false));
}
However, they actually sum up to 21, which doesn't make much sense to me.
My guess is that retry(3) is actually causing the processing of the stream to restart from the beginning, rather than just retrying the current object on the stream "3". So I am in effect getting:
1 + 2 + (reset) + 1 + 2 + (reset) + 1 + 2 + 3 + 4 + 5
If this is the case, Is there a way to repeat the processing on just "3", without starting over?
If this isn't the case, please explain what is happening here.
Thanks!