[akka-stream-experimental-1.0] How to reuse akka.stream.scaladsl.Tcp connections?

648 views
Skip to first unread message

Simon Schäfer

unread,
Aug 21, 2015, 5:39:33 PM8/21/15
to Akka User List
I get the following log messages every time I send a message to to the remote system:

backend [DEBUG] [08/21/2015 23:26:40.513] [default-akka.actor.default-dispatcher-23] [akka://default/system/IO-TCP/selectors/$a/18] Attempting connection to [/127.0.0.1:6666]
backend [DEBUG] [08/21/2015 23:26:40.514] [default-akka.actor.default-dispatcher-23] [akka://default/system/IO-TCP/selectors/$a/18] Connection established to [/127.0.0.1:6666

Does this mean a connection is created every time I try to send something or is the connection reused? I would prefer the latter how can I achieve that if it is not already the case?

My code looks basically like this:

// I would like to do some caching here
val tcpFlow = Tcp().outgoingConnection(host, port)

// Here, a connection should only be created when none exists
def send() = {
  implicit val m = ActorMaterializer()
  val byteString: IndexedSeq[akka.util.ByteString] = ...
  val resp = Source(byteString).via(tcpFlow).runFold(ByteString.empty)(_++_)
  resp onComplete { ... }
}

Endre Varga

unread,
Aug 25, 2015, 5:34:53 AM8/25/15
to akka...@googlegroups.com
Hi Simon,

tcpFlow is the description of a TCP connetion, it does not connect itself. Whenever you materialize that flow as part of a larger stream, it connects.

Your send() method calls "runFold" on a stream that contains this tcpFlow, therefore you always open a new connection in send().

-Endre

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Simon Schäfer

unread,
Aug 25, 2015, 10:04:36 AM8/25/15
to akka...@googlegroups.com


On 08/25/2015 11:34 AM, Endre Varga wrote:
Hi Simon,

tcpFlow is the description of a TCP connetion, it does not connect itself. Whenever you materialize that flow as part of a larger stream, it connects.

Your send() method calls "runFold" on a stream that contains this tcpFlow, therefore you always open a new connection in send().
Ok, thanks. That clears out a lot of things. Still, I don't understand how I move from a finite stream to an infinite one. I just want to have one single open connection (maybe more than one, but then they come from a connection pool) over which I can send and receive data.

I thought the whole point of reactive streams is to make working with infinite data easy. But the akka-strems documentation never mentions that. They explain things with a `Source(1 to 100)`, which isn't very useful, since it is finite. This concept of materializing also doesn't make sense to me when I have infinite data. I can't materialize anything, since there is no end. For finite data I can just use the collection library, I don't see why I should use akka-streams here.
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/qpqWePkADwU/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.

Johannes Rudolph

unread,
Aug 25, 2015, 11:25:56 AM8/25/15
to Akka User List
Hi Simon,

I think there are two conceptual difficulties you need to tackle:

The first is the problem which you describe with infinite / finite streams which is actually more one of the "traditional" (= actor based) push-style asynchronous programming versus the "new" [*] pull-style of reactive/akka streams which was introduced to deal with backpressure. The issue with backpressure is that it only works if all components take part in it. If you have one component that opts-out of backpressure it will have to fail or drop elements if it becomes overloaded and this component will become the weakest link (or the "Sollbruchstelle") of your application under load. Akka currently supports `Source.actorRef` (and `Sink.actorRef` respectively) which does exactly this translation from a push-style Actor API to the pull-style streams API. You usually don't want to use them as they will be limited by design and bound to fail under load.

Pull-style means that you need to write your program so that it is completely passive and waits for demand (you could also call that style "reactive", you setup your program to passively wait for a source to provide elements and then react to them). Writing "passive" programs is perfectly suited to services that follow the request / response principle. You setup your handler as a flow and just put it between the Source[Request] / Sink[Response].

But what does it mean for a client program which usually actively tries to achieve something? I think you can also build such a program in a passive style: if it doesn't take any dynamic input it is easy as you can create all the sources and sinks from static data. If it does take dynamic input (like user input), you just need a Source of that user input that only prompts the user for more input if there's demand. It should be possible to structure a program like this but it will be a pervasive change that cannot be recommended in all cases.

So, in reality for client applications you will probably use things like the brittle `Source.actorRef` and just statically configure the size of the buffers and queues to be big enough for the common use cases. (You could say that `Source.actorRef` is not more brittle than the JVM itself which you also need to configure with a maximum heap size.) In any case using streams will force you to think about these kind of issues.

The second difficulty is a shortcoming in your description (IMO) regarding your notion of "reusing a connection" that is also uncovered by your use of streams. Look at what this line means:

    val resp = Source(byteString).via(tcpFlow).runFold(ByteString.empty)(_++_)

It says, "open a TCP connection, stream the source byteString to the connection, read all data *until the connection closed by the other side* and return this data". So, the end of the response is determined by looking for the end of the TCP stream. To be able to reuse a connection you will need a different end-of-response marker than the signal that TCP connection has been closed. You will need some framing protocol on top of TCP that can discern where one response ends and the next one starts and implement a streaming parser for that. You would start by implementing a

def requestRenderer: Flow[Request, ByteString]

and a

def responseParser: Flow[ByteString, Response]

Between those you can put the tcp connection:

def pipeline: Flow[ByteString, ByteString] = Flow[Request].via(requestRenderer).via(Tcp.outgoingConnection).via(responseParser)

Now you still have the problem how to interface that Flow.(And maybe that is what all your question is about). If you can structure your program like hinted above then you could create a

// prompts user for more input
def userInput: Source[UserInput]

and a

def userInputParser: Flow[UserInput, Request]

and a

def output: Sink[Response]

so you could finally create and run your program as

userInput.via(userInputParser).via(pipeline).to(output).run()

(If you are into functional programming, that may be actually very similar to how you would have structured your program in any case).

For the rest of us, it would be nice if we could wrap the `pipeline` above with something to either get a function `Request => Future[Response]` or an ActorRef to which requests could be sent and which would send back a message after the response was received. Unfortunately, The Right Solution (TM) for that case is still missing. It would be nice if there was a a one-to-one Flow wrapper in akka-stream that would do exactly this translation but unfortunately there currently isn't one readily available. You can build such a component yourself (Mathias actually built a specific solution for akka-http to implement `Http.singleRequest()` which has exactly the same problem).

So, how you can build something like that? Here is a sketch:

class RequestResponseActor extends Actor {
  val pipelineActor = Source.actorRef[Request].via(pipeline).to(Sink.actorRef(self)).run() // should return the actorRef for the Source.actorRef

  def receive = {
    case req: Request =>
      register(req, sender) // put request and sender ref at the end of a FIFO data structure
      pipelineActor ! req
    case res: Response =>
      val (req, originalSender) = unregister() // gets original request and sender from the head of the FIFO data structure
      originalSender ! req
    // what happens on error? what on premature closing of the connection? etc.
  }
}

All of this is based on the premise that your framing protocol and the semantics of the service you are talking to are using a request/response style (like HTTP with HTTP pipelining enabled) where requests are answered with responses in a FIFO manner. Also, in the sketch I skimmed over a lot of configuration and subtle semantic details you may have to consider (this is another reason there's no such shrink-wrapped component in akka-stream).

Does that answer most of your question?

This became quite a long answer but it also covers a lot of stuff :)

HTH
Johannes

[*] Of course, there's not too much conceptually new here. E.g. UNIX shell pipes and filters are very similar to the whole reactive streams concept (but constrained to byte streams): you have a buffer that can be asynchronously written to from one side and read from on the other side. The reader must poll if no data is currently available while the writer must poll while the buffer is full. Demand is signalled over the capacity of the shared buffer. Similar for TCP where demand is exchanged by notifying the peer of the capacity of the receive buffer. Etc.

Simon Schäfer

unread,
Aug 25, 2015, 8:03:16 PM8/25/15
to akka...@googlegroups.com
Hi Johannes,


On 08/25/2015 05:25 PM, Johannes Rudolph wrote:
Hi Simon,

I think there are two conceptual difficulties you need to tackle:

The first is the problem which you describe with infinite / finite streams which is actually more one of the "traditional" (= actor based) push-style asynchronous programming versus the "new" [*] pull-style of reactive/akka streams which was introduced to deal with backpressure. The issue with backpressure is that it only works if all components take part in it. If you have one component that opts-out of backpressure it will have to fail or drop elements if it becomes overloaded and this component will become the weakest link (or the "Sollbruchstelle") of your application under load. Akka currently supports `Source.actorRef` (and `Sink.actorRef` respectively) which does exactly this translation from a push-style Actor API to the pull-style streams API. You usually don't want to use them as they will be limited by design and bound to fail under load.

Pull-style means that you need to write your program so that it is completely passive and waits for demand (you could also call that style "reactive", you setup your program to passively wait for a source to provide elements and then react to them). Writing "passive" programs is perfectly suited to services that follow the request / response principle. You setup your handler as a flow and just put it between the Source[Request] / Sink[Response].

But what does it mean for a client program which usually actively tries to achieve something? I think you can also build such a program in a passive style: if it doesn't take any dynamic input it is easy as you can create all the sources and sinks from static data. If it does take dynamic input (like user input), you just need a Source of that user input that only prompts the user for more input if there's demand. It should be possible to structure a program like this but it will be a pervasive change that cannot be recommended in all cases.

So, in reality for client applications you will probably use things like the brittle `Source.actorRef` and just statically configure the size of the buffers and queues to be big enough for the common use cases. (You could say that `Source.actorRef` is not more brittle than the JVM itself which you also need to configure with a maximum heap size.) In any case using streams will force you to think about these kind of issues.
While I read about the concepts, I was not really aware of them. Thanks for clarification on that area.
Yes, this was tremendously useful. You described exactly my use case and a way how I could achieve what I want. Big thank you!

For now it solved all of my problems. Actually it was more an answer to my other question: https://groups.google.com/forum/#!topic/akka-user/GviQjB08rS0

Not creating multiple connections also solved my problem that some data was lost, even though I don't understand where the problem was. I guess data was sent to the wrong connection or something like this.

There are still a few open questions but since but nothing that blocks me, therefore I will look into that when the turn into problems. With the knowledge on how to feed the streams, how to get the values out of there and how to connect the subcomponents I should be less helpless.

The only thing I didn't understand was this part:

  Source.actorRef(1, OverflowStrategy.fail)

When I replace the 1 with a 0 (which is allowed according to the documentation) I get this error message: Dropping element because there is no downstream demand

Why do I get it? I expected that since there is a client which awaits for data that I don't need a cache. Maybe it is because the client (connected over TCP) is not a reactive stream, i.e. didn't tell beforehand that it awaits data? Anyway, 1 seems to be enough, even for 100s of requests.

Thanks again for all your help!

Simon

This became quite a long answer but it also covers a lot of stuff :)

HTH
Johannes

[*] Of course, there's not too much conceptually new here. E.g. UNIX shell pipes and filters are very similar to the whole reactive streams concept (but constrained to byte streams): you have a buffer that can be asynchronously written to from one side and read from on the other side. The reader must poll if no data is currently available while the writer must poll while the buffer is full. Demand is signalled over the capacity of the shared buffer. Similar for TCP where demand is exchanged by notifying the peer of the capacity of the receive buffer. Etc.

Simon Schäfer

unread,
Aug 25, 2015, 9:36:34 PM8/25/15
to Akka User List
There is another thing that just came up: Can I receive data on the client with a above approach when there was no prior request?

I made a small code example:

import scala.concurrent.Future
import scala.util._

import akka.actor._
import akka.stream._, scaladsl._
import akka.util._

object Test extends App {

  def server(system: ActorSystem, address: String, port: Int): Unit = {
    implicit val sys = system
    import system.dispatcher
    implicit val materializer = ActorMaterializer()

    val flow = Flow[ByteString].map{bs ⇒
      println("server received: " + bs)
      bs
    }

    val handler = Sink.foreach[Tcp.IncomingConnection] { conn =>
      println("Client connected from: " + conn.remoteAddress)
      Future {
        Thread.sleep(2000)
        val a = Source.actorRef(1, OverflowStrategy.fail).via(conn.flow).to(Sink.foreach{x ⇒ println("server sink: " + x)}).run()
        println("sending notification")
        a ! ByteString("server test")
      }
      conn handleWith flow
    }

    val connections = Tcp().bind(address, port)
    val binding = connections.to(handler).run()

    binding.onComplete {
      case Success(b) =>
        println("Server started, listening on: " + b.localAddress)
      case Failure(e) =>
        println(s"Server could not bind to $address:$port: ${e.getMessage}")
    }
  }

  def client(system: ActorSystem, address: String, port: Int): Unit = {
    implicit val sys = system
    import system.dispatcher
    implicit val materializer = ActorMaterializer()

    val tcpFlow = Flow[ByteString].via(Tcp().outgoingConnection(address, port))
    val a = Source.actorRef(1, OverflowStrategy.fail).via(tcpFlow).to(Sink.foreach{x ⇒ println("client sink: " + x)}).run()
    a ! ByteString("client test")
  }

  val ser = ActorSystem("Server")
  server(ser, "127.0.0.1", 6666)

  val client = ActorSystem("Client")
  client(client, "127.0.0.1", 6666)
}

I would expect that the when the future is completed, the client receives a notification, but it does not happen. Instead I get a message in dead letters. Not sure why. Am I allowed to send to `conn.flow`, or do I have to do it differently?

Johannes Rudolph

unread,
Aug 26, 2015, 4:46:49 AM8/26/15
to akka-user
On Wed, Aug 26, 2015 at 2:02 AM, Simon Schäfer <ma...@antoras.de> wrote:
> The only thing I didn't understand was this part:
>
> Source.actorRef(1, OverflowStrategy.fail)
>
> When I replace the 1 with a 0 (which is allowed according to the
> documentation) I get this error message: Dropping element because there is
> no downstream demand
>
> Why do I get it? I expected that since there is a client which awaits for
> data that I don't need a cache. Maybe it is because the client (connected
> over TCP) is not a reactive stream, i.e. didn't tell beforehand that it
> awaits data?

