Fwd: Another Pipe datatype

151 views
Skip to first unread message

Michael Snoyman

unread,
Jun 3, 2012, 1:52:38 PM6/3/12
to streamin...@googlegroups.com


Forwarded conversation
Subject: Another Pipe datatype
------------------------

From: Michael Snoyman <mic...@snoyman.com>
Date: Sat, Jun 2, 2012 at 11:11 PM
To: Chris Smith <cds...@gmail.com>, Gabriel Gonzalez <Gabri...@gmail.com>, Paolo Capriotti <p.cap...@gmail.com>


Hi guys,

Two ideas that have come up recently have been sitting in the back of
my mind for the past few days:

* Chris's idea of a fifth type parameter for upstream result.
* Gabriel's recommendation that conduit do away with the second field in PipeM.

Putting this together with conduit, I came up with the following:

https://github.com/snoyberg/conduit/blob/7c167419b14daa69f84c0228eda7afdc9558ae27/conduit/Data/Conduit/Internal.hs#L62

data Pipe i o u m r =
   HaveOutput (Pipe i o u m r) (m ()) o
 | NeedInput (i -> Pipe i o u m r) (u -> Pipe Void o () m r)
 | Done r
 | PipeM (m (Pipe i o u m r))
 | Leftover (Pipe i o u m r) i

