{-# LANGUAGE RankNTypes, FlexibleInstances #-}
import Control.Applicative
import Control.Monad.Trans.State.Strict (get, put)
import Pipes.Lift (evalStateP)
import Pipes.Prelude (foreverK)
import Prelude hiding ((.), id, print)
import Control.Monad (mapM_)
newtype Edge m r a b = Edge { unEdge :: a -> Pipe a b m r }
instance (Monad m) => Category (Edge m r) where
(Edge p2) . (Edge p1) = Edge (p2 <~< p1)
instance (Monad m) => Arrow (Edge m r) where
arr f = Edge (foreverK (respond . f >=> request))
first (Edge p) = Edge $ \(b, d) ->
evalStateP d $ (up \>\ hoist lift . p />/ dn) b
runEdge :: (Monad m) => Edge m r () a -> m r
runEdge (Edge p) = runEffect $ (up \>\ p />/ dn) ()
type Input m r b = Edge m r () b
type Output m r a = forall b . Edge m r a b
instance (Monad m) => Functor (Edge m r ()) where
fmap f (Edge k) = Edge $ \() -> go (k ())
Request () ku -> Request () (\() -> go (ku ()))
Respond b ku -> Respond (f b) (\() -> go (ku ()))
M m -> M (m >>= \p' -> return (go p'))
instance (Monad m) => Applicative (Edge m r ()) where
pure b = Edge $ \() -> forever $ respond b
(Edge k1) <*> (Edge k2) = Edge (\() -> goL (k1 ()) (k2 ()))
Request () ku -> Request () (\() -> goL (ku ()) p2)
Respond f ku -> goR f (ku ()) p2
M m -> M (m >>= \p1' -> return (goL p1' p2))
Request () ku -> Request () (\() -> goR f p1 (ku ()))
Respond x ku -> Respond (f x) (\() -> goL p1 (ku ()))
M m -> M (m >>= \p2' -> return (goR f p1 p2'))
fromList :: (Monad m) => [a] -> Input m () a
fromList bs = Edge $ \_ -> mapM_ respond bs
print :: (Show a) => Output IO r a
print = Edge $ foreverK $ lift . Prelude.print >=> request
I was motivated by Florian's questions about how to build directed acyclic graphs using pipes. Every time I tried to build a correct `Arrow` instance I was always foiled by the need to provide some sort of initial input value to get things going. Then I realized that the signature of a pipe that requires an initial input value is a push-based pipe. Once I realized that the entire Arrow implementation fell into place immediately and it obeys all the Arrow laws.
I just want to point out that the meaning of the `Input` and `Output` type synonyms is the reverse of the meaning from `pipes-concurrency`. An `Input` produces values, and the name reflects the fact that it is an input into the directed acyclic graph. An `Output` consumes values and is an output of the directed acyclic graph.
This provides a nice way to build deterministic acyclic graphs like the kinds that Florian was trying to build. The `Applicative` instance lets you zip two sources together:
twoSources :: (Monad m) => Input m () (Int, Int)
twoSources = (,) <$> fromList [1..3] <*> fromList [4..6]
... and then you can use all the arrow operators you know and love to connect them together:
You can also use arrow notation to build up the graph, but I will skip that for now since I discussed how to do it last time.
I think this warrants releasing some sort of `pipes-arrow` package that has the above code and utilities for arrow-based pipe graphs. I will write this up soon.