Creating a reconnecting TCP client

151 views
Skip to first unread message

Adam

unread,
Sep 6, 2015, 6:25:16 AM9/6/15
to Akka User List
Hi,

I've been looking for a way to create a client that upon disconnection will try to reconnect.
I've done this in the past with RxJava observables, but I'm not sure how to do this using Akka Streams.

I saw some code examples where PushStage is being used to implement this, but the code samples are old (the API has changed) and I'm not sure exactly how to migrate them correctly.
In the code examples, it seems like past versions of the API allowed outgoing TCP connections to be handled using a function (by calling handleWith), which means a lazy approach could be used.
I *think* this is no longer possible.

So I figured I can get the same effect with an approach similar to what I've used in the past with Rx observables - create a Source of TCP outgoing connection flows.
Essentially what I need is similar to a theoretic Source.repeat[T](gen: ()=>T).
There's no such method on Source, so I implemented this using ActorPublisher.
I plan to also have some delay between re-connection attempts with an exponential backoff and also support multiple target hosts, but for now I only want to immediately reconnect to the same address.

If I didn't care about how many connections my client actually opens at the same time, I could say this approach works.
In actuality I do, of course, care and I only want a single connection to be open at any given time.
Unfortunately, the Request objects my ActorPublisher receives contain numbers larger than 1.

Is there a way to force the downstream flow to only ask for a single item at a time?
Alternatively, is there actually a better way to achieve my original goal (a TCP client that reconnects)?

Akka Team

unread,
Sep 8, 2015, 7:05:41 AM9/8/15
to Akka User List
Hi Adam,

This is not a trivial task to be honest, and I cannot really give you a nice way to use it. The difference from Rx Observables is that RS implements explicit backpressure (i.e. instead of blocking it uses explicit asynchronous signals) and therefore makes all processing stage inherently stateful (even if the processing stage itself implements only a simple stateless transformation like map or filter).

This means that a layout like this:

tcpWriter -> tcpConnection -> tcpReader

has to deal with
 - stopping the tcpConnection without the writer and reader realizing it
 - take the backpressure state of the tcpConnection (pending demand, buffered elements)
 - create a new connection, and initialize it with the backpressure state of the previous connection
 
This is not simple to do but I hope we can provide such functionality soon. You have to prepare though for lost data (this is independent of Rx or RS) since there might been bytes in the TCP send buffer in the kernel at the point when the connection was lost.

-Endre

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Akka Team
Typesafe - Reactive apps on the JVM
Blog: letitcrash.com
Twitter: @akkateam

אדם חונן

unread,
Sep 8, 2015, 9:17:14 AM9/8/15
to akka...@googlegroups.com
I'm OK with lost data - I assume messages can be lost anyway, even without disconnection, so the API I want to provide will not guarantee message delivery and users will be aware of that, and that they are actually sending asynchronous messages.

I was also thinking that a possible alternative might be to create the Akka Streams' graph within a stateful object (an Actor probably) which will be responsible of recreating the graph when it breaks (preferably depending on reason, of course).
Is there a way to do that?
Basically I need something similar to Actor's watch so I know when my flow has terminated.

I'm not sure what's the mechanism that can be used for this - a call to recover at the end of the flow, a materialized future value, something else...

You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/hBSiWAYKEiw/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.

Konrad Malawski

unread,
Sep 17, 2015, 8:58:27 AM9/17/15
to Akka User List

Hi there,
I actually implemented something like a “reconnect” a number of times (most of them ending up in overcomplication though…).

The style we ended up with is indeed an Actor which encapsulates the connection handling, and gets notified if the stream is completed (i.e. .toMat(Sink.onComplete { case _ => self ! Disconnected })(Keep.left), and then it creates the flow a-new simply). The upside of this is that it can easily handle “wait a bit before reconnecting”, since you can use the Actor’s access to the system scheduler.

You may need to fan-out explicitly into the completion-signal-emitting-part and the data-signalling-part (just use Broadcast :-)).

Cheers,
Konrad 'ktoso' Malawski
Reply all
Reply to author
Forward
0 new messages