Constant memory producer transformation

115 views
Skip to first unread message

Dylan Tisdall

unread,
Sep 21, 2015, 3:10:16 PM9/21/15
to Haskell Pipes
Following up on my last question, my next issue is also probably a very straight ahead example of pipes, but I've managed to get tangled up going back and forth in the packages' documentation.

I've got a file whose first 4 bytes give the offset into the file of a series of binary data elements (called MDHs in my case). Given a Handle to the start of such a file, I want to:

1. read the first Word32 in the file, to retrieve the offset;
2. skip the Handle to that offset; and
3. turn the rest of the file into a Producer MDH IO ()

Given that the file I'm reading may be large, I want to make sure this process is going to run in constant memory. I thought I could use pipes-attoparsec, but I couldn't get straight whether it would need to read the whole file before it could produce anything (as I understand is normally the case with attoparsec).

So far I have the following, which isn't complete, but at least does the skip and converts the remaining file to a ByteString producer.

handleToMDHs :: Handle -> IO (Either P.DecodingError (P.Producer P.ByteString IO ()))
handleToMDHs h
= do
    hLen
<- P.evalStateT (P.decodeGet getWord32le) (PB.fromHandle h)
   
case (hLen :: Either P.DecodingError Word32) of
       
Left err -> return $ Left err
       
Right len -> fmap Right (skipAndProceed h len)
 
where
    skipAndProceed
:: Handle -> Word32 -> IO (P.Producer P.ByteString IO ())  
    skipAndProceed handle l
= do
       
(hSeek handle AbsoluteSeek) (fromIntegral l)
       
return $ PB.fromHandle handle


My MDH type is an instance of Binary, so there is a get method available. I'm wondering:

a) What's the right way to turn this into a Producer of MDHs instead of a Producer of ByteStrings while operating in constant memory?
b) Is there a more elegant way to deal with error handling here? I'm not even dealing with possible failure in hSeek, and I already think this looks pretty messy. I'm not wedded to my function type being
 
handleToMDHs :: Handle -> IO (Either P.DecodingError (P.Producer MDH IO ()))

I just am not sure how else to express the possibility of failure in this kind of operation.


Thanks,
Dylan

Dylan Tisdall

unread,
Sep 21, 2015, 4:41:20 PM9/21/15
to Haskell Pipes
Perhaps I've figured at least part of this out; is a combination of view and decoded going to give me a constant memory conversion from a ByteString Producer to an MDH Producer? My code now is:

type ErrorResultProducer = P.Producer MDHAndScanLine IO (Either
   
(P.DecodingError, P.Producer P.ByteString IO ()) ())


measDatMDHScanLinePairs
:: Handle ->
    IO
(Either P.DecodingError ErrorResultProducer)
measDatMDHScanLinePairs h
= do

    hLen
<- P.evalStateT (P.decodeGet getWord32le) (PB.fromHandle h)
   
case (hLen :: Either P.DecodingError Word32) of
       
Left err -> return $ Left err
       
Right len -> fmap Right (skipAndProceed h len)
 
where

    skipAndProceed
:: Handle -> Word32 -> IO ErrorResultProducer  
    skipAndProceed handle l
= do

       
(hSeek handle AbsoluteSeek) (fromIntegral l)

       
return $ mdhProd (PB.fromHandle handle)
    mdhProd
:: P.Producer P.ByteString IO () -> ErrorResultProducer
    mdhProd bsProd
= view P.decoded bsProd


I'm still suspicious of how I'm handling errors here, though. I seem to have something with several layers of Either, and that must be wrong.


Thanks again,
Dylan

Gabriel Gonzalez

unread,
Sep 21, 2015, 11:43:58 PM9/21/15
to haskel...@googlegroups.com, dy...@geeky.net
You're definitely on the right track.  The type I would aim for would be something like this:

    example :: Handle -> Producer MDHAndScanLine IO (Either DecodingError (Producer ByteString IO ()))

Notice that this slightly differs from your type; I'm merging the outer `IO (Either DecodingError ...)` into the first `Producer` to simplify the type.

The implementation for that type would be very similar to the one you wrote in your second e-mail:

    example :: Handle -> Producer MDHAndScanLine IO (Either DecodingError (Producer ByteString IO ()))
    example handle = do
        let p = Pipes.ByteString.fromHandle handle
        x <- lift (evalStateT (decodeGet getWord32le) p)
        case x of
            Left  err -> return (Left err)
            Right len -> do
                lift (hSeek handle AbsoluteSeek (fromIntegral l))
                view decoded p

That will definitely run in constant memory, meaning that it won't ever load more than one chunk of bytes at a time (where a chunk is something like 32 kB, I think).  You can profile the heap if you want to verify this by following these instructions:

https://downloads.haskell.org/~ghc/latest/docs/html/users_guide/prof-heap.html

