websockets

167 views
Skip to first unread message

Sean Allen

unread,
Dec 24, 2010, 11:56:37 PM12/24/10
to alep...@googlegroups.com
This start with the following email to the clojure list:

==

We did a prototype application using websockets for work using node.js as the server.
Websocket client connects, sending some basic info... said info is used to repeatedly get
new data from a database that is pushed down as it arrives in the db to the client which displays.
There will be more than 1 client, each with its own data constraints that are used to get the data 
to send.

If it goes into production we need to run on the jvm so I've been rewriting in clojure. I spent a couple
hours yesterday trying to figuring out the best websockets option to use w/ the clojure based server
before I gave up. I realized w/o any background I'm just running blind.

Given the basic idea of the application, what is the best websockets abstraction to use w/ clojure?
Aleph? The jetty websocket support? Something else? 

Pointers from anyone will more experience doing a websocket server in clojure greatly appreciated.

==

After looking over aleph and the jetty websockets examples, the jetty stuff appears much closer to
the node.js that I would be moving over as it has onMessage, onConnect, onDisconnect abstractions
baked right in that I don't see w/ aleph. Has anyone added basic onMessage, onConnect, onDisconnect
abstractions over the top of aleph? 

I know I don't feel very confident in my understanding of aleph and seeing a working abstraction would
really help me dig in and understand how to use aleph as the basis for this port.

Thanks,
Sean

Zach Tellman

unread,
Dec 25, 2010, 3:42:03 AM12/25/10
to alep...@googlegroups.com
Hi Sean,

The way Aleph models streams of data is using channels [1]. When a
connection opens, you're handed a channel that allows you to
communicate with the client. When the connection is closed, the
channel is closed. To create a websocket server, you simply create an
HTTP server with websockets enabled:

(start-http-server handler {:port 8080, :websocket true})

The handler takes two parameters: a channel and a request hash. In
the case of standard HTTP request the channel will accept a single
message, which is the response. In the case of a websocket handshake
(:websocket in the request hash is set to true), the channel
represents a full duplex socket. The handler will be called every
time a new websocket connection is opened.

More concretely, here is a websocket echo server:

(defn handler [ch handshake]
(siphon ch ch))

This takes every message that comes from the client, and sends it back
through the channel. If we wanted to send a greeting before echoing
everything, that's easy too:

(defn handler [ch handshake]
(enqueue ch "Hello")
(siphon ch ch))

If assume that the first message from the client is their name, we can
use that too:

