N independent passes

56 views
Skip to first unread message

Antonio Nikishaev

unread,
Apr 7, 2015, 8:05:08 AM4/7/15
to haskel...@googlegroups.com

What is the sane way to do something like this?

(await >>= \x -> yield (Left x) >> yield (Right x))
>-> p1 +++ p2

Let's assume p1 or/and p2 are too complex to fit into Foldl.


--
lelf

Dan Burton

unread,
Apr 7, 2015, 1:14:20 PM4/7/15
to haskel...@googlegroups.com
p1 `idealOp` p2

What's the type signature you're shooting for with `idealOp`?

The idea of what you're doing looks sane to me. To put a few names to it:

dup :: a ~> (a,a)
dup = arr $ \a -> (a,a)

decouple :: (a,b) ~> Either a b
decouple = forever $ await >>= \(a,b) -> yield (Left a) >> yield (Right b)

(&&&) :: (a ~> b) -> (a ~> c) -> (a ~> (b,c))
-- ^ for comparison
idealOp :: (a ~> b) -> (a ~> b) -> (a ~> b)
idealOp p1 p2 = dup >>> decouple >>> (p1 +++ p2)

It seems that for this case, decouple might be important. While they're running in lock step, p1 might yield but p2 might not. If you *want* their yields to be coupled, then instead try this:

merge :: (a,a) ~> a
merge = forever $ await >>= \(a,a') -> yield a >> yield a'

idealOp :: (a ~> b) -> (a ~> b) -> (a ~> b)
idealOp p1 p2 = (p1 &&& p2)  >>> merge

I'm writing in pseudo-code, but I hope what I'm saying is clear.

-- Dan Burton



--
lelf

--
You received this message because you are subscribed to the Google Groups "Haskell Pipes" group.
To unsubscribe from this group and stop receiving emails from it, send an email to haskell-pipe...@googlegroups.com.
To post to this group, send email to haskel...@googlegroups.com.

Gabriel Gonzalez

unread,
Apr 7, 2015, 1:39:13 PM4/7/15
to haskel...@googlegroups.com
What's wrong with what you just wrote?

Also, I think you probably meant:

forever (await >>= \x -> yield (Left x) >> yield (Right x))

Michael Thompson

unread,
Apr 8, 2015, 9:44:53 AM4/8/15
to haskel...@googlegroups.com
I'm just wondering for clarification: are you looking for something that -- if the consumers p1, p2 ... pN were in `IO` -- might look like so:


    import Control.Concurrent.Async
    import qualified Pipes.Prelude as P
    import Pipes.Concurrent
    import Pipes
    import Control.Monad

    together :: Producer x IO () -> [Consumer x IO ()] -> IO ()
    together producer consumers = do
        (output, input) <- spawn unbounded
        as <- forM consumers $ \consumer ->
              async $ do runEffect $ fromInput input  >-> consumer
                         performGC
        a  <- async $ do runEffect $ producer >-> toOutput output
                         performGC
        mapM_ wait (a:as)

Michael Thompson

unread,
Apr 8, 2015, 10:03:13 AM4/8/15
to haskel...@googlegroups.com
I should have said that my assumption that you meant them to be consumers is the you compared them to Folds.

Daniel Díaz

unread,
Apr 8, 2015, 4:22:07 PM4/8/15
to haskel...@googlegroups.com
In my process-streaming package, I have a type called Siphon that behaves a bit like this. Its Applicative instance uses pipes-concurrency tricks to achieve "independent passes". A simple example:

import System.Process.Streaming
import System.Process.Streaming.Internal (runSiphon)
import Pipes
import qualified Pipes.Prelude as P
import Control.Applicative 

runSiphon (fromConsumer P.print *> fromConsumer P.print)  (yield 5 >> yield 6)

 
Reply all
Reply to author
Forward
0 new messages