I can go into more details about intuition here if you guys are
interested, but I have a feeling I'd just be boring you with details
you can guess on your own. (If you want more elaboration, just let me
know, I'll be happy to write up some more tomorrow.) I haven't checked
either the Monad or Category laws (yet). I'm fairly certain the Monad
laws hold up, and likely barring the issue of Leftover which needs a
bit more research, I think the Category laws hold too.

One obvious (minor) point of contention is whether to use Void or ()
for the first parameter of the second field in NeedInput. I have a
feeling that () is actually more correct, which if I'm not mistaken is
what both pipes and pipes-core are already using.

Anyway, if you guys have any thoughts on this, let me know.

Michael

----------
From: Gabriel Gonzalez <gabri...@gmail.com>
Date: Sat, Jun 2, 2012 at 11:39 PM
To: Michael Snoyman <mic...@snoyman.com>
Cc: Chris Smith <cds...@gmail.com>, Paolo Capriotti <p.cap...@gmail.com>


This type only differs from Frames in three ways:
  • Left-over input
  • Fifth type parameter
  • Non-obligatory monad
I can't comment on left-over input yet because I haven't had time to address parsing, and everybody here already knows the advantages and disadvantages of making the monad bind optional, so I won't go over that.

The fifth-type parameter is already something I came up with as part of the comonoid half of the frame implementation.  I deliberately left it out (i.e. by using Nothing) because I was specializing it to finalization and didn't want to complicate the Pipe type constructor (yet).  You can use this generalized parameter to communicate any kind of exception and it doesn't have to be tied to termination at all.  I typically use 'e' to denote this type parameter (for exception), and Paolo's implementation is an example where 'e' would be SomeException, although it's still not quite the way I would have implemented it.

The way my generalized frames would use this 'e' parameter is that they have an additional constructor (perhaps named "Throw") dedicated just to throwing 'e' and then the Await constructor looks just like the one you have, with an input of (Either e a).  If the pipe that received the 'e' value awaits a second time after receiving the 'e' or terminates, that pipe gets temporarily suspended and the pipe rethrows the "e" downstream first, giving downstream a chance to handle it before it continues.  The exception is the most downstream pipe, which will not automatically rethrow the exception if it awaits again or terminates.  On the contrary, if none of the downstream pipes terminated, this would then cause the chain of suspended awaits to collapse back to the point where the exception was thrown, allowing it to resume where it left off as if nothing happened (i.e. the exception was completely handled).  However, if one of them terminates, then this terminates the entire pipe (and, of course, you can have termination itself throw a distinct exception of its own, too giving pipes downstream a chance to handle that.

So the simple semantic explanation of the behavior is that if a pipe throws an exception, every pipe downstream of it gets a chance to handle it, and then if they all await again control returns back to the pipe that threw the exception.

There is actually an even more general solution with a 6th type parameter that replaces the (m ()) field of HaveOutput with a generalized monoid, in case you are curious.

So I think the 5th type parameter is on the right track and it mirrors what I encountered when working with frames, however, implementing composition correctly with this is not trivial, especially since I still think you have the specific case of just termination exceptions still implemented incorrectly.

----------
From: Michael Snoyman <mic...@snoyman.com>
Date: Sun, Jun 3, 2012 at 7:30 AM
To: Gabriel Gonzalez <gabri...@gmail.com>
Cc: Chris Smith <cds...@gmail.com>, Paolo Capriotti <p.cap...@gmail.com>


I think we're looking at the purpose of this fifth parameter
completely differently. I'm basing it off of Chris's idea discussed
here:

http://www.reddit.com/r/haskell/comments/uav9d/pipes_20_vs_pipescore/c4u4uz5

You're describing a type of parameter that would affect the entire
remainder of the pipeline. What I've implemented (and what I think
Chris meant) would just affect the next pipe downstream. This
intuition I would use to describe this is that whenever a pipe
produces a stream of values, it produces a stream of output values,
followed by a single result type to "seal" the stream. Previously in
conduit, only the most downstream pipe had the option of returning a
meaningful result value; all other pipes implicitly returned unit. The
modification to conduit now allows upstream pipes to have arbitrary
return types.

The reason I was able to make the modifications to finalization was by
reassessing how closing a pipe would work, based on my experience with
implementing this upstream result type. Essentially, a pipe can be in
a few states:

1. Not yet run
2. Done
3. Awaiting input from upstream
4. Yielding output downstream

The change in approach is realizing that, if yielding downstream fails
(because downstream already shut down), there's no need to send a
final result value downstream. Instead, all we need to do is finalize.
That means that yield can auto-terminate, and finalization only needs
to occur during yield. Also, finalization no longer has to return a
result value.

It might be easier to look at the simplified `pipe` function which
doesn't take resume into account:

pipe :: Monad m => Pipe a b r0 m r1 -> Pipe b c r1 m r2 -> Pipe a c r0 m r2
pipe =
   pipe' (return ())
 where
   pipe' :: Monad m => m () -> Pipe a b r0 m r1 -> Pipe b c r1 m r2
-> Pipe a c r0 m r2
   pipe' final left right =
       case right of
           Done r2 -> PipeM (final >> return (Done r2))
           HaveOutput p c o -> HaveOutput (pipe' final left p) c o
           PipeM mp -> PipeM (liftM (pipe' final left) mp)
           Leftover p i -> pipe' final (HaveOutput left final i) p
           NeedInput rp rc ->
               case left of
                   Done r1 -> noInput () (rc r1)
                   HaveOutput left' final' o -> pipe' final' left' (rp o)
                   PipeM mp -> PipeM (liftM (\left' -> pipe' final
left' right) mp)
                   Leftover left' i -> Leftover (pipe' final left' right) i
                   NeedInput left' lc -> NeedInput
                       (\a -> pipe' final (lp a) right)
                       (\r0 -> pipe' final (lc r0) right)

Can you clarify what you mean by termination not being handled correctly?

Michael

----------
From: Chris Smith <cds...@gmail.com>
Date: Sun, Jun 3, 2012 at 5:14 PM
To: Michael Snoyman <mic...@snoyman.com>
Cc: Gabriel Gonzalez <gabri...@gmail.com>, Paolo Capriotti <p.cap...@gmail.com>


Michael Snoyman <mic...@snoyman.com> wrote:
> I think we're looking at the purpose of this fifth parameter
> completely differently.

I think so as well.  I'm sure there are many ways that a fifth type
parameter could be added to Pipe, but Michael's code looks consistent
with what I had in mind, though I haven't looked in detail at the
finalization part.

The only comment I'd give (and this is admittedly tough in conduit due
to backward compatibility) is that it sure would be nice to separate
leftovers from the base pipe, just because leftovers are a different
concern, and don't fit cleanly into the abstraction.  Including them
makes the abstraction ugly everywhere, as opposed to only in
situations where it's needed.  This comes out in the "downstream
leftovers will be discarded" rule -- which is of course necessary, but
would lead me to want to build a different type for pipes with
leftovers.  That would essentially amount to:

   newtype LeftoverPipe a b u m r = LeftoverPipe { fromLeftoverPipe
:: Pipe a b u m (r, Maybe a) }

with an appropriate monad instance, and a specialized
pseudo-composition where the LeftoverPipe can only be used on the
upstream side.  If you want to explicitly discard leftovers, then you
could unwrap it back to a regular pipe, and (liftM fst) on the inner
pipe.

--
Chris

----------
From: Paolo Capriotti <p.cap...@gmail.com>
Date: Sun, Jun 3, 2012 at 5:52 PM
To: Chris Smith <cds...@gmail.com>
Cc: Michael Snoyman <mic...@snoyman.com>, Gabriel Gonzalez <gabri...@gmail.com>


On Sun, Jun 3, 2012 at 3:14 PM, Chris Smith <cds...@gmail.com> wrote:
> Michael Snoyman <mic...@snoyman.com> wrote:
>> I think we're looking at the purpose of this fifth parameter
>> completely differently.
>
> I think so as well.  I'm sure there are many ways that a fifth type
> parameter could be added to Pipe, but Michael's code looks consistent
> with what I had in mind, though I haven't looked in detail at the
> finalization part.

I actually think Gabriel is right (although I'm still unsure what this
"comonoid" business is all about).  The `u` parameter can be seen as the
upstream return value (I guess this is Michael's intuition as well, given the
name he's chosen).

Of course, in the end, `u` and `r` will be unified, but for
individual portions of a pipe (composed using monadic bind), they are not
necessarily going to be the same type.

So `u` is playing the role of my `BrokenPipe` exception, but it actually
carries the upstream return value with it. More generally, you could use
`Either u SomeException` to support catching general exceptions instead of just
`BrokenPipe`.

What I don't like in this approach is that it makes it impossible to define a
simple `await` primitive returning `a`. The reason you can do that in
pipes-core is that, in addition to the `SomeException -> Pipe a b m r` field in
`Await`, you also have a `Throw` primitive, which lets you express the idea of
"ignoring" the exception (by essentially catching it then rethrowing it).

Extending this idea to the 5-parameter solution, it would probably make sense
to add another constructor (analogous to my `Throw`):

   data Pipe a b u m r =
       ...
       Defer u -- no continuation

   defer :: u -> Pipe a b u m r
   defer = Defer

which works just like `Done`, but with the upstream return value. Now you can
implement `await`:

   await :: Pipe a b u m r
   await = Await return defer

I'm not sure if this works, but if it does, it looks like a very nice
improvement.

Without `await`, I don't think the fifth parameter is a good tradeoff: you
would end up with a cruftier version of `await`, in exchange for a (sometimes)
cleaner `runPipe`. And `await` usually appears quite more frequently then
`runPipe`, so that doesn't seem very advantageous to me.

> This comes out in the "downstream
> leftovers will be discarded" rule -- which is of course necessary, but
> would lead me to want to build a different type for pipes with
> leftovers.  That would essentially amount to:
>
>    newtype LeftoverPipe a b u m r = LeftoverPipe { fromLeftoverPipe
> :: Pipe a b u m (r, Maybe a) }
>
> with an appropriate monad instance, and a specialized
> pseudo-composition where the LeftoverPipe can only be used on the
> upstream side.  If you want to explicitly discard leftovers, then you
> could unwrap it back to a regular pipe, and (liftM fst) on the inner
> pipe.

I agree 100% on the fact that "pipe-with-leftovers" should be different from
"pipe", because neither is strictly more general than the other (one has
leftovers, but no sensible composition).

This `LeftoverPipe` looks a lot like my old `ChunkPipe` in the previous release
of pipes-extra. Maybe it can actually be made to work, but I originally messed
that up and it didn't satisfy the `Monad` laws.

My current solution is `PutbackPipe` (also in pipes-extra), which has the
slightly more powerful `unawait` primitive.

By the way, would it make sense to move this conversation (and possible future
ones about the design of pipes/conduits) to some public medium? How about
setting up a mailing list?

BR,
Paolo

----------
From: Michael Snoyman <mic...@snoyman.com>
Date: Sun, Jun 3, 2012 at 6:32 PM
To: Paolo Capriotti <p.cap...@gmail.com>
Cc: Gabriel Gonzalez <gabri...@gmail.com>, Chris Smith <cds...@gmail.com>


Just responding to the point of public mailing list: I think that's a great idea. Anyone opposed to a Google Group named "streaming-haskell"? And is it OK with everyone if I forward the contents of this thread to the list?


----------
From: Chris Smith <cds...@gmail.com>
Date: Sun, Jun 3, 2012 at 6:46 PM
To: Paolo Capriotti <p.cap...@gmail.com>
Cc: Michael Snoyman <mic...@snoyman.com>, Gabriel Gonzalez <gabri...@gmail.com>


Paolo Capriotti <p.cap...@gmail.com> wrote:
> I actually think Gabriel is right (although I'm still unsure what this
> "comonoid" business is all about).  The `u` parameter can be seen as the
> upstream return value (I guess this is Michael's intuition as well, given the
> name he's chosen).

Yes, that's exactly what u is...

> Of course, in the end, `u` and `r` will be unified, but for
> individual portions of a pipe (composed using monadic bind), they are not
> necessarily going to be the same type.

Right.  I'm actually backing off on the "eventually u and r will be
unified" part, too.  I initially thought of the u parameter as a
kludge, a trick to fix up return types for pipes that worked like
consume, and that it should eventually be stuffed back under the
covers.  But it's become clear that the u type parameter is meaningful
in its own right.  In my sample implementation, the type of runPipe is

   runPipe :: Monad m => Pipe () Void u m r -> m r

So there is no requirement that the u type parameter unify with the
result anywhere.

> So `u` is playing the role of my `BrokenPipe` exception, but it actually
> carries the upstream return value with it. More generally, you could use
> `Either u SomeException` to support catching general exceptions instead of just
> `BrokenPipe`.

Yes, this would be a welcome change.  I completely punted on questions
of exceptions and finalization, but it's true that the upstream pipe
could terminate with an exception.  In that case, you'd probably want
await to rethrow the exception, so changing the type of the Await
constructor seems reasonable.

> What I don't like in this approach is that it makes it impossible to define a
> simple `await` primitive returning `a`. The reason you can do that in
> pipes-core is that, in addition to the `SomeException -> Pipe a b m r` field in
> `Await`, you also have a `Throw` primitive, which lets you express the idea of
> "ignoring" the exception (by essentially catching it then rethrowing it).
>
> Extending this idea to the 5-parameter solution, it would probably make sense
> to add another constructor (analogous to my `Throw`):

Yes, I agree with this as well.  Again, I completely punted on
exception handling.  If there is a way to build a pipes-style await
primitive on top of tryAwait (being the version we have now) and
exception handling, that is ideal.  What I did not understand about
Gabriel's post with exception handling was the strange wiring where
downstream pipes get to handle exceptions first, and sounded like some
magical behavior was being attached to using await a second time.  My
intuition says that a pipe should handle its own exceptions, and if it
terminates with an exception, they you should pass that downstream
(and a second await after catching the exception should rethrow the
same exception again).  I'm willing to be convinced otherwise, though.

> Without `await`, I don't think the fifth parameter is a good tradeoff: you
> would end up with a cruftier version of `await`, in exchange for a (sometimes)
> cleaner `runPipe`. And `await` usually appears quite more frequently then
> `runPipe`, so that doesn't seem very advantageous to me.

On the other hand, you could say that it's preferable because it
allows things to be done that just can't be done (without undefined
and such) in the current pipes-core.  But hopefully we can recover the
more convenient await primitive through something like exception
handling, and we won't have to worry about it.

> This `LeftoverPipe` looks a lot like my old `ChunkPipe` in the previous release
> of pipes-extra. Maybe it can actually be made to work, but I originally messed
> that up and it didn't satisfy the `Monad` laws.

Okay, I'd have to look at the details.  I don't have proofs of the
monad laws for what I've written, but it looks right intuitively.  In
any case, if there's a better option there, fine.  My main point was
that including it in Pipe is probably a mistake in the long term,
since it fails to enforce issues that can lead to just dropping data
(I'd say something about the category laws, but really it's the
dangerous behavior that matters, and the category laws are the
symptom).

> By the way, would it make sense to move this conversation (and possible future
> ones about the design of pipes/conduits) to some public medium?

Fine with me.

--
Chris

----------
From: Paolo Capriotti <p.cap...@gmail.com>
Date: Sun, Jun 3, 2012 at 7:00 PM
To: Michael Snoyman <mic...@snoyman.com>
Cc: Gabriel Gonzalez <gabri...@gmail.com>, Chris Smith <cds...@gmail.com>


On Sun, Jun 3, 2012 at 4:32 PM, Michael Snoyman <mic...@snoyman.com> wrote:
> Just responding to the point of public mailing list: I think that's a great
> idea. Anyone opposed to a Google Group named "streaming-haskell"? And is it
> OK with everyone if I forward the contents of this thread to the list?

Yep! Please go ahead!

BR,
Paolo


Michael Snoyman

unread,
Jun 3, 2012, 2:06:50 PM6/3/12
to Chris Smith, Gabriel Gonzalez, Paolo Capriotti, streamin...@googlegroups.com
On Sun, Jun 3, 2012 at 5:14 PM, Chris Smith <cds...@gmail.com> wrote:
> Michael Snoyman <mic...@snoyman.com> wrote:
>> I think we're looking at the purpose of this fifth parameter
>> completely differently.
>
> I think so as well.  I'm sure there are many ways that a fifth type
> parameter could be added to Pipe, but Michael's code looks consistent
> with what I had in mind, though I haven't looked in detail at the
> finalization part.
>
> The only comment I'd give (and this is admittedly tough in conduit due
> to backward compatibility) is that it sure would be nice to separate
> leftovers from the base pipe, just because leftovers are a different
> concern, and don't fit cleanly into the abstraction.  Including them
> makes the abstraction ugly everywhere, as opposed to only in
> situations where it's needed.  This comes out in the "downstream
> leftovers will be discarded" rule -- which is of course necessary, but
> would lead me to want to build a different type for pipes with
> leftovers.  That would essentially amount to:
>
>    newtype LeftoverPipe a b u m r = LeftoverPipe { fromLeftoverPipe
> :: Pipe a b u m (r, Maybe a) }
>
> with an appropriate monad instance, and a specialized
> pseudo-composition where the LeftoverPipe can only be used on the
> upstream side.  If you want to explicitly discard leftovers, then you
> could unwrap it back to a regular pipe, and (liftM fst) on the inner
> pipe.
>
> --
> Chris

That's essentially what conduit 0.4 has: the `Done` constructor is
`Done (Maybe a) r`. The problem is that you end up with data loss in
the case of:

Done (Just x) () >> Done (Just y) r

This is why both `conduit` and `enumerator` forbid this kind of
behavior, but do so at the documentation level, not the type level.
Including the `Leftover` constructor avoids this problem entirely.
(And yes, I've checked, all of the monad laws are correct.)

I actually think most of the other features being discussed in this
thread (automatic termination for await and exceptions) should really
be part of a layer above the core type. Exceptions seems pretty easy:
just embed them in the output type via Either for all Pipes in a
pipeline. Perhaps I'm missing a use case, but the fact is that with
all of the conduit and enumerator code I've written, I don't think
I've once wished I could send exceptions like that. Similarly,
automatic termination of await can be handled via an ErrorT (/EitherT)
monad transformer.

I don't think the same can be said at all of leftovers. I've yet to
see a proposal for layering leftovers on top of the core type that
works correctly and naturally. Additionally, I don't really see
Leftover as breaking an abstraction. It might cause problems for a
Category instance (I'm still not certain that's true, I haven't looked
into it thoroughly), but I strongly believe moving that complexity out
of the library will just move it into user code.

I still think there's something better we can do for leftovers, and I
want to figure out what that is. But I don't think we have a better
approach currently. Since the code as it stands works correctly and
the shortcomings are relatively well understood, I think it's an
acceptable tradeoff.

Michael

Gabriel Gonzalez

unread,
Jun 3, 2012, 3:32:19 PM6/3/12
to streamin...@googlegroups.com, Chris Smith, Gabriel Gonzalez, Paolo Capriotti
For the specific solution of finalization, Michael's intuition is right and you can just layer that on top of the core pipe type using Either for inputs.  The magic is in the composition, not the pipe type.

I think Chris's solution of layering leftover inputs on top of the core pipe type seems like a step in the right direction, but I really haven't spent enough time thinking about this, so I defer to Michael on this.

You don't want to unify u with the return value type.  That's a really bad idea, because it then it constrains composition to only work when you reach the "end" of the monad.  It's better to keep them distinct and leave it up to the user if they want to transmit the return value and constrain their pipes accordingly.  Also, it's not necessary to get the category to work, as I proved in my solution, which is isomorphic to u = `()`.

You're describing a type of parameter that would affect the entire
remainder of the pipeline. What I've implemented (and what I think
Chris meant) would just affect the next pipe downstream.

Well, I know you're not worrying about the category instance, but I just want to emphasize that IF you want a category instance there is no distinction between a single pipe and a bunch of composed pipes.  The easiest way to show this is with the identity law, which foils all attempts to define "the next pipe downstream":

p1 >-> p2 = p1 >-> id >-> p2

Similarly, concepts such as "the most downstream pipe" or "the number of pipes downstream" are completely meaningless when you build a proper category (and that's a good thing(!), because it guarantees there are no corner cases).  Paolo might be able to elaborate because I know he has a good intuition of this as well.

What I did not understand about
Gabriel's post with exception handling was the strange wiring where
downstream pipes get to handle exceptions first, and sounded like some
magical behavior was being attached to using await a second time.

No, it's just that if you want to handle an exception generated within the same pipe you just handle it (this requires no pipe features at all).  In my library, the only exception handled right now is termination, so if you want to handle termination, just don't terminate yet and run the handling code first.  This is what finallyP does.

The implementation just guarantees that other pipes get to handle the exception, too, once you are done with it (where you signal this by terminating or awaiting again).  Also, the ordering of all exception handling is from the pipe that threw the exception to downstream, not the reverse order.

 Similarly, 
automatic termination of await can be handled via an ErrorT (/EitherT) 
monad transformer. 

No, this doesn't permit a category instance.  I can say quite confidently that the correct automatic behavior for await should be to simply await again if it receives a Left, not to terminate.  This is what `awaitF` does:

awaitF = await >>= maybe awaitF return

If you use "Either u a", it just becomes:

awaitF = await >>= either (\_ -> awaitF) return

In fact, an easy way to get exception handling to form a category is just to define the above `awaitF` as the correct `await` and then fix everything else to make it work.

Michael Snoyman

unread,
Jun 3, 2012, 8:09:39 PM6/3/12
to streamin...@googlegroups.com, Chris Smith, Gabriel Gonzalez, Paolo Capriotti
On Sun, Jun 3, 2012 at 10:32 PM, Gabriel Gonzalez <Gabri...@gmail.com> wrote:

[snip]

>> You're describing a type of parameter that would affect the entire
>> remainder of the pipeline. What I've implemented (and what I think
>> Chris meant) would just affect the next pipe downstream.
>
>
> Well, I know you're not worrying about the category instance, but I just
> want to emphasize that IF you want a category instance there is no
> distinction between a single pipe and a bunch of composed pipes.  The
> easiest way to show this is with the identity law, which foils all attempts
> to define "the next pipe downstream":
>
> p1 >-> p2 = p1 >-> id >-> p2
>
> Similarly, concepts such as "the most downstream pipe" or "the number of
> pipes downstream" are completely meaningless when you build a proper
> category (and that's a good thing(!), because it guarantees there are no
> corner cases).  Paolo might be able to elaborate because I know he has a
> good intuition of this as well.

Forgetting pipes entirely, consider the following do-block:

x <- foo
y <- bar x
baz y

I don't think anyone would argue against the fact that bar is the
middle action, and that the result from foo only affects bar. But
technically speaking, `bar >=> baz` is just a single action. In fact,
I can prove this because `bar >=> return >=> baz` is equivalent to
`bar >=> baz`. Nonetheless, it's incredibly useful to think about the
individual actions at play, even if they ultimately collapse into a
single action.

That's all I mean when I refer to the next pipe downstream. Even in
`conduit` the `pipe` function causes two adjacent `Pipe`s to be
unified into a single `Pipe`, but we can still think about the
individual `Pipe`s for discussion's sake.

[snip]

>>  Similarly,
>> automatic termination of await can be handled via an ErrorT (/EitherT)
>> monad transformer.
>
>
> No, this doesn't permit a category instance.

Actually, since this is just layered on top of the existing Pipe type,
it definitely *does* provide a category instance (providing the
underlying Pipe does of course). I probably wasn't clear in my
intention, here's an example:

import Data.Conduit
import qualified Data.Conduit.List as CL

idP :: Monad m => Pipe a a r m r
idP = forever' $ await' >>= yield'

newtype EitherT e m a = EitherT { runEitherT :: m (Either e a) }

type EPipe i o u m r = EitherT r (Pipe i o u m)

forever' :: Monad m => EPipe i o u m r () -> Pipe i o u m r
forever' e@(EitherT me) = me >>= either return (\() -> forever' e)

await' :: EPipe i o r m r i
await' = EitherT awaitE

yield' :: Monad m => o -> EPipe i o u m r ()
yield' = EitherT . fmap Right . yield

instance Monad m => Monad (EitherT e m) where
return = EitherT . return . Right
EitherT mx >>= f = EitherT $ mx >>= either (return . Left)
(runEitherT . f)

main :: IO [Int]
main = CL.sourceList [1..10] $$ idP =$ CL.consume

> I can say quite confidently
> that the correct automatic behavior for await should be to simply await
> again if it receives a Left, not to terminate.  This is what `awaitF` does:
>
> awaitF = await >>= maybe awaitF return
>
> If you use "Either u a", it just becomes:
>
> awaitF = await >>= either (\_ -> awaitF) return
>
> In fact, an easy way to get exception handling to form a category is just to
> define the above `awaitF` as the correct `await` and then fix everything
> else to make it work.

I'm missing an intuition as to why this is the correct behavior. Isn't
this the source of the fact that, in pipes, `await`ing twice can cause
the `Pipe` to terminate?

Michael

Gabriel Gonzalez

unread,
Jun 3, 2012, 10:52:47 PM6/3/12
to Michael Snoyman, streamin...@googlegroups.com, Chris Smith, Paolo Capriotti
Alright, I understand what you mean now.  Your (forever' $ await' >>= yield) is like my (forever $ awaitF >>= yield).  As a side note, your forever' can be simplified by just using the ordinary forever in the EitherT monad, since the first left value will automatically exit, then you just unify the Left and the (polymorphic, non-terminating) Right values.  The problem is that await' only works in the EPipe monad.

Also, I've been thinking about what you guys have been saying about my weird rethrow semantics.  Perhaps I could change the rethrow behavior of my composition so that you have to specifically invoke a "Throw" constructor to throw the value again (and also to throw the initial value).  Then the new awaitF would instead be:

awaitF = await >>= either (\e -> throw e >> awaitF) return

idP = forever $ awaitF >>= yield

And then you won't get the weird behavior of it rethrowing when you await again or terminate.  Instead the user decides when to rethrow  (if at all).  However, I haven't yet checked if you can make a category out of that or not.

Paolo Capriotti

unread,
Jun 6, 2012, 5:47:52 PM6/6/12
to Chris Smith, streamin...@googlegroups.com
On Sun, Jun 3, 2012 at 4:46 PM, Chris Smith <cds...@gmail.com> wrote:
> On the other hand, you could say that it's preferable because it
> allows things to be done that just can't be done (without undefined
> and such) in the current pipes-core.  But hopefully we can recover the
> more convenient await primitive through something like exception
> handling, and we won't have to worry about it.

I tried implementing the idea in pipes-core, but, unfortunately, it doesn't
seem to work so well. You can look at my [termination] branch to check my
progress so far.

[termination]: https://github.com/pcapriotti/pipes-core/tree/termination

It seems that a convenient `await` of type:

await :: Pipe a b u m a

is incompatibile with generalized composition of type:

(>+>) :: Pipe a b u m r -> Pipe b c r m s -> Pipe a c u m s

In fact, assuming the existence of both, what would something like:

return x >+> await

evaluate to? Assuming `(return x :: Pipe a b u m r)` and `await :: Pipe b c r m
s`, then the composite pipe has a type that doesn't contain `r`, but it
definitely should have something to do with `x`, which is of type `r`.

There are two ways to constrain the type of `>+>` so that this can work, either
we set `r == s`, and then:

return x >+> await == return x

or `u == r`, so that:

return x >+> await == defer x

Both solutions make the combinators in `Control.Pipe.Combinators` less
polymorphic then necessary to be usable.

I really liked the idea of introducing the `u` parameter, but without a simple
`await` primitive, I think it creates more problems than it solves.

BR,
Paolo

Chris Smith

unread,
Jun 6, 2012, 9:00:14 PM6/6/12
to Paolo Capriotti, streamin...@googlegroups.com
Paolo Capriotti <p.cap...@gmail.com> wrote:
> In fact, assuming the existence of both, what would something like:
>
>    return x >+> await
>
> evaluate to? Assuming `(return x :: Pipe a b u m r)` and `await :: Pipe b c r m
> s`, then the composite pipe has a type that doesn't contain `r`, but it
> definitely should have something to do with `x`, which is of type `r`.

Right, I see that. I think the core of the issue is that if await
throws an exception, then something needs to catch it. If that
something is the composition operator, then it needs to be type
restricted. You can also require that the something is explicitly
given, like so:

terminate :: Pipe a b r m r -> Pipe a b r m r

Then it turns out you get something like:

terminate :: Pipe a b r m r -> Pipe a b r m r
p >+> terminate q

Here, terminate just catches the exception thrown by await, and turns
it into an ordinary return value. It's the terminate that unifies r =
s. (Your other option corresponds to an unterminate, which changes
normal termination into exceptions, and a similarly constrained
runPipe that plucks the value from the exception. I find that much
less appealing.)

You're arguing for building terminate automatically into the
composition operator... which can be done, but it leaves you with two
different compositions, neither of which is strictly more powerful
than the other, while an explicit terminate lets you choose. If you
fail to choose, then you end up with upstream termination throwing an
exception rather than terminating normally. By providing the
combinator, you can get that termination behavior if you need it, and
it's not even particularly painful. It's just not automatic.

So this leaves us with the question of why we wanted automatic
termination in the first place:

1. Originally, because of the category instance. (Or, for those who
aren't interested in the category instance, the ability to write
simple pipes and compose them uniformly both upstream and downstream.)
But >+> on 5-parameter pipes forms a category too (several of them),
and allows for those same techniques just as easily. Without the
fifth parameter, the automatic termination was necessary to get a
valid category instance. With it, automatic termination is just one
of several possible implementation options that all yield perfectly
good categories.

2. Because you actually want to use it to solve certain problems? But
which problems? I would like to see a convincing and realistic
example of a problem that is really more difficult to solve using the
Either version of tryAwait.

> I really liked the idea of introducing the `u` parameter, but without a simple
> `await` primitive, I think it creates more problems than it solves.

So I'd just like to see an example of it creating a realistic problem
before deciding that.

--
Chris Smith

Chris Smith

unread,
Jun 8, 2012, 11:55:44 AM6/8/12
to Paolo Capriotti, streamin...@googlegroups.com
Chris Smith <cds...@gmail.com> wrote:
> You can also require that the something is explicitly
> given, like so:
>
>    terminate :: Pipe a b r m r -> Pipe a b r m r
>
> Then it turns out you get something like:
>
>    terminate :: Pipe a b r m r -> Pipe a b r m r
>    p >+> terminate q

Sadly, I have to retract this. I set out to implement it, and ran
into the problem that there isn't a good category instance, and as a
result composition becomes brittle and confusing. I'd hoped to have
(p >+> q) just throw an exception if q wasn't wrapped in terminate,
but this leads to a situation where (terminate (q >+> id)) is not the
same as (terminate q). I do not believe there is any good solution to
providing an auto-terminating `await` primitive that doesn't build the
logic into composition the way Paolo did, and that in turn restricts
the type of termination so that we can no longer compose pipes that
transform the results.

So we're back to wondering how important that is. Anyone have a
compelling example of an important use of downstream auto-termination
that isn't equally well served by

forP :: (a -> Pipe a b u m r) -> Pipe a b u m u

???

--
Chris

Paolo Capriotti

unread,
Jun 8, 2012, 12:30:42 PM6/8/12
to Chris Smith, streamin...@googlegroups.com
On Fri, Jun 8, 2012 at 4:55 PM, Chris Smith <cds...@gmail.com> wrote:
> Chris Smith <cds...@gmail.com> wrote:
>> You can also require that the something is explicitly
>> given, like so:
>>
>>    terminate :: Pipe a b r m r -> Pipe a b r m r
>>
>> Then it turns out you get something like:
>>
>>    terminate :: Pipe a b r m r -> Pipe a b r m r
>>    p >+> terminate q
>
> Sadly, I have to retract this.  I set out to implement it, and ran
> into the problem that there isn't a good category instance, and as a
> result composition becomes brittle and confusing.  I'd hoped to have
> (p >+> q) just throw an exception if q wasn't wrapped in terminate,
> but this leads to a situation where (terminate (q >+> id)) is not the
> same as (terminate q).  I do not believe there is any good solution to
> providing an auto-terminating `await` primitive that doesn't build the
> logic into composition the way Paolo did, and that in turn restricts
> the type of termination so that we can no longer compose pipes that
> transform the results.

Well, I experimented with a different fifth parameter:

https://gist.github.com/2896616

but I don't really like this solution either.

> So we're back to wondering how important that is.  Anyone have a
> compelling example of an important use of downstream auto-termination
> that isn't equally well served by
>
>    forP :: (a -> Pipe a b u m r) -> Pipe a b u m u

Well, first of all `forP` is already quite akward to use without
auto-termination. For example, a zlib (de)compression pipe would look like:

u <- forP $ \chunk -> do
...
yield chunk'
...
...
yield chunk''
return u

You need to manually keep track of the upstream return value, and return it at
the end.

Another example is `fileWriter`. Right now it looks like:

-- | Write data to a file.
--
-- The file is only opened if some data arrives into the pipe.
fileWriter :: MonadIO m => FilePath -> Pipe B.ByteString Void m r
fileWriter path = do
-- receive some data before opening the handle
input <- await
-- feed it to the actual worker pipe
feed input go
where
go = bracket
(liftIO $ openFile path WriteMode)
(liftIO . hClose)
handleWriter

You see how it implements lazy opening of the handle just by awaiting once
initially. Without an `await` primitive, you would need to handle the empty
case explicitly, which is quite messier.

I agree that it's not a big deal, but neither is the fact that composable pipes
must have the same return value. I find that the only cases where this
is actually
inconvenient are trivial examples and tests.

BR,
Paolo

Michael Snoyman

unread,
Jun 9, 2012, 11:47:03 PM6/9/12
to streamin...@googlegroups.com, Chris Smith
I think we have very different definitions of inconvenient: I see
nothing at all wrong with your zlib example. What you're ignoring is
the cases where the current behavior of pipes and pipes-core causes
not just inconvenient code, but type-unsafe code. We discussed a few
of these points on Reddit, but my basic contention is that requiring
wrapping in an unnecessary Maybe wrapper or using undefined is not an
acceptable trade-off.

I think the claim has been made that the final result type is less
important in the pipes world since code shouldn't be exiting as often
as we do in conduit. Ignoring issues of inversion of control (one of
the major motivating factors in moving from enumerator to conduit in
the first place), let me give a simple example. In crypto-conduit,
there's a function sinkHash which would looks something like this in
conduit 0.5:

sinkHash :: Monad m => Pipe l ByteString o u m Hash

Felipe has also provided a convenience function called hashFile, which
is essentially (using the pipes interface):

hashFile :: FilePath -> IO Hash
hashFile fp = runResourceT $ runPipe $ sourceFile fp >+> sinkHash

How would you implement something like hashFile in pipes or
pipes-core? Could it be done without introducing a bottom value or a
dummy Hash value?

Michael

Chris Smith

unread,
Jun 10, 2012, 12:44:00 AM6/10/12
to Michael Snoyman, streamin...@googlegroups.com
Michael Snoyman <mic...@snoyman.com> wrote:
> I think we have very different definitions of inconvenient: I see
> nothing at all wrong with your zlib example. What you're ignoring is
> the cases where the current behavior of pipes and pipes-core causes
> not just inconvenient code, but type-unsafe code. We discussed a few
> of these points on Reddit, but my basic contention is that requiring
> wrapping in an unnecessary Maybe wrapper or using undefined is not an
> acceptable trade-off.

As Gabriel mentioned on Reddit, there are different situations in
which each is convenient. The current pipes behavior is more
convenient for maps and filters, and what I suggested (aka the
improved conduit behavior) is more convenient for folds. I think all
of us have our pet examples, which we've developed partly with an eye
toward how they seem particularly elegant in our own libraries, and
that is a lot of what's going on.

So now that I've got it pretty much put together in a complete form,
I'd like to propose the compromise plan, at
https://github.com/cdsmith/my-pipes/blob/master/Pipes.hs

It is fundamentally the fold-friendly answer, but it throws the basic
pipe operations into a type class MonadStream (and MonadUnStream for
leftovers), which means you can write code in a monad transformer like
EitherT on top of Pipe and still use a natural style. Then it
provides a simple await, which has the type:

await :: MonadStream m => EitherT (StreamResult m) m (Upstream m)

and that is typically used inside of withAwait, which has type:

withAwait :: MonadStream m => EitherT (StreamResult m) m
(StreamResult m) -> m (StreamResult m)

The combination is used something like this:

foo = withAwait $ do
...
x <- await
...
yield x
...

For example, idP can be defined in either of the following two ways:
the result is the same:

idP = tryAwait >>= either return (\x -> yield x >> idP)

or

idP = withAwait $ forever $ await >>= yield

You'll notice I've done nothing with termination or exception handling
in the link shown... but I don't see why adding it should cause any
problems. I just don't understand all of the details of that well
enough to keep in my head yet, so that will be a lot of copy and paste
from existing code, most likely in pipes-core or conduit.

Michael Snoyman

unread,
Jun 10, 2012, 1:02:43 AM6/10/12
to streamin...@googlegroups.com
On Sun, Jun 10, 2012 at 7:44 AM, Chris Smith <cds...@gmail.com> wrote:
> Michael Snoyman <mic...@snoyman.com> wrote:
>> I think we have very different definitions of inconvenient: I see
>> nothing at all wrong with your zlib example. What you're ignoring is
>> the cases where the current behavior of pipes and pipes-core causes
>> not just inconvenient code, but type-unsafe code. We discussed a few
>> of these points on Reddit, but my basic contention is that requiring
>> wrapping in an unnecessary Maybe wrapper or using undefined is not an
>> acceptable trade-off.
>
> As Gabriel mentioned on Reddit, there are different situations in
> which each is convenient.  The current pipes behavior is more
> convenient for maps and filters, and what I suggested (aka the
> improved conduit behavior) is more convenient for folds.  I think all
> of us have our pet examples, which we've developed partly with an eye
> toward how they seem particularly elegant in our own libraries, and
> that is a lot of what's going on.

I don't want to stress this point too much, but I don't think that's
the entire story. My point is that- without resorting to unsafe
operations- certain workflows are not possible in pipes/pipes-core
(such as hashFile). I don't think the reverse is true: with the
upstream result type included, I think anything that can be expressed
in pipes/pipes-core can be implemented in conduit (or the sample
library that you've given).
It's definitely an interesting idea. The approach you've used for
leftovers is definitely appealing, and with some CPP magic, we could
even use DataKinds as well, which is frankly just cool :). I also like
collectLeftovers; it's something I've been considering adding to
conduit as well.

My main concern would be error messages: do we get some
incomprehensible output for simple mistakes, or something more
meaningful? It might even be that the typeclass approach produces
*better* error messages, I just haven't had a chance to test it.

One other minor point: I think having two separate fields in Await
instead of using Either can produce significantly faster code, since
it avoids the intermediate data structure. When I added the rewrite
rule for `awaitE >>= either` in conduit, the bigsum benchmark double
in performance. Obviously that's a very contrived example, but it
might hold true in real world code.

Michael

Paolo Capriotti

unread,
Jun 11, 2012, 4:04:06 PM6/11/12
to streamin...@googlegroups.com
On Sun, Jun 10, 2012 at 6:02 AM, Michael Snoyman <mic...@snoyman.com> wrote:
> On Sun, Jun 10, 2012 at 7:44 AM, Chris Smith <cds...@gmail.com> wrote:
>> Michael Snoyman <mic...@snoyman.com> wrote:
>>> I think we have very different definitions of inconvenient: I see
>>> nothing at all wrong with your zlib example. What you're ignoring is
>>> the cases where the current behavior of pipes and pipes-core causes
>>> not just inconvenient code, but type-unsafe code. We discussed a few
>>> of these points on Reddit, but my basic contention is that requiring
>>> wrapping in an unnecessary Maybe wrapper or using undefined is not an
>>> acceptable trade-off.
>>
>> As Gabriel mentioned on Reddit, there are different situations in
>> which each is convenient.  The current pipes behavior is more
>> convenient for maps and filters, and what I suggested (aka the
>> improved conduit behavior) is more convenient for folds.  I think all
>> of us have our pet examples, which we've developed partly with an eye
>> toward how they seem particularly elegant in our own libraries, and
>> that is a lot of what's going on.
>
> I don't want to stress this point too much, but I don't think that's
> the entire story. My point is that- without resorting to unsafe
> operations- certain workflows are not possible in pipes/pipes-core
> (such as hashFile). I don't think the reverse is true: with the
> upstream result type included, I think anything that can be expressed
> in pipes/pipes-core can be implemented in conduit (or the sample
> library that you've given).

Well, you can implement `hashFile`, you just need to return a dummy hash value
from upstream. Not elegant, I concur, but I wouldn't call it unsafe either.

In any case, Chris found a good compromise, so we need not fight anymore over
this :)
This is a really nice solution! I'm still not 100% sold on the idea of using
type classes, but maybe just need to get used to them. They make the type
signatures clearer in some cases, but more awkward in others (like with the
`Upstream m ~ Downstream m` constraint).

The one thing I didn't like about Chris' Pipe implementation is the fact that
`await` is a second class citizen. I understand this is because we have reached
the conclusion that an `await` primitive is incompatibile with generalized
composition (i.e. when result types do not necessarily match).

However, the same is true for leftovers, so there should be no reason why
`unawait` is directly supported in the `Pipe` type, but `await` is not.

So I hacked at Chris' implementation, and added a `MonadStreamDefer` type
class, with a `defer` method. The idea is that `defer` lets you write something
like:

await = tryAwait >>= either defer return

Then I rewrote functions like `forP` and similar using `MonadStreamDefer`. This
allows you to use them in exactly the same way as in pipes-core, and you just
need to call `withAwait` to get the original behavior.

I could have done the same generalization using the previous formulation in
terms of `EitherT`, but the advantage of the current approach is that pipes are
also instances of `MonadStreamDefer`, and can be used in a composition (only
upstream, just like for `MonadUnStream`).

You can find the code on my github fork: https://github.com/pcapriotti/my-pipes

What do you guys think? I quite like this solution, and, assuming there are no
hidden problems with exception handling and finalization, I think I'm going to
integrate it in pipes-core.

BR,
Paolo

Carter Schonwald

unread,
Jun 11, 2012, 5:22:57 PM6/11/12
to streamin...@googlegroups.com
Hey all,

I've been following the (really exciting) evolving work on streaming-haskell! (partly because theres a few projects of mine  where the results of this great work will be super handy for me :-) )

a few thoughts/questions (because i'm very curious):

  • I'm kinda uncomfortable with the "use a dummy value" approach for early return values, this really creates a sort of NULL value checking problem. Perhaps its better to take a page from how Parsing Libs are done and have an (Either ErrTyp ResTyp) discipline? (especially since these dummy values should be unreachable anyways when they're not at points of return anyways). Is this something that'd be worth baking into the lib or does it only make sense as a usability  suggestion to client users?
    • is there perhaps some generalization/specialization/extension of how Monad and Category classes are defined currently that lets us have the compositional goodness and not the dummy values problem? Such a hypothetical variant may or may not be currently expressible nicely in ghc haskell, but its perhaps worth thinking about as a reference point in the design space?
  • on another note: how does it (in principle) look for how the current design space you guys are exploring relate to in practice performance? Eg: current pipes-core has the higher than (naively) expected composition complexity from a client user side, and theres currently also a bit of mulling about how to structure internals etc for exploiting some fusion opportunities for then making things (potentially) much faster, along side perhaps   (and I say that with unrealistic optimism) allowing some further optimization of computations occurring at adjoining steps of the pipeline.

I hope these questions make sense, and I appreciate any information in those two directions! :)
-Carter



--
You received this message because you are subscribed to the Google Groups "streaming-haskell" group.
To post to this group, send email to streamin...@googlegroups.com.
To unsubscribe from this group, send email to streaming-hask...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/streaming-haskell?hl=en.


Paolo Capriotti

unread,
Jun 24, 2012, 7:42:21 AM6/24/12
to streamin...@googlegroups.com
On Mon, Jun 11, 2012 at 10:22 PM, Carter Schonwald
<carter.s...@gmail.com> wrote:
> Hey all,
>
> I've been following the (really exciting) evolving work on
> streaming-haskell! (partly because theres a few projects of mine  where the
> results of this great work will be super handy for me :-) )
>
> a few thoughts/questions (because i'm very curious):
>
> I'm kinda uncomfortable with the "use a dummy value" approach for early
> return values, this really creates a sort of NULL value checking problem.
> Perhaps its better to take a page from how Parsing Libs are done and have an
> (Either ErrTyp ResTyp) discipline? (especially since these dummy values
> should be unreachable anyways when they're not at points of return anyways).
> Is this something that'd be worth baking into the lib or does it only make
> sense as a usability  suggestion to client users?

I'm not sure what you are referring to here. There are no null or dummy values
anywhere in any of our respective implementations, as far as I know.

The dummy value that I mentioned at some point in a previous discussion is used
in pipes-core when you fold over a pipe with a different type. For example:

mapM_ yield [1..10] >+> fold (+) 0

doesn't type-check in pipes-core 0.1 (although it would in the new version), so
you usually add a dummy value to change the type of the first pipe:

(mapM_ yield [1..10] >> return 0) >+> fold (+) 0

This is an annoyance (now solved), but doesn't have any impact on safety, in my
opinion.

> is there perhaps some generalization/specialization/extension of how Monad
> and Category classes are defined currently that lets us have the
> compositional goodness and not the dummy values problem? Such a hypothetical
> variant may or may not be currently expressible nicely in ghc haskell, but
> its perhaps worth thinking about as a reference point in the design space?

Well, that's exactly what we are trying to find. I personally think both the
upcoming versions of pipes-core and conduit pretty much solve all the problems
that we discussed, and are as compositional as they can get (although I feel
conduit is still missing some of pipes-core features).

> on another note: how does it (in principle) look for how the current design
> space you guys are exploring relate to in practice performance? Eg: current
> pipes-core has the higher than (naively) expected composition complexity
> from a client user side, and theres currently also a bit of mulling about
> how to structure internals etc for exploiting some fusion opportunities for
> then making things (potentially) much faster, along side perhaps   (and I
> say that with unrealistic optimism) allowing some further optimization of
> computations occurring at adjoining steps of the pipeline.

Are you referring to the quadratic complexity of monadic bind? That's a problem
in all existing implementations (maybe even in iteratee/enumerator?).

As I wrote in a blog post, I'm planning to explore ways to implement fusion for
pipes. Since pipes are a corecursive structure, it should be possible to apply
the classical fusion transformation. However, I haven't looked into this very
much, yet, so I'm not really sure.

BR,
Paolo
Reply all
Reply to author
Forward
0 new messages