Help folding a Producer into a value

68 views
Skip to first unread message

Rune Kjær Svendsen

unread,
Sep 30, 2015, 11:40:33 PM9/30/15
to Haskell Pipes
Hello list,

I have a Producer of the following type:

    blockProducer :: Handle -> Producer Block IO ()

which reads Blocks from a file with the given handle, and yields a block one-by-one until EOF is reached.

I also have a function of the following type:

    processBlock :: BlockState -> Block -> Either String BlockState

which takes an initial BlockState, and updates it with Block, and returns either Right BlockState if no error occurs, or Left String if an error occurs.

So I want to create a Pipe that consumes Blocks from blockProducer, folds them as if I were using (foldlM processBlock), and yields the resulting Either String BlockState.

How do I do that?

Gabriel Gonzalez

unread,
Sep 30, 2015, 11:53:34 PM9/30/15
to haskel...@googlegroups.com, rune...@gmail.com
Use `Pipes.Prelude.fold`, which has this type:

    fold :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Producer a m () -> m b

The trick is realizing that this works if we specialize the `a`, `x`, and `b` type parameters to:

    a = Block
    x = Either String BlockState
    b = Either String BlockState

95% of the work is just figuring out what the type of your accumulator (i.e. the type parameter`x`) should be.  Once you figure that out it's all downhill from there.

... and also specialize `m` to `IO`, which would give us:

    fold
        :: (Either String BlockState -> Block -> Either String BlockState)
        -> Either String BlockState
        -> (Either String BlockState -> Either String BlockState)
        -> Producer Block IO () -> IO (Either String BlockState)

That means that we need to pass `fold` three functions of type:

    step  :: Either String BlockState -> Block -> Either String BlockState
    begin :: Either String BlockState
    done  :: Either String BlockState -> Either String BlockState

    fold step begin done :: Producer Block IO () -> IO (Either String BlockState)

We can create the `step` function from your `processBlock` function:

    step :: Either String BlockState -> Block -> Either String BlockState
    step e block = do
        blockState <- e
        process blockState block

The `done` function is really easy:

    done :: Either String BlockState -> Either String BlockState
    done = id

... and the `begin` function requires us to supply some sort of beginning state:

    begin :: Either String BlockState
    begin = return initialBlockState -- We'll get `initialBlockState` elsewhere

... so we can combine those together to write up the complete function:

    foldBlocks :: BlockState -> Producer Block IO () -> IO (Either String BlockState)
    foldBlocks initialBlockState = Pipes.Prelude.fold step begin done
      where
        step e block = do
            blockState <- e
            process blockState block

        begin = return initialBlockState

        done = id

Now to give a high-level explanation of what is going on.

You probably tried to originally solve this by using `Pipes.Prelude.foldM` but you noticed that the `Either` monad didn't match up with the `Producer`'s base monad, `IO`.  However, you can still use `Pipes.Prelude.fold` if you do all the `Either` monad work within the fold itself.  In other words, the step function does the binding instead of relying on the fold to do the binding for you.
--
You received this message because you are subscribed to the Google Groups "Haskell Pipes" group.
To unsubscribe from this group and stop receiving emails from it, send an email to haskell-pipe...@googlegroups.com.
To post to this group, send email to haskel...@googlegroups.com.

Rune Kjær Svendsen

unread,
Oct 1, 2015, 12:26:57 AM10/1/15
to Gabriel Gonzalez, haskel...@googlegroups.com
THANK you very much for your concise and prompt reply, this was
exactly what I needed. And yes, I was using Pipes.Prelude.foldM,
without much success.

It works beautifully now.

Cheers!



/Rune

Rune Kjær Svendsen

unread,
Oct 1, 2015, 6:56:26 AM10/1/15
to Haskell Pipes, gabri...@gmail.com
How would I go about printing log messages, while folding Blocks into the BlockState? The fold may take tens of minutes to complete, so printing information about the processing would be very useful.

Printing a message every time a block is read is simple, because reading a block from the disk already happens in the IO monad. The fold "step" function is pure, so producing a log file requires using something other than Pipes.Prelude.fold, as far as I can see.

Is there a better alternative?



/Rune



On Thursday, October 1, 2015 at 6:26:57 AM UTC+2, Rune Kjær Svendsen wrote:
THANK you very much for your concise and prompt reply, this was
exactly what I needed. And yes, I was using Pipes.Prelude.foldM,
without much success.

It works beautifully now.

Cheers!



/Rune


Michael Thompson

unread,
Oct 2, 2015, 12:21:29 AM10/2/15
to Haskell Pipes, gabri...@gmail.com
Dear Rune, 

This might be the magic moment for shifting to the `foldl` library (`cabal install foldl`). Then you can keep pretty much everything you have
from above, but slide in other independent components like logging or debugging and other stuff, e.g. a final count of blocks processed, 
as you please. Part of the reason `Pipes.Prelude.fold` has the type it has it to make it interoperate with 'Control.Foldl`.

