Producers, Purity and Resumability

171 views
Skip to first unread message

Jeremy Shaw

unread,
Jan 30, 2014, 1:02:37 PM1/30/14
to haskell-pipes
I have been thinking about what it means to run a 'Producer'
twice. Specifically -- whether the Producer resumes where it left of
or not. I think that in general the behavior is undefined. I feel like
this has not been explicitly stated much -- so I am going to say it
now. In some sense, it should be obvious -- but when peering through
the haze of Pipes, StateT, and IO, the simple things can get lost.

Consider two different cases:

1. a producer that produces values from a pure list

2. a producer that produces values from a network connection


If we run the first producer twice we will get the same answer each
time. If we run the second producer twice -- we will likely get
different results -- depending on what data is available from the
network stream.

Now -- that is not entirely surprising -- one value is pure and one is
based on IO. So that is no different than calling a normal pure
function versus a normal IO function.

But -- I think it can be easy to forget that when writing pipes
code. Imagine we write some pipes code that processes a network stream
-- and it relies on the fact that the network Producer automatically
resumes from where it left off.

Now, let's pretend we want to test our code. So we create a pure
Producer that produces the same bytestring that the network pipe was
producing. Alas, our code will not work because the pure Producer does
not automatically resume when called multiple times.

I think this means that we must assume, by default, that the Producer
does not have resumable behavior. If we want to write code that relies
on the resumable behavior -- then we must explictly ensure that it
happens.

In pipes-parse the resumability is handled by storing the 'Producer'
in 'StateT'.

Another alternative is to use an 'IORef'. I have an example of the
'IORef' solution below.

> module Main where

> import Data.IORef (IORef(..), newIORef, readIORef, writeIORef)
> import Pipes
> import qualified Pipes.Prelude as P

Here is our pure Producer:

> pure10 :: (Monad m) => Producer Int m ()
> pure10 = mapM_ yield [1..10]

And here is a function which uses a Producer twice.

> take5_twice :: Show a => Producer a IO () -> IO ()
> take5_twice p =
> do runEffect $ p >-> P.take 5 >-> P.print
> putStrLn "<<Intermission>>"
> runEffect $ p >-> P.take 5 >-> P.print

Note that we have limited ability reason about the results since we do
not know if the 'Producer' is resumable or not.

If we run 'take5_twice' using our pure Producer:

> pure10_test :: IO ()
> pure10_test =
> take5_twice pure10

it will restart from 1 each time:

> pure10_test
1
2
3
4
5
<<Intermission>>
1
2
3
4
5

Here is a (not very generalized) function that uses an 'IORef' to
store the current position in the 'Producer' -- similar to how
'StateT' works:

