Hi all,--I play with new akka-http and streams. I found some example application and try add to it my own logicimport akka.actor.ActorSystem
import akka.http.scaladsl._
import akka.http.scaladsl.model.ws._
import akka.http.scaladsl.model.{HttpResponse, StatusCodes}
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server._
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl.{Flow, Sink, Source}
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
object RouteHelpers {
implicit def future2Route[T](future: Future[T])(implicit executionContext: ExecutionContext): Route = {
onComplete(future) {
case Success(success) =>
success match {
case value: String =>
complete(value)
case httpRespose: HttpResponse =>
complete(httpRespose)
case any =>
complete(any.toString)
}
case Failure(ex) =>
complete(StatusCodes.InternalServerError, s"An error occurred: ${ex.getMessage}")
}
}
}
object Main extends App {
import RouteHelpers._
val host = "127.0.0.1"
val port = 8094
implicit val system = ActorSystem("my-testing-system")
implicit val fm = ActorFlowMaterializer()
implicit val executionContext = system.dispatcher
val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] =
Http(system).bind(interface = host, port = port)
type Username = String
type IsRegistered = Boolean
val registerUserFlow: Flow[Username, Future[(Username, IsRegistered)], Unit] = {
Flow[Username].map {
case name =>
Future {
(name, true)
}
}
}
val resultToStringFlow: Flow[Future[(Username, IsRegistered)], Future[String], Unit] = {
Flow[Future[(Username, IsRegistered)]].map {
case future => future.flatMap {
case (username, status) =>
// call some business logic that result as Future[String]
Future {
s"User with username [$username] registration status is [$status]"
}
}
}
}
val route: Route =
get {
path("test") {
parameter('test) { case t: String =>
Future {
s"Test is [$t]"
}
}
} ~
path("register") {
parameter('username) { case username: Username =>
val t = Source.single(username).via(registerUserFlow).via(resultToStringFlow).runWith(Sink.head)
t
}
}
}
serverSource.to(Sink.foreach {
connection =>
connection handleWith Route.handlerFlow(route)
}).run()
}
When I run and curl in I see not what I need:scala.concurrent.impl.Promise$DefaultPromise@33bc080f
I think it is because val t have type Future[Future[String]]My question is: How I can do some business logic with Streams, if it require call something async inside, but I want to some number of Flow to process requestPS: I want to use Streams for back pressure between some component of my systemWith best regards, Vladimir.
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.