scalaz-stream: help understanding error

171 views
Skip to first unread message

Devon Miller

unread,
Oct 14, 2013, 8:09:29 PM10/14/13
to sca...@googlegroups.com

I am trying to get some scalaz-streams running but some simple tests are giving me some errors that I do not understand. I have tried a number of combinations to specify the type parameters directly but have failed to produce something that would work.

scala> val values = Seq("value1", "value2", "value3")
values: Seq[String] = List(value1, value2, value3)

scala> val target = collection.mutable.ListBuffer[String]()
target: scala.collection.mutable.ListBuffer[String] = ListBuffer()

scala> Process.emitAll(values).to(io.fillBuffer(target)).run.run
<console>:58: error: could not find implicit value for parameter F: scalaz.Monad[[x]Any]
              Process.emitAll(values).to(io.fillBuffer(target)).run.run
                                                                ^

Thoughts?

Pavel Chlupacek

unread,
Oct 15, 2013, 12:57:59 AM10/15/13
to sca...@googlegroups.com
Hi, 

even I am not following exactly what do you want to achieve, from the last sample I think you want just emit some strings and get them to collection. 

you can do that for example by 

Process.emitAll(values).toSource.runLog.run

or 

Process(values:_*).toList

Is this what you are looking for? If not, can you please more describe what you want to achieve? Also good starting point are examples on git repo of scalaz-stream. 

The `to` combinator you are using in fact expects as a parameter another Process, that is called Sink, it really does not work with collections itself. 


P.





--
You received this message because you are subscribed to the Google Groups "scalaz" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scalaz+un...@googlegroups.com.
To post to this group, send email to sca...@googlegroups.com.
Visit this group at http://groups.google.com/group/scalaz.
For more options, visit https://groups.google.com/groups/opt_out.

Paul Chiusano

unread,
Oct 15, 2013, 1:00:20 AM10/15/13
to sca...@googlegroups.com
This is just a type inference failure, unfortunately. It's picking `Any` as the `F` type (which isn't even well-kinded). Just annotate the `Process.emitAll(values)` and it will work:

val a: Process[Task,A] = Process.emitAll(values)
a.to(io.fillBuffer(target))



Pavel Chlupacek