> resumable :: Producer Int IO () -> IO (Producer Int IO ())
> resumable p0 =
> do ref <- liftIO $ newIORef p0
> return (go ref)
> where
> go :: IORef (Producer Int IO ()) -> Producer Int IO ()
> go ref =
> do p <- liftIO $ readIORef ref
> x <- liftIO $ next p
> case x of
> (Right (i, p')) ->
> do liftIO $ writeIORef ref p'
> yield i
> go ref
> (Left ()) ->
> do liftIO $ writeIORef ref (return ())
> return ()

Now if we call 'take5_twice' with our resumable Producer:

> impure10_test :: IO ()
> impure10_test =
> do p <- resumable pure10
> take5_twice p

Here we see the resuming behavior:

> impure10_test
1
2
3
4
5
<<Intermission>>
6
7
8
9
10

If we call 'resumable' on a 'Producer' that already has resumable
behavior -- it will still work. We can simulate that by calling resumable twice:

> twice_resumable :: IO ()
> twice_resumable =
> do p0 <- resumable pure10
> p <- resumable p0
> take5_twice p


> twice_resumable
1
2
3
4
5
<<Intermission>>
6
7
8
9
10

Of course, we now have the overhead of *two* 'IORef' based Producers.

So we are now left with some questions of style.

If we are writing something like an HTTP server -- we can assume that
most of the time we are going to working with a 'Producer' based on a
resumable source like a network stream. So, by using the inherent
resumability we can presumably get lower overhead and higher
performance. If we need to use the code with a non-resumable Producer
then we can use a function like 'resumable' to fake it.

This is somewhat distasteful in two ways though.

(1) It forces everything to be in the IO monad -- even when
everything could actually be pure.

(2) it relies on the resumability of the Producer -- but there is no
enforcement or indication of that in the type system.

In some sense -- being in the IO monad is not really a big deal since any
practical web server needs to be anyway. On the other hand -- creating
a nice pure streaming abstraction and sticking an ugly IORef in it seems
a little sad.

The alternative is to run all our code inside a 'StateT'. Since the
'StateT' takes care of resuming we do not have to worry if the
underlying Producer does or not. But.. now we always have the overhead
of being inside a 'StateT' even we don't really need to be -- so we
have a more complicated set of types to work with and more potential
overhead.

The upside is that our pure code stays pure. We only introduce the IO
monad when IO is really used.

This is the major decision blocking hyperdrive at the
moment. (hyperdrive is my pipes based HTTP server).

Any thoughts?

- jeremy

Carter Schonwald

unread,
Jan 30, 2014, 3:12:04 PM1/30/14
to haskel...@googlegroups.com
I think this precise issue is why the snap server http parser tooling uses the iostreams lib! 



- jeremy

--
You received this message because you are subscribed to the Google Groups "Haskell Pipes" group.
To unsubscribe from this group and stop receiving emails from it, send an email to haskell-pipe...@googlegroups.com.
To post to this group, send email to haskel...@googlegroups.com.

Jeremy Shaw

unread,
Jan 30, 2014, 3:43:15 PM1/30/14
to haskell-pipes
Yes -- io-stream forces everything to be done in the IO monad and uses
hidden IORefs. conduit also uses hidden IORefs for resumable streams.

But is that really the best choice?

- jeremy

Carter Schonwald

unread,
Jan 30, 2014, 4:12:55 PM1/30/14
to haskel...@googlegroups.com
What's a good minimal example of this resumption problem?  Couldn't a functional approach be that running a pipe till it returns a value also returns a continuation? In the style of attoparsec and friends?

Dan Burton

unread,
Jan 30, 2014, 4:30:47 PM1/30/14
to haskel...@googlegroups.com
And here is a function which uses a Producer twice. 

> take5_twice :: Show a => Producer a IO () -> IO ()
> take5_twice p =
>     do runEffect $ p >-> P.take 5 >-> P.print
>        putStrLn "<<Intermission>>"
>        runEffect $ p >-> P.take 5 >-> P.print

The reason that the "resumable" pipe resumes is because it pushes the pipe state tracking down into the IO monad. So when you stitch the code together, you get:

    do -- an IO do block
      p <- resumable pureP -- allocate IORef
      runEffect $ p >-> P.take 5 >-> P.print -- read and update the IORef
      putStrLn "<<Intermission>>"
      runEffect $ p >-> P.take 5 >-> P.print -- read and update the IORef

In short, pipes do not have resumability, so what you have done is added resumability by utilizing specific capabilities of the underlying monad (in this case IO, but you could do the same thing with ST or State).

You seem to imply that this is a problem with pipes generally, because running the same pipe multiple times might have unexpected effects. I instead see it as a problem with adding resumability to pipes, or other such dangerous effects, and that is what makes a given pipe behave in potentially unexpected ways.

... whether the Producer resumes where it left of
or not. I think that in general the behavior is undefined

This is incorrect. When you use pipe composition, a Producer always runs sequentially, for as long as downstream pipes pull from it in a pipeline. When you use the same pipe in two different pipelines, a Producer always starts over again from the beginning in each pipeline. But when you have a Producer that looks at an IORef or makes a network call or performs some sort of effect, then "starting over from the beginning" won't necessarily produce the same results. The key difference is that it starts over with the same set of instructions. If its instructions tell it to look at some outside state and base its behavior off of that, well then all bets are off for "intuitive" behavior, unless you are able to intuitively keep track of that outside state. Intuitive reasoning about a given pipe's behavior is only as good as the intuitive reasoning for the underlying monadic effects which it is built upon. If you want to have stronger guarantees, then you must use a more principled monad.


-- Dan Burton

Jeremy Shaw

unread,
Jan 30, 2014, 4:33:56 PM1/30/14
to haskell-pipes
For an HTTP server, we want the user to be able to supply a handler
that optionally consumes the request body.

We could (and I have even implemented) a system where you have a
Request type like:


data RequestF next = RequestF
{ requestHead :: Request
, requestBody :: Producer ByteString IO next
} deriving (Functor)

type Requests = FreeT RequestF IO ()

And in this system, the user supplied handler is responsible for
draining the the current request body and returning the remainder of
the Request stream. But this seems to put a lot of trust and hassle on
the poor end user.

-jeremy


On Thu, Jan 30, 2014 at 3:12 PM, Carter Schonwald

Dan Burton

unread,
Jan 30, 2014, 4:38:58 PM1/30/14
to haskel...@googlegroups.com
The key difference is that it starts over with the same set of instructions.

And by "the key difference", I mean the difference between how I am interpreting "resumes where it left off," which is based on pipes regardless of underlying monad, and how you are interpreting "resumes where it left off," which is based on your specific implementation of "resumable." In your interpretation, you are trivially correct: pipes are not generally guaranteed to "resume" according to the notion of resumption which you provided in the implementation of "resumable" (which is very neat, by the way), and we cannot make this guarantee simply because not all pipes are implemented that way, nor should they be, imo.

-- Dan Burton

Jeremy Shaw

unread,
Jan 30, 2014, 5:28:46 PM1/30/14
to haskell-pipes
On Thu, Jan 30, 2014 at 3:30 PM, Dan Burton <danburt...@gmail.com> wrote:

Yes -- I agree with the how/why it works the way that it does.

> The reason that the "resumable" pipe resumes is because it pushes the pipe
> state tracking down into the IO monad.
>
> In short, pipes do not have resumability, so what you have done is added
> resumability by utilizing specific capabilities of the underlying monad (in
> this case IO, but you could do the same thing with ST or State).

Right. pipes itself does not supply any resumability feature. But some
Producers have that behavior implicitly -- via the underlying IO monad
or other mechanisms.

> You seem to imply that this is a problem with pipes generally, because
> running the same pipe multiple times might have unexpected effects. I
> instead see it as a problem with adding resumability to pipes, or other such
> dangerous effects, and that is what makes a given pipe behave in potentially
> unexpected ways.

I'm not suggesting it is a problem with pipes. Just that it is not
obvious at first (even though it perhaps should be). It is important
for people to realize that if you write code that expects the Producer
to have resuming behavior -- then you are going to be surprised when
you pass in a pure Producer. When you really think about it -- it
seems somewhat trivial. But -- I think it is easy to overlook when you
get tangled up in Pipes. If you sit down and start writing code that
is drawing data over the network from the beginning -- you may not
realize that you are relying on the Producer to 'remember' how much
has been consumed already until you try to pass in a pure Producer
later. Obviously, the Producer doesn't explicitly remember anything in
that case -- it is merely an artifact of the underlying IO that is
producing the results. I think it is easy to overlook which aspects
Producers do *not* abstract away.

But this still leaves me with questions. If I want to write an HTTP
server, should I assume that the Producer supplying the incoming data
is going to support 'resuming' and then use IORefs to fake it for pure
producers. Or should I assume purity by default and have the resuming
automatically baked into the system. Having a pure core seems better
in theory for testing and reasoning. But going IO seems like a better
choice for real world performance.

I guess part of my queasiness is that if I provide you a function like:


serveHTTP :: (Monad m) =>
Producer ByteString m ()
-> Consumer ByteString m ()
-> (Request -> m Response)
-> m ()

I feel like it is natural to expect that you can do something like:

test = serveHTTP (yield dummyRequest) P.print myHandler

If you pass in a Producer where the ByteString is an invalid HTTP
request -- then you shouldn't be too surprised that you get an error.

But if you pass in a valid HTTP request like that and it fails because
serveHTTP is expecting the producer to 'resume' -- then I think it
will be pretty mysterious?

Now, we could run into the same problem in plain old IO if I had:

serverHTTP :: IO ByteString -- ^ get next chunk
-> (ByteString -> IO ()) -- ^ send next chunk
-> (Request -> IO Response)
-> IO ()

and I do, (return dummyRequest), then I am essentially in the same
situation. Not sure why I feel differently about that. Perhaps because
we simply don't write a lot of functions with callbacks like that in
Haskell most of the time -- but in the Pipes world, it is a lot more
natural to try to wire up different Producers to the input of the
pipe?

Anyway, I don't think this is a problem with pipes itself. But more a
question of with path to go in using them in hyperdrive -- IO vs
StateT. I have to pick one -- and the choice is not obvious.

I have implemented a dummy prototype using the StateT method already,
I am working on an IO based version now. Perhaps when I have both
versions available, that will help make for clearer questions...

- jeremy

Dan Burton

unread,
Jan 30, 2014, 6:03:56 PM1/30/14
to haskel...@googlegroups.com
I guess part of my queasiness is that if I provide you a function like:

serveHTTP :: (Monad m) =>
             Producer ByteString m ()
          -> Consumer ByteString m ()
          -> (Request -> m Response)
          ->  m () 

I feel like it is natural to expect that you can do something like:
test = serveHTTP (yield dummyRequest) P.print myHandler
If you pass in a Producer where the ByteString is an invalid HTTP
request -- then you shouldn't be too surprised that you get an error. 

But if you pass in a valid HTTP request like that and it fails because
serveHTTP is expecting the producer to 'resume' -- then I think it
will be pretty mysterious?

Why would serveHTTP expect the producer to resume? It is parametric over the underlying monad, so there should be no expectation that the Producer given to serveHTTP will resume. I would say this is an error in the implementation of serveHTTP.

It seems that you are treating Producers like you would treat a handle. And indeed, with "resumable", you can upgrade any Producer to a handle-like Producer that retains its position across pipelines.

But the whole advantage of the Pipes/Conduits/Iteratee abstraction over handles is that the composition operators abstract away this "position" stuff. As far as I can tell, idiomatic Pipes code typically only uses a producer in exactly one pipeline.

-- Dan Burton



- jeremy

Gabriel Gonzalez

unread,
Jan 30, 2014, 8:48:25 PM1/30/14
to haskel...@googlegroups.com, Jeremy Shaw
Jeremy is correct and several `Producer`s cannot be safely run twice.
Well, they can be, but the result is not sensible. I'm not happy with
the `io-streams` solution in general because it is hard-coded to only
work in the `IO` monad. However, I'd be totally fine with adding
something like:

once :: Producer a IO () -> IO (Producer a IO ())

That would allow one to selectively enforce the single-pass nature when
one preferred this style of programming.

Gabriel Gonzalez

unread,
Jan 30, 2014, 9:20:20 PM1/30/14
to haskel...@googlegroups.com, Jeremy Shaw
Now the correct API is that the user should supply a `Parser ByteString
IO ()`, to ensure that they don't need to keep track of unused input.
For example, if the user just wanted to save the first 10 lines of a
response body to some handle, they could write the following `example`
parser:

import Control.Monad (join)
import Data.ByteString (ByteString)
import Lens.Family2
import Lens.Family2.Unchecked (iso)
import Lens.Family2.State.Strict (zoom)
import Pipes
import Pipes.Parse
import qualified Pipes.ByteString as PB
import System.IO (Handle)

-- This reminds me that I need to provide a function to convert
'Consumer's to
-- their equivalent 'Parser's
toHandle' :: Handle -> Parser ByteString IO ()
toHandle' h = StateT $ \p -> do
r <- runEffect (p >-> PB.toHandle h)
return ((), return r)

-- Here's an example for how to define a custom parsing lens
maxLines
:: Monad m
=> Int
-> Lens' (Producer ByteString m x)
(Producer ByteString m (Producer ByteString m x))
maxLines n = iso (cut n) join
where
cut 0 p = return p
cut n p = p ^. PB.line >>= cut (n - 1)

example :: Handle -> Parser ByteString IO ()
example h = zoom (maxLines 10) (toHandle' h)

That will run in constant space and automatically handle unused input
for the user.

Note that the implementation of `cut` from `maxLines` could be
simplified further, but at the expense of complicating the type of
`Pipes.ByteString.lines` (by generalizing that `Iso'` to a
four-parameter `Iso`). I'm a little bit reluctant to complicate that
type signature, though.

Gabriel Gonzalez

unread,
Jan 30, 2014, 9:31:36 PM1/30/14
to haskel...@googlegroups.com, Jeremy Shaw
I think the issue here is that you are exposing the `Producer` to the
user at all. Like I mentioned in another e-mail, I think the correct
API is that the user provides you with a `Parser ByteString IO ()`.

There is one case where I think you may want to explicitly expose the
underlying `Producer`, which is when you HTTP pipelining (that `FreeT`
stuff we discussed previously). The `FreeT`-based abstraction has to
expose the `Producer` and it implicitly assumes going over the data in a
single pass by necessity.

Gabriel Gonzalez

unread,
Jan 31, 2014, 3:49:37 AM1/31/14
to haskel...@googlegroups.com
Alright, so I added two utilities to `pipes-parse` for converting `Consumer`s to `Parser`s:

    toParser :: Monad m => Consumer (Maybe a) m r -> Parser a m r

    toParser' :: Monad m => Consumer a m X -> Parser a m ()


These cover most common cases with good efficiency.

Also, I noticed there was a slight mistake in my example implementation of `maxLines` because it was not consuming the newline byte.  The correct version is this:

    maxLines n = iso (cut n) join
      where

        cut 0 p = return p
        cut n p = do
            p'  <- p ^. PB.line
            p'' <- lift (execStateT PB.drawByte p')  -- Drain newline byte
            cut (n - 1) p''

Gabriel Gonzalez

unread,
Jan 31, 2014, 3:55:18 AM1/31/14
to haskel...@googlegroups.com
Oops, two more mistakes:

It's `toParser_`, not `toParser'`.

Also, `maxLines` still has a mistake because it doesn't re-yield the newline byte.  Maybe I should provide a `line` lens that includes the newline byte for simplicity.  Anyway, it's a digression from the discussion.

Brendan Hay

unread,
Jan 31, 2014, 4:03:46 AM1/31/14
to haskel...@googlegroups.com
Something similar would be useful as a response body for your recent
Pipes HTTP library POC, wherein the result is not wrapped inside the `withXXX` idiom.

This seems to involve pipes-safe - would a hypothetical `ResumableProducer` be possible?

Gabriel Gonzalez

unread,
Jan 31, 2014, 4:34:28 AM1/31/14
to haskel...@googlegroups.com, Brendan Hay
In the specific case of `pipes-http`, it actually is safe to run the `Producer` multiple times.  This is because it is internally just reading off of an `IO ByteString` action.

To be honest, I think the easiest solution is to just teach users to only pass over `Producer`s once.  This has worked fine for `conduit` users, so I see no reason why `pipes` should complicate things by behaving differently.

Note that the issue of whether or not a `Producer` is resumable is orthogonal to whether or not it should be replayable (which is the *real* issue).  Even if you restrict yourself to non-resumable `Producer`s and only permit `runEffect` and `(>->)` (or `($$)`/`(=$=)` for `conduit`) you can still trigger pathological behavior just by replaying `Producer`s.  All you have to do is run a `Producer` to completion twice to mess with any protocol:

    runEffect $ someProducer >-> consumer1
    -- The following line will probably produce garbage
    runEffect $ someProducer >-> consumer2

There's no good solution within the language to prevent users from screwing things up.  For example, consider the `once` function I mentioned previously in this thread:

    once :: Producer a IO r -> IO (Producer a IO r)

The easy way to break this function is to call `once` twice generating two separate `Producer`s.  The second generated `Producer` is most likely garbage.  Asking the user to not call `once` twice is just as unsafe as asking the user not to run a given `Producer` twice, so you're back to where you started.

Jeremy Shaw

unread,
Feb 1, 2014, 7:01:02 PM2/1/14
to Gabriel Gonzalez, haskell-pipes
On Thu, Jan 30, 2014 at 8:31 PM, Gabriel Gonzalez <gabri...@gmail.com> wrote:
> I think the issue here is that you are exposing the `Producer` to the user
> at all. Like I mentioned in another e-mail, I think the correct API is that
> the user provides you with a `Parser ByteString IO ()`.

Right -- that is the 'StateT' based solution I mentioned. This is my
dummy implementation -- which I am still refactoring:

https://github.com/stepcut/hyperdrive/blob/master/experiments/HyperMonad.hs

- jeremy

Jeremy Shaw

unread,
Feb 1, 2014, 7:06:38 PM2/1/14
to haskell-pipes
On Thu, Jan 30, 2014 at 5:03 PM, Dan Burton <danburt...@gmail.com> wrote:

> It seems that you are treating Producers like you would treat a handle. And
> indeed, with "resumable", you can upgrade any Producer to a handle-like
> Producer that retains its position across pipelines.

Yes! The Handle analogy is exactly the description I was looking for.

> But the whole advantage of the Pipes/Conduits/Iteratee abstraction over
> handles is that the composition operators abstract away this "position"
> stuff. As far as I can tell, idiomatic Pipes code typically only uses a
> producer in exactly one pipeline.

Right -- this is my feeling as well. I *can* write code using
Producers like Handles -- but is it wrong to? It sure doesn't make me
feel warm and fuzzy inside.

- jeremy

Gabriel Gonzalez

unread,
Feb 3, 2014, 10:02:40 AM2/3/14
to Jeremy Shaw, haskell-pipes
The `StateT` solution you have actually looks pretty close to what I had
in mind (at least the parsing part). I think there's only one change
you need to make, which is to drain any unused bytes after the user is
done parsing the body, like this:

...
a <- zoom (Pb.splitAt (rqLength req)) (handler req <* skipAll)
lift $ sendResponse a
...

`splitAt` only ensures that the parser doesn't overrun the alloted
number of bytes. If you add `skipAll` then it ensures that you consume
any unused bytes so that subsequent commands begin from after the
request body.

Gabriel Gonzalez

unread,
Feb 5, 2014, 8:56:58 AM2/5/14
to haskel...@googlegroups.com, Jeremy Shaw
Also, I forgot to mention that there is a warm and fuzzy implementation
of `Handle`s using `pipes`, which I originally wrote about here:

http://www.haskellforall.com/2013/04/pipes-and-io-streams.html

That post was written using the old and ugly `pipes-3.*` API, but I can
summarize it using the new API pretty briefly:

Read only handles have type:

readHandle :: Effect IO a -- Read a value of type `a`

Transformations on read-only handles have types like:

readTransformation1 :: Consumer a IO b
readTransformation2 :: Consumer b IO c

You use `(>~)` connect transformations to handles:

(>~) :: Effect IO a -> Consumer a IO b -> Effect IO b

readHandle >~ readTransformation1 :: Effect IO b

... and also to connect transformations to each other:

(>~) :: Consumer a IO b -> Consumer b IO c -> Consumer a IO c

readTransformation1 >~ readTransformation2 :: Consumer a IO c

This connection process is associative:

(readHandle >~ readTransformation1) >~ readTransformation2
= readHandle >~ (readTransformation1 >~ readTransformation2)
:: Effect IO c

... and `await` is the identity handle/transformation:

await :: Consumer a IO a

Dually, write handles have type:

writeHandle :: c -> Effect IO ()

... and transformations on write handle have types like:

writeTransformation1 :: a -> Producer b IO ()
writeTransformation2 :: b ->Producer c IO ()

You use `(~>)` to connect transformations to handles:

(~>) :: (b -> Producer c IO ()) -> (c -> Effect IO ()) -> (b ->
Effect IO ())

writeTransformation1 ~> writeHandle :: b -> Effect IO ()

... and also to connect transformations to each other:

(~>) :: (a -> Producer b IO ()) -> (b -> Producer c IO ()) -> (a ->
Producer c IO ())

writeTransformation1 ~> writeTransformation2 :: a -> Producer c IO ()

This connection process is associative:

(writeTransformation1 ~> writeTransformation2) ~> writeHandle
= writeTransformation1 ~> (writeTransformation2 ~> writeHandle)
:: a -> Effect IO ()

... and `yield` is the identity handle/transformation:

yield :: a -> Producer a IO ()

Also, these generalize to any base monad (not just `IO`) and they work
bidirectionally, too, if you just replace them all with their
`Pipes.Core` analogs (i.e. `request`/`respond`/`(/>/)`/`(\>\)`). This
means that you can have write handles that return a result and read
handles which take an argument.

I think I will just go ahead and write this up into a `pipes-handle`
package. It actually plays pretty nicely with the rest of the `pipes`
ecosystem.
Reply all
Reply to author
Forward
0 new messages