Understanding Stream.retry()

66 views
Skip to first unread message

Joshua Chaitin-Pollak

unread,
Jan 30, 2015, 3:30:59 PM1/30/15
to reactor-...@googlegroups.com
Hello,

I'm trying to get some error handling working. The Javadoc for Stream.retry() says:

"Create a new Stream whose will re-subscribe its oldest parent-child stream pair. The action will start propagating errors after {@param numRetries}. This is generally useful for retry strategies and fault-tolerant streams."

First off, I'm not sure what "oldest parent-child stream pair" means. parent and child streams seem like a pretty basic concept, but I'm not sure where they are described or what a parent-child stream pair is.

Anyway, I wrote this test (designed to be dropped into the project's StreamTest.java), and I expected the numbers to sum up to 15:

@Test
public void testComposedErrorHandlingWithRetryErrors() throws InterruptedException {
Stream<String> stream = Streams.just("1", "2", "3", "4", "5");

// Fail twice
CountDownLatch failures = new CountDownLatch(2);
final AtomicBoolean exception = new AtomicBoolean(false);
Stream<Integer> s =
stream.map(i -> Integer.parseInt(i))
.observe(i -> {
if (i == 3 && failures.getCount() > 0) {
failures.countDown();
throw new IllegalArgumentException();
}
})
.retry(3)
.reduce(0, (acc, next) -> acc + next)
.when(IllegalArgumentException.class, e -> exception.set(true));

await(4, s, is(15));
assertThat("exception triggered", exception.get(), is(false));
}

However, they actually sum up to 21, which doesn't make much sense to me.

My guess is that retry(3) is actually causing the processing of the stream to restart from the beginning, rather than just retrying the current object on the stream "3". So I am in effect getting:

1 + 2 + (reset) + 1 + 2 + (reset) + 1 + 2 + 3 + 4 + 5

If this is the case, Is there a way to repeat the processing on just "3", without starting over?

If this isn't the case, please explain what is happening here.

Thanks!

Stephane Maldini

unread,
Feb 1, 2015, 3:27:20 PM2/1/15
to Joshua Chaitin-Pollak, reactor-framework
Your observation is totally right, a cold stream (Streams.from...) is fully restarted, we aligned on Rx behavior for this.
A hot stream (Broadcaster.create...)  will just resubscribe and listen for the new values.

A good mix between the full restart behavior and the "ignore error" behavior is to use a BehaviorBroadcaster (like the broadcaster but retains the last signal, just in case :)) or flatMap + a new unique Stream.

--
You received this message because you are subscribed to the Google Groups "reactor-framework" group.
To unsubscribe from this group and stop receiving emails from it, send an email to reactor-framew...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Stephane Maldini | Solutions Architect, CSO EMEA | London | Pivotal

Stephane Maldini

unread,
Feb 1, 2015, 3:28:49 PM2/1/15
to Joshua Chaitin-Pollak, reactor-framework
BTW your test is cool, if you have a github account I encourage you to PR it just to keep track of your contributions. I can copy and paste but its not as explicit and managed as a PR, and I feel like you have totally awesome stuff to contribute !

On Fri, Jan 30, 2015 at 8:30 PM, Joshua Chaitin-Pollak <jo...@assuredlabor.com> wrote:

--
You received this message because you are subscribed to the Google Groups "reactor-framework" group.
To unsubscribe from this group and stop receiving emails from it, send an email to reactor-framew...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Joshua Chaitin-Pollak

unread,
Feb 2, 2015, 3:59:00 PM2/2/15
to Stephane Maldini, reactor-framework
On Sun, Feb 1, 2015 at 3:26 PM, Stephane Maldini <smal...@pivotal.io> wrote:
Your observation is totally right, a cold stream (Streams.from...) is fully restarted, we aligned on Rx behavior for this.
A hot stream (Broadcaster.create...)  will just resubscribe and listen for the new values.

Is this part of the Rx specification? It seems completely counter-intuitive to have one method have different behaviors based on the type of stream being operated on. Also if you are passing streams around for composition, its entirely possible a downstream method might have unintended consequences by unintentionally re-executing a cold stream.

A good mix between the full restart behavior and the "ignore error" behavior is to use a BehaviorBroadcaster (like the broadcaster but retains the last signal, just in case :)) or flatMap + a new unique Stream.

Can you give an example of the BehaviorBroadcaster, or a link to one? I don't think I've seen that before.

About the flatMap + new unique Stream, you are suggesting using a flatMap to effectively encapsulate an operation that might fail around a single item? So if the stream inside the flatMap fails, the remainder of the stream continues to process?

Thats a clever workaround, but it strikes me as a workaround, and pretty verbose, for something that I would expect would be a pretty common operation.

-Josh



--
Joshua Chaitin-Pollak
Chief Technology Officer
Assured Labor, Inc.
Reply all
Reply to author
Forward
0 new messages