If you use `Tcp.outgoingConnection` you get a reactive stream
interface for a TCP connection. On the outgoing stream side it works
like this: whenever there's space in the OS send buffer for the socket
the sink/subscriber part of the flow will tell its publisher that it
needs demand. On a fresh or idle connection this will be the case. So,
I think you are right that a value of `0` could work. However, all
these components are working asynchronously, so you might be running
into a race condition where demand just has not been registered yet.

> Anyway, 1 seems to be enough, even for 100s of requests.

Yes, this is possible. Without load on the machine and especially
testing against localhost the kernel send buffers for a socket are
quite big (I forgot about how big exactly, but usually somewhere
between 100kb and 5MB). It is unlikely that you will be able to fill
the socket buffers fast enough. However, there's still the race
condition of demand having to travel fast enough through the
asynchronous stages, so it could also be just luck that it is working
well right now (or it could mean that your current running conditions
are favorable for this case for unknown reason).

Johannes

Simon Schäfer

unread,
Aug 26, 2015, 7:03:13 AM8/26/15
to Akka User List


On Wednesday, August 26, 2015 at 3:36:34 AM UTC+2, Simon Schäfer wrote:
There is another thing that just came up: Can I receive data on the client with a above approach when there was no prior request?

I made a small code example:
 
I just solved this problem. The new code for server:


  def server(system: ActorSystem, address: String, port: Int): Unit = {
    implicit val sys = system
    import system.dispatcher
    implicit val materializer = ActorMaterializer()

    val actor = system.actorOf(Props(new Actor{
      var _sender: ActorRef = _
      override def receive = {
        case in: ByteString ⇒
          println("server received: " + in)
          val s = _sender
          Future {
            Thread.sleep(2000)
            println("sending notification")
            s ! ByteString("server test")
          }
          s ! bs.reverse

        case ref: ActorRef ⇒
          _sender = ref
      }
    }))

    val in = Flow[ByteString].to(Sink.actorRef(actor, "exit"))
    val out = Source.actorRef[ByteString](1, OverflowStrategy.fail).mapMaterializedValue(actor ! _)
    val flow = Flow.wrap(in, out)(Keep.none)


    val handler = Sink.foreach[Tcp.IncomingConnection] { conn =>
      println("Client connected from: " + conn.remoteAddress)
      conn handleWith flow
    }

    val connections = Tcp().bind(address, port)
    val binding = connections.to(handler).run()

    binding.onComplete {
      case Success(b) =>
        println("Server started, listening on: " + b.localAddress)
      case Failure(e) =>
        println(s"Server could not bind to $address:$port: ${e.getMessage}")
    }
  }

