object PublisherDispatcher extends App {
import akka.actor.{ActorRef, ActorSystem}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpResponse, Uri, HttpMethods, HttpRequest}
import akka.stream.scaladsl.{Source, Sink}
import akka.stream.{OverflowStrategy, ActorFlowMaterializer, FlowMaterializer}
import akka.util.Timeout
import scala.concurrent.{Future, Await}
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.{Failure, Success, Try}
class MyPublisher extends ActorPublisher[(HttpRequest, Unit)] {
//left to the reader, but it just responds to HttpRequest messages, tuples it up with the sender() reference
// and either emits it (if there's demand) or buffers it
override def receive: Receive = ???
}
def pipeline(host: String, port: Int, ssl: Boolean)(implicit system: ActorSystem, fm: FlowMaterializer, timeout: Timeout = 20.seconds): HttpRequest => Future[HttpResponse] = {
val flow = if (ssl) {
Http(system).cachedHostConnectionPool[ActorRef](host, port)
} else {
Http(system).cachedHostConnectionPoolTls[ActorRef](host, port)
}
val source = Source.actorPublisher[(HttpRequest, ActorRef)](Props(new MyPublisher))
val sink = Sink.foreach[(Try[HttpResponse], ActorRef)] {
case (Success(response), replyTo) =>
replyTo ! response
case (Failure(t), replyTo) =>
replyTo ! akka.actor.Status.Failure(t)
}
val sourceActor = source.via(flow).to(sink).run()
import akka.pattern._
request => (sourceActor ? request).mapTo[HttpResponse]
}
implicit val system = ActorSystem("test")
try {
implicit val fm: FlowMaterializer = ActorFlowMaterializer()
import system.dispatcher
val resF = pipe(HttpRequest(HttpMethods.GET, uri = Uri("/")))
val res = Await.result(resF, 30 seconds)
println(res)
} finally {
system.shutdown()
}
}
From examination of the APIs and docs, I don't see why the above doesn't work - the Await times out. If you add logging in there, you'll see that the ActorPublisher side receives (and emits) the message, but then the pipeline falls silent.
def pipeline(host: String, port: Int, ssl: Boolean)(implicit system: ActorSystem, fm: FlowMaterializer): HttpRequest => Future[HttpResponse] = {