Devon,
emitAll is actually Nothing on environment because it just `strictly` (eventually lazily) emits the upfront known list of values, so it is quite irrelevant in which environment you will run it. Nothing gives a possibility to use it everywhere in code, for example in flat maps w/o any typecasting and conversions.
Usually you start with some source, where you define your `F` environment, that is most often `Task` as asynchronous computation for scalaz-stream. Then you follow that environment through your process and finally you `run` it.
Most often you use emitAll as helper in flatMap or in other combinators.
Now from your provisos question. Assuming I have source of string in Task environment, this will filter strings of size that is even, buffer by 10 chars, transforms to pair of (size,string) and then emits the results to mutable buffer:
val source : Process[Task,String] = ???
// |> is alias for pipe
val transformed: Process[Task,(Int,String] =
(source |> filter(_.size % 2 == 0) |> buffer(10)).map(s=>(s.size,s))
//please note that implementation of buffer must be thread safe if you want to read and mutate it from other thread that where the source process runs…
val finalBuffer : collection.mutable.Buffer[(Int,String)] = ???
//now actually glue source and sink together and run it
(transformed to (io.fillBuffer(finaBuffer))).run.run
Please note on the concurrency issue of the buffer. I would rather just create sink or channel that will wrap your DSL you mentioned for processing and then just use process to drive it through instead of using some middle-man buffer:
val postProcessing : Sink[Task,Vector[(Int,String)] = repeatEval { buffer : Vector[(Int,String)] => Task.now[Unit] { /* your post-processing code here */ } }
val transformedGrouped10 = transformed.chunk(10)
val postProcess : Process[Task,Unit] =
transformedGrouped10 to postProcessing
postProcess.run.run
I hope this gave a bit of more ideas …
Pavel.