Are you thinking of regular pure parallelism, as with `parallel` or `monad-par` or of something fancier like the work stealing example in the pipes concurrency tutorial (which isn't itself appropriate here, I think, since the order of events is important)?
If you are thinking of pure parallelism here is a flat-footed approach. In choosing a batch size you would be surveying the whole producer, so you can't think inside the pipeline. You can first freeze each batch to a list or something, say
batched :: Monad m => Int -> Producer a m x -> Producer [a] m x
batched n p = L.purely folds L.list (view (chunksOf n) p)
then resume piping with something like
>>> :t \n f p -> batched n p >-> P.mapM (runParIO . parMap f) >-> P.concat -- or P.map (runPar . parMap f)
\n f p -> batched n p >-> P.mapM (runParIO . parMap f) >-> P.concat
:: NFData c =>
Int -> (a -> c) -> Producer a IO r -> Producer IO r
The equivalent could be done with `async`. You'd have to think out whether waiting to accumulate a batch and then processing simultaneously and continuing would be an improvement on processing blocks as they come.
You received this message because you are subscribed to the Google Groups "Haskell Pipes" group.
To unsubscribe from this group and stop receiving emails from it, send an email to haskell-pipe...@googlegroups.com.
Here's the module I was working with fwiw. Is `chunkEval` below completely beside the point? It does speed up the calculation of the sum a bit, with `+RTS -N2`
import Control.Foldl (list, purely)
import Control.Parallel
import Control.Monad.Par
import Lens.Family.State.Strict (zoom)
import Lens.Family
import Pipes
import Pipes.Parse
import Prelude hiding (splitAt)
import qualified Pipes.Prelude as P
import Control.Monad (replicateM_)
import Pipes.Group
import Control.Exception
import Control.Concurrent.Async
fib :: Integer -> Integer
fib 0 = 1
fib 1 = 1
fib n = fib (n-1) + fib (n-2)
parallelize :: Monad m => Int -> Producer a m r -> Producer a m r
parallelize n p = do
let parser = zoom (splitAt n) drawAll
(as, p') <- lift (runStateT parser p)
as `par` (each as >> parallelize n p')
batched n p = purely folds list (view (chunksOf n) p)
chunkEval :: MonadIO m => Int -> Producer a m r -> Producer a m r
chunkEval n p = purely folds list (view (chunksOf n) p)
>-> P.mapM (liftIO . mapConcurrently evaluate)
>-> P.concat
fibproducer n = replicateM_ n p >-> P.map fib
p = each [15..28::Integer]
main = do
-- n <- P.sum $ parallelize 5 (fibproducer 7)
-- n <- P.sum $ fibproducer 7
n <- P.sum $ chunkEval 10 $ fibproducer 7
print n
--