Microsoft Rx -style operations with core.async

565 views
Skip to first unread message

Brandon Bloom

unread,
Jun 30, 2013, 5:27:18 PM6/30/13
to clo...@googlegroups.com
Hi all,

Today, primarily for my own edification, I've been implementing as many Microsoft Reactive Extensions operators as I can using core.async. The results have been *spectacular*. core.async is an absolute pleasure to work with. I'm so happy with how they have turned out, that I really want to share.

You can find my work-in-progress here:

The primary file of interest is:

At the time of this message, I've implemented the bulk of the relevant operators described here:

All of my implementations utilize parking operations, so this library should be fully portable to ClojureScript.

My notes describe any difference from Rx, notable omissions, and known issues:
I'll probably work more on this tonight and will update my notes as I go.

Rich & team: I understand that you guys will probably take a run at implementing this sort of library in the not too distant future. Please let me know if and how I can contribute to that effort.

Cheers,
Brandon

Ben Wolfson

unread,
Jun 30, 2013, 6:54:43 PM6/30/13
to clo...@googlegroups.com
I don't know the semantics of the MS functions so maybe this mirrors them, but the implementations of take-while and drop-while remove an "extra" element from the argument channel, right?

user> (def c (chan))
#'user/c
user> (go (doseq [i (range 10)] (>! c i)))
#<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@13f452d>
user> (def d (take-while even? c))
#'user/d
user> (<!! (go (<! d)))
0
user> (<!! (go (<! c)))
2

take-while pulled down the 1 and discovered it didn't pass the predicate, and there's no way to put it back.



--
--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clo...@googlegroups.com
Note that posts from new members are moderated - please be patient with your first post.
To unsubscribe from this group, send email to
clojure+u...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to the Google Groups "Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email to clojure+u...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 



--
Ben Wolfson
"Human kind has used its intelligence to vary the flavour of drinks, which may be sweet, aromatic, fermented or spirit-based. ... Family and social life also offer numerous other occasions to consume drinks for pleasure." [Larousse, "Drink" entry]

Brandon Bloom

unread,
Jun 30, 2013, 7:16:27 PM6/30/13
to clo...@googlegroups.com
> I don't know the semantics of the MS functions so maybe this mirrors them

This code is not an attempt to replicate the semantics of Rx, just provide a comparable set of operators.

> the implementations of take-while and drop-while remove an "extra" element from the argument channel, right?

Yes. All of these operations inherently consume values from channels. Your c variable is no longer usable once you've given it out to a consumer that expects exclusive access.

Channels and Push Sequences are different in many ways. In particular, callback-based push sequences can have multiple subscribers. That's why there are no fan-out operations listed on that Msft reference page: All subscription can implicitly fan out.

I've only thought briefly about this, but there are potentially many different strategies for fan out (eg buffering/blocking/dropping/sliding).It's not yet obvious to me what the various higher level operators should look like (or be called). I'm open to suggestions. I also haven't studied the design space from the Golang perspective yet either. Maybe that community has some good ideas...


Brandon Bloom

unread,
Jun 30, 2013, 7:46:24 PM6/30/13
to clo...@googlegroups.com
Two bits of core.async feedback:

1) The (let [c chan] (go ...) c) pattern is *extremely-common*. Might be nice to have something like (go-as c ...) that expands to that pattern.

2) It's somewhat annoying to always have to consider boolean false all the time. Since nil signifies a closed channel, if, when, if-let, and when-let are extremely convenient. Unfortunately, they are subtly bugged! You need nil? checks everywhere, cluttering up relatively nice code.


--
--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clo...@googlegroups.com
Note that posts from new members are moderated - please be patient with your first post.
To unsubscribe from this group, send email to
clojure+u...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to a topic in the Google Groups "Clojure" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/clojure/L4nEVho555k/unsubscribe.
To unsubscribe from this group and all its topics, send an email to clojure+u...@googlegroups.com.

