Hey All,
I'm trying to build a library that exposes manifold streams for data producing applications. The intention
would be for these apps to s/put! to a sink stream returned by the producer function. However, I need to
take the value that is put in and pipe it into some other function before I know whether there is an error
and if there is an error, I'd like to propagate it back to the caller.
I understand that the value returned by put! is a deferred, and how to set the error state of a deferred but I
don't understand how I can arrange for the deferred to be manipulated downstream of where it has been
taken off the stream.
(defn producer []
(let [in (s/stream)]
(s/consume (fn [msg]
(d/future
(prn "oops, going to error now")
(throw (Exception. (str msg))))) in)
in))
@(s/put! (producer) 42)
As written, you get a success response even though an exception was triggered by the function that consumes
the "in" stream. Is it possible to re-write this so that the exception bubbles up to where the put! is deref'd?
Cheers,
Andy