`Proxy`s are fundamentally single-threaded coroutines meaning that:
(A) Only one `Proxy` is active at any given point in time
(B) They cooperatively transfer control to each other
The easiest way to explain this is in terms of the two primitive
combinators: `(>>~)` and `(+>>)` which are mutually
defined in terms of each other. I've simplified the types to more
closely match your example:
(>>~)
:: Server PeekerMessage String IO r
-> (String -> Client PeekerMessage String IO
r)
-> Effect IO r
(+>>)
:: (PeekerMessage -> Server PeekerMessage String
IO r)
-> Client PeekerMessage String IO r
-> Effect IO r
The intuition for control flow that you should have is:
* When you use `p1 >>~ p2` the flow of control begins from
`p1` and transfers to `p2` the first time that `p1` `respond`s
* When you use `p1 +>> p2` the flow of control begins from
`p2` and transfers to `p1` the first time that `p2` `request`s
Notice that for each operator, the `Proxy` the flow of control
begins from the `Proxy` that takes no function argument:
(>>~)
:: Server PeekerMessage String IO r -- This one
initiates the flow of control for `(>>~)`
-> (String -> Client PeekerMessage String IO r)
-> Effect IO r
(+>>)
:: (PeekerMessage -> Server PeekerMessage String IO r)
-> Client PeekerMessage String IO r -- This one
initiates the flow of control for `(+>>)`
-> Effect IO r
Also notice that the `Proxy` that does not initiate the flow of
control takes a function argument:
* In the case of `p1 >>~ p2`, `p2` is a function whose
argument is the first value that `p1` emits via `respond`.
* In the case of `p1 +>> p2`, `p1` is a function whose
argument is the first value that `p2` emits via `request`
This is consistent with the types, too:
(>>~)
:: Server PeekerMessage String IO r
-> (String -> Client PeekerMessage String IO r) --
I wait for the first `String` from `p1`
-> Effect IO r
(+>>)
:: (PeekerMessage -> Server PeekerMessage String IO r)
-- I wait for the first `PeekerMessage` from `p2`
-> Client PeekerMessage String IO r
-> Effect IO r
So the trick to deciding which operators to use is to just decide
which `Proxy` in the pipeline you want to initiate control.
For example, suppose that you have a pipeline with 3 `Proxy`s: `p1`,
`p2`, and `p3`. If you want to start from the most upstream
`Proxy`, you would use this:
(p1 >>~ p2) >>~ p3 = p1
>>~ p2 >>~ p3
If you wanted to start from the most downstream `Proxy`, you would
use this:
p1 +>> (p2 +>> p3) = p1 +>> p2
+>> p3
If you wanted to start from the middle `Proxy`, you would use this:
(p1 +>> p2) >>~ p3 = p1 +>> (p2 >>~
p3) = p1 +>> p2 >>~ p3
However, you're not limited to starting from a `Proxy` inside the
pipeline. If you want to start from something upstream of `p1`, you
would write this:
p1 >~> p2 >~> p3
... and if you wanted to start from something downstream of `p3`,
you would write this:
p1 >+> p2 >+> p3
So going back to your original question, it seems from your code
that you wanted control to begin from the `source` `Proxy`, which
means that you want your pipeline to look like this:
runEffect (source >>~ peeker >>~ test)
That in turn tells you which `Proxy`s need to be functions and which
ones are not. Remember that a most one `Proxy` in the pipeline will
initiate the flow of control (`source` in this case), so that will
be the one `Proxy` that is not a function. Every other `Proxy` will
be a function and you just have to follow the type of the
`(>>~)` operator to deduce what the function argument types
must be.
We know that `source` will have type:
Producer String m () = Proxy X () () String m ()
If you plug that type as the first argument to `(>>~) you get
this specialized type:
(>>~)
:: Monad m
=> Proxy X () () String m ()
-> (String -> Proxy () String c' c m ())
-> Proxy c' c m ()
So that means that when you transform `peeker` into a function it
takes an initial `String` as an argument (the first `String` that
`source` emits). Notice how convenient that is because you can use
that `String` to seed `peeker` instead of having to explicitly call
`await` to retrieve the first value. In fact, this is exactly the
type that your internal `responder` loop has, so you can simplify
`peeker` to just:
peeker :: (Monad m) => b -> Proxy () b PeekerMessage b
m ()
peeker = responder
where
responder x = do
m <- respond x
case m of
Peek -> responder x
Acquire -> peeker
And now if you compose `source` with `peeker` you would get this
type:
source >>~ peeker
:: Monad m => Proxy X () PeekerMessage String m
()
-- :: Monad m => Server PeekerMessage String m ()
Now we can feed that type as the left argument to `(>>~)` to
figure out what the type of `test` should be:
(>>~)
:: Monad m
=> Proxy X () PeekerMessage String m ()
-> (String -> Proxy PeekerMessage String c' c
m ())
-> Proxy X () c' c m ()
In other words, `test` needs to also be a function that takes a
`String` as its initial argument. That `String` is the first value
emitted by `peeker`.
This is why you got weird behavior because you were throwing away
that initial value when you used `const`. What you really wanted to
write was:
test :: (Show a') => a' -> Proxy PeekerMessage a'
b b' IO ()
test a = do
b <- peek
lift . putStrLn . show $ (a, b)
a' <- acquire
test a'
That would then give an expected output of:
("hello", "hello")
("world", "world")
("test", "test")
("message", "message")
If you wanted to stagger the values you would need to modify `test`
like this:
test :: (Show a') => a' -> Proxy PeekerMessage a' b b'
IO ()
test a = do
b <- acquire
lift . putStrLn . show $ (a, b)
loop
where
loop = do
a <- peek
b <- acquire
lift . putStrLn . show $ (a, b)
loop
That would then give an expected output of:
("hello","world")
("world","test")
("test","message")