Hi,
It's written in the ScalaDoc of the MergeHub.source method that:
If one of the inputs fails the Sink, the Source is failed in turn
But, in the MergeHub source code, the onUpstreamFailure method only throw an exception:
override def onUpstreamFailure(ex: Throwable): Unit = {
throw new MergeHub.ProducerFailed("Upstream producer failed with exception, " +
"removing from MergeHub now", ex)
}
So maybe I'm missing something, but why failing an input will fails the Source?
I need the MergeHub to fails when an input fails, but it doesn't seems to work.
Thanks,
Victor