I am trying to create a polling flow which feeds the results of the last update into the next update (much like the SSE example here
https://youtu.be/-7OyuTMgI1I). However, I'd like to add an exponential backoff on failed requests. The new RestartSource/Flow/Sink seems like the correct fit, however I can't seem to get it to work in practice. The strawman example I have is
val flow = Flow[Long].mapAsyncUnordered(1) { lastIndex =>
val request = RestartFlow.withBackoff(1.second, 10.seconds, .20)(() => {
Flow[Long].mapAsyncUnordered(1) { lastIndex =>
val p = Promise[WatchResult]
pool ! HAConnectionPool.Watch(lastIndex, p)
p.future
}
})
Source.repeat(lastIndex).via(request).runWith(Sink.head)
}
The problem with this however is that mapAsyncUnordered() will always pull the upstream when the future completes, even if there is no downstream demand, causing 2 requests to be made instead of one. I thought about using RestartSource, but it appears that it will keep resetting the Source.single on completeStage(), which is undesirable.
Thoughts?