Exception in thread "main" java.util.NoSuchElementException: head of empty stream at akka.stream.scaladsl.Sink$$anonfun$head$1$$anonfun$apply$2$$anonfun$apply$3.apply(Sink.scala:128) at akka.stream.scaladsl.Sink$$anonfun$head$1$$anonfun$apply$2$$anonfun$apply$3.apply(Sink.scala:128) at scala.Option.getOrElse(Option.scala:121) at akka.stream.scaladsl.Sink$$anonfun$head$1$$anonfun$apply$2.apply(Sink.scala:128) at akka.stream.scaladsl.Sink$$anonfun$head$1$$anonfun$apply$2.apply(Sink.scala:128) at scala.util.Success$$anonfun$map$1.apply(Try.scala:237) at scala.util.Try$.apply(Try.scala:192) at scala.util.Success.map(Try.scala:237) at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237) at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:76) at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:120) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:75) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) at scala.concurrent.Promise$class.trySuccess(Promise.scala:94) at scala.concurrent.impl.Promise$DefaultPromise.trySuccess(Promise.scala:153) at akka.stream.impl.HeadOptionStage$$anon$3.onUpstreamFinish(Sinks.scala:255) at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:732) at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:616) at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:471) at akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:381) at akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:538) at akka.stream.impl.fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter.scala:586) at akka.actor.Actor$class.aroundPreStart(Actor.scala:489) at akka.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala:529) at akka.actor.ActorCell.create(ActorCell.scala:590) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:461) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282) at akka.dispatch.Mailbox.run(Mailbox.scala:223) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
package com.workday.excalibur.http
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream._
import akka.stream.scaladsl.{BidiFlow, Sink, Source}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.util.ByteString
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.{Failure, Success, Try}
object TestBidiToConnectionPool extends App {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
// connection pool flow
val connectionPoolFlow = Http().superPool[Int]()
// Bidi from functions join connection pool
val flowFromFunctionsBidi = BidiFlow.fromFunctions((request: (HttpRequest, Int)) ⇒ request, (response: (Try[HttpResponse], Int)) ⇒ response).join(connectionPoolFlow)
// Bidi from custom GraphStage join connection pool
val flowFromCustomGraphStageBidi = BidiFlow.fromGraph(new CustomIdentityBidi).join(connectionPoolFlow)
// SUCCESS -> Submit HttpRequest to flow built with functions Bidi
val responseFuture = Source.single(HttpRequest(uri = "http://httpbin.org/ip") -> 42).via(flowFromFunctionsBidi).runWith(Sink.head)
Await.result(responseFuture, 10 seconds) match {
case (tryResponse, context) ⇒ {
tryResponse match {
case Success(httpResponse) ⇒ {
val entity = Await.result(httpResponse.entity.dataBytes.runFold(ByteString(""))(_ ++ _), 3 second).utf8String
println("Success: " + entity)
}
case Failure(t) ⇒ {
println("Error occurred: " + t)
}
}
}
}
// FAILURE -> Submit HttpRequest to flow built with custom GraphStage
// Exception in thread "main" java.util.NoSuchElementException: head of empty stream
val responseFuture2 = Source.single(HttpRequest(uri = "http://httpbin.org/ip") -> 42).via(flowFromCustomGraphStageBidi).runWith(Sink.head)
Await.result(responseFuture2, 10 seconds) match {
case (tryResponse, context) ⇒ {
tryResponse match {
case Success(httpResponse) ⇒ {
val entity = Await.result(httpResponse.entity.dataBytes.runFold(ByteString(""))(_ ++ _), 3 second).utf8String
println("Success: " + entity)
}
case Failure(t) ⇒ {
println("Error occurred: " + t)
}
}
}
}
}
class CustomIdentityBidi extends GraphStage[BidiShape[(HttpRequest, Int), (HttpRequest, Int), (Try[HttpResponse], Int), (Try[HttpResponse], Int)]] {
val requestIn = Inlet[(HttpRequest, Int)]("requestIn")
val requestOut = Outlet[(HttpRequest, Int)]("requestOut")
val responseIn = Inlet[(Try[HttpResponse], Int)]("responseIn")
val responseOut = Outlet[(Try[HttpResponse], Int)]("responseOut")
override val shape = BidiShape.of(requestIn, requestOut, responseIn, responseOut)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
setHandler(requestIn, new InHandler {
override def onPush(): Unit = {
push(requestOut, grab(requestIn))
}
})
setHandler(requestOut, new OutHandler {
override def onPull(): Unit = {
pull(requestIn)
}
})
setHandler(responseIn, new InHandler {
override def onPush(): Unit = {
push(responseOut, grab(responseIn))
}
})
setHandler(responseOut, new OutHandler {
override def onPull(): Unit = {
pull(responseIn)
}
})
}
}
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
setHandler(requestIn, new InHandler {
override def onPush(): Unit = {
push(requestOut, grab(requestIn))
}
override def onUpstreamFinish() = complete(requestOut)
})
setHandler(requestOut, new OutHandler {
override def onPull(): Unit = {
pull(requestIn)
}
override def onDownstreamFinish() = cancel(requestIn)
})
setHandler(responseIn, new InHandler {
override def onPush(): Unit = {
val msg = grab(responseIn)
push(responseOut, msg)
}
override def onUpstreamFinish() = completeStage()
})
setHandler(responseOut, new OutHandler {
override def onPull(): Unit = {
pull(responseIn)
}
override def onDownstreamFinish() = cancel(responseIn)
})
}
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.