This is still all very experimental, but as I said above, everything
seems to just work. There are some impedance mismatches with lazy-
seqs, but the general solution to that is to just bite the bullet and
block the thread whenever someone iterates over them. I think this
has the potential to make Aleph much more approachable for new
developers, and I'm interested to hear what everyone thinks.
Zach
(async (inc (task 1)))
is fine, because 'inc' won't be called until the thread pool returns
'1'. However, the return value for 'inc' will be transformed into a
result-channel, since it's not guaranteed to execute immediately.
Zach
The "read messages out one at a time from a channel" example was maybe
too trivial. I don't intend for people to use async for a use case as
simple as that (also, receive-in-order already does exactly the same
thing).
As to integrating errors into channels, there are a few potential
issues. For instance, what are the semantics of having an error flow
through a channel? Does that close the channel, so that further work
can't happen? Also, what if we only want to wait a certain amount of
time for each message? Channels don't concern themselves with *when*
messages come through, and I'm inclined to keep it that way.
I feel like setting up flows of messages with map*, fork, siphon, et
al and the structured consumption of those flows are somewhat
orthogonal to each other. As you point out, it's possible to consume
two messages using just channel primitives, but once the interactions
get more complex (we only want to wait 500ms for each message, we want
to close the connection if the timeout elapses, we want to calculate
something on a thread pool between consuming the first and second
messages) I think it becomes an exercise in frustration to limit
ourselves to channels, however enriched. As an example of non-trivial
structured consumption of channels, check out
https://github.com/ztellman/lamina/blob/master/src/lamina/connections.clj.
I wouldn't even attempt any of that without pipelines.
You point out that it's hard to follow the flow of code within
pipelines, and that it might also be difficult within the (async ...)
block. I think this is a problem endemic to the entire practice of
event-driven programming. Channels are nice in that you can more or
less abstract away individual events, but when that no longer becomes
a possibility I don't think the answer is to just make channels more
capable. My (somewhat untested) theory is that this new functionality
is going to greatly simplify the sorts of problems that channels can't
solve by themselves. I hope you'll give it a try and let me know if
that isn't the case.
Zach
The major gap that seems to exist now is in composing asynchronous
flows of data, and being able to deal with errors. For example, my
current dilemma is in the query engine for Plasma. When a graph query
is executed a channel is returned, onto which the results will be
emitted as they become available. The query engine itself is built
out of a dataflow graph of query operators connected with channels
which output onto the result channel, and often you want to further
map* results or compose them in other ways. The problem is that there
is no way to handle an error in this scenario. Since the channel is
the only means of communication errors would have to be sent mixed in
with data, or else out of band in a secondary channel. A
result-channel like is used in pipelines only represents one value, so
that doesn't work. Also, I need to handle errors that happen both up
front, for example from a bad query, or after having already received
some results, like if a network connection fails. This is why I think
channels need to support error handling.
I've been looking into how both errors and timing are handled in other
asynchronous libraries, such as python's twisted, ruby's
event-machine, and Rx for .net. Without doubt the Rx library has the
most interesting selection of functions (what they call operators),
and the most coherent mechanisms for dealing with error handling and
timing. They all have the basic concept of an error callback, but Rx
has well thought out operators for managing errors as well. To get a
sense for how it works, I recommend checking out this page of samples:
http://rxwiki.wikidot.com/101samples
(more RX links below... including 2 that focus on error handling)
On Mar 20, 7:50 pm, Zach Tellman <ztell...@gmail.com> wrote:
> As to integrating errors into channels, there are a few potential
> issues. For instance, what are the semantics of having an error flow
> through a channel? Does that close the channel, so that further work
> can't happen?
I think Lamina should use semantics similar to Rx. Each channel would
also have an on-error callback, and errors propagate up the chain of
subscribers. (siphon, map, etc., need to connect both error and
receiver callbacks.) When an exception is thrown in a channel handler:
* channel is locked and the on-error handlers are executed
* if unhandled then the channel is closed and no more messages are emitted
* on-error handlers can:
- retry (msg is processed again) until max-retries
- drop msg and continue processing
- catch (provide a new channel that will act as the new message source)
The other piece of this that lamina basically already does is handling
drained and closed. In Rx there is just on-completed because the
underlying abstraction is slightly different than channels, but in
essence it is the same. The main difference is that lamina emits a
nil on close, which has bitten me a number of times now, and it means
that every handler needs to handle this odd case. I propose that
on-drained should be *the* mechanism for dealing with finished
channels, and nil should not be emitted on close. This will simplify
the semantics and the code, both in lamina and in the libs that use
it.
> Also, what if we only want to wait a certain amount of
> time for each message? Channels don't concern themselves with *when*
> messages come through, and I'm inclined to keep it that way.
> I feel like setting up flows of messages with map*, fork, siphon, et
> al and the structured consumption of those flows are somewhat
> orthogonal to each other. As you point out, it's possible to consume
> two messages using just channel primitives, but once the interactions
> get more complex (we only want to wait 500ms for each message, we want
> to close the connection if the timeout elapses, we want to calculate
> something on a thread pool between consuming the first and second
> messages) I think it becomes an exercise in frustration to limit
> ourselves to channels, however enriched.
I think if you checkout the Rx examples and think about how
lamina.connections could be implemented with these mechanisms, you
might be convinced otherwise.
The async support is very cool and I think it will make sense for some
situations, but when I've tried to use it in exactly the query engine
situation I described above it just doesn't fit. I need to represent
streams of values, and then compose those streams, while also having
the ability do deal with errors in a sensible way. With the tools in
lamina now it seems like we have to choose between either streams of
values and no error handling, or single values with error handling. I
thought about producing streams of result-channels, but this means
everything starts composing weird and it requires every function to be
aware of this strange message payload.
So as not to just be sitting here whining, I've started implementing a
set of helper functions inspired by Rx here:
https://github.com/rosejn/lamina/commits/helpers
These include:
* enqueue-periodic (periodically enqueue vals from a seq onto a channel)
* indexed* (indexed* (channel :foo :bar)) -> <<<[0 :foo] [1
:bar] ...>>>
* buffered-channel (buffers input channel and periodically emits all
available messages)
* delay-channel (delays input messages and emits them periodically)
If they seem promising I can add unit tests.
Beyond that, I'm mostly interested in getting some kind of error
handling into channels, or else finding an alternative solution. I
started working on it by adding error support to observers, but then I
thought it would be best to talk it over here before spending too much
time. I'm really enjoying Lamina I think with the addition of
composable error handling it could become a vital part of the clojure
ecosystem. Hopefully here I've made the case for error handling more
clearly than in my previous messages. That, or maybe I'm missing a
better way to do what I need to do...?
Cheers,
Jeff
P.S. RX info:
Here are links to the two basic interfaces that enable everything,
IObservable and IObserver:
http://msdn.microsoft.com/en-us/library/dd782981.aspx
http://msdn.microsoft.com/library/dd783449.aspx
These articles discuss error handling (using the javascript version of RX):
http://codebetter.com/matthewpodwysocki/2010/07/20/introduction-to-the-reactive-extensions-for-
javascript-error-handling-part-i/
http://codebetter.com/matthewpodwysocki/2010/08/02/introduction-to-the-reactive-extensions-for-
javascript-error-handling-part-ii/
Your suggestion for on-error is pretty close to what I've been
thinking, with a few differences. The primary one is that Lamina
never assumes there will be a handler for any message enqueued into a
channel, so we can't just close the channel if no error handler is
registered. The error will have to stick around until it is handled,
blocking all subsequent messages, or is siphoned into another channel.
The trailing nil is something I've gone back and forth on, but it is a
genuinely useful guarantee that every time you call (receive ...) on a
non-drained channel, you'll get a message. What's more bothersome, in
my opinion, is that the last message is not *always* a nil. In this
case, you'd simply have to put
(when-not (drained? ch)
...)
around your callbacks, which I don't think is a particularly onerous
requirement. And it's worth noting that if you really hate the
trailing nils, you can always use (receive-in-order ...), which
filters them out for you. I'm willing to be convinced otherwise on
all of the above, though, so please speak up if you strongly disagree.
As to the utility functions, I honestly don't see the benefit relative
to pipelines. You're using internal functions to implement something
that could be easily created using pipelines without any knowledge of
the library's implementation details.
Both
(receive-in-order ch
(fn [msg]
(handler msg)
(wait interval))) ;; (wait ..) is in alpha2, but it's just
(read-channel (timed-channel interval))
or even the more verbose
(run-pipeline ch
read-channel
#(when-not (drained? ch) (handler %))
(wait-stage interval)
(fn [_] (when-not (drained? ch) (restart)))
seem preferable to the recursive callback mechanism used in your
functions. Also, as implemented your function will only receive
messages on precise boundaries of the interval rather than just making
sure there's at least that interval between the consumption of
messages, and the recursive callback will continue to call itself even
once the channel is drained. Neither of these is true of the above
implementations.
I don't want to harp on these bugs (I've certainly written my fair
share), but I believe it's fundamentally more difficult to have these
sorts of mistakes in an asynchronous workflow described by pipelines.
I know you've said in the past that you don't find pipelines
particularly intuitive, but I'd like to delve into that rather than
jumping to a completely different way of describing the problem. Can
you expand on why you don't like pipelines?
Thanks for the detailed write-up, please let me know if you think I've
misunderstood any of your points.
Zach
I'm always glad when something I've made can inspire strong opinions
and discussion. If anyone else has feedback, please don't hesitate to
speak up.
Zach
1) error handling with channels
2) trailing nil (or the broader question of propagating signals across channels)
3) utility functions
-----------------------------------------------------
Error Handling:
On Tue, May 31, 2011 at 7:47 PM, Zach Tellman <ztel...@gmail.com> wrote:
> I'll preface this by saying that I agree that error handling within
> channels is a major omission, and one that I intend to fix.
Great! I'm happy to hear this.
> Your suggestion for on-error is pretty close to what I've been
> thinking, with a few differences. The primary one is that Lamina
> never assumes there will be a handler for any message enqueued into a
> channel, so we can't just close the channel if no error handler is
> registered. The error will have to stick around until it is handled,
> blocking all subsequent messages, or is siphoned into another channel.
Ok, so the possible scenarios are:
* exception in a receive handler
* exception in an on-closed handler
* exception in an on-drained handler
For channels that have been siphoned, mapped, etc...
* error coming from an up-stream channel (on-error handler called)
While error handlers are being called the channel(s) need to be locked
so more receive handlers don't get called, right? This is pausing,
but not closing. If one of the error handlers does a retry, drop, or
catch, then the error is handled and operation continues. If there
are no error handlers, or they take one of these actions, then what?
* keep allowing new messages to be enqueued, but don't run handlers
until the error is handled or the channel is explicitly closed
* close channel and stop executing handlers
or maybe something else?
--------------------------------------------------------------------------------------------------------------
Trailing nil:
> The trailing nil is something I've gone back and forth on, but it is a
> genuinely useful guarantee that every time you call (receive ...) on a
> non-drained channel, you'll get a message.
It seems that the on-drained callback and the trailing nil are
duplicate signals for the same event, except that compared with using
(on-drained ...) checking for nil is more obscured in the code and
prone to conflict with data since nil also a valid value to enqueue.
I guess the reason you would want to guarantee that a (receive ...)
handler will get called is because you've counted on it to get back
the flow of control, but is this a valid assumption when errors can
occur?
> What's more bothersome, in
> my opinion, is that the last message is not *always* a nil. In this
> case, you'd simply have to put
>
> (when-not (drained? ch)
> ...)
>
> around your callbacks, which I don't think is a particularly onerous
> requirement. And it's worth noting that if you really hate the
> trailing nils, you can always use (receive-in-order ...), which
> filters them out for you. I'm willing to be convinced otherwise on
> all of the above, though, so please speak up if you strongly disagree.
Or only use receive handlers to process messages, and use the
on-drained handler to handle termination/cleanup. Then you don't need
to check for nil or drained or any special cases in your handlers.
------------------------------------------------------------------------------------------------
Utility functions:
> As to the utility functions, I honestly don't see the benefit relative
> to pipelines. You're using internal functions to implement something
> that could be easily created using pipelines without any knowledge of
> the library's implementation details.
If you mean to use a pipeline and restart rather than a recursive call
with delay-invoke in the implementation of these functions, then sure,
that might save from having to check on-drained, which as you say, is
a bug in my implementations. But pipelines are no replacement for
having a set of channel oriented functions like this. The problem
with pipelines is that while they can use channels, they don't compose
with channel functions. For me this makes them basically useless,
since I need to return a channel representing future values. Having a
library of utility functions like this (as in Rx) seems to me like a
powerful and clear way to compose asynchronous event streams. Last,
if channels get error handling that propagates, then what are
pipelines for?
> Also, as implemented your function will only receive
> messages on precise boundaries of the interval rather than just making
> sure there's at least that interval between the consumption of
> messages
I implemented the delay-channel to mimick the delay operator in Rx,
and what you are talking about is another operator, throttle, which
would also be useful. Taking throttle as an example... Using channel
functions we could do something like this to keep track of the number
of requests per client and throttle each one:
(defn client-chan [client]
(-> (:ch client)
(indexed)
(throttled-channel (:msgs-per-sec client))))
The returned channel can then be passed around and further composed
using other channel functions. This seems like a very attractive
programming model to me. What's the alternative using pipelines?
> Can you expand on why you don't like pipelines?
It feels like an awkward split in the programming model to switch
between channels and pipelines. More importantly, with channels I can
see how I would use a library of functions like the Rx operators to
handle many different scenarios cleanly, and with pipelines everything
I see requires reading the code within each pipeline to understand
what is happening, rather than creating a composable library of
pipeline operations.
-Jeff
---
Error handling is going to be a bigger change, I want to think about
it a bit before getting too in-depth. My current train of thought,
though, is that I'm not sure I can come up with a reason not to always
close the channel in the case of an exception. If we want to resume
in the case of an error, we can simply create a new channel. Anything
else seems a little too much like using exceptions for flow control,
which I'm not a fan of.
This is all still pretty nebulous, though, so it almost certainly will
be subject to change.
---
I do use message receiving for flow control, but more specifically I
use (read-channel ...) for flow control, and there's not reason I
can't change its signature from
(read-channel channel timeout)
to
(read-channel channel timeout closed-message)
where 'closed-message' defaults to nil. I think this will obviate the
need for trailing nils, but I need to think about it a bit more.
---
I'm all about having rich composable operators for channels, believe
me. Pipelines are just a flexible way of *defining* these operators,
even if they're currently mostly used to create terminal operators
that only consume the stream and don't generate a new one. But look
at how reduce* or partition* are implemented, and keep in mind that
the recursive receive callback approach will overflow the stack for a
sufficiently large channel, but pipelines never will.
I think there are definitely ways to simplify the specification of
non-terminal operators, and I intend to work on that front, but I'm
currently more concerned with having a general mechanism for
structured consumption of channels than lots of built-in operators.
Yes, having all the seq utility functions in Clojure is great, but if
it were a choice between only having loop/recur and only having the
utility functions, I'd choose loop/recur every time.
I think I understand your vision for how you want to be able to
interact with channels, and I hope to be able to support that very
soon. I'm going to focus on getting the 0.2.0 release out, then look
at error handling, and once there's a more solid foundation we can
work on getting analogues for the Rx operators.
In the meantime, please consider getting more comfortable with
pipelines, so that you can provide specific feedback on them as well.
I'd be interested to hear your thoughts.
Zach
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aleph-lib+...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.