The way Aleph models streams of data is using channels [1]. When a
connection opens, you're handed a channel that allows you to
communicate with the client. When the connection is closed, the
channel is closed. To create a websocket server, you simply create an
HTTP server with websockets enabled:
(start-http-server handler {:port 8080, :websocket true})
The handler takes two parameters: a channel and a request hash. In
the case of standard HTTP request the channel will accept a single
message, which is the response. In the case of a websocket handshake
(:websocket in the request hash is set to true), the channel
represents a full duplex socket. The handler will be called every
time a new websocket connection is opened.
More concretely, here is a websocket echo server:
(defn handler [ch handshake]
(siphon ch ch))
This takes every message that comes from the client, and sends it back
through the channel. If we wanted to send a greeting before echoing
everything, that's easy too:
(defn handler [ch handshake]
(enqueue ch "Hello")
(siphon ch ch))
If assume that the first message from the client is their name, we can
use that too:
(defn handler [ch handshake]
(receive ch
(fn [name]
(siphon
(map* #(str name " said " %) ch)
ch))))
These are all trivial examples, but hopefully they're somewhat
illustrative. If you have more specific questions, I'd be happy to
answer them.
Zach
[1] https://github.com/ztellman/lamina/wiki/Channels
(defn event-loop [ch request]
(enqueue ch (encode ...)))
Zach
On Sun, Dec 26, 2010 at 12:40 PM, Sean Allen
(receive-all ch
(fn [msg]
(do-something msg)))
Your original message said that the client gets new information from
the database, filtered per some initial criteria that they send.
Let's assume that all new information from the database comes in
through 'database-channel', and that the filter predicate is generated
from the first message using 'generate-predicate'. Then your handler
would look something like:
(defn handler [ch request]
(receive ch
(fn [criteria]
(siphon
(filter* (generate-predicate criteria) database-channel)
ch))))
This doesn't allow for the criteria to change, but that would be a
fairly easy modification. I'd be happy to break down the various
steps in the above example, if that would help.
Zach
On Sun, Dec 26, 2010 at 12:59 PM, Sean Allen
Your original message said that the client gets new information from
the database, filtered per some initial criteria that they send.
Let's assume that all new information from the database comes in
through 'database-channel', and that the filter predicate is generated
from the first message using 'generate-predicate'. Then your handler
would look something like:
(defn handler [ch request]
(receive ch
(fn [criteria]
(siphon
(filter* (generate-predicate criteria) database-channel)
ch))))
This doesn't allow for the criteria to change, but that would be a
fairly easy modification. I'd be happy to break down the various
steps in the above example, if that would help.
If we assume that every message from the database is enqueued into
'database-channel', then all we need to do is create a custom version
of that stream for each client, based on its individual criteria.
(filter* predicate channel) does exactly this: it takes the original
channel, and returns a channel that only emits the messages that
satisfy the predicate. This "drains" any messages coming out of
database-channel, so you can compose map* and filter* and so on
without messages accumulating in some intermediate channel. And since
channels multicast to their subscribers, you can have multiple
distinct filtered views of the same stream.
Finally, we take the resulting channel and siphon it back into the
client's communication channel. (siphon ...) simply takes all
messages from the first channel, and forwards them into the second.
Let me know if I missed anything, or can expand on any of the above.
Zach
A callback registered using (receive ...) will consume a single
message and go away.
A callback registered using (receive-all ...) will consume all
messages until the channel closes or the callback is canceled.
If no callbacks are registered when a message comes in, it will queue
up until a callback is registered via receive or receive-all.
Zach
Okay, so using receive-all you can mimic the on-message event loop
model that node.js and others use. However, if all you're doing is
taking a stream of messages, modifying them, and passing them along
somewhere else, this is sort of a low-level approach. Instead of
operating on individual messages, you can operate on the entire
stream.
If we assume that every message from the database is enqueued into
'database-channel', then all we need to do is create a custom version
of that stream for each client, based on its individual criteria.
(filter* predicate channel) does exactly this: it takes the original
channel, and returns a channel that only emits the messages that
satisfy the predicate. This "drains" any messages coming out of
database-channel, so you can compose map* and filter* and so on
without messages accumulating in some intermediate channel. And since
channels multicast to their subscribers, you can have multiple
distinct filtered views of the same stream.
Finally, we take the resulting channel and siphon it back into the
client's communication channel. (siphon ...) simply takes all
messages from the first channel, and forwards them into the second.
And because of the siphon, once the data is in the channel created by
(filter* ...) it's immediately forwarded to the client, so there's
nothing to expire.
Hopefully this answers your question, I'm a little unsure by what you
mean by the "siphoned channel".
Zach
So, as a post-mortem on this whole exchange: how can I improve the
documentation to nip this confusion in the bud? The descriptions of
map*, filter*, and siphon* are pretty bare bones right now, and have a
diagram of how multiple subscribers work would be useful. Is there
anything else you'd point to as lacking in the wiki?
Zach