Manifold streams/deferreds raising errors downstream of a sink

138 views
Skip to first unread message

Andy Chambers

unread,
Aug 10, 2015, 12:13:30 AM8/10/15
to Clojure
Hey All,

I'm trying to build a library that exposes manifold streams for data producing applications. The intention
would be for these apps to s/put! to a sink stream returned by the producer function. However, I need to
take the value that is put in and pipe it into some other function before I know whether there is an error
and if there is an error, I'd like to propagate it back to the caller.

I understand that the value returned by put! is a deferred, and how to set the error state of a deferred but I
don't understand how I can arrange for the deferred to be manipulated downstream of where it has been
taken off the stream.

(defn producer []
  (let [in (s/stream)]

    (s/consume (fn [msg]
                 (d/future
                   (prn "oops, going to error now")
                   (throw (Exception. (str msg))))) in)
    
    in))
           
@(s/put! (producer) 42)

As written, you get a success response even though an exception was triggered by the function that consumes
the "in" stream. Is it possible to re-write this so that the exception bubbles up to where the put! is deref'd?

Cheers,
Andy

Atamert Ölçgen

unread,
Aug 10, 2015, 2:20:00 AM8/10/15
to clo...@googlegroups.com
Hi Andy,

I would provide a function that wraps put!. It would still return a deferred, but not the result of put!. It would create a deferred, representing the result of the computation and pass it down the stream together with the input. Consumers would set its value once the computation is done.

I am actually using this technique in a couple of projects and very happy with it.


--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clo...@googlegroups.com
Note that posts from new members are moderated - please be patient with your first post.
To unsubscribe from this group, send email to
clojure+u...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to the Google Groups "Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email to clojure+u...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Kind Regards,
Atamert Ölçgen

◻◼◻
◻◻◼
◼◼◼

www.muhuk.com

Zach Tellman

unread,
Aug 11, 2015, 2:42:57 PM8/11/15
to Clojure
Hi Andy,

To give you some context on this, `put!` will return true if the thing directly downstream accepts the value (in this case, the stream).  The fact that the callback in `consume` fails is between the stream and the callback, and is not propagated all the way back (though the exception will cancel the `consume`, which will cause the stream to close).  The approach described by Atamert is a reasonable one to get the desired result.

Zach
Reply all
Reply to author
Forward
0 new messages