def sourceTransformToFlow[V1,V2](f: Source[V1,akka.NotUsed] => Future[Source[V2,akka.NotUsed]])(implicit ec:ExecutionContext):Flow[V1,V2,akka.NotUsed]={
val queue = Promise[akka.stream.scaladsl.SourceQueueWithComplete[V1]]
val outSource=
Source.queue[V1](0, akka.stream.OverflowStrategy.backpressure).mapMaterializedValue{v => queue.complete(scala.util.Success(v)); akka.NotUsed}
val inSink=
Sink.queue[V1]().mapMaterializedValue{sinkQ =>
//pull next value from sink queue and offer to source queue and recall again recursively
def pullAndOffer():Unit=
sinkQ.pull.onComplete{
case scala.util.Success(Some(elem)) =>
queue.future.flatMap(_.offer(elem)).onComplete{
case scala.util.Success(akka.stream.QueueOfferResult.Enqueued) =>
pullAndOffer
case scala.util.Success(notEnqueued) =>
sinkQ.cancel
case scala.util.Failure(err) =>
sinkQ.cancel
}
case scala.util.Success(None) => queue.future.onSuccess{case v => v.complete()}
case scala.util.Failure(err) => queue.future.onSuccess{case v => v.fail(err)}
}
pullAndOffer
}
val flow= (akka.stream.scaladsl.Flow.fromSinkAndSource(inSink,Source.fromFutureSource(f(outSource))))
flow
}