To use the library, instead of directly applying `Pipes.Prelude.fold` to `step` `begin` and
`done` defined above, you will 'reify' the fold, to start with. That is, instead of writing 


    foldBlocks :: BlockState -> Producer Block IO () -> IO (Either String BlockState)
    foldBlocks initialBlockState = Pipes.Prelude.fold step begin done
      where
        step e block = do
            blockState <- e
            process blockState block

        begin = return initialBlockState

        done = id

you will write 

    
     blockFold : BlockState -> Control.Foldl.Fold Block (Either String BlockState)
     blockFold initialBlockState = Control.Foldl.Fold step begin done

      where
        step e block = do
            blockState <- e
            process blockState block

        begin = return initialBlockState

        done = id

then the function defined earlier is recovered with a use of `purely` from Control.Foldl

    foldBlocks :: BlockState -> Producer Block IO () -> IO (Either String BlockState)
    foldBlocks initialBlockState = Control.Foldl.purely Pipes.Prelude.fold (blockFold initialBlockState)

`purely` just extracts the components of the 'reified fold' and feeds them to a suitably
defined fold from another library. ( Here, it is the pipes fold function, but there are 

So far our results are as before, but with this little fold reifying bit. Once we've expressed
our `foldBlocks` with a `Control.Foldl.Fold`, we can start adding stuff. For example given 


    Control.Foldl.length :: Fold a Int

we can immediately complicate `foldBlocks` to also give a count of blocks processed.

    blockFoldWithLength : BlockState -> Control.Foldl.Fold Block (Int,Either String BlockState)
    blockFoldWithLength initialState = liftA2 (,) Control.Foldl.length (blockFold initialState) 

so now we can trivial alter our processing function to 



    foldBlocksWithLength :: BlockState -> Producer Block IO () -> IO (Int, Either String BlockState)

    foldBlocksWithLength initialBlockState = 
            Control.Foldl.purely Pipes.Prelude.fold (blockFoldWithLength initialBlockState)


But the problem wasn't to snap a pure component like block count into our fold, but to snap in
an impure logging component. So we are using the `FoldM` type from `Control.Foldl`, not the `Fold` type.
No problem, we already have a `FoldM` on our hands:

     blockFoldM : Monad m => BlockState -> Control.Foldl.FoldM m Block (Either String BlockState)
     blockFoldM initialState = generalize (blockFold initialState) 


and we could as well have written our initial `foldBlocks` as: 

    foldBlocks :: BlockState -> Producer Block IO () -> IO (Either String BlockState)
    foldBlocks initialBlockState = Control.Foldl.impurely Pipes.Prelude.foldM (blockFoldM initialBlockState)

but now we have it in a shape where we can snap in a logger function. We might do this
by hand writing something with this shape

     loggerFold :: Handle -> Control.FoldM IO Block ()
     loggerFold = FoldM step begin done where
        step x block = undefined -- new 'IO x' whatever it is, probably IO ()
        begin = undefined        -- initial IO x, probably 'return ()'
        done _ = return ()       -- given I assumed () in the signature


So it is a question of writing `step` to do whatever you want to be logged from each block.
Then you will just write:


    foldBlocksWithLogging :: BlockState -> Producer Block IO () -> IO (Either String BlockState)

    foldBlocksWithLogging initialBlockState = 
        Control.Foldl.impurely Pipes.Prelude.foldM (blockFoldM initialBlockState <* loggerFold)

or

      foldBlocksWithLogging :: BlockState -> Producer Block IO () -> IO (Either String BlockState)
    foldBlocksWithLogging initialBlockState = 
        Control.Foldl.impurely Pipes.Prelude.foldM (generalize (blockFold initialBlockState) <* loggerFold)

a sufficiently trivial logger can be written with 

    L.sink :: (Monad m, Monoid w) => (a -> m w) -> L.FoldM m a w

the ultra-minimal debugging fold would be 

        blockAlertFoldM :: FoldM IO Block ()
        blockAlertFoldM = sink (\block -> putStrLn "Block Processed!")

then we have, e.g.

    foldBlocksWithAlert :: BlockState -> Producer Block IO () -> IO (Either String BlockState)
    foldBlocksWithAlert initialBlockState = 
        Control.Foldl.impurely Pipes.Prelude.foldM (generalize (blockFold initialBlockState) <* loggerFold)


and could start throwing in other components


    foldBlocksWithAlertAndLength :: BlockState -> Producer Block IO () -> IO (Int, Either String BlockState)

    foldBlocksWithAlertAndLengh initialBlockState = Control.Foldl.impurely Pipes.Prelude.foldM myfolds where
         myfolds = generalize mypurefolds <* loggerFold
         mypurefolds = liftA2 (,) Control.Foldl.length (blockFold initialBlockState) 

     
Here we get the block count at the end, and are printing "Block processed!" to stdout as each block is processed.

There are probably a few gruesome type and typing errors in the above, but I hope it makes the Control.Foldl approach
to your problem clear. I probably managed to cross some crucial consideration of importance to Gabriel, but it's always pleasing to find out...


Michael Thompson

unread,
Oct 2, 2015, 12:29:20 AM10/2/15
to Haskell Pipes, gabri...@gmail.com
I used 'loggerFold' where I wanted to use my simple ` blockAlertFoldM` so the last several lines of code should be

 

then we have, e.g.

    foldBlocksWithAlert :: BlockState -> Producer Block IO () -> IO (Either String BlockState)
    foldBlocksWithAlert initialBlockState = 
        Control.Foldl.impurely Pipes.Prelude.foldM (generalize (blockFold initialBlockState) <* blockAlertFoldM)


and could start throwing in other components


    foldBlocksWithAlertAndLength :: BlockState -> Producer Block IO () -> IO (Int, Either String BlockState)

    foldBlocksWithAlertAndLengh initialBlockState = Control.Foldl.impurely Pipes.Prelude.foldM myfolds where
         myfolds = generalize mypurefolds <* blockAlertFoldM 
         mypurefolds = liftA2 (,) Control.Foldl.length  (blockFold initialBlockState)

 
or something like that.

Gabriel Gonzalez

unread,
Oct 2, 2015, 12:45:27 AM10/2/15
to Rune Kjær Svendsen, Haskell Pipes
So this is a situation where you would use `Pipes.Prelude.foldM` and the solution would look something like this:

    foldBlocks :: BlockState -> Producer BlockState IO () -> IO (Either String BlockState)
    foldBlocks initialBlockState = Pipes.Prelude.foldM step begin done
      where
        step :: Either String BlockState -> Block -> IO (Either String BlockState)
        step (Right blockState) block = do
            print "your progress information here"
            return (process blockState block)
        step (Left str        ) _     = do
            return (Left str)

        begin :: IO (Either String BlockState)
        begin = return (Right initialBlockState)

        done :: Either String BlockState -> IO (Either String BlockState)
        done = return

Gabriel Gonzalez

unread,
Oct 2, 2015, 10:44:33 AM10/2/15
to Ben Gamari, Rune Kjær Svendsen, Haskell Pipes
Actually, I like Ben's solution better than my own. Just print each
element going into the fold before you fold it. My only contribution
here will be to point out that the `notify` function already exists as
`Pipes.Prelude.chain`, so you could just write:

foldBlocks initialBlockState (yourProducer >-> Pipes.Prelude.chain
(liftIO . print))

On 10/01/2015 08:17 AM, Ben Gamari wrote:
> Rune Kjær Svendsen <rune...@gmail.com> writes:
>
>> How would I go about printing log messages, while folding Blocks into the
>> BlockState? The fold may take tens of minutes to complete, so printing
>> information about the processing would be very useful.
>>
>> Printing a message every time a block is read is simple, because reading a
>> block from the disk already happens in the IO monad. The fold "step"
>> function is pure, so producing a log file requires using something other
>> than Pipes.Prelude.fold, as far as I can see.
>>
>> Is there a better alternative?
>>
> You needn't touch your consumer. Instead just compose your producer with
> a something that runs an action on every downstream value,
>
> notify :: (a -> m ()) -> Pipe a a m r
> notify action = forever $ do
> x <- await
> action x
> yield x
>
> producer :: Producer a m ()
> producer = undefined
>
> statusProducer :: MonadIO m => Producer a m ()
> statusProducer = producer >-> notify (\_ -> liftIO $ putStr ".")
>
> Cheers,
>
> - Ben

Michael Thompson

unread,
Oct 2, 2015, 12:21:28 PM10/2/15
to Haskell Pipes, b...@smart-cactus.org, rune...@gmail.com
Ah yes, Ben's idea of just adding the logging to the producer itself via a pipe,
rather than adding it to the fold over the producer, fits with the goal of 
preserving the purity of the work already done.  I was taking that as 
part of the desideratum, but obviously adding a suitable pipe  
only adds impurity where we are already committed to it, and 
keeps things within the system of concepts already being used.

Rune K. Svendsen

unread,
Oct 3, 2015, 6:10:10 AM10/3/15
to Michael Thompson, Haskell Pipes, b...@smart-cactus.org
Thank you everyone. I've already got logging down wrt. incoming blocks.
I want to log about the accumulated state as well, though, so I need the "step" function to return an IO action. And the information provided is exactly what I needed to figure that out.

Cheers! :)

/Rune
Reply all
Reply to author
Forward
0 new messages