The key was to redirect all the data through an actor, which can send as many messages as it wants.
 

Simon Schäfer

unread,
Aug 26, 2015, 7:08:01 AM8/26/15
to akka...@googlegroups.com


On 08/26/2015 10:46 AM, 'Johannes Rudolph' via Akka User List wrote:
> On Wed, Aug 26, 2015 at 2:02 AM, Simon Schäfer <ma...@antoras.de> wrote:
>> The only thing I didn't understand was this part:
>>
>> Source.actorRef(1, OverflowStrategy.fail)
>>
>> When I replace the 1 with a 0 (which is allowed according to the
>> documentation) I get this error message: Dropping element because there is
>> no downstream demand
>>
>> Why do I get it? I expected that since there is a client which awaits for
>> data that I don't need a cache. Maybe it is because the client (connected
>> over TCP) is not a reactive stream, i.e. didn't tell beforehand that it
>> awaits data?
> If you use `Tcp.outgoingConnection` you get a reactive stream
> interface for a TCP connection. On the outgoing stream side it works
> like this: whenever there's space in the OS send buffer for the socket
> the sink/subscriber part of the flow will tell its publisher that it
> needs demand. On a fresh or idle connection this will be the case. So,
> I think you are right that a value of `0` could work. However, all
> these components are working asynchronously, so you might be running
> into a race condition where demand just has not been registered yet.
Ok, good to know. I don't know how to solve the problem for now. I'll
let it bite me in the future.
Reply all
Reply to author
Forward
0 new messages