Hello, everyone! I'm Haskell beginner and newbie in Streaming library. As I understood, this group is dedicated to Pipes, but Streaming library too. My simple question is related to Streaming library usage. I'm trying to realize next common and popular "pattern": processing of streaming items with common state, see Fig, please:
.--state-------+--state'--------+--state''-->
| | |
[e0..eN] ==> [e0'...eN'] ==> [e0''..eN''] =====>
This "state" will be used for statistics, errors, whatever - through the whole workflow. Some of "pipe" nodes will iterate over `eN` items (which will be lists/streams too), concatenates results... How this can be achieved with Streaming library? I mean each "node" should have access to stream items but to "global" state (result of prev. node return?) too. Also nodes will do IO actions!
My first attempt was:
...
import Streaming
import qualified Streaming.Prelude as S
...
-- simulate source of items in the stream
gen :: S.Stream (S.Of Int) IO [String]
gen = do
S.yield 1000
S.yield 2000
x <- lift getLine -- simulate item got through IO
return ["a", "b", "c", x]
-- simulate some node - processor on the pipe
proc :: S.Stream (S.Of Int) IO [String] -> S.Stream (S.Of Int) IO [String]
proc str = do
e <- str -- WRONG!
-- S.yield $ e + 8
lift $ print "Enter x:"
x <- lift getLine -- simulate IO communication
return $ e ++ [" -- " ++ x] -- simulate change of common state via result component of ":>" pair
main :: IO ()
main = do
let
s <- S.mapM_ print $ S.map show gen
p <- S.mapM_ print $ proc gen
putStr "s: " >> print s
putStr "p: " >> print p
print "end."
But this is wrong sure, because "do e <- str" binds "e" to results of previous result, but not to stream's items. And I need to have access to both: stream's items and state (which I suppose) can be second component of Stream ":>" pair, i.e. result, returning with "return" function.
I tried also to implement "proc" as "a -> m b" function and to apply it with "S.mapM", but in this case I'm processing stream's items but without previous "return" - state.
Intuitively I'm feeling that solution may be in transformation of result monad "m", to be not only IO, but with "StateT", but I don't know how to:
- do it
- call it - in drawn workflow
- if I'll yield items wrapped in State monad, what a devil will be it, a specially related to performance.
I'll need to iterate over some stream's items too (they are lists), to join results and forever to have common state.
Can somebody help with it, please?
===
Best regards, Paul