Materialize future after content negotiation

101 views
Skip to first unread message

Muntis Grube

unread,
Sep 20, 2017, 6:10:24 AM9/20/17
to Akka User List
Hello

Is there way to trick marshallers to accept my Future[HttpResponse] in withFixedContentType ?

To provide large responses from cursor I'm trying to materialize class that extends Iterator. To do that we created  SourceShape[ByteString] that wraps the iterator and corrseponding GraphStageWithMaterializedValue[SinkShape[ByteString], Future[MessageEntity]]
that pools first values from source and decides if HttpEntity.Strict or HttpEntity.Chunked should be creaded. 


  def httpResponse(iterator: Iterator[Data])(implicit ec: ExecutionContext) = {
   
Await.result(Source.fromGraph(sourceGrap(iterator)).runWith(entitySink).map(entity => HttpResponse(entity = entity)), 60 seconds)
 
}

  val toResponse
IteratorJsonMarshaller: ToResponseMarshaller[Iterator[Data]] =
   
Marshaller.withFixedContentType(`application/json`) {
      result
=> httpResponse(result)
   
}

 
implicit val toResponseIteratorMarshaller: ToResponseMarshaller[Iterator[Data]] =
   
Marshaller.oneOf(
      toResponse
IteratorJsonMarshaller,
      toResponseIteratorOdsMarshaller,
      toResponseIteratorExcelMarshaller
    )


As you can probably guess the biggest concern for me is unnecessary Await.result. One of the promising solutions was to drop await and to rewrite marshaller as: 

 def httpResponse(iterator: Iterator[Data])(implicit ec: ExecutionContext) = {
   
Source.fromGraph(sourceGrap(iterator)).runWith(entitySink).map(entity => HttpResponse(entity = entity))
 
}

 val toResponseIteratorJsonMarshaller
: ToResponseMarshaller[Iterator[Data]] = Marshaller{ implicit ec => result =>
   httpResponse
(result).map(response => Marshalling.WithFixedContentType(`application/json`, () => response ) :: Nil)
 
}

Unfortunately project structure is built that way that query is executed and cursor is opened when iterator hasNext or next methods are called. And in this solution it is done before content negotiation is done and causes multiple queries called for each response type at best or multiple queries on one connection at worst. 


Thanks.
Muntis

johannes...@lightbend.com

unread,
Sep 26, 2017, 9:05:03 AM9/26/17
to Akka User List
Hi Mantis,

you are right, `Marshaller.withFixedContentType` is a bit restricted in that regard. Fortunately, it is only a shortcut for simpler cases and the full asynchronous functionality is available for marshallers. Try something like

Marshaller[Iterator[Data], HttpResponse] { implicit ec => it =>
  doSomethingWhichReturnsFutureOfHttpResponse.map { response =>
    Marshalling.WithFixedContentType(contentType, () ⇒ response) :: Nil
  }
}

Cheers,
Johannes

johannes...@lightbend.com

unread,
Sep 26, 2017, 9:13:15 AM9/26/17
to Akka User List
Oops, one should read the whole question before answering... Just saw that you already tried that. Unfortunately, it seems that this is indeed a shortcoming of the current model.

I guess with a bit of fiddling you could try making all of those marshallers marshal to `Future[HttpResponse]` instead of `HttpResponse` and then use something like



val toResponseIteratorJsonMarshaller: Marshaller[Iterator[Data], Future[HttpResponse]] =

    
Marshaller.withFixedContentType(`application/json`) {
      result 
=> httpResponse(result)
    
}


  
implicit val toResponseIteratorMarshaller: Marshaller[Iterator[Data], Future[HttpResponse]] =

    
Marshaller.oneOf(
      toResponse
IteratorJsonMarshaller,
      toResponseIteratorOdsMarshaller,
      toResponseIteratorExcelMarshaller
    )

and then in your route:

val responseFuture =
Marshal(data).toResponseFor(request)(toResponseIteratorMarshaller): // Future[Future[HttpResponse]]

.flatMap(identity)
complete(responseFuture)

Would be interesting to know if that works.

Johannes

Muntis Grube

unread,
Sep 27, 2017, 5:34:11 AM9/27/17
to Akka User List
Thanks. It seams that it worked: 

  type
FutureResponse = Future[HttpResponse]
  type
FutureResponseMarshaller[T] = Marshaller[T, FutureResponse]


 
def httpResponse(iterator: Iterator[Data])(implicit ec: ExecutionContext) = {
   
Source.fromGraph(sourceGrap(iterator)).runWith(entitySink).map(entity => HttpResponse(entity = entity))
 
}


  val toResponseIteratorJsonMarshaller
: FutureResponseMarshaller[Iterator[Data]] =

   
Marshaller.withFixedContentType(`application/json`) {
      result
=> httpResponse(result)
   
}


 
implicit val toResponseIteratorMarshaller: FutureResponseMarshaller[Iterator[Data]] =

   
Marshaller.oneOf(
      toResponseIteratorJsonMarshaller
,
      toResponseIteratorOdsMarshaller
,
      toResponseIteratorExcelMarshaller
   
)


   complete
{
     val result
= ???
     
Marshal(result).to[FutureResponse].flatMap(identity)
   
}

Muntis Grube

unread,
Oct 10, 2017, 11:19:45 AM10/10/17
to Akka User List
Today if found out that content negotiation is not done in Marshal(result).to[__] case. (I should had tests for that)
Only solution I can find is to copy Marshal.toResponseFor method as it is not expandable at all. and add flatMap(identity) there:

 implicit def toFutureResponseMarshallable[A](_value: A)(implicit _marshaller: FutureResponseMarshaller[A]): ToResponseMarshallable = new ToResponseMarshallable{
    type T
= A
   
def value: T = _value
   
implicit def marshaller: ToResponseMarshaller[A] = null

   
override def apply(request: HttpRequest)(implicit ec: ExecutionContext): Future[HttpResponse] = {
     
import akka.http.scaladsl.util.FastFuture._
     
import akka.http.scaladsl.marshalling.Marshal._
      val ctn
= ContentNegotiator(request.headers)
      _marshaller
(value).fast.map { marshallings
        val supportedAlternatives
: List[ContentNegotiator.Alternative] =
          marshallings
.collect {
           
case Marshalling.WithFixedContentType(ct, _) ContentNegotiator.Alternative(ct)
           
case Marshalling.WithOpenCharset(mt, _)       ContentNegotiator.Alternative(mt)
         
}(collection.breakOut)
        val bestMarshal
= {
         
if (supportedAlternatives.nonEmpty) {
            ctn
.pickContentType(supportedAlternatives).flatMap {
             
case best @ (_: ContentType.Binary | _: ContentType.WithFixedCharset | _: ContentType.WithMissingCharset)
                marshallings collectFirst
{ case Marshalling.WithFixedContentType(`best`, marshal) marshal }
             
case best @ ContentType.WithCharset(bestMT, bestCS)
                marshallings collectFirst
{
                 
case Marshalling.WithFixedContentType(`best`, marshal) marshal
                 
case Marshalling.WithOpenCharset(`bestMT`, marshal)     () marshal(bestCS)
               
}
           
}
         
} else None
       
} orElse {
          marshallings collectFirst
{ case Marshalling.Opaque(marshal) marshal }
       
} getOrElse {
         
throw UnacceptableResponseContentTypeException(supportedAlternatives.toSet)
       
}
        bestMarshal
()
     
}.flatMap(identity)
   
}
 
}

Reply all
Reply to author
Forward
0 new messages