A little problem with new HTTP 1.0-M1

184 views
Skip to first unread message

tigerfoot

unread,
Dec 21, 2014, 10:35:29 PM12/21/14
to akka...@googlegroups.com
Hello,

I'm very excited about the new Akka streams + http features.  Migrating some sample code from Spray to Akka-Http I encountered a small problem in a utility class I use to fire http requests.  Code here:

import akka.http.Http
import akka.http.model._

import akka.io.IO
import akka.pattern.ask
import akka.actor.ActorSystem
import akka.util.Timeout

import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.concurrent.ExecutionContext.Implicits.global

object Util {
def httpGet( uri:String )(implicit s:ActorSystem, timeout:Timeout = 30 seconds) = {
val resp = _http( HttpRequest(HttpMethods.GET, Uri(uri)) )
(resp.entity.toString, resp.status)
}
private def _http( hr:HttpRequest )(implicit s:ActorSystem, timeout:Timeout = 30 seconds) = {
val response: Future[HttpResponse] = (IO(Http) ? hr).mapTo[HttpResponse]
Await.result( response, Duration.Inf )
}
}

I get the following error on the key line IO(...):

[error] /Users/Greg/git/docker-exp/src/main/scala/com.gwz.dockerexp/Util.scala:66: inferred type arguments [akka.http.HttpExt] do not conform to method apply's type parameter bounds [T <: akka.io.IO.Extension]
[error] val response: Future[HttpResponse] = (IO(Http) ? hr).mapTo[HttpResponse]

This worked great for the Spray libs.  How can I modify this to work with Akka-Http, as the Http object is clearly different?

Thanks!
Greg

Akka Team

unread,
Dec 22, 2014, 4:39:41 AM12/22/14
to Akka User List
Hi Greg,

The API doesn't look like that. Here is an example how it is supposed to work:

import akka.actor.ActorSystem
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.{ Sink, Source }
import akka.http.model.{ HttpResponse, HttpRequest }
import akka.http.Http
import scala.concurrent.Await
import scala.concurrent.duration._

// ActorSystem to execute things with
implicit val system = ActorSystem("clientsys")

// Flow materializer that executes the streams
implicit val materializer = FlowMaterializer()

// Flow representing the http client
val httpClient = Http(system).outgoingConnection("www.example.com").flow

// Consumer that will just print headers
val consumer = Sink.foreach[HttpResponse] { resp ⇒ println(resp.headers.mkString("\n")) }

// Future produced by the ForeachConsumer
val finishFuture = Source.single(HttpRequest()).via(httpClient).runWith(consumer)

Await.result(finishFuture, 3.seconds)
system.shutdown()
system.awaitTermination()


I recommend to wait for 1.0-M2 (coming very soon) since it contains some fixes for the http client.


--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Akka Team
Typesafe - The software stack for applications that scale
Blog: letitcrash.com
Twitter: @akkateam

tigerfoot

unread,
Dec 23, 2014, 4:57:32 PM12/23/14
to akka...@googlegroups.com
Thanks for the clarification... the new API is very different.  I need to let this whole flows thing sink in a bit, but I kinda see where its going.  Granted this is just one use case but I am a little concerned about all the machinery needed to perform something pretty simple, a GET request in this case.  Not sure if I'm doing this right but I end up awaiting twice to get the content I need out of the response--like the code below.  It does work...  Just appears quite a bit more complex than the Spray version. 
 

  def httpGet( uri:String )(implicit s:ActorSystem) = {  // returns (status_code, entity_as_string)

    implicit val materializer = FlowMaterializer()

    var r:HttpResponse = null

    val req = HttpRequest(HttpMethods.GET, Uri(uri))

    val host:String = req.uri.authority.host.toString

    val port:Int = req.uri.effectivePort

    val httpClient = Http().outgoingConnection(host,port).flow

    val consumer = Sink.foreach[HttpResponse] { resp r = resp }

    val finishFuture = Source.single(req).via(httpClient).runWith(consumer)

    Await.result(finishFuture, Duration("3 seconds"))

    // unpack result

    (r.status.intValue,

      Await.result(r.entity.toStrict(FiniteDuration(3,"seconds")), Duration("3 seconds") ).data.utf8String)

  } 
 