David Pollak

unread,
Jun 30, 2013, 7:54:19 PM6/30/13
to clo...@googlegroups.com
On Mon, Jul 1, 2013 at 7:46 AM, Brandon Bloom <brandon...@gmail.com> wrote:
Two bits of core.async feedback:

1) The (let [c chan] (go ...) c) pattern is *extremely-common*. Might be nice to have something like (go-as c ...) that expands to that pattern.

2) It's somewhat annoying to always have to consider boolean false all the time. Since nil signifies a closed channel, if, when, if-let, and when-let are extremely convenient. Unfortunately, they are subtly bugged! You need nil? checks everywhere, cluttering up relatively nice code.

It'd be really nice to have a "closed" token that one could construct a channel with so the token could be at the app level... although I guess this wouldn't deal with the if-let/when-let issue.
 
You received this message because you are subscribed to the Google Groups "Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email to clojure+u...@googlegroups.com.

Ben Wolfson

unread,
Jun 30, 2013, 8:04:08 PM6/30/13
to clo...@googlegroups.com
On Sun, Jun 30, 2013 at 4:46 PM, Brandon Bloom <brandon...@gmail.com> wrote:

2) It's somewhat annoying to always have to consider boolean false all the time. Since nil signifies a closed channel, if, when, if-let, and when-let are extremely convenient. Unfortunately, they are subtly bugged! You need nil? checks everywhere, cluttering up relatively nice code.

Since nil-as-closed also has the consequence that you can't send nil over channels, one way around this would be to uniformly give everything sent a wrapper indicating that there's something inside; on the receiving end, you could have a macro or function to test whether the received item was nil or wrapped and take one branch in the former case and transparently remove the wrapper and proceed with the inner value in the other branch in the latter. Er, and that relates to the beginning of the sentence because then you could also send nil over the channels, if you wanted to.
 

David Nolen

unread,
Jun 30, 2013, 8:12:05 PM6/30/13
to clojure
On Sun, Jun 30, 2013 at 7:46 PM, Brandon Bloom <brandon...@gmail.com> wrote:
Two bits of core.async feedback:

1) The (let [c chan] (go ...) c) pattern is *extremely-common*. Might be nice to have something like (go-as c ...) that expands to that pattern.

My understanding with some member of the core.async team is that most channel based APIs fns should *take* a channel and only construct one as a default.

Awesome stuff by the way doing Rx over core.async

David

Brandon Bloom

unread,
Jun 30, 2013, 8:13:44 PM6/30/13
to clo...@googlegroups.com
> My understanding with some member of the core.async team is that most channel based APIs fns should *take* a channel and only construct one as a default.

Could you elaborate on and motivate that?

David Nolen

unread,
Jun 30, 2013, 8:15:45 PM6/30/13
to clojure
Because of blocking on read/write on unbuffered channels - users might need more flexibility.


On Sun, Jun 30, 2013 at 8:13 PM, Brandon Bloom <brandon...@gmail.com> wrote:
> My understanding with some member of the core.async team is that most channel based APIs fns should *take* a channel and only construct one as a default.

Could you elaborate on and motivate that?

--
--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clo...@googlegroups.com
Note that posts from new members are moderated - please be patient with your first post.
To unsubscribe from this group, send email to
clojure+u...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to the Google Groups "Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email to clojure+u...@googlegroups.com.

Brandon Bloom

unread,
Jun 30, 2013, 11:53:26 PM6/30/13
to clo...@googlegroups.com
Then maybe we need (go-as [c arg] ...)

Brandon Bloom

unread,
Jul 2, 2013, 6:12:07 PM7/2/13
to clo...@googlegroups.com
When I first wrote this code, I intentionally avoided any custom syntactical sugar and worked as closely with the primitives as I could.

After a few days away, I've used fresh eyes to abstract and revamp. The new code is *dramatically* cleaner and I only needed to introduce a few simple and predictable macros.

With regards to my prior feedback:
 
> 1) The (let [c chan] (go ...) c) pattern is *extremely-common*. Might be nice to have something like (go-as c ...) that expands to that pattern.

I've implemented go-as without buffer parameterization. This was a huge cleanup.

> 2) It's somewhat annoying to always have to consider boolean false all the time.

I've introduced two macros: if-recv and when-recv. (if-recv [x p] ...) is equivalent to (let [x (<! p)] (if-not (nil? x) ...)) and when-recv does what you'd expect.

Currently, these macros only operate on a single port to read from. I need to give this a bit more thought before I attempt a multiplexing version of this macro.

> 3) Not mentioned in my prior feedback: Looping receives over a channel is pretty common too.

For that, I've got the (dorecv [x port] ...) macro, which is analogous to doseq, but reads from port via <!

Similarly, a multiplexing variant of dorecv needs more thought before I attempt that.

(doc strings available for these new macros too)

Each one of these primitives is about an order of magnitude more useful than any of the original Rx operators... So I guess that means I can consider this a successful learning experience! In fact, I think these 4 macros might even be useful enough for the core.async namespace after some more field testing. Please let me know if these are helpful to you. I'd also love to know if there are any little patterns or macros like these that you've "discovered".

Cheers,
Brandon

Timothy Baldridge

unread,
Jul 2, 2013, 6:57:13 PM7/2/13
to clo...@googlegroups.com
These look great! 

I would, however, avoid using go-as. In the JCSP example a style such as this is prefered most of the time:

(defn inc-all
  "increments every int received in the input channel"
 ([in]
  (let [out (chan)]
   (inc-all in out)
   out))
 ([in out]
  (go (while true
           (>! out (inc (<! in))))))


Now users can feel free to supply their own output (and input) channels. This avoids needless copy operations, and allows users to build communication graphs either forwards or backwards. 

Timothy



Cheers,
Brandon

--
--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clo...@googlegroups.com
Note that posts from new members are moderated - please be patient with your first post.
To unsubscribe from this group, send email to
clojure+u...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to the Google Groups "Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email to clojure+u...@googlegroups.com.

For more options, visit https://groups.google.com/groups/opt_out.
 
 



--
“One of the main causes of the fall of the Roman Empire was that–lacking zero–they had no way to indicate successful termination of their C programs.”
(Robert Firth)

Brandon Bloom

unread,
Jul 2, 2013, 9:38:22 PM7/2/13
to clo...@googlegroups.com
> These look great! 

Thanks!
 
> I would, however, avoid using go-as. [...snip...]

Yeah, David Nolen mentioned that in IRC. I was thinking something along the lines of (go-as [c blah] ...) which would basically do (let [c (or blah (chan))] ...)

Next time I get some extra cycles, I'll investigate what that would look like.

Thoughts on if-recv, when-recv, dorecv, etc? If you like those, I'd love to hear thoughts on if / how to extend them for multiplexing.

Brandon Bloom

unread,
Jul 2, 2013, 9:54:54 PM7/2/13
to clo...@googlegroups.com
While it's fresh in my mind, some quick thoughts:

(defn inc-all
  "increments every int received in the input channel"
 ([in]
  (let [out (chan)]
   (inc-all in out)
   out))
 ([in out]
  (go (while true
           (>! out (inc (<! in)))))

A) The first overload and second overload for this function return different things. The first returns the output channel, the second returns a channel that will never yield a value. Is it useful to ever return that? Seems like it should return out too.

B) Shouldn't that loop consider when the in channel gets closed? I realize that in many cases channels won't ever be closed, but for utilities it seems wise to expect that or document if you don't expect it.

C) For something like the Rx operators, you want to signal the end of a sequence. If somebody else gives you a channel to write to, is it couth to close it?
Reply all
Reply to author
Forward
0 new messages