(defn handler [ch handshake]
(receive ch
(fn [name]
(siphon
(map* #(str name " said " %) ch)
ch))))

These are all trivial examples, but hopefully they're somewhat
illustrative. If you have more specific questions, I'd be happy to
answer them.

Zach

[1] https://github.com/ztellman/lamina/wiki/Channels

Sean Allen

unread,
Dec 26, 2010, 3:40:06 PM12/26/10
to alep...@googlegroups.com
Hi Zach,

Thanks. I understand the concept but I think I'm missing a basic point:

client connects to server, lets say we just want to push some json back down at it...
what is the proper method to do that? this doesn't work so i'm sure i'm missing that basic something:

(defn event-loop [ch request]
  (enqueue ch
    { :status 200
      :headers { "content-type" "application/json" }
      :body (encode { :latitude -10 :longitude -10 :num_hits 1000 :timestamp 1293141739})}))

(defn start-websocket-server [port] 
  (start-http-server event-loop {:port port :websocket true}))

So client connects, this creates a channel right? ch in event-loop, so shouldn't enqueue'ing on it get the message
send back to the client?

Zach Tellman

unread,
Dec 26, 2010, 3:50:28 PM12/26/10
to alep...@googlegroups.com
WebSocket messages are just strings, there's no status or headers to
worry about. Try:

(defn event-loop [ch request]
(enqueue ch (encode ...)))

Zach

On Sun, Dec 26, 2010 at 12:40 PM, Sean Allen

Sean Allen

unread,
Dec 26, 2010, 3:59:13 PM12/26/10
to alep...@googlegroups.com
Thanks Zach,

That is one problem down. And that was a real big missing something basic..
So if I have that as an event loop and...

client connects, server send back the encoded, client puts something else in the queue by sending something back...
then the server does nothing else right? 

so the handler, it gets run for each connection... so if you want to keep sending and receiving from the client, you need
to be receiving off the channel and enqueueing new items in the event loop...

something i haven't picked up from the documentation.. what is the proper way to keep that loop going?

-Sean-

Zach Tellman

unread,
Dec 26, 2010, 4:23:04 PM12/26/10
to alep...@googlegroups.com
It really depends on how you handle the messages from the client. If
you want do the same thing every time they send a message, it's easy:

(receive-all ch
(fn [msg]
(do-something msg)))

Your original message said that the client gets new information from
the database, filtered per some initial criteria that they send.
Let's assume that all new information from the database comes in
through 'database-channel', and that the filter predicate is generated
from the first message using 'generate-predicate'. Then your handler
would look something like:

(defn handler [ch request]
(receive ch
(fn [criteria]
(siphon
(filter* (generate-predicate criteria) database-channel)
ch))))

This doesn't allow for the criteria to change, but that would be a
fairly easy modification. I'd be happy to break down the various
steps in the above example, if that would help.

Zach

On Sun, Dec 26, 2010 at 12:59 PM, Sean Allen

Sean Allen

unread,
Dec 26, 2010, 4:51:41 PM12/26/10
to alep...@googlegroups.com

Your original message said that the client gets new information from
the database, filtered per some initial criteria that they send.
Let's assume that all new information from the database comes in
through 'database-channel', and that the filter predicate is generated
from the first message using 'generate-predicate'.  Then your handler
would look something like:

(defn handler [ch request]
 (receive ch
   (fn [criteria]
     (siphon
       (filter* (generate-predicate criteria) database-channel)
       ch))))

This doesn't allow for the criteria to change, but that would be a
fairly easy modification.  I'd be happy to break down the various
steps in the above example, if that would help.


I'd appreciate that. I could ask a lot of questions but many of them might be resolved 
by a quick explanation of the above at which point I can know how far off my understanding
is right now.

Thanks,
Sean 

Zach Tellman

unread,
Dec 26, 2010, 5:27:14 PM12/26/10
to alep...@googlegroups.com
Okay, so using receive-all you can mimic the on-message event loop
model that node.js and others use. However, if all you're doing is
taking a stream of messages, modifying them, and passing them along
somewhere else, this is sort of a low-level approach. Instead of
operating on individual messages, you can operate on the entire
stream.

If we assume that every message from the database is enqueued into
'database-channel', then all we need to do is create a custom version
of that stream for each client, based on its individual criteria.
(filter* predicate channel) does exactly this: it takes the original
channel, and returns a channel that only emits the messages that
satisfy the predicate. This "drains" any messages coming out of
database-channel, so you can compose map* and filter* and so on
without messages accumulating in some intermediate channel. And since
channels multicast to their subscribers, you can have multiple
distinct filtered views of the same stream.

Finally, we take the resulting channel and siphon it back into the
client's communication channel. (siphon ...) simply takes all
messages from the first channel, and forwards them into the second.

Let me know if I missed anything, or can expand on any of the above.

Zach

Sean Allen

unread,
Dec 26, 2010, 5:33:42 PM12/26/10
to alep...@googlegroups.com
Quick question while I spend time making sure I absorbed all that.

How long does a handler run for? If a handler ends before the connection ends,
does the connection get shut down?  receive-all keeps it running forever-ish,
so what happens with just a receive?

Zach Tellman

unread,
Dec 26, 2010, 5:42:28 PM12/26/10
to alep...@googlegroups.com
I assume by handler you mean callback. If not, please disregard the
following and correct my misunderstanding:

A callback registered using (receive ...) will consume a single
message and go away.

A callback registered using (receive-all ...) will consume all
messages until the channel closes or the callback is canceled.

If no callbacks are registered when a message comes in, it will queue
up until a callback is registered via receive or receive-all.

Zach

Sean Allen

unread,
Dec 26, 2010, 5:48:12 PM12/26/10
to alep...@googlegroups.com
That helps, Lets take this lower level...

client connects and this runs the registered handler right?
what keeps the websocket open and communicating?

if the handler just shoves a single string down, does it just sit there open
until the client closes?

(defn handler [ch req]
  (enqueue ch "hi))

-Sean-

Zach Tellman

unread,
Dec 26, 2010, 5:53:19 PM12/26/10
to alep...@googlegroups.com
Yes, and it will accumulate any messages sent, since no callbacks are consuming them. Once the connection drops, I'd expect the entire thing to be garbage collected. 

If you're worried about this, you can register a dummy callback via receive-all. 

Zach


Sean Allen

unread,
Dec 26, 2010, 5:54:25 PM12/26/10
to alep...@googlegroups.com
On Sun, Dec 26, 2010 at 5:27 PM, Zach Tellman <ztel...@gmail.com> wrote:
Okay, so using receive-all you can mimic the on-message event loop
model that node.js and others use.  However, if all you're doing is
taking a stream of messages, modifying them, and passing them along
somewhere else, this is sort of a low-level approach.  Instead of
operating on individual messages, you can operate on the entire
stream.

If we assume that every message from the database is enqueued into
'database-channel', then all we need to do is create a custom version
of that stream for each client, based on its individual criteria.
(filter* predicate channel) does exactly this: it takes the original
channel, and returns a channel that only emits the messages that
satisfy the predicate.  This "drains" any messages coming out of
database-channel, so you can compose map* and filter* and so on
without messages accumulating in some intermediate channel.  And since
channels multicast to their subscribers, you can have multiple
distinct filtered views of the same stream.

Finally, we take the resulting channel and siphon it back into the
client's communication channel.  (siphon ...) simply takes all
messages from the first channel, and forwards them into the second.


If two connections have the 'same view' of the database, then would the
filter and siphon take it so only one of them got the message.
Or anther view on the same question..

if client A and B are both interested in Message C from Channel D
and we siphon C from D to A then it cant also be siphoned to B, correct?

Sean Allen

unread,
Dec 26, 2010, 5:55:10 PM12/26/10
to alep...@googlegroups.com
Not worried about it, just want to make sure I understand how things work.

Zach Tellman

unread,
Dec 26, 2010, 6:03:29 PM12/26/10
to alep...@googlegroups.com
Messages are always multicast, never distributed. 

If we set up the siphons to both clients in advance of the message arriving, both clients will receive the message. 


Sean Allen

unread,
Dec 26, 2010, 6:09:55 PM12/26/10
to alep...@googlegroups.com
Interesting. So let me follow through on that thought to make sure I have this correct...

client A will need data from timeframe 2-3
client B will need data from timeframe 2-4
client C will need data from timeframe 1

so if we have this data in the database channel and we filter on it so that A gets timeframe 2-3, B will also get timeframe 2-4 and C will get timeframe 1. if client C then needs timeframe 2 and that is setup in a filter, will timeframe 2 have been removed from the channel already?

is the general idea of what i am getting at clear?
basically, once you introduce the filter into the siphon, how does data expire from the siphoned channel?

Zach Tellman

unread,
Dec 26, 2010, 6:26:19 PM12/26/10
to alep...@googlegroups.com
If you call (filter* ...) on a channel, your predicate will see all
the messages sent into that channel, regardless of what else is
subscribed to that channel. There's no way for one subscriber to see
and consume a message before another, the message is simply multicast
to all subscribers at the same time.

And because of the siphon, once the data is in the channel created by
(filter* ...) it's immediately forwarded to the client, so there's
nothing to expire.

Hopefully this answers your question, I'm a little unsure by what you
mean by the "siphoned channel".

Zach

Sean Allen

unread,
Dec 26, 2010, 6:30:32 PM12/26/10
to alep...@googlegroups.com
Ok so under the hood... if I have channel 'z'.

when a message is enqueued to channel 'z' there is a queue for each person doing a receive type
action on that queue that it gets pushed onto?  and once it goes onto all those 'subscriber' queues,
it is gone from channel 'z' unless no one is subscribed to it in which case it sits on the queue until
someone subscribes.

correct?

Zach Tellman

unread,
Dec 26, 2010, 6:43:18 PM12/26/10
to alep...@googlegroups.com
Yes. I'd only add that the "subscriber queues" are channels themselves. 

Sean Allen

unread,
Dec 26, 2010, 6:49:25 PM12/26/10
to alep...@googlegroups.com
Ok so... to make sure I have that...

you have a channel 'Z' if client A and B subscribe to it, you have 3 channels.
Z, Z-A and Z-B ( not really called Z-A but you know what I mean ).

correct?

Zach Tellman

unread,
Dec 26, 2010, 11:10:03 PM12/26/10
to alep...@googlegroups.com
Yes, though channels are only generated by map*, filter*, and company.
receive and receive-all don't create additional channels or have
queueing semantics.

So, as a post-mortem on this whole exchange: how can I improve the
documentation to nip this confusion in the bud? The descriptions of
map*, filter*, and siphon* are pretty bare bones right now, and have a
diagram of how multiple subscribers work would be useful. Is there
anything else you'd point to as lacking in the wiki?

Zach

Sean Allen

unread,
Dec 26, 2010, 11:12:42 PM12/26/10
to alep...@googlegroups.com
Give me a bit of time to think about that. 
Reply all
Reply to author
Forward
0 new messages