Strange behavior with akka-http RC3

120 views
Skip to first unread message

dpratt

unread,
May 27, 2015, 4:18:55 PM5/27/15
to akka...@googlegroups.com
I've been trying to come up with a high-level HTTP pipeline abstraction for dispatch of requests. I know that there is the singleRequest method on the HTTP extension, but I wanted to avoid it's (arguably minimal) overhead for each call by piping requests using the following: 

object PublisherDispatcher extends App {
  import akka.actor.{ActorRef, ActorSystem}
  import akka.http.scaladsl.Http
  import akka.http.scaladsl.model.{HttpResponse, Uri, HttpMethods, HttpRequest}
  import akka.stream.scaladsl.{Source, Sink}
  import akka.stream.{OverflowStrategy, ActorFlowMaterializer, FlowMaterializer}
  import akka.util.Timeout

  import scala.concurrent.{Future, Await}
  import scala.concurrent.duration._
  import scala.language.postfixOps
  import scala.util.{Failure, Success, Try}

  class MyPublisher extends ActorPublisher[(HttpRequest, Unit)] {
    //left to the reader, but it just responds to HttpRequest messages, tuples it up with the sender() reference
    // and either emits it (if there's demand) or buffers it
    override def receive: Receive = ???
  }
  
  def pipeline(host: String, port: Int, ssl: Boolean)(implicit system: ActorSystem, fm: FlowMaterializer, timeout: Timeout = 20.seconds): HttpRequest => Future[HttpResponse] = {
    val flow = if (ssl) {
      Http(system).cachedHostConnectionPool[ActorRef](host, port)
    } else {
      Http(system).cachedHostConnectionPoolTls[ActorRef](host, port)
    }
    val source = Source.actorPublisher[(HttpRequest, ActorRef)](Props(new MyPublisher))
    val sink = Sink.foreach[(Try[HttpResponse], ActorRef)] {
      case (Success(response), replyTo) =>
        replyTo ! response
      case (Failure(t), replyTo) =>
        replyTo ! akka.actor.Status.Failure(t)
    }

    val sourceActor = source.via(flow).to(sink).run()

    import akka.pattern._

    request => (sourceActor ? request).mapTo[HttpResponse]
  }

  implicit val system = ActorSystem("test")
  try {
    implicit val fm: FlowMaterializer = ActorFlowMaterializer()
    import system.dispatcher

    val pipe = pipeline("www.google.com", 80, ssl = false)
    val resF = pipe(HttpRequest(HttpMethods.GET, uri = Uri("/")))

    val res = Await.result(resF, 30 seconds)

    println(res)
  } finally {
    system.shutdown()
  }
}

From examination of the APIs and docs, I don't see why the above doesn't work - the Await times out. If you add logging in there, you'll see that the ActorPublisher side receives (and emits) the message, but then the pipeline falls silent.

I decided to play around a bit, and even using the following fails

  def pipeline(host: String, port: Int, ssl: Boolean)(implicit system: ActorSystem, fm: FlowMaterializer): HttpRequest => Future[HttpResponse] = {
    import system.dispatcher

    val flow =
      if (ssl) {
        Http(system).cachedHostConnectionPool[Unit](host, port)
      } else {
        Http(system).cachedHostConnectionPoolTls[Unit](host, port)
      }
    val sink = flow.toMat(Sink.head[(Try[HttpResponse], Unit)])(Keep.right)

    request => {
      val res = Promise[HttpResponse]()
      val source = Source.single[(HttpRequest, Unit)]((request, ()))
      source.runWith(sink).onComplete {
        case Success((response, _)) =>
          res.complete(response)
        case Failure(t) =>
          res.failure(t)
      }
      res.future
    }
  }
 
What did I miss here?


Akka Team

unread,
Jun 1, 2015, 9:21:35 AM6/1/15
to Akka User List
Hi,

It was not easy to find out, but you reverted the "if" check for the ssl parameter and therefore used the TLS version of the pool, but you gave port 80 where no TLS is used. It seems like TLS got stuck there, so I opened a ticket: https://github.com/akka/akka/issues/17628

This works:

def pipeline(host: String, port: Int)(implicit system: ActorSystem, fm: FlowMaterializer): HttpRequest ⇒ Future[HttpResponse] = {
  import system.dispatcher
  val flow = Http(system).cachedHostConnectionPoolTls[Unit](host, port)
  val sink = flow.toMat(Sink.head)(Keep.right)

  request ⇒ {
    val source = Source.single((request, ()))
    source.runWith(sink).map(_._1.get)

  }
}

implicit val system = ActorSystem("test")

try {
  implicit val fm: FlowMaterializer = ActorFlowMaterializer()

  val pipe = pipeline("www.google.com", 443)

  val resF = pipe(HttpRequest(HttpMethods.GET, uri = Uri("/")))

  val res = Await.result(resF, 3 seconds)
  println(res)
} finally {
  system.shutdown()
}

-Endre

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Akka Team
Typesafe - Reactive apps on the JVM
Blog: letitcrash.com
Twitter: @akkateam

David Pratt

unread,
Jun 1, 2015, 10:06:20 AM6/1/15
to akka...@googlegroups.com
Well, don't I feel dumb. Thanks for the help!
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/GK99I2rSqgs/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.

Akka Team

unread,
Jun 1, 2015, 10:07:59 AM6/1/15
to Akka User List
On Mon, Jun 1, 2015 at 4:06 PM, David Pratt <david...@gmail.com> wrote:
Well, don't I feel dumb. Thanks for the help!

No reason to feel dumb, the request should have failed with an error showing that there is a TLS problem - hence the ticket.

-Endre
Reply all
Reply to author
Forward
0 new messages