Also, to answer your other question, `pipes-attoparsec` runs in constant memory.  The difference between `pipes-attoparsec` and `attoparsec` is that `pipes-attoparsec` runs a separate parser for each element in the stream, which is equivalent to "committing" after each parsed element.  That means that it can only backtrack while parsing a single element in the stream, but no further back.  This is why `pipes-attoparsec` runs in constant space over a large file and why `attoparsec` does not, because `attoparsec` backtracks indefinitely and `pipes-attoparsec` does not.
--
You received this message because you are subscribed to the Google Groups "Haskell Pipes" group.
To unsubscribe from this group and stop receiving emails from it, send an email to haskell-pipe...@googlegroups.com.
To post to this group, send email to haskel...@googlegroups.com.

Dylan Tisdall

unread,
Sep 22, 2015, 6:50:44 PM9/22/15
to Haskell Pipes, dy...@geeky.net
Hi Gabriel,

Thanks again for your help. That really clarified that I should be using lift to keep everything inside the Producer transfomer. To make all the types work, I ended up with:

type MDHAndScanLineProducer = P.Producer MDHAndScanLine IO (Either
    (P.DecodingError, P.Producer P.ByteString IO ()) ())

measDatMDHScanLinePairs :: Handle -> MDHAndScanLineProducer
measDatMDHScanLinePairs h = do
    (hLen, leftovers) <- lift $ P.runStateT (P.decodeGet getWord32le) p 
    case (hLen :: Either P.DecodingError Word32) of
        Left err -> return $ Left (err, leftovers)
        Right len -> do
            lift (hSeek h AbsoluteSeek (fromIntegral len))
            view P.decoded p
  where
    p = PB.fromHandle h

This seems to work exactly as I'd hoped.

As a follow-up, I'm now wondering how to use this producer and ignore its return type; effectively how to turn it into a Producer MDHAndScanLine IO (). This seems to be necessary to access many library functions. For example, I can't use

Pipes.Prelude.length :: Monad m => Producer a m () -> m Int

directly on the output of measDatMDHScanLinePairs because the return type doesn't match.

Thanks again for all your help as I get up to speed on this!


Dylan

Gabriel Gonzalez

unread,
Sep 22, 2015, 6:56:49 PM9/22/15
to haskel...@googlegroups.com, dy...@geeky.net
Use the `void` function from `Control.Monad` if you want to erase the return type of a `Producer`:

    void :: Functor f => f a -> f ()
    void = fmap (\_ -> ())

I might even re-export this from `pipes` as a convenience since this question comes up a lot.

Originally functions like `Pipes.Prelude.length` had a more general type like this:

    Pipes.Prelude.length :: Producer a m r -> m Int

... but then at the advice of others I restricted the type to this:

    Pipes.Prelude.length :: Producer a m () -> m Int

... so that the user would have to explicitly discard the return value to signal that they were okay with ignoring that data.  This is similar in principle to the warning you get if you turn on the `-Wall` flag that (among other things) warns if you have an unused non-empty return value, like this:

    example = do
        getLine // Compiler warning because you didn't use the result
        ...

... and you usually have to explicitly ignore the value using something like this syntax to indicate that you are intentionally ignoring the value:

    example = do
        _ <- getLine
        ...

So the requirement to explicitly discard the value using `void` is in the same spirit as that compiler warning.

Dylan Tisdall

unread,
Sep 24, 2015, 11:20:06 PM9/24/15
to Haskell Pipes, dy...@geeky.net
Right, I wasn't recognizing that `Producer` was an instance of `Functor` since it's an instance of `Monad`, so I wasn't even looking there. Thanks again for all your help!

Dylan Tisdall

unread,
Sep 24, 2015, 11:47:55 PM9/24/15
to Haskell Pipes, dy...@geeky.net
I have a quick follow-up question, actually; pipes-group defines:

Pipes.Group.folds
    :: Monad m
    => (x -> a -> x)
    -- ^ Step function
    -> x
    -- ^ Initial accumulator
    -> (x -> b)
    -- ^ Extraction function
    -> FreeT (Producer a m) m r
    -- ^
    -> Producer b m r

If I'm reading this right, when my FreeT "list" consists of just one Producer, then Pipes.Groups.folds returns a Producer that yields one output, and preserves the original Producer's return type, r, in the returned Producer. This is in contrast to the similar function

Pipes.Prelude.fold :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Producer a m () -> m b

which only works on Producers with return type (). You note in the documentation for Pipes.Prelude.fold that this type is required because it may stop drawing from the Producer early, so you don't necessarily get to compute the return type. I'm wondering if it's easy to define a function

foldToProducer :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Producer a m r -> Producer b m r

that does what I think Pipes.Group.folds is doing, but without needing all the FreeT bits as well. As an exercise, I tried to write foldToProducer, but couldn't figure it out.


