Newbie question about Streaming-based pipe with common state

36 views
Skip to first unread message

aqu...@gmail.com

unread,
May 29, 2017, 4:10:12 AM5/29/17
to Haskell Pipes
Hello, everyone! I'm Haskell beginner and newbie in Streaming library. As I understood, this group is dedicated to Pipes, but Streaming library too. My simple question is related to Streaming library usage. I'm trying to realize next common and popular "pattern": processing of streaming items with common state, see Fig, please:

     .--state-------+--state'--------+--state''-->
     |              |                |
  [e0..eN] ==> [e0'
...eN'] ==> [e0''..eN''] =====>


This "state" will be used for statistics, errors, whatever - through the whole workflow. Some of "pipe" nodes will iterate over `eN` items (which will be lists/streams too), concatenates results... How this can be achieved with Streaming library? I mean each "node" should have access to stream items but to "global" state (result of prev. node return?) too. Also nodes will do IO actions!

My first attempt was:

...
import Streaming
import qualified Streaming.Prelude as S
...


-- simulate source of items in the stream
gen
:: S.Stream (S.Of Int) IO [String]
gen
= do
  S
.yield 1000
  S
.yield 2000
  x
<- lift getLine -- simulate item got through IO
 
return ["a", "b", "c", x]


-- simulate some node - processor on the pipe
proc
:: S.Stream (S.Of Int) IO [String] -> S.Stream (S.Of Int) IO [String]
proc str
= do
  e
<- str -- WRONG!
 
-- S.yield $ e + 8
  lift $
print "Enter x:"
  x
<- lift getLine -- simulate IO communication
 
return $ e ++ [" -- " ++ x] -- simulate change of common state via result component of ":>" pair

main
:: IO ()
main
= do
  let
  s
<- S.mapM_ print $ S.map show gen
  p
<- S.mapM_ print $ proc gen
  putStr
"s: " >> print s
  putStr
"p: " >> print p
 
print "end."


But this is wrong sure, because "do e <- str" binds "e" to results of previous result, but not to stream's items. And I need to have access to both: stream's items and state (which I suppose) can be second component of Stream ":>" pair, i.e. result, returning with "return" function.

I tried also to implement "proc" as "a -> m b" function and to apply it with "S.mapM", but in this case I'm processing stream's items but without previous "return" - state.

Intuitively I'm feeling that solution may be in transformation of result monad "m", to be not only IO, but with "StateT", but I don't know how to:
  1. do it
  2. call it  - in drawn workflow
  3. if I'll yield items wrapped in State monad, what a devil will be it, a specially related to performance.
I'll need to iterate over some stream's items too (they are lists), to join results and forever to have common state.

Can somebody help with it, please?

===
Best regards, Paul

Chris Pollard

unread,
May 30, 2017, 6:48:16 AM5/30/17
to Haskell Pipes
Hi Paul,

You're right: typically when you need to access or modify some state of type "s" throughout a monadic action, you want your monad to be an instance of "MonadState s".


Streams are instances of "MonadState s" as long as the underlying monad is also an instance. Probably the simplest way to do achieve what you need is, as you mentioned, to change your streams of type "S.Stream (S.Of Int) IO" to "S.Stream (S.Of Int) (StateT s IO)", where StateT comes from


With "StateT s IO" as your underlying monad, you can access the current state with "get" and change it with "modify".

You will end up with something of type "StateT s IO r" rather than "IO r"; you can convert this to an IO action using runStateT, evalStateT, or execStateT, whichever is appropriate. I think you'll find StateT to be pretty light-weight in terms of performance.

Cheers,

Chris

aqu...@gmail.com

unread,
May 30, 2017, 10:07:41 AM5/30/17
to Haskell Pipes
Many thanks, Chris! I tried this and this works. By the way, Chris, I'm newbie, would you explain - is this right group for Streaming library? And how Streaming is related to Pipes, because author of Streaming (if I understood right) point out this group for questions/discussions, so seems that there is some relation between these 2 libraries. Is it true?

вторник, 30 мая 2017 г., 13:48:16 UTC+3 пользователь Chris Pollard написал:

David McBride

unread,
May 30, 2017, 10:18:10 AM5/30/17
to haskel...@googlegroups.com
This list is technically for the pipes ecosystem. That said there are
no other lists that I'm aware of for streaming or conduits People on
this list are likely to answer questions on them, just be aware that
not everyone on this list has used every other library.
> --
> You received this message because you are subscribed to the Google Groups
> "Haskell Pipes" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to haskell-pipe...@googlegroups.com.
> To post to this group, send email to haskel...@googlegroups.com.

Gabriel Gonzalez

unread,
May 30, 2017, 10:32:27 AM5/30/17
to haskel...@googlegroups.com
I'm fine with using this group for questions about the streaming library

The steaming library originated as a more efficient and specialized implementation of the pipes-group library with better inferred types.  If you read the pipes-group tutorial you will see the similarities between the two libraries.

aqu...@gmail.com

unread,
May 30, 2017, 11:49:00 AM5/30/17
to Haskell Pipes
Wow!!! Gabriel! Thank you very much!
PS. Excuse my emotions please! :)

вторник, 30 мая 2017 г., 17:32:27 UTC+3 пользователь Gabriel Gonzalez написал:
Reply all
Reply to author
Forward
0 new messages