unread,
Oct 15, 2013, 1:04:34 AM10/15/13
to sca...@googlegroups.com
ah I completely overlooked the io.fillBuffer in code  :-(

sorry for that Devon. 

P.

Jason Zaugg

unread,
Oct 15, 2013, 1:36:47 AM10/15/13
to sca...@googlegroups.com
Any and nothing are kind polymorphic in scala.

Miles Sabin

unread,
Oct 15, 2013, 5:06:35 AM10/15/13
to sca...@googlegroups.com
On Tue, Oct 15, 2013 at 6:36 AM, Jason Zaugg <jza...@gmail.com> wrote:
> Any and nothing are kind polymorphic in scala.

Is there anything useful that can be done with that? (I haven't been
able to come up with anything).

Cheers,


Miles

--
Miles Sabin
tel: +44 7813 944 528
skype: milessabin
gtalk: mi...@milessabin.com
g+: http://www.milessabin.com
http://twitter.com/milessabin

Devon Miller

unread,
Oct 15, 2013, 8:26:54 AM10/15/13
to sca...@googlegroups.com
I tried that in my REPL but it did not work.

scala> val a: Process[Task, A] = Process.emitAll(values)
<console>:68: error: not found: type A
       val a: Process[Task, A] = Process.emitAll(values)
                            ^
I had also tried Process[Task,String] at one point as well but that had not worked.

To Pavel's question, I am trying to buildup to a solution that I want but I typically start tiny and incrementally build the solution out. I have a some data record processing dataflow that I wish to execute.

Pavel Chlupacek

unread,
Oct 15, 2013, 8:39:29 AM10/15/13
to sca...@googlegroups.com
would this help you? 

scala> import scalaz.stream.Process._
import scalaz.stream.Process._

scala> import scalaz.stream.Process
import scalaz.stream.Process

scala> import scalaz.stream.processes._
import scalaz.stream.processes._


scala> val source = Process("one","two","three").toSource
source: scalaz.stream.Process[scalaz.concurrent.Task,String] = Await(scalaz.concurrent.Task@13a70b29,<function1>,Halt(scalaz.stream.Process$End$),Halt(scalaz.stream.Process$End$))

scala> source.map(_.size).runLog.run
res5: scala.collection.immutable.IndexedSeq[Int] = Vector(3, 3, 5)

???



Devon Miller

unread,
Oct 15, 2013, 4:47:43 PM10/15/13
to sca...@googlegroups.com
Not as much. I am trying to shove records in (in this case represented by strings but would normally be a DataRecord object) and get the surviving records out in a buffer at the other end to undergo some additional domain specific processing. Along the way I need to filter, convert and log some of the fields which is what scalaz stream seems to promise to make easier for me. I'll keep playing around with those type parameters. The first step was to see if I could simply flow some values into a buffer.

Paul Chiusano

unread,
Oct 15, 2013, 5:05:58 PM10/15/13
to sca...@googlegroups.com
scala> val a: Process[Task, A] = Process.emitAll(values)
<console>:68: error: not found: type A
       val a: Process[Task, A] = Process.emitAll(values)
                            ^
> I had also tried Process[Task,String] at one point as well but that had not worked.

I was just using `A` to mean an arbitrary type. `A` is not magical here, it should match the type of values you are emitting. This example works for me:

scala> import scalaz.stream._
import scalaz.stream._

scala> import scalaz.concurrent.Task
import scalaz.concurrent.Task

scala> val buf = new collection.mutable.ListBuffer[Int]
buf: scala.collection.mutable.ListBuffer[Int] = ListBuffer()

scala> val in: Process[Task,Int] = Process.emitAll(List(1,2,3,4))
in: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Emit(List(1, 2, 3, 4),Halt(scalaz.stream.Process$End$))

scala> in.to(io.fillBuffer(buf)).run.run

scala> buf
res1: scala.collection.mutable.ListBuffer[Int] = ListBuffer(1, 2, 3, 4)


Devon Miller

unread,
Oct 15, 2013, 7:13:07 PM10/15/13
to sca...@googlegroups.com
That seems to work.

It looks like just doing a Process.emitAll(aListOfStrings) will not work though without assigning to a val with the type declared explicitly.

Looking at the Process companion object, the emitAll[O](seq: Seq[O])... definition returns a Process[Nothing, O] directly so the assignment I guess is necessary to make it work type-wise.

It feels like the emitAll return value's type should be something more clever though than just Process[Nothing, O].

Tracing through the definition, I can do the following directly and it seems to take the type arguments ok:

scala> Process.emitSeq[Task, String](values)
res5: scalaz.stream.Process[scalaz.concurrent.Task,String] = Emit(List(value1, value2, value3),Halt(scalaz.stream.Process$End$))

I'll have to rely on other people's judgement to determine if the Nothing type should be something different. I tried adding the Task type to .to[Task](io.fillBuffer(target)) but that did not work for another type reason.

scala> Process.emitAll[ String](values).to[Task](io.fillBuffer(target))
<console>:14: error: type arguments [scalaz.concurrent.Task] do not conform to method to's type parameter bounds [F2[x] >: F[x]]
              Process.emitAll[ String](values).to[Task](io.fillBuffer(target))
                       


Pavel Chlupacek

unread,
Oct 15, 2013, 11:38:17 PM10/15/13
to sca...@googlegroups.com
Devon, 

   emitAll is actually Nothing on environment because it just `strictly` (eventually lazily) emits the upfront known list of values, so it is quite irrelevant in which environment you will run it. Nothing gives a possibility to use it everywhere in code, for example in flat maps w/o any typecasting and conversions. 

 Usually you start with some source, where you define your `F` environment, that is most often `Task` as asynchronous computation for scalaz-stream.  Then you follow that environment through your process and finally you `run` it.

 Most often you use emitAll as helper in flatMap or in other combinators. 

Now from your provisos question. Assuming I have source of string in Task environment, this will filter strings of size that is even, buffer by 10 chars, transforms to pair of (size,string) and then emits the results to mutable buffer: 

val source : Process[Task,String] = ???

// |> is alias for pipe
val transformed: Process[Task,(Int,String] = 
(source |> filter(_.size % 2 == 0) |> buffer(10)).map(s=>(s.size,s))

 //please note that implementation of buffer must be thread safe if you want to  read and mutate it from other thread that where the source process runs…
val finalBuffer : collection.mutable.Buffer[(Int,String)] =  ??? 

//now actually glue source and sink together and run it
(transformed to (io.fillBuffer(finaBuffer))).run.run


Please note on the concurrency issue of the buffer. I would rather just create sink or channel that will wrap your DSL you mentioned for processing and then just use process to drive it through instead of using some middle-man buffer: 


val postProcessing : Sink[Task,Vector[(Int,String)] = repeatEval {    buffer : Vector[(Int,String)] => Task.now[Unit] {  /* your post-processing code here */  }   }

val transformedGrouped10 = transformed.chunk(10) 

val postProcess : Process[Task,Unit] =
 transformedGrouped10 to postProcessing

postProcess.run.run

I hope this gave a bit of more ideas …

Pavel. 





Devon Miller

unread,
Oct 16, 2013, 10:02:54 PM10/16/13
to sca...@googlegroups.com
That helps. Thanks.

Viewing F as an environment, like a task, is a useful comment for understanding and becoming proficient at scalaz streams. If that comment could make it into the source docs that would be great.

The comment about concurrency and buffering was also very good.

In spring integration, you have to ensure you have the proper configuration to handle async-sync, sync-async, async-async type interactions between your POJO and the channels between them. This amounts to configuring the queue and buffering elements as well as the pollers.

So the postProcessing comment will help me think through this design aspect.

If enterprise integration patterns are a target for scalaz streams, some more blogs/documentation on how to create some EIP configurations would probably help speed adoption. Record-oriented processing is probably a target and perhaps there is a third major scenario for scalaz streams as well.

Pavel Chlupáček

unread,
Oct 17, 2013, 1:11:59 AM10/17/13
to sca...@googlegroups.com, sca...@googlegroups.com
Cool and thanks for comments. We certainly are looking on enhancing the documentation for scalaz-stream and for people that can contribute. 

We are using streams in our project not only for record processing but also for all concurrency control, effectivelly replacing actors that cannot compose and are not typesafe. You start to think about your profgrams in much different way but so far it seem its paying off.  Much less code, much less unit tests, much better code to reason about. 

Again thanks for all your comments

P.

Sent from my iPad
Reply all
Reply to author
Forward
0 new messages