{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE RankNTypes #-}
import qualified Control.Foldl as Foldl
import Control.Concurrent.Async
import Control.Exception
import Pipes
import qualified Pipes.Prelude as Pipes
import Pipes.Concurrent
newtype Fold1 b a = Fold1 { runFold1 :: forall r. Producer b IO r -> IO (a,r) } deriving (Functor)
withFold :: Foldl.Fold b a -> Fold1 b a
withFold aFold = Fold1 (adapt (Foldl.generalize aFold))
where
adapt f = \producer -> Foldl.impurely Pipes.foldM' f producer
appFold1 :: Int -> Producer b IO r -> Fold1 b a1 -> Fold1 b a2 -> IO ((a1,a2),r)
appFold1 bufsize producer (Fold1 fs) (Fold1 as) = do
(outbox1,inbox1,seal1) <- spawn' (bounded bufsize)
(outbox2,inbox2,seal2) <- spawn' (bounded bufsize)
runConcurrently $
(\(a1,()) (a2,()) r -> ((a1,a2),r))
<$>
Concurrently (fs (fromInput inbox1) `finally` atomically seal1)
<*>
Concurrently (as (fromInput inbox2) `finally` atomically seal2)
<*>
(Concurrently $
(runEffect (producer >-> Pipes.tee (toOutput outbox1 *> Pipes.drain)
>-> (toOutput outbox2 *> Pipes.drain)))
`finally` atomically seal1
`finally` atomically seal2)
I haven't tested it, however.