Push-based pipes are arrows and applicatives

221 views
Skip to first unread message

Gabriel Gonzalez

unread,
Jul 3, 2013, 2:29:27 PM7/3/13
to haskel...@googlegroups.com
hpaste is down, so I'm pasting it here:

{-# LANGUAGE RankNTypes, FlexibleInstances #-}

import Control.Applicative
import Control.Arrow
import Control.Category
import Control.Monad.Trans.State.Strict (get, put)
import Data.Monoid
import Pipes
import Pipes.Internal
import Pipes.Lift (evalStateP)
import Pipes.Prelude (foreverK)
import Prelude hiding ((.), id, print)
import qualified Prelude

import Control.Monad (mapM_)

newtype Edge m r a b = Edge { unEdge :: a -> Pipe a b m r }

instance (Monad m) => Category (Edge m r) where
    id  = Edge push
    (Edge p2) . (Edge p1) = Edge (p2 <~< p1)

instance (Monad m) => Arrow (Edge m r) where
    arr f = Edge (foreverK (respond . f >=> request))
    first (Edge p) = Edge $ \(b, d) ->
        evalStateP d $ (up \>\ hoist lift . p />/ dn) b
      where
        up () = do
            (b, d) <- request ()
            lift $ put d
            return b
        dn c = do
            d <- lift get
            respond (c, d)

runEdge :: (Monad m) => Edge m r () a -> m r
runEdge (Edge p) = runEffect $ (up \>\ p />/ dn) ()
  where
    up    = return
    dn _  = return ()

type Input  m r b =            Edge m r () b
type Output m r a = forall b . Edge m r a  b

instance (Monad m) => Functor (Edge m r ()) where
    fmap f (Edge k) = Edge $ \() -> go (k ())
      where
        go p = case p of
            Request () ku -> Request ()    (\() -> go (ku ()))
            Respond b  ku -> Respond (f b) (\() -> go (ku ()))
            M          m  -> M (m >>= \p' -> return (go p'))  
            Pure    r     -> Pure r

instance (Monad m) => Applicative (Edge m r ()) where
    pure b = Edge $ \() -> forever $ respond b
    (Edge k1) <*> (Edge k2) = Edge (\() -> goL (k1 ()) (k2 ()))
      where
        goL p1 p2 = case p1 of
            Request () ku -> Request () (\() -> goL   (ku ()) p2)
            Respond f  ku ->                    goR f (ku ()) p2
   M          m  -> M (m >>= \p1' -> return (goL p1' p2))
   Pure    r     -> Pure r
goR f p1 p2 = case p2 of
            Request () ku -> Request ()    (\() -> goR f p1 (ku ()))
            Respond x  ku -> Respond (f x) (\() -> goL   p1 (ku ()))
   M          m  -> M (m >>= \p2' -> return (goR f p1 p2'))
   Pure    r     -> Pure r

fromList :: (Monad m) => [a] -> Input m () a
fromList bs = Edge $ \_ -> mapM_ respond bs

print :: (Show a) => Output IO r a
print = Edge $ foreverK $ lift . Prelude.print >=> request

I was motivated by Florian's questions about how to build directed acyclic graphs using pipes.  Every time I tried to build a correct `Arrow` instance I was always foiled by the need to provide some sort of initial input value to get things going.  Then I realized that the signature of a pipe that requires an initial input value is a push-based pipe.  Once I realized that the entire Arrow implementation fell into place immediately and it obeys all the Arrow laws.

I just want to point out that the meaning of the `Input` and `Output` type synonyms is the reverse of the meaning from `pipes-concurrency`.  An `Input` produces values, and the name reflects the fact that it is an input into the directed acyclic graph.  An `Output` consumes values and is an output of the directed acyclic graph.

This provides a nice way to build deterministic acyclic graphs like the kinds that Florian was trying to build.  The `Applicative` instance lets you zip two sources together:

twoSources :: (Monad m) => Input m () (Int, Int)
twoSources = (,) <$> fromList [1..3] <*> fromList [4..6]

... and then you can use all the arrow operators you know and love to connect them together:

>>> runEdge $ twoSources >> first (arr + 1) >>> print
(2, 4)
(3, 5)
(4, 6)

You can also use arrow notation to build up the graph, but I will skip that for now since I discussed how to do it last time.

I think this warrants releasing some sort of `pipes-arrow` package that has the above code and utilities for arrow-based pipe graphs.  I will write this up soon.
Reply all
Reply to author
Forward
0 new messages