Stream to lagom service via an API gateway

150 views
Skip to first unread message

Dan Markwat

unread,
Oct 12, 2017, 9:52:35 AM10/12/17
to Lagom Framework Users
Hello!

I'm using the online auction scala project as a reference for a project I'm working on but I came across a situation I can't find clear documentation or examples on: handling a stream to a lagom service via the web gateway.  It looks like it should be simple enough, but as Play operates on websockets using only a Flow, I'm stuck understanding how to connect it to a Lagom service client which in this case accepts a Source on the method call.  I can't find any good references to using Flows as Sources or anything of that nature.

Is there something I should be doing differently?  I've thought of bypassing the API gateway (probably not ideal), using a ws client to talk to the service and connect the flow that way, or even writing a custom Flow or Source to try and handle it, but this seems like it might already be solved for and just not widely known (or I missed it).

Service method:

def observeStream: ServiceCall[Source[String, NotUsed], NotUsed]

API gateway endpoint:

def observe: WebSocket = WebSocket.accept { request =>
Flow[String].via(sessionService.observeStream) // ???
Flow[String].via(sessionService.observeStream.invoke(???)) // ???
}

Thoughts or ideas?  Is there a different method I should be looking for or a different approach than to stream via the API gateway?

Dan

Marc-Antoine Nüssli

unread,
Feb 22, 2018, 4:07:15 PM2/22/18
to Lagom Framework Users
Hi Dan,

I don't know if you found a solution for this but I actually faced a similar issue when trying to a connect a Lagom stream service with an akka-http websocket (which also works with  Flow)
I ended up with a solution that connects a SinkQueue to a SourceQueue in order to construct a Flow out of a function that transforms a Source in a Source. It is not very nice but it works.

  def sourceTransformToFlow[V1,V2](f: Source[V1,akka.NotUsed] => Future[Source[V2,akka.NotUsed]])(implicit ec:ExecutionContext):Flow[V1,V2,akka.NotUsed]={
    val queue = Promise[akka.stream.scaladsl.SourceQueueWithComplete[V1]]
    val outSource=
      Source.queue[V1](0, akka.stream.OverflowStrategy.backpressure).mapMaterializedValue{v => queue.complete(scala.util.Success(v)); akka.NotUsed}
    val inSink=
      Sink.queue[V1]().mapMaterializedValue{sinkQ => 
        //pull next value from sink queue and offer to source queue and recall again recursively
        def pullAndOffer():Unit=
          sinkQ.pull.onComplete{
            case scala.util.Success(Some(elem)) =>  
              queue.future.flatMap(_.offer(elem)).onComplete{
                case scala.util.Success(akka.stream.QueueOfferResult.Enqueued) =>
                  pullAndOffer
                case scala.util.Success(notEnqueued) =>
                  sinkQ.cancel
                case scala.util.Failure(err) => 
                  sinkQ.cancel
              }
            case scala.util.Success(None) => queue.future.onSuccess{case v => v.complete()}
            case scala.util.Failure(err) => queue.future.onSuccess{case v => v.fail(err)}
          }
        pullAndOffer
      }
    val flow= (akka.stream.scaladsl.Flow.fromSinkAndSource(inSink,Source.fromFutureSource(f(outSource))))
    flow
  } 

Best,
ma
Reply all
Reply to author
Forward
0 new messages