Asynchronous segment in channel topology

61 views
Skip to first unread message

Marko Topolnik

unread,
Dec 27, 2012, 4:57:26 PM12/27/12
to alep...@googlegroups.com
I've come across a case where my map* operator must sometimes do its transformation asynchronously. It came as a natural solution to introduce a transformation step that works just like pipeline: if it gets a realized result, it pushes it right on; otherwise it sets up a callback that will enqueue the message when it's ready. I called it async-stage:

(defn async-stage [src]
  (let [dest (m/channel)]
    (m/receive-all src (m/pipeline #(m/enqueue dest %)))
    (m/on-closed src #(m/close dest))
    (m/on-closed dest #(m/close src))
    dest))

This approach isn't a champion of elegance because you need an async-stage after each step that may produce an asynchronous result.

Now, since this idea (not necessarily the approach) seems to fit squarely within the whole framework of lamina, I would like to know if you (Zach) already have your own story on it.

Zach Tellman

unread,
Dec 27, 2012, 5:06:08 PM12/27/12
to alep...@googlegroups.com
If you want to hold onto results until they're realized, you'll have to do something like that.  However, all the wiring with on-closed, etc. isn't necessary.  Check out bridge-join and bridge-siphon in lamina.core.channel.  There are numerous examples in the codebase for how they can be used.  You almost always want to use bridge-join.

This doesn't save you from having to identify where you'll need the operator:

(defn and-realize [f]
  (fn [& args]
    (async-stage (apply f args)))

which allows it to be somewhat inline:

((and-realize map*) ...)

Zach

Zach Tellman

unread,
Dec 27, 2012, 5:11:21 PM12/27/12
to alep...@googlegroups.com
Sorry, left out a clause there.  Should have been "This doesn't save you from having to identify where you'll need the operator, but something like this can help:"

Marko Topolnik

unread,
Dec 27, 2012, 5:25:54 PM12/27/12
to alep...@googlegroups.com
Thanks, I got it anyway :)

I've looked at the implementation of siphon vs. join several times before, but I still don't get what exactly their difference amounts to. siphon is the one I understand and use, bu I have never used join. Can you give me a hint?

I'm looking into bridge-* right now.

Zach Tellman

unread,
Dec 27, 2012, 5:32:15 PM12/27/12
to alep...@googlegroups.com
Sorry, the wiki documentation stops just shy of explaining it.

'siphon' assumes the destination is more permanent than the source.  If you feed A into B, and then close A, B will remain open.

'join' assumes both channels are equally permanent.  If you close A or B, the other will also be closed.  The default relationship between nodes is 'join'.  Edges in the graph will only be labelled when they're not join, if you're using graphviz.

Marko Topolnik

unread,
Dec 27, 2012, 6:02:32 PM12/27/12
to alep...@googlegroups.com
Interesting; turns out I thought siphon worked like join in fact does. 

In several places I need the source to be the more permanent one, so I wrote leech (lamina is really straining the English thesaurus :)

(defn leech [src dest]
  (let [enq-dest #(m/enqueue dest %)]
    (m/receive-all src enq-dest)
    (m/on-closed src #(m/close dest))
    (m/on-closed dest #(m/cancel-callback src enq-dest))
    dest))

I wonder if I can rewrite this one with more grace.

Zach Tellman

unread,
Dec 27, 2012, 6:11:10 PM12/27/12
to alep...@googlegroups.com
Using 'join' with a permanent channel as the source should accomplish the same thing as your 'leech' function.

Marko Topolnik

unread,
Dec 28, 2012, 2:08:45 AM12/28/12
to alep...@googlegroups.com
True; but I do need to close my channels eventually. I have close-permanent for that, but I was looking to get rid of it because a) it involves the low-level API and b) I don't like to have the correctness of my code depend on which close I call against any given channel. I could solve b) by always calling close-permanent, though, and I could name it just close then (in my namespace).

Marko Topolnik

unread,
Dec 28, 2012, 4:40:47 AM12/28/12
to alep...@googlegroups.com
To give a fuller description, I'm writing a public library which endorses Lamina. The key function returns a Lamina channel for which the typical usage pattern is connect sink 1 --> close sink 1 --> connect sink 2 --> close sink 2 --> ... --> close main channel. Ideally, the user would be able to achieve this just by using Lamina functions, but he can't: he'll need either leech or close-permanent to be provided by my library. 

Zach Tellman

unread,
Dec 29, 2012, 6:42:35 PM12/29/12
to alep...@googlegroups.com
I've checked in 'force-close' and 'force-error' functions to lamina.core, which should remove the need for a special close-permanent hack.

Zach

Marko Topolnik

unread,
Dec 30, 2012, 3:22:59 AM12/30/12
to alep...@googlegroups.com
After spending time with my usage pattern, and after finding out from you about the difference between siphon and join, I felt that there is much to this idea that auto-close semantics are contained in the edge and not the node: no matter what channel you siphon into, you don't want to close it when you close the source. In my case, the client knows he doesn't want to close the source when he closes the sink, and there are legitimate cases when he knows he wants to close it.  This is why I found the addition of leech to produce the nicest code. Maybe a generalized function that connects two channels and accepts two boolean options (something like autoclose-src and autoclose-dest) would be appropriate. Then siphon and join would be just two of the four special cases.

Zach Tellman

unread,
Jan 4, 2013, 8:16:27 PM1/4/13
to alep...@googlegroups.com
I think this is a reasonable thing to ask for (sorry for being so late to reply).  I've checked in two new methods to lamina.api, 'connect' and 'bridge'.  Each is a generalized version of siphon/join and bridge-siphon/bridge-join, respectively. 

Zach

Marko Topolnik

unread,
Jan 5, 2013, 10:30:18 AM1/5/13
to alep...@googlegroups.com
I've pulled from github, but I don't see this commit. The last one I got is 

55da39...

Zach Tellman

unread,
Jan 5, 2013, 12:07:27 PM1/5/13
to alep...@googlegroups.com
Sorry, apparently I forgot to push the changes.  They should be up now.

Zach

Marko Topolnik

unread,
Jan 6, 2013, 7:59:46 AM1/6/13
to alep...@googlegroups.com
That's... quite a commit :) I went to test it right away, but unfortunately I can't test end-to-end until Aleph is updated to match the new Lamina. Do you already have a schedule for the next Aleph/Lamina beta?
Reply all
Reply to author
Forward
0 new messages