Parallelizing fold of Producer

94 views
Skip to first unread message

Rune Kjær Svendsen

unread,
Nov 4, 2015, 3:09:47 PM11/4/15
to Haskell Pipes
I currently have a pipe which produces Blocks which are folded into a BlockState using Pipes.Prelude.fold.

I have a function which combines two BlockStates into one, and this is relatively quick. The slowest part is converting Blocks into a BlockState. So it seems that a good parallelization strategy would be to convert batches of Blocks into BlockStates in parallel, and merge the resulting BlockStates into the final BlockState.

So I need to attach something to the Block pipe which is parallelizable, which takes, say, 100 blocks and converts them into a BlockState. This process can be executed in parallel. The resulting BlockStates can be sent down another pipe which just folds the BlockStates into an empty BlockState.

I can't find anything on parallel processing in pipes. How do I enable parallel processing inside a pipe?

Michael Thompson

unread,
Nov 5, 2015, 8:22:42 AM11/5/15
to Haskell Pipes

Are you thinking of regular pure parallelism, as with `parallel` or `monad-par` or of something fancier like the work stealing example in the pipes concurrency tutorial (which isn't itself appropriate here, I think, since the order of events is important)?


If you are thinking of pure parallelism here is a flat-footed approach.  In choosing a batch size you would be surveying the whole producer, so you can't think inside the pipeline. You can first freeze each batch to a list or something, say

   

     batched :: Monad m => Int -> Producer a m x -> Producer [a] m x

     batched n p = L.purely folds L.list (view (chunksOf n) p)


then resume piping with something like 


    >>> :t \n f p -> batched n p >-> P.mapM (runParIO . parMap f) >-> P.concat      --  or P.map (runPar . parMap f) 

    \n f p -> batched n p >-> P.mapM (runParIO . parMap f) >-> P.concat

     :: NFData c =>

         Int -> (a -> c) -> Producer a IO r -> Producer IO r


The equivalent could be done with `async`.  You'd have to think out whether waiting to accumulate a batch and then processing simultaneously and continuing would be an improvement on processing blocks as they come.

Pierre Radermecker

unread,
Nov 5, 2015, 11:22:14 AM11/5/15
to haskel...@googlegroups.com
There is also parallel-io. FWIW I have a project where `parallel-io`
seems to give me better performance compared to `mapConcurrently` from
async. Though I was willing to believe the opposite as I was about to
replace the first with the second (`parallel-io` is an older project).
I have never really understood why nor playing with more accurate
benchmarking for it.

I would be interested in hearing any practical experience on the
subject. I guess the par monad is really interesting when you have
more free cores available for your application (is there a good lucky
number here ?)
> --
> You received this message because you are subscribed to the Google Groups
> "Haskell Pipes" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to haskell-pipe...@googlegroups.com.
> To post to this group, send email to haskel...@googlegroups.com.



--
Pierre

Pierre Radermecker

unread,
Nov 5, 2015, 11:43:27 AM11/5/15
to haskel...@googlegroups.com
My use case is the following: 10 to 60 independent IO computations
that takes about if not less than a quarter of sec each; to be run on
machine with 2 to 4 CPUs max.

What would me my best bet performance wise ?
- the usual sequential `mapM` (casual testing seems to suggest it is
less optimal but the difference is not that big either)
- parallel-io (casual testing seems to suggest it is the more optimal option)
- mapConcurrently (for some reasons always a bit less optimal compared
to parallel-io)
- parMonad or another form of "true parallelism" (my feeling is that
it does not really fit the use case because there isn't a lot of core
CPUs and the worker tasks doesn't take much time either)

As a note (not using any -Nx seems to offer the best speed on
multithreading CPUs).

I guess only benchmarking would tell for sure but in case one of you
has some tips/advises on that matter ...
--
Pierre

Rune Kjær Svendsen

unread,
Nov 5, 2015, 5:27:57 PM11/5/15
to haskel...@googlegroups.com
Yes, I’m thinking of regular parallelism. The “par”/“parseq” pattern looks like what I want, as far as I can see.

So the goal is to convert a Producer of blocks into a producer of lists of blocks, as you write, but I’m not sure I understand your second example.

I'm looking for a pipe that takes in a list of Blocks and produces a spark using “seq” that creates a thread which starts converting this [Block] into a BlockState. This should continue until all cores are utilized converting [Block] -> BlockState. When a thread finishes it sends its BlockState down the pipe. The order of BlockStates is important, so if a thread finishes out-of-order, the result will have to be queued.

I’m not sure if this is too much to ask, but it's what I have in mind.

In your example, as far as I can see, the individual Block elements of the block list are just processed in parallel, which doesn't offer a speedup, since the folding of Blocks into a BlockState is sequential.

I'm thinking something like this:

blockListToBlockStatePar :: Pipe [Block] (Either String BlockState) IO ()
blockListToBlockStatePar = do
bl <- await
yield (processBlockList emptyBS bl) `par`
blockListToBlockStatePar

processBlockList :: BlockState -> [Block] -> Either String BlockState

But I get an error I don't understand:

Error:(106, 5) ghc: No instance for (Monad m0) arising from a use of ‘yield’
The type variable ‘m0’ is ambiguous
Note: there are several potential instances:
instance Monad ((->) r) -- Defined in ‘GHC.Base’
instance Monad IO -- Defined in ‘GHC.Base’
instance Monad [] -- Defined in ‘GHC.Base’
...plus 25 others
In the first argument of ‘par’, namely
‘yield (processBlockList emptyCS bl)’



/Rune
> --
> You received this message because you are subscribed to a topic in the Google Groups "Haskell Pipes" group.
> To unsubscribe from this topic, visit https://groups.google.com/d/topic/haskell-pipes/FItX8aZ588g/unsubscribe.
> To unsubscribe from this group and all its topics, send an email to haskell-pipe...@googlegroups.com.

Michael Thompson

unread,
Nov 6, 2015, 8:21:38 AM11/6/15
to Haskell Pipes

The error is arising because 

    par :: a -> b -> b

thus 
 

    yield (processBlockList emptyBS bl) 

could be in any monad at all.  Note that nothing will actually be yielded by the do-block as you defined it, even if we added a signature.  What you seem to be envisaging there, though, would involve simultaneous calculation of elements across the whole length of the initial pipe or rather producer. That would conflict with its being a regular producer, but I will try to piece together more of what you are saying, there's still something I don't get.


   

Michael Thompson

unread,
Nov 7, 2015, 1:49:03 PM11/7/15
to Haskell Pipes

Right, I think I misunderstood the original message as saying that there was a slow function

     Block -> BlockState

and that one could fold the BlockStates up monoidally. But I guess the slow function is

    BlockState -> Block -> BlockState

Rune Kjær Svendsen

unread,
Nov 7, 2015, 2:24:39 PM11/7/15
to haskel...@googlegroups.com
Yes. I see that I was less clear in my original message than I could have been. The slow/fast function isn't really relevant. Allow me to start over :)

I have a function:

processBlock :: BlockState -> Block -> BlockState

which folds a Block into a BlockState which has been accumulated from Blocks previous to the Block in question.

As such, this function only allows sequential operation on a list/stream of blocks.

I also have a function which combines two BlockStates into one:

consolidateBlockState :: BlockState -> BlockState -> BlockState

where the first BlockState is accumulated from Blocks prior to the Blocks from which the latter BlockState is made of.

This allows me to turn the otherwise sequential operation into a parallel one. I just can't figure out how to get "par" and "pseq" working inside a Pipe, in order to fold multiple sets of Blocks into BlockStates in parallel (on multiple CPU cores).


/Rune


Gabriel Gonzalez

unread,
Nov 8, 2015, 11:04:37 AM11/8/15
to haskel...@googlegroups.com, rune...@gmail.com
Let's decompose this into solving two smaller problems:

* How to process the group in blocks of 100
* How to parallelize each block

The type signature of the first step would be a function of type:

    groupAndFold :: Monad m => Producer Block m r -> Producer BlockState m r

And this would use the `pipes-group` library to partition the stream into groups of 100 elements and then fold each group.

Notice that we haven't actually evaluated anything by doing that partition-and-fold step.  Our final `Producer BlockState m r` is still a producer of unevaluated thunks.  So that leads us to the second step, which is how to parallelize each block.  The type of that would be something like:

    parallelize :: Monad m => Producer a m r -> Producer a m r

Now we have a smaller and clearer problem to solve: how do we take an arbitrary `Producer` that is yielding large unevaluated thunks and speculatively evaluate them ahead of time so that they are ready when you finally need them.

This is actually difficult to do, though, because you can't even *begin* to evaluate the Nth element that the `Producer` yield without triggering all side effects preceding that element in the `Producer`.

Let's take a simple example to illustrate the problem:

    example :: Producer Int IO ()
    example = do
        yield someExpensiveComputation1
        str1 <- lift getLine
        yield (someExpensiveComputationThatDependsOn str1)
        str2 <- lift getLine
        yield (anotherExpensiveComputationThatDependsOn str2)

The issue is that we can't even begin to evaluate the `anotherExpensiveComputationThatDependsOn str2` until we know the value of `str2`, but that requires forcing all effects leading up to the second `getLine` command.  So the only way we can speculatively evaluate a `Producer` is to force the entire producer or at least force large chunks of the `Producer` at a time (i.e. force 20 `BlockState`s worth of computation at a time).

So let's refine the type of our `parallelize` function:

    parallelize :: Monad m => Int -> Producer a m r -> Producer a m r

The first argument will be how many elements to materialize and then speculatively compute at one time.  This will materialize the `Producer` in chunks of the given size, spark off their evaluation and then re-yield them.

    import Control.Foldl (list, purely)
    import Control.Parallel (par)
    import Lens.Family.State.Strict (zoom)
    import Pipes (Producer, each, lift)
    import Pipes.Parse (foldAll, runStateT, splitAt)
    import Prelude hiding (splitAt)

    parallelize :: Monad m => Int -> Producer a m r -> Producer a m r
    parallelize n p = do
        let parser = zoom (splitAt 10) (purely foldAll list)
        (as, p') <- lift (runStateT parser p)
        as `par` (each as >> parallelize n p')

I haven't yet tested that the above code works; I only verified that it type-checks.  However, that is probably close to the best you will be able to do using `pipes`.
You received this message because you are subscribed to the Google Groups "Haskell Pipes" group.
To unsubscribe from this group and stop receiving emails from it, send an email to haskell-pipe...@googlegroups.com.

Michael Thompson

unread,
Nov 8, 2015, 1:29:57 PM11/8/15
to Haskell Pipes, rune...@gmail.com
`parallelize` seems to diverge, but I can't tell why yet.

Michael Thompson

unread,
Nov 8, 2015, 1:43:33 PM11/8/15
to Haskell Pipes, rune...@gmail.com

Here's the module I was working with fwiw.  Is `chunkEval` below completely beside the point? It does speed up the calculation of the sum a bit, with `+RTS -N2`


    import Control.Foldl (list, purely)

    import Control.Parallel

    import Control.Monad.Par

    import Lens.Family.State.Strict (zoom)

    import Lens.Family

    import Pipes 

    import Pipes.Parse 

    import Prelude hiding (splitAt)

    import qualified Pipes.Prelude as P

    import Control.Monad (replicateM_)

    import Pipes.Group

    import Control.Exception

    import Control.Concurrent.Async

    fib :: Integer -> Integer

    fib 0 = 1

    fib 1 = 1

    fib n = fib (n-1) + fib (n-2)


    parallelize :: Monad m => Int -> Producer a m r -> Producer a m r

    parallelize n p = do

        let parser = zoom (splitAt n) drawAll

        (as, p') <- lift (runStateT parser p)

        as `par` (each as >> parallelize n p')

    


    batched n p = purely folds list (view (chunksOf n) p)


    chunkEval :: MonadIO m => Int -> Producer a m r -> Producer a m r

    chunkEval n p = purely folds list (view (chunksOf n) p) 

                    >-> P.mapM (liftIO . mapConcurrently evaluate) 

                    >-> P.concat


    fibproducer n = replicateM_ n p >-> P.map fib


    p = each [15..28::Integer]


    main = do 

    --  n <- P.sum $ parallelize 5 (fibproducer 7)

    --  n <- P.sum $ fibproducer 7

      n <- P.sum $ chunkEval 10 $ fibproducer 7 

      print n

Gabriel Gonzalez

unread,
Nov 8, 2015, 4:06:45 PM11/8/15
to haskel...@googlegroups.com, rune...@gmail.com
Yeah, actually `chunkEval` is a good approach, too.  If that works then you should go with that.
--

Pierre Radermecker

unread,
Nov 9, 2015, 3:55:05 AM11/9/15
to haskel...@googlegroups.com, rune...@gmail.com
Michael, I guess it is not so easy to translate the example to use
your streaming library due to the parsing bit ?
--
Pierre

Michael Thompson

unread,
Nov 9, 2015, 7:38:55 PM11/9/15
to Haskell Pipes, rune...@gmail.com
Something like `chunkEval` or `parallelize` wouldn't work 
 `Stream (Of a) m r`  with things  are written at the moment, 
since it is strict in the a's. 

One would have to intervene, e.g., at strategic `maps`, doing
something like what I was imagining earlier, which I guess doesnt
correspond to your problem. 

    mapSpeculate :: MonadIO m => Int -> (a -> b) -> Stream (Of a) m r -> Stream (Of b) m r 
    mapSpeculate n f p = S.concat 
                                       $ S.mapM (liftIO . mapConcurrently (evaluate . f)) 
                                       $ S.mapsM S.toList
                                       $ S.chunksOf n p

works fine for `fib` - which isn't saying much. Similarly with 

    parMapSpeculate :: (MonadIO m, NFData b) => Int -> (a -> b) -> Stream (Of a) m r -> Stream (Of b) m r 
    parMapSpeculate n f p = S.concat 
                                         $ S.map (runPar . parMap f) 
                                         $ S.mapsM S.toList
                                         $ S.chunksOf n p

which seems to me much faster. I'm not sure how to get a raw `par` operation going.
Reply all
Reply to author
Forward
0 new messages