[ANN] async-sockets - work with sockets using core.async channels

1,390 views
Skip to first unread message

Brian Guthrie

unread,
Oct 5, 2014, 10:06:56 PM10/5/14
to clojure
Hi all,

I'm releasing a little library for working with sockets. Feedback and pull requests gratefully appreciated.

The skinny
---------------

This library allows you to create socket servers and socket clients and interact with them asynchronously using channels. Servers return a record with a :connections field, a channel which yields one socket per incoming connection. Clients return the same socket record. Socket records each have an :in and :out channel each which allow you to receive and send data respectively on a line-by-line basis. The raw java.net.Socket is also available (as :socket).

Servers and clients are defined using the Component framework and must be explicitly started using (component/start <server-or-client>), though sockets will clean up after themselves if they are terminated for some reason.

Further information is available on Github here: https://github.com/bguthrie/async-sockets

Releases
--------------

This is the first release, which I've tagged for now as 0.0.1-SNAPSHOT. Leiningen dependency: [com.gearswithingears/async-sockets "0.0.1-SNAPSHOT"].

If this is useful to you, please let me know, but any and all feedback is great.

Happy hacking,

Brian
@bguthrie

Zach Tellman

unread,
Oct 5, 2014, 11:57:18 PM10/5/14
to clo...@googlegroups.com
If I'm reading this correctly, you're using non-blocking thread pools for blocking operations on the sockets.  Given more than N connections (last time I looked the thread pool's size was 42), you risk deadlock or at the very least poor average throughput.

adrian...@mail.yu.edu

unread,
Oct 6, 2014, 12:10:24 AM10/6/14
to clo...@googlegroups.com
Zach makes an excellent point; I've used AsyncSocketChannels and its irk (http://docs.oracle.com/javase/8/docs/api/java/nio/channels/AsynchronousServerSocketChannel.html), with core.async in the past. Perhaps replacing your direct java.net.Sockets with nio classes that can be given CompletionHandlers (http://docs.oracle.com/javase/7/docs/api/java/nio/channels/CompletionHandler.html) would be a better fit. 

Zach Tellman

unread,
Oct 6, 2014, 12:13:53 AM10/6/14
to clo...@googlegroups.com
Please note that if you use core.async with java.nio, you need to make sure backpressure is properly propagated (this happens automatically with java.io, assuming you have a thread per connection).

Joachim De Beule

unread,
Oct 6, 2014, 9:33:34 PM10/6/14
to clo...@googlegroups.com
 Could you please expand on this, Zach?
 
Please note that if you use core.async with java.nio, you need to make sure backpressure is properly propagated (this happens automatically with java.io, assuming you have a thread per connection)

Thanks!

Brian Guthrie

unread,
Oct 7, 2014, 2:48:23 PM10/7/14
to clojure

On Sun, Oct 5, 2014 at 11:57 PM, Zach Tellman <ztel...@gmail.com> wrote:
If I'm reading this correctly, you're using non-blocking thread pools for blocking operations on the sockets.  Given more than N connections (last time I looked the thread pool's size was 42), you risk deadlock or at the very least poor average throughput.

I'd thought the thread pool's size was a function of your core count plus some constant, which I believe is 42. But yes, that is a constraint, and deadlock is a risk with too many concurrent connections. I haven't tried to measure throughput yet but that seems like a reasonable next step. Thanks for the code review!

Brian Guthrie

unread,
Oct 7, 2014, 2:49:30 PM10/7/14
to clojure

On Mon, Oct 6, 2014 at 12:10 AM, <adrian...@mail.yu.edu> wrote:
Zach makes an excellent point; I've used AsyncSocketChannels and its irk (http://docs.oracle.com/javase/8/docs/api/java/nio/channels/AsynchronousServerSocketChannel.html), with core.async in the past. Perhaps replacing your direct java.net.Sockets with nio classes that can be given CompletionHandlers (http://docs.oracle.com/javase/7/docs/api/java/nio/channels/CompletionHandler.html) would be a better fit. 

Once I do some performance instrumentation I'll give that a shot. I admit that I'm not familiar with all the implications of using the nio classes; were I to switch, is it safe to continue using go blocks, or is it worth explicitly allocating a single thread per socket?

Brian

adrian...@mail.yu.edu

unread,
Oct 7, 2014, 4:36:16 PM10/7/14
to clo...@googlegroups.com
It's not about 'safety' (depending on what that means in this context), but as Zach pointed out, if you aren't careful about backpressure you can run into performance bottlenecks with unrestrained async IO operations because although they let you code as if you could handle an unlimited amount of connections, obviously that isn't true. There is only a finite amount of data that can be buffered in and out of any network according to its hardware. When you don't regulate that, your system will end up spending an inordinate amount of time compensating for this. You don't need to worry about this with "regular io" because the "thread per connection" abstraction effectively bounds your activity within the acceptable physical constraints of the server. 

Sun Ning

unread,
Oct 8, 2014, 1:17:11 AM10/8/14
to clo...@googlegroups.com, adrian...@mail.yu.edu
BTW, is there any network based core.async channel available now?
--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clo...@googlegroups.com
Note that posts from new members are moderated - please be patient with your first post.
To unsubscribe from this group, send email to
clojure+u...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to the Google Groups "Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email to clojure+u...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Zach Tellman

unread,
Oct 8, 2014, 2:00:02 AM10/8/14
to clo...@googlegroups.com
The reason the thread-per-connection approach is nice is because it correctly propagates backpressure.  If we're copying data from a source to a sink (let's say reading it in from the network and writing to a file), it's possible that the production of data may outstrip the consumption.  If this happens, we need to make sure the producer slows down, or we risk running out of memory.  In Java, the producer is typically connected to the consumer via a blocking queue, and if the queue fills up the producer can't send anything more to the consumer.  A Java socket is one such queue, and if it fills up it will exert backpressure via TCP.  This will work no matter how many queues or other mechanisms separate the producer and consumer.

However, every attempt I've seen to marry core.async to an async network stack has been fundamentally broken, in that it doesn't do this.  Often, they'll just use 'put!', which works fine until the channel's queue fills up, and 1024 pending puts are accumulated, and finally the channel throws an exception.  Alternately, they'll use a blocking put on the channel, which means that any backpressure will also extend to whatever other connections are sharing that thread or the thread pool.  Note that the software that uses core.async in this way may work flawlessly in a wide variety of cases, but there's still an intractable failure mode lying in wait.

In some cases, such as http-kit's websocket mechanism, there's no way to even exert backpressure (you register a callback, and have no way to indicate in your callback that you can't handle more messages).  This means that any attempt to use http-kit in conjunction with core.async will be subtly but fundamentally broken.  Arguably, even without core.async in the equation it's broken.  This is not a good state of affairs.  I'll admit that it took me a few failures in production to realize how important correct handling of backpressure is, but this isn't something that our ecosystem can afford to ignore, especially as Clojure is used for larger-scale projects.

I will note that I am working on a solution to this, in the form of the upcoming Aleph release [1].  This will model every network connection via streams that can trivially be converted into core.async channels [2], and which exert backpressure over TCP wherever necessary without requiring a thread per connection.  A formal beta should be available in the near future (it's already handling billions of requests a day in production without issue).

Zach

Max Penet

unread,
Oct 8, 2014, 3:41:46 AM10/8/14
to clo...@googlegroups.com
This is something that should be configurable & extendable in core.async really... but given how long it took to have this possibility with agents I am not holding my breath (not to mention it was never even considered for c.c/future).

Jozef Wagner

unread,
Oct 8, 2014, 5:00:24 AM10/8/14
to clo...@googlegroups.com
One way how to handle this elegantly in Java is to add support for
java.nio.channels.SelectableChannel in core.async buffer. That way you
could select on any combination of core.async channels and io
resources, waiting for core.async channels to free and resources to
have data ready.

Jozef

adrian...@mail.yu.edu

unread,
Oct 8, 2014, 10:12:59 AM10/8/14
to clo...@googlegroups.com, adrian...@mail.yu.edu
Check out https://github.com/halgari/com.tbaldridge.hermod for an interesting take on this. 

Zach Tellman

unread,
Oct 8, 2014, 12:15:47 PM10/8/14
to clo...@googlegroups.com
I wasn't aware of hermod, that's interesting.  I would still characterize its approach to backpressure as "broken", though, since when the queues get full it silently drops messages on the ground.  In fairness, this is very clearly documented, so it's less pernicious than some of the other cases out there.

Both core.async buffers and SelectableChannels have very particular semantics, and I would be very surprised if they could be combined in that way.  It's perfectly possible to feed one into the other and handle backpressure properly (again, I'm doing just that with Aleph 0.4.0, using Netty), but it's a nuanced integration and easy to get wrong.

You received this message because you are subscribed to a topic in the Google Groups "Clojure" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/clojure/TVMQJwaij1U/unsubscribe.
To unsubscribe from this group and all its topics, send an email to clojure+u...@googlegroups.com.

Jozef Wagner

unread,
Oct 8, 2014, 1:10:10 PM10/8/14
to clo...@googlegroups.com
If you want to handle multiple TCP connections and async channels in one thread, you need a way how to block on both connections (wait for new input to arrive) and channels (wait for a free space in a buffer). Blocking only on connections will get you a busy loop if channels are full. If you could point me to the part of Aleph sources that handles this issue, I would be very grateful. I'm not familiar with netty API nor manifold's concepts, so I'm having trouble navigating in the Aleph sources.

Thanks,
Jozef

Zach Tellman

unread,
Oct 8, 2014, 1:16:44 PM10/8/14
to clo...@googlegroups.com
The documentation for Manifold can explain the API better than I can here.  The point where that interacts with Netty w.r.t. backpressure is here: https://github.com/ztellman/aleph/blob/0.4.0/src/aleph/netty.clj#L109.  Here the stream represents data coming off the wire, and if the put onto the stream is not immediately successful, backpressure is enabled until the put completes.  No blocking required anywhere.

Zach Tellman

unread,
Oct 8, 2014, 1:21:50 PM10/8/14
to clo...@googlegroups.com
Sorry, didn't cover converse case.  That's handled by the ChannelSink directly underneath.  Note that each write returns a Netty ChannelFuture representing completion of the write, which is transformed into a Manifold deferred.  Any time a Manifold put returns an unrealized deferred, that creates upstream backpressure.

Jozef Wagner

unread,
Oct 8, 2014, 3:22:39 PM10/8/14
to clo...@googlegroups.com
Thank you! Using put! callback to control backpressure is a very elegant solution. BTW there always has to be blocking somewhere. Netty uses selector to block at [1] and .setAutoRead causes respective channel to deregister itself from a selector [2], until put! completes.

Regarding the other way around, it seems to me from the quick look (sorry if I misinterpreted) that using futures to represent write completion will cause the eventual backpressure to block the thread, causing analogous drawback as a blocking put! in the "reading from multiple connection" case.


Jozef

Zach Tellman

unread,
Oct 8, 2014, 3:31:50 PM10/8/14
to clo...@googlegroups.com
Yes, I didn't mean to imply that there are no blocking operations anywhere, only that nothing in the worker threads (where the put! occurs) will block, and that backpressure isn't exerted by causing an active thread to hang.

As to the second point, I'm not sure why you'd think that.  These aren't futures that are being accessed via blocking dereference, so there's no reason any thread would block as a result of an unrealized future/deferred.


Jozef Wagner

unread,
Oct 8, 2014, 5:06:20 PM10/8/14
to clo...@googlegroups.com
It was just my ignorance of manifold's concepts :). I was thinking in terms of "I have this thread which should connect these channels to those sockets, when to do what". Manifold's async approach with its connectable streams and deferreds with callbacks seems to abstract away such bookkeeping.

Jozef 

Julian

unread,
Oct 11, 2014, 9:01:29 PM10/11/14
to clo...@googlegroups.com
Hi Zach, 

Thanks for the clarity of thought that went into this post. 

Perhaps it is obvious to everyone but me, but I saw this post by Christophe Grande yesterday that appears to address these concerns:
"Back-pressurized interop for core.async" https://twitter.com/cgrand/status/520566182194450432

I'm interested to hear if this solves your problem or is about something else. 

Cheers
Julian

Ryan Waters

unread,
Oct 12, 2014, 12:42:32 PM10/12/14
to Clojure
I was just starting to use Sente [1] (which relies on httpkit [2]) and this conversation is a real eye opener.  Unless a person uses a library that supports backpressure, as mentioned earlier, your transport-concern-made-opaque-because-of-core-async must become an application-level concern.  The far-side of the communication would have to respond with an application level acknowledgement for local sends and the local side would need to not send data unless acks were received for previously sent data.

E.g. this could be implemented with core.async by using a pair of channels (instead of a single channel) for all 'sends' where one channel is used for data while the other channel waits for acknowledgement of put data (a 'control' channel).  This would have the unfortunate side effect of hurting throughput.  A better system would be to allow for a certain number of unacknowledged sends before backing off.  Of course, now a person is implementing what was created for TCP at the level of their application.

Christophe's approach means you at least wouldn't have to do the above, replacing it instead with a per backend implementation.  I hope somebody else is able to explain it better.

Looking forward to an Aleph rewrite!!


--

Zach Tellman

unread,
Oct 12, 2014, 1:40:16 PM10/12/14
to clo...@googlegroups.com
A slightly more straightforward implementation can be found at https://gist.github.com/ztellman/fb64e81d1d7f0b261ccd.  I'm fairly sure it's equivalent to Cristophe's, but I may be missing some nuance.  At any rate, I've found using the async/put! and callback mechanism to be a much more straightforward way to do interop with non-core.async code.

And yes, reimplementing a TCP-like ack mechanism on top of WebSockets is not something you want to do.  The existing stack will do it better and faster than you can.  Just to be clear, this is a large part of why I wrote Manifold [1], which can easily be turned into a core.async channel, but provides an API which is designed for interop with other stream mechanisms (including synchronous ones like Java's BlockingQueues).  core.async is a framework, meaning it brings not only a stream representation, but an entire execution model; using core.async should be an application-level decision, not one made for you by your libraries

Zach

Reply all
Reply to author
Forward
0 new messages