Endre Varga

unread,
Dec 24, 2014, 3:34:15 AM12/24/14
to akka...@googlegroups.com
Hi Greg,


On Tue, Dec 23, 2014 at 10:57 PM, tigerfoot <gzo...@gmail.com> wrote:
Thanks for the clarification... the new API is very different.  I need to let this whole flows thing sink in a bit, but I kinda see where its going.

 
 Granted this is just one use case but I am a little concerned about all the machinery needed to perform something pretty simple, a GET request in this case.

What do you mean all the machinery? It is basically a one-liner if you rewrite it:

val future = Source.single(req).via(Http().outgoingConnection(host, port).flow).runWith(Sink.head)

Which translates to "Take the single request req through an Http outgoing connection, then take the head of the output, which is the first (and in this case only) response"

Once you get the basics and start looking at more complex integration patterns not just the simplest req-resp cycle you will see that the new API is usually simpler due to composability and handles backpressure automatically.
 
 Not sure if I'm doing this right but I end up awaiting twice to get the content I need out of the response--like the code below.  It does work...  Just appears quite a bit more complex than the Spray version. 

You need to await the second time because everything is streaming. The ".data" stream you accessed can be GBytes in size! The trick is that the stream never loads the whole data just the current part, propagating backpressure.

If you want to avoid calling Await twice, rewrite your code like this:

val dataFuture = Source.single(req).via(Http().outgoingConnection(host, port).flow).mapAsync(_.entity.toStrict).map(_.data.utf8String).runWith(Sink.head)

The above uses the "mapAsync" to flatten the Future[Entity.Strict] returned by toStrict to a stream of Entity.Strict

-Endre
 
 

  def httpGet( uri:String )(implicit s:ActorSystem) = {  // returns (status_code, entity_as_string)

    implicit val materializer = FlowMaterializer()

    var r:HttpResponse = null

    val req = HttpRequest(HttpMethods.GET, Uri(uri))

    val host:String = req.uri.authority.host.toString

    val port:Int = req.uri.effectivePort

    val httpClient = Http().outgoingConnection(host,port).flow

    val consumer = Sink.foreach[HttpResponse] { resp r = resp }

    val finishFuture = Source.single(req).via(httpClient).runWith(consumer)

    Await.result(finishFuture, Duration("3 seconds"))

    // unpack result

    (r.status.intValue,

      Await.result(r.entity.toStrict(FiniteDuration(3,"seconds")), Duration("3 seconds") ).data.utf8String)

  } 
 

--

Roland Kuhn

unread,
Dec 24, 2014, 4:18:57 AM12/24/14
to akka...@googlegroups.com


Sent from my iPhone

On 24 Dec 2014, at 09:34, Endre Varga <endre...@typesafe.com> wrote:

Hi Greg,


On Tue, Dec 23, 2014 at 10:57 PM, tigerfoot <gzo...@gmail.com> wrote:
Thanks for the clarification... the new API is very different.  I need to let this whole flows thing sink in a bit, but I kinda see where its going.

 
 Granted this is just one use case but I am a little concerned about all the machinery needed to perform something pretty simple, a GET request in this case.

What do you mean all the machinery? It is basically a one-liner if you rewrite it:

val future = Source.single(req).via(Http().outgoingConnection(host, port).flow).runWith(Sink.head)

We will definitely have a convenience API for this rather common use case, though. Without the boilerplate code it would shrink down to 

val f = Http().request(req, host, port)

which looks much nicer ;-)

Regards,

Roland 
Reply all
Reply to author
Forward
0 new messages