Re: [akka-user] How to use Future with in akka-streams

822 views
Skip to first unread message

Martynas Mickevičius

unread,
Jun 4, 2015, 10:48:16 AM6/4/15
to akka...@googlegroups.com
Hi Vladimir,

inside of registerUserFlow and resultToStringFlow where you map incoming stream elements use mapAsync instead of map on Flows, so you are not sending futures in your stream pipeline.

On Tue, Jun 2, 2015 at 6:13 PM, Владимир Морозов <green...@gmail.com> wrote:
Hi all,

I play with new akka-http and streams. I found some example application and try add to it my own logic

import akka.actor.ActorSystem
import akka.http.scaladsl._
import akka.http.scaladsl.model.ws._
import akka.http.scaladsl.model.{HttpResponse, StatusCodes}
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server._
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl.{Flow, Sink, Source}

import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

object RouteHelpers {
 
implicit def future2Route[T](future: Future[T])(implicit executionContext: ExecutionContext): Route = {
    onComplete
(future) {
     
case Success(success) =>
        success
match {
         
case value: String =>
            complete
(value)
         
case httpRespose: HttpResponse =>
            complete
(httpRespose)
         
case any =>
            complete
(any.toString)
       
}
     
case Failure(ex) =>
        complete
(StatusCodes.InternalServerError, s"An error occurred: ${ex.getMessage}")
   
}
 
}
}

object Main extends App {

 
import RouteHelpers._

 
val host = "127.0.0.1"
 
val port = 8094

 
implicit val system = ActorSystem("my-testing-system")
 
implicit val fm = ActorFlowMaterializer()
 
implicit val executionContext = system.dispatcher

 
val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] =
   
Http(system).bind(interface = host, port = port)

 
type Username = String
 
type IsRegistered = Boolean

 
val registerUserFlow: Flow[Username, Future[(Username, IsRegistered)], Unit] = {
   
Flow[Username].map {
     
case name =>
       
Future {
         
(name, true)
       
}
   
}
 
}

 
val resultToStringFlow: Flow[Future[(Username, IsRegistered)], Future[String], Unit] = {
   
Flow[Future[(Username, IsRegistered)]].map {
     
case future => future.flatMap {
       
case (username, status) =>
         
// call some business logic that result as Future[String]
         
Future {
            s
"User with username [$username] registration status is [$status]"
         
}
     
}
   
}
 
}

 
val route: Route =
   
get {
      path
("test") {
        parameter
('test) { case t: String =>
         
Future {
            s
"Test is [$t]"
          }
        }
      } ~
        path(
"register") {
          parameter('
username) { case username: Username =>
           
val t = Source.single(username).via(registerUserFlow).via(resultToStringFlow).runWith(Sink.head)

            t
         
}
       
}
   
}

  serverSource
.to(Sink.foreach {
    connection
=>
      connection handleWith
Route.handlerFlow(route)
 
}).run()

}

When I run and curl in I see not what I need:

scala.concurrent.impl.Promise$DefaultPromise@33bc080f

I think it is because val t have type Future[Future[String]]

My question is: How I can do some business logic with Streams, if it require call something async inside, but I want to some number of Flow to process request
PS: I want to use Streams for back pressure between some component of my system

With best regards, Vladimir.

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Martynas Mickevičius
TypesafeReactive Apps on the JVM
Reply all
Reply to author
Forward
0 new messages