Thanks again,
Dylan

Michael Thompson

unread,
Sep 26, 2015, 12:06:55 AM9/26/15
to Haskell Pipes, dy...@geeky.net

The type you mention 

    foldToProducer :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Producer a m r -> Producer b m r

could be doing one of two things. It could be a scan, which appears as a pipe in Pipes.Prelude

     P.scan :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Pipe a b m r

so 

     \op seed out p -> p >-> P.scan op seed out

     :: Monad m =>

       (x -> b -> x)

          -> x -> (x -> b) -> Producer a m r -> Producer b m r  -- specializing a bit


But it seems you want to fold, but then yield the single result. (This is effectively what folds is doing, repeatedly for each of the successive producers in 

a FreeT (Producer a m) m r.  But the simple case is pretty straightforward with the asterisked fold' in Pipes.Prelude:


     P.fold'

     :: Monad m =>

        (x -> a -> x) -> x -> (x -> b) -> Producer a m r -> m (b, r)


     foldToProducer

      :: Monad m => (x -> a -> x)  -> x -> (x -> b) -> Producer a m r -> Producer b m r   


     foldToProducer op seed out p = do 

          (b,r) <- lift $ P.fold' op seed out p

          yield b

          return r


I don't know if there would be a swanker way of doing it just with standard combinators. But is that more like what you mean?

Michael Thompson

unread,
Sep 26, 2015, 12:21:55 AM9/26/15
to Haskell Pipes, dy...@geeky.net
I meant to say that it would be easy to do it just as you were thinking, using `folds`, 
if `Control.Monad.Trans.Free` had a function :: f r -> FreeT f m r.
There isn't any reason why it doesn't

    elevate :: (Monad m, Functor f) => f r -> FreeT f m r
    elevate = FreeT . return . Free . fmap return

Then  we get

     \op seed out ->  folds op seed out . elevate

     :: Monad m =>

         (x -> a -> x)

         -> x -> (x -> b) -> Producer a m r -> Producer b m r


as you planned. 

Gabriel Gonzalez

unread,
Sep 27, 2015, 12:05:52 PM9/27/15
to haskel...@googlegroups.com, dy...@geeky.net, practica...@gmail.com
Like Michael mentioned, you want `Pipes.Prelude.fold'`

If you know that your `FreeT` list only has one `Producer`, then you should encode that in the type by keeping it as a `Producer`.  Then the question becomes how to fold that `Producer` directly instead of folding it within the context of a `FreeT` list, and that's what the `fold'` function does: it folds the producer and also preserves the return value:

    fold' :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Producer a m r -> m (b, r)

... or combined with the `foldl` library it would be:

    purely fold' :: Monad m => Fold a b -> Producer a m r -> m (b, r)

Dylan Tisdall

unread,
Oct 1, 2015, 5:11:12 PM10/1/15
to Haskell Pipes, dy...@geeky.net, practica...@gmail.com
Thanks Michael and Gabriel for your help; despite appearances here, I am actually making headway on my project!

I've run into a new point of confusion, though, related to how to allow failure in parsing over groups. More explicitly:

My data naturally breaks into groups, so I've got a method

groupByID :: Monad m => P.Producer MDHAndScanLine m (Either String ()) ->
P.FreeT (P.Producer MDHAndScanLine m) m (Either String ())

That takes my Producer of MDHAndScanLines and breaks them using the relevant comparison. My question is now how to parse this new FreeT structure when it's possible that the parsing might fail. For example, I want to process each group, making them into a new data type `GroupedMDHs`, but it's possible I'll discover as I'm consuming the data that my preconditions for making a `GroupedMDHs` are violated, at which point I want to fail. So I want a method like:

example: :: Monad m => P.FreeT (P.Producer MDHAndScanLine m) m (Either String ()) ->
P
.Producer GroupedMDHs m (Either String ())

where I return `Left "Error"` if either the underlying Producer returns a `Left`, or if I discover I can't make a GroupedMDH.

To make my example more concrete, say I have a function:

groupedMDHParser :: Monad m => Int -> P.Parser MDHAndScanLine m (Either (Either String ()) GroupedMDHs)

This parser consumes all the input from a producer, and makes a `GroupedMDHs`. However, if the length of data consumed doesn't match the first `Int` argument, then it returns `Left $ Left "Error"` and otherwise it returns a `GroupedMDHs`. I chose the type signature to be compatible with `parseForever`, since that would turn this parser into a pipe that matched my original producer type `P.Producer MDHAndScanLine m (Either String ())`. I've tried to find a way to apply this pipe to each element of the FreeT "list" in turn, but the return types don't match up. I understand at a conceptual level that I can't just map my transformation over each element of the FreeT, because if the application of `groupedMDHParser` fails, its implicitly also truncating the FreeT structure at that point and returning early. So this doesn't work:

example groupedMDHScanLines = P.concats $ P.maps (\p -> p >-> groupedMDHParser) groupedMDHScanLines


I can also see that Pipes.Groups.folds is a candidate, but I don't see how I can get it to exit early if processing one of the groups fails. Is there a standard idiom for that kind of behaviour?


Thanks again,
Dylan
...

Dylan Tisdall

unread,
Oct 2, 2015, 9:47:00 AM10/2/15
to Haskell Pipes, dy...@geeky.net, practica...@gmail.com
Maybe I should ask a more general question, in case I'm going about this all wrong with my previous detailed question. I'm processing a file that consists of millions of "records". The records (`MDHAndScanLine`) were generated by a process with a series of nested loops, so each record contains data, and the loop counters when the data was generated. Having parsed these records into a producer of type `P.Producer MDHAndScanLine m (Either String ())`, I've basically flattened out all the loops into a single list. My problem is now that I've got multiple operations I'd like to do in one pass down the `Producer` (e.g., scan through it and make sure that the loop counters are all in the order I expect; do processing of type A to the data and output the result; do processing of type B on only data with certain loop counters and output the result). Ideally, if any of these operations encounter a failure, I'd like to report the error and bail on the whole thing.

I was originally thinking of making each one of these operations a Pipe, and having them stop `yield`-ing and return errors immediately if something goes wrong; that's what lead to my questions about FreeT. However, based on the ongoing thread "Help folding a Producer into a value", I'm getting the sense that Control.Foldl might be the better tool for my problem. I can see that if I write each of my operations at a `Fold` (or `FoldM` for logging output along the way), then I can combine them all into one joint operation that will traverse the `Producer`. I can see that you can use the fold's state to keep track of whether you've encountered an error, and then the fold's step function can just stop updating on new data. What's unclear to me is how to make all of the combined `Fold`s stop if any one of them encounter an error state.

Also, the nice part of the Pipe-style solution I described first is that I don't think it'll keep reading the file once it encounters an error (it will stop `await`-ing one it returns an error). My impression is that the Fold approach will insist on traversing the whole file every time, and just ignore all the data once its had an error. Or perhaps I've totally missed something about Fold that allows it to stop early if it has its result.

Sorry for the confusion! These tools all seem really helpful, but I'm still trying to figure out how to put them together the right way.


Thanks again,
Dylan
...

Daniel Díaz

unread,
Oct 2, 2015, 10:31:18 AM10/2/15
to Haskell Pipes, dy...@geeky.net, practica...@gmail.com
> What's unclear to me is how to make all of the combined `Fold`s stop if any one of them encounter an error state.

You could try using a monadic fold that works in the ExceptT monad transformer. Such folds can "stop early".
...

Dylan Tisdall

unread,
Oct 2, 2015, 4:23:39 PM10/2/15
to Haskell Pipes, dy...@geeky.net, practica...@gmail.com
Thanks, Daniel, that was a really helpful suggestion! In case someone else is ever looking for an example of using ExceptT and Control.Foldl to validate/process data from a Producer, stopping immediately if any of the operations experience an error, here's a small program demonstrating how to do it:

http://pastebin.com/HgXQRm3p

This example produces a list of pairs, and then folds over them to validate some ordering properties of the pairs, and also prints the pairs out as they're produced. Running the example shows that if any of the validators experience an error, the computation is terminated and we get the error back immediately.


Thanks again,
Dylan
...

Gabriel Gonzalez

unread,
Oct 3, 2015, 4:59:04 PM10/3/15
to haskel...@googlegroups.com, dy...@geeky.net, practica...@gmail.com
Looks nice!

I only have one minor tip, which is that instead of this:

    combinedFold2 :: MonadIO m => L.FoldM (ExceptT String m) (Int, Int) ((),(),())
    combinedFold2 = (,,) <$> printEveryElement <*> checkInnerLoop <*> checkOuterLoop

... you can do this:

    combinedFold2 :: MonadIO m => L.FoldM (ExceptT String m) (Int, Int) ()
    combinedFold2 = printEveryElement *> checkInnerLoop *> checkOuterLoop

This works because the type of `(*>)` is:

    (*>) :: Applicative f => f a -> f b -> f b
--

Daniel Díaz

unread,
Oct 4, 2015, 7:08:55 AM10/4/15
to Haskell Pipes, dy...@geeky.net, practica...@gmail.com
You're welcome!

This is more odd than useful, but these "fallible folds" admit a monad instance. If you wrap them in a newtype

    newtype Fallible m r i e = Fallible { getFallible :: FoldM (ExceptT e m) i r }

Then you can define return as "create a fold that begins in a failed state" and >>= as "if an error is encountered, keep folding using a new fold constructed from the error".

...
Reply all
Reply to author
Forward
0 new messages