RestartSource/Flow/Sink practical examples

116 views
Skip to first unread message

Jeff

unread,
Oct 5, 2017, 3:29:07 PM10/5/17
to Akka User List
I am trying to create a polling flow which feeds the results of the last update into the next update (much like the SSE example here https://youtu.be/-7OyuTMgI1I). However, I'd like to add an exponential backoff on failed requests. The new RestartSource/Flow/Sink seems like the correct fit, however I can't seem to get it to work in practice. The strawman example I have is 

val flow = Flow[Long].mapAsyncUnordered(1) { lastIndex =>

val request = RestartFlow.withBackoff(1.second, 10.seconds, .20)(() => {
Flow[Long].mapAsyncUnordered(1) { lastIndex =>
val p = Promise[WatchResult]
pool ! HAConnectionPool.Watch(lastIndex, p)
p.future
}
})

Source.repeat(lastIndex).via(request).runWith(Sink.head)
}

The problem with this however is that mapAsyncUnordered() will always pull the upstream when the future completes, even if there is no downstream demand, causing 2 requests to be made instead of one. I thought about using RestartSource, but it appears that it will keep resetting the Source.single on completeStage(), which is undesirable. 

Thoughts?

Jeff

unread,
Oct 9, 2017, 12:16:41 PM10/9/17
to Akka User List
bump

Jeff

unread,
Oct 13, 2017, 5:42:12 PM10/13/17
to Akka User List
Still trying to figure out how I can use the new RestartSink/Flow/Source to retry an Http request in a Stream. Any thoughts?


On Thursday, October 5, 2017 at 12:29:07 PM UTC-7, Jeff wrote:
Reply all
Reply to author
Forward
0 new messages