Comparing core.async and Reactive Extensions

710 views
Skip to first unread message

Michal Till

unread,
Dec 16, 2013, 11:09:54 PM12/16/13
to clo...@googlegroups.com
Hello,

I seem to be a little bit confused when comparing core.async to the so called Reactive Extensions (Rx). They seem to tackle similar problem of async-icity, so I wonder what are the principal differences and in what cases is one preferred over the other. Can someone please explain?

Regards,
Michal

Leonardo Borges

unread,
Dec 17, 2013, 5:55:27 AM12/17/13
to clojure
Thought not specifically about Rx, this thread talks about core.async
and FRP (of which Rx is an implementation):
https://groups.google.com/forum/#!topic/clojure/jHhwufCjrR8

Timothy Baldridge

unread,
Dec 17, 2013, 8:33:10 AM12/17/13
to clo...@googlegroups.com
I won't go so far as to tell you which is better as that often comes down to a matter of taste. However, I will explain the technical differences. In this case I'll use my (somewhat limited) knowledge of C# Rx. Scala/Java's Rx may be different.

Rx is based on a direct call. We could write a simple version of Rx thusly:

(defprotocol IObservable
 (on-next [this val])
 (attach [this obs]))

(defn filter [f]
  (let [a (atom #{})]
    (reify IObservable
      (on-next [this val]
        (when (f val)
          (doseq [i @a]
            (on-next i val))))
      (attach [this obs]
        (swap! a conj obs)))))


As you can see, each parent in the tree directly calls the child via the on-next (or on-error and on-exit in a real Rx impl). Thus it's pretty simple to run something like this without a thread pool or dispatcher. But what happens if calling on-next throws an exception? In that case the error bubbles up the wrong way. That is to say, the data goes down the data graph while errors go the opposite way. C# Rx has some code to handle things like this and populate errors the right way.

In addition it can be possible for multiple threads to be executing on-next at the same time. It all depends on the semantics of whatever is calling the top level on-next. 

Core.Async on the other hand is based on two primitives: channels (queues) and processes. Errors are never propagated unless specified by the user. And thread pools/dispatchers are almost always required. With core.async we could implement filter thusly:

(defn filter [f in-c out-c]
  (go-loop []
    (when-let [v (<! in-c)]
      (when (f  v)
        (>! out-c v)))))

Here we accept the input and output channels as arguments and then create a process that connects the two. Since the go-loop is a single thread only one message is in-flight in the body of this go at a single time. If we want more parallelism, we simply call this filter function multiple times to create multiple gos. 

So that's the difference. Rx creates graphs of objects, while Core.Async (and CSP in general) creates graphs of processes connected by queues. CSP seems a bit more general to me, but many would probably disagree with me on that one. 

Timothy


--
--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clo...@googlegroups.com
Note that posts from new members are moderated - please be patient with your first post.
To unsubscribe from this group, send email to
clojure+u...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to the Google Groups "Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email to clojure+u...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.



--
“One of the main causes of the fall of the Roman Empire was that–lacking zero–they had no way to indicate successful termination of their C programs.”
(Robert Firth)

Brian Craft

unread,
Dec 17, 2013, 12:33:15 PM12/17/13
to clo...@googlegroups.com


On Tuesday, December 17, 2013 5:33:10 AM UTC-8, tbc++ wrote:
Core.Async on the other hand is based on two primitives: channels (queues) and processes. Errors are never propagated unless specified by the user. And thread pools/dispatchers are almost always required. 

Seems like it's more subtle than this, since it is supported in clojurescript, where threads are not available?

Ben Mabey

unread,
Dec 17, 2013, 10:09:58 PM12/17/13
to clo...@googlegroups.com
Having used both I would agree that CSP is more general and when you
need to do custom logic it tends to be easier, IME, than Rx. Another key
difference is that Rx is only about one way communication within a
single subscription. Once you need to have bi-directional communication
CSP is a better tool for the job.

-Ben

Matthew Podwysocki

unread,
Dec 19, 2013, 11:41:44 AM12/19/13
to clo...@googlegroups.com
You can easily do bi-directional communication using Rx, but it's involving two Subjects, which are both Observables and Observers, or any flavor thereof Subjects whether Replay, Async, Behavior, or Buffered or Controlled (for backpressure coming soon).

var subject1 = new Rx.Subject();
var subject2 = new Rx.Subject();

var sub1 = subject1.subscribe(subject2);
var sub2 = subject2.subscribe(subject1);

And yes, back to Timothy's point, it is not possible in Rx to have multiple people calling onNext at a single time as we have locks around that kind of behavior, so there is no chance for overlapping onNext calls, and have the strict grammar enforced of 0-N onNext calls with an optional onError or onCompleted but not both.  Once the onError or onCompleted calls have been fired, no onNext values can be fired.  You can get around that behavior in a number of ways with retry, catch, or even onErrorResumeNext so instead of terminating the entire sequence, you can go to another sequence or retry the current one.

Matt

Timothy Baldridge

unread,
Dec 19, 2013, 11:51:46 AM12/19/13
to clo...@googlegroups.com
You learn something new every day. As this always been the way that Rx worked (the locking part)? I haven't used Rx for several years, so I may be off in my assumptions. 

Timothy


--
--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clo...@googlegroups.com
Note that posts from new members are moderated - please be patient with your first post.
To unsubscribe from this group, send email to
clojure+u...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to the Google Groups "Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email to clojure+u...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Matthew Podwysocki

unread,
Dec 19, 2013, 12:12:55 PM12/19/13
to clo...@googlegroups.com
Timothy,

Yes, it has always been this way enforced with a strict grammar with locking.  The only place we don't care is in JavaScript, because we're single threaded.

Matt
Reply all
Reply to author
Forward
0 new messages