Is this the correct flow to retrieve bytes from an HttpResponse?

283 views
Skip to first unread message

john....@gmail.com

unread,
Jul 30, 2015, 2:18:18 AM7/30/15
to Akka User List
// this is part of a BidiFlow

FlowShape<Tuple2<Try<HttpResponse>, RequestResult>,
     
Tuple2<ByteString, Object>>
      bottom
=
      b
.graph(Flow.<Tuple2<Try<HttpResponse>, Object>>empty().
            mapAsync
(4, pair ->
                       
getEntityBytes(pair._1().get(), pair._2(), materializer)
           
).map((pair) -> new Tuple2<>(pair._1(), pair._2())));



static
Future<Tuple2<ByteString, RequestResult>>
   getEntityBytes
( final HttpResponse response,
                   
final Object requestResult,
                   
final ActorMaterializer materializer) {

   
return response.entity().getDataBytes().runFold(
         
new Tuple2(ByteString.empty(),requestResult),
         
(aggr, next) -> new Tuple2(aggr._1().concat(next),aggr._2()), materializer);
}




What looks a little funny to me is that I need to pass a materializer to the inner flow?

I am a little unsure because the docs Modularity, Composition and Hierarchy state:
"It is rarely useful to embed a closed graph shape in a larger graph"

Viktor Klang

unread,
Jul 30, 2015, 2:54:22 AM7/30/15
to Akka User List

What's the use-case?

--
Cheers,

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

john....@gmail.com

unread,
Jul 30, 2015, 4:31:55 AM7/30/15
to Akka User List, viktor...@gmail.com
the usecase is getting a "complete"  ByteString from a from HttpResponse.

I joinded a BidiFlow ( above code shows this BidiFlows bottom channel)
with Http.get().cachedHostConnectionPool() Flow

Viktor Klang

unread,
Jul 30, 2015, 6:04:21 AM7/30/15
to john....@gmail.com, Akka User List

so if the user sends 4gb you want to load it all into memory and crash the jvm? :-)

What's the use-case where you can't stream it?

--
Cheers,

john....@gmail.com

unread,
Jul 30, 2015, 6:25:27 AM7/30/15
to Akka User List, viktor...@gmail.com
I think you may be showing me an  error in my design.
the HttpResponse contains json which I then decode back to java Objects.
These json payloads will never be 4 gb more 500kb - 10MB but I could be receiveing a lot concurrently which could add up
I need to find a way to avoid folding with
response.entity().getDataBytes().runFold
....

Viktor Klang

unread,
Jul 30, 2015, 7:41:22 AM7/30/15
to john....@gmail.com, Akka User List

Happy hAkking!

--
Cheers,

john....@gmail.com

unread,
Jul 30, 2015, 7:50:36 AM7/30/15
to Akka User List, viktor...@gmail.com

yeah the streams api has really taken me (-: it reminds me of flow programming or go blocks or clojure async

john....@gmail.com

unread,
Aug 4, 2015, 11:45:07 AM8/4/15
to Akka User List

Richard Grossman

unread,
Jan 27, 2016, 9:13:32 AM1/27/16
to Akka User List
Hi

I need to exactly the same I must get the response on http call into my flow.
I see that you think this tickets can solve your problem is it true ?

Thanks

john....@gmail.com

unread,
Jan 30, 2016, 11:15:36 AM1/30/16
to Akka User List
I ended up creating a flow with flatMapConcat:
With this flow the bytes of the response get accumulated to a single ByteString  
for example:

final Flow<Pair, ByteString, BoxedUnit> bytestringFlow = Flow.of(Pair.class).flatMapConcat((Pair pair) -> {
Try<HttpResponse> responseTry = (Try<HttpResponse>) pair.first();
Source<ByteString, Object> dataBytes = responseTry.get().entity().getDataBytes();
return dataBytes;
});

Richard Grossman

unread,
Feb 2, 2016, 4:41:07 AM2/2/16
to Akka User List
Hi

If it can help someone in scala you can do this like this

val future : Future[NetworkResponse] =
 
Source.single(req)
 
.log("Start Http")
 
.map(req => (HttpRequest(HttpMethods.GET, Uri(req.url)), req))
 
.log("Map to httpRequest")
 
.map(httpReq => httpReq._1 -> (httpReq._2.id, httpReq._2.start))
 
.log("Map to request")
 
.via(connectionPool)
 
.log("after connection pool")
 
.map(r => (r._1.get.entity.dataBytes.runWith(Sink.head), r._2))
 
.log("after reading response")
 
.mapAsync(10)(r => r._1.map(bytes => NetworkResponse(r._2._1, r._2._2, bytes.decodeString("UTF-8"))))
 
.log("after map async")
 
.runWith(Sink.head)
future pipeTo sender

It start the flow for each source as single 
run the flow via the connectionPool
Then map to response that read the data from the http connection
Finally mapAsync send back a future on any structure that you want with the data inside

john....@gmail.com

unread,
Feb 2, 2016, 7:19:27 AM2/2/16
to Akka User List
Hi Richard, unfortunately my scala is very bad. (Actually I  learning scala by looking at the akka source and this list)
But If I am right, the line
.map(r => (r._1.get.entity.dataBytes.runWith(Sink.head), r._2)
uses an implicit materializer. So your are aggregating the whole ByteBuffer in memory.

Hacker  √ complained about this in the 4'th Mail in this thread.

Pleas forgive if I am wrong!
Many Greetings John

Richard Grossman

unread,
Feb 2, 2016, 11:34:12 AM2/2/16
to Akka User List
Indeed it can be a problem 
I can't see why your code make something different not an expert of the akka-stream 
why adding a flow like make something different that map in scala

john....@gmail.com

unread,
Feb 2, 2016, 11:48:32 AM2/2/16
to Akka User List
Hi Richard,
The key is flatmapConat
The akka team coded this build-in stage so that it flattens the nested ByteBuffer without consuming unnecessary memory
Reply all
Reply to author
Forward
0 new messages