Consume multiple streams in parallel

76 views
Skip to first unread message

Kostiantyn Rybnikov

unread,
Mar 29, 2016, 7:13:35 AM3/29/16
to Haskell Pipes
I have a pipe which consumes data from storage, folding it with foldl package. It's all great and wonderful. But now, since I have multiple tasks which stream the same data basically, but fold them with different folds into different report types, I want to gather them into a single piped stream, which gets folded into multiple reports at once. On one hand I'd like to have a guarantee that data gets stored in a constant memory, on the other hand, I'd like reports to be computed in different threads.

Which are the best practices to achieve this? Is there a way to make a "tee" on a pipe, but then make sure it somehow gets blocked when consumer of one channel's copy gets "too fast", e.g. when slower report is 2 times slower and faster report consumes 2000 items while slower one only consumed 1000, I'd like to block faster report to wait until slower one catches up to not have more than 1000 elems in memory.

Any ideas or experiences are welcome. Thank you!

David McBride

unread,
Mar 29, 2016, 12:09:37 PM3/29/16
to haskel...@googlegroups.com
Have you looked at the pipes-concurrency package?  It allows you to spawn threads from consumers and producers that pass messages between each other and you can bound the buffers to allow you to achive good memory usage.  It has a nice tutorial as well.

--
You received this message because you are subscribed to the Google Groups "Haskell Pipes" group.
To unsubscribe from this group and stop receiving emails from it, send an email to haskell-pipe...@googlegroups.com.
To post to this group, send email to haskel...@googlegroups.com.

John Wiegley

unread,
Mar 29, 2016, 12:56:25 PM3/29/16
to David McBride, haskel...@googlegroups.com
>>>>> David McBride <toa...@gmail.com> writes:

> Have you looked at the pipes-concurrency package? It allows you to spawn
> threads from consumers and producers that pass messages between each other
> and you can bound the buffers to allow you to achive good memory usage. It
> has a nice tutorial as well.

pipes-async also wraps up this sort of functionality into a simple >&>
operator, if one's needs are not terribly complex. That is, it frees one from
having to manage the intermediate mailbox.

--
John Wiegley GPG fingerprint = 4710 CF98 AF9B 327B B80F
http://newartisans.com 60E1 46C4 BD1A 7AC1 4BA2

Daniel Díaz

unread,
Mar 29, 2016, 7:33:19 PM3/29/16
to Haskell Pipes
How about this for a possible implementation using pipes-concurrency:

    {-# LANGUAGE DeriveFunctor #-}
    {-# LANGUAGE RankNTypes #-}

    import qualified Control.Foldl as Foldl
    import Control.Concurrent.Async
    import Control.Exception
    import Pipes 
    import qualified Pipes.Prelude as Pipes
    import Pipes.Concurrent

    newtype Fold1 b a = Fold1 { runFold1 :: forall r. Producer b IO r -> IO (a,r) } deriving (Functor)

    withFold :: Foldl.Fold b a -> Fold1 b a 
    withFold aFold = Fold1 (adapt (Foldl.generalize aFold))
        where
        adapt f = \producer -> Foldl.impurely Pipes.foldM' f producer

    appFold1 :: Int -> Producer b IO r -> Fold1 b a1 -> Fold1 b a2 -> IO ((a1,a2),r)
    appFold1 bufsize producer (Fold1 fs) (Fold1 as) = do
        (outbox1,inbox1,seal1) <- spawn' (bounded bufsize)
        (outbox2,inbox2,seal2) <- spawn' (bounded bufsize)
        runConcurrently $
            (\(a1,()) (a2,()) r -> ((a1,a2),r))
            <$>
            Concurrently (fs (fromInput inbox1) `finally` atomically seal1)
            <*>
            Concurrently (as (fromInput inbox2) `finally` atomically seal2)
            <*>
            (Concurrently $
                (runEffect (producer >-> Pipes.tee (toOutput outbox1 *> Pipes.drain) 
                                                 >->                 (toOutput outbox2 *> Pipes.drain)))
                `finally` atomically seal1 
                `finally` atomically seal2)

I haven't tested it, however.
Reply all
Reply to author
Forward
Message has been deleted
0 new messages