How to turn a scalaz Process into a TraversableOnce? (scalaz stream - Spark integration)

193 views
Skip to first unread message

Florian Verhein

unread,
Feb 10, 2015, 2:06:35 AM2/10/15
to sca...@googlegroups.com

For example, suppose I have a Process[Task,String] p (for example, from io.linesR)

Rather than e.g. p.pipe(...).to(io...)  and .run.run ing it (so that the task drives it and pushes the output somewhere), I would like it to be driven externally - exposing p as a TraversableOnce[String]. So there's no need for Task here either.

I'm sure this is a case of me not being able to find the answer in the API / not completely understanding something... but I'm stuck. Any ideas? 

The motivation for this is creating a Spark RDD via flatmap* using a scalaz Process. That is, stream data directly into an RDD after some stream processing done using scalaz streams (e.g. with the data being read from a file).

* in RDD: flatMap[U](f: (T) ⇒ TraversableOnce[U])(implicit arg0: ClassTag[U]): RDD[U]

Pavel Chlupacek

unread,
Feb 10, 2015, 2:16:34 AM2/10/15
to sca...@googlegroups.com
Florian, 

 in sense of scalaz-stream we think of spar as `sink` where the data has to be pushed. 

So what you need is transform the Spark effect to effectfull function and then use 

  val source: Process[Task,String] = ???
  val sparkSink : Sink[Task,String] = /** your spark sink here actually a Process[Task,String => Task[Unit]] **/  

  source.to(sparkSink).run.run
  
Hope this helps
 

Pavel Chlupáček

Solution Designer
——————————————————————————————
Spinoco Czech Republic, a.s.
Šafránkova 1243/3
155 00 Praha 5

tel +420 257 895 111
www.spinoco.com


We are hiring! Do you want to work with cutting-edge technologies in scala, scalaz, scalaz-stream, scalaJS? On product that we love to build for our customers that scale? Then go to http://job.spinoco.com/en or mail cre...@spinoco.com

Spinoco - Communications for Business


Would you like to get in touch with us? Were you particularly satisfied with the handling of your request? Or did it not meet your expectations? Contact us at pod...@spinoco.com
Your information will be used for the improvement of our Customer Center and our other services.
Thank you. 






--
You received this message because you are subscribed to the Google Groups "scalaz" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scalaz+un...@googlegroups.com.
To post to this group, send email to sca...@googlegroups.com.
Visit this group at http://groups.google.com/group/scalaz.
For more options, visit https://groups.google.com/d/optout.

Pascal Voitot Dev

unread,
Feb 10, 2015, 2:34:41 AM2/10/15
to scalaz
Have you seen my 3 articles from last year?
http://mandubian.com/2014/03/08/zpark-ml-nio-1/

That was just sandbox so I don't know if it can help.
The main problem of scalaz-stream with Spark is non-serialization of scalaz-stream structures limiting a lot what you can do.

Pascal

Florian Verhein

unread,
Feb 10, 2015, 5:05:39 PM2/10/15
to sca...@googlegroups.com

Thanks for the idea and the quick feedback Pavel. 

If I understand it correctly (??), this would result in a Process running on a single node, with the result streaming into Spark? (presumably using spark streaming?)
(I guess there would be some blocking there too, to avoid the Process getting ahead)

If that's about right, then it won't solve my problem - which I probably should have explained better:
I need to be able to run the Process (many of them, one for each file say - so completely independent of each other) inside the cluster (i.e. in parallel across multiple workers), streaming the result into an RDD.
Running the Process outside the cluster doesn't scale so is unsuitable for what I'm doing.  

Florian Verhein

unread,
Feb 10, 2015, 5:19:07 PM2/10/15
to sca...@googlegroups.com
Thanks Pascal, interesting work. 

Good point about serialization. I think I can get around that by creating and running the Process on the worker, rather than on the driver (which would cause closure and serialisation).

Spark streaming isn't a good solution for what I'm doing unfortunately, since it assumes real time (and I need to process at much faster than real time - since I have the data already). The application is machine learning on complex timeseries/streams at scale, and I have the data already.
This also means I need to parallelise the input processing, so streaming into Spark is a bottleneck (see response to Pavel). 

More concrete example: 
val files = Seq(file1,file2,....)
val rdd = files.parallelize().flatMap(the_thing_I_want_to_do)

rdd should then contain the processed (using my scalaz-stream Process) contents of all the files. 

Does that make sense?

Pascal Voitot Dev

unread,
Feb 10, 2015, 5:39:14 PM2/10/15
to scalaz
On Tue, Feb 10, 2015 at 11:19 PM, Florian Verhein <flo...@arkig.com> wrote:
Thanks Pascal, interesting work. 

Good point about serialization. I think I can get around that by creating and running the Process on the worker, rather than on the driver (which would cause closure and serialisation).

Spark streaming isn't a good solution for what I'm doing unfortunately, since it assumes real time (and I need to process at much faster than real time - since I have the data already). The application is machine learning on complex timeseries/streams at scale, and I have the data already.
This also means I need to parallelise the input processing, so streaming into Spark is a bottleneck (see response to Pavel). 

More concrete example: 
val files = Seq(file1,file2,....)
val rdd = files.parallelize().flatMap(the_thing_I_want_to_do)

rdd should then contain the processed (using my scalaz-stream Process) contents of all the files. 

Does that make sense?


so you would process data  with different scalaz-stream processes to files and then ask spark to inject all those files in parallel in its cluster right?
If yes, it makes sense yes!

Pascal

Florian Verhein

unread,
Feb 10, 2015, 6:01:26 PM2/10/15
to sca...@googlegroups.com

Sort of... but without intermediate files between them (so 'from files', rather than "to files" - but maybe that was just a typo?) 
... and all without any queuing/buffering between scalaz and spark - just directly into an RDD.

That's the theory anyway :) but to make it work I need TraverableOnce :/

I obviously can't use runlog in the solution.

Any ideas?

Pascal Voitot Dev

unread,
Feb 10, 2015, 6:22:53 PM2/10/15
to scalaz
to do that, as you said you should run scalaz-stream on each worker which is not so trivial...

Florian Verhein

unread,
Feb 10, 2015, 8:20:53 PM2/10/15
to sca...@googlegroups.com

Why not Pascal?

(I see the main challenge as the TranersableOnce one, so keen to hear your thoughts if you think there are other issues)

Florian Verhein

unread,
Feb 10, 2015, 11:05:16 PM2/10/15
to sca...@googlegroups.com
So I tried it and it turns out that part is easy. 

Pascal Voitot Dev

unread,
Feb 11, 2015, 2:44:15 AM2/11/15
to scalaz
Does it work in Spark on multi-node cluster?
Normally, Spark serializes code to other nodes but AFAIK scalaz-stream isn't serializable (or it has changed) so it can't do it.

Pascal

Florian Verhein

unread,
Feb 11, 2015, 5:13:34 AM2/11/15
to sca...@googlegroups.com
Yeah I ran it on a cluster on EC2 via the submit script on the master node (spark standalone scheduling used). Pretty sure I saw it run on 2 workers (I used two files). I can check again though. 

The code inside flatMap is executed on the workers, so the Process only ever gets instantiated there. So I think there should be no need to serialise an instantiated Process. 

Can you remember what you tried? Are you sure you didn't instantiate the process outside, causing a closure over that instance?

Pascal Voitot Dev

unread,
Feb 11, 2015, 5:33:18 AM2/11/15
to scalaz
On Wed, Feb 11, 2015 at 11:13 AM, Florian Verhein <flo...@arkig.com> wrote:
Yeah I ran it on a cluster on EC2 via the submit script on the master node (spark standalone scheduling used). Pretty sure I saw it run on 2 workers (I used two files). I can check again though. 

The code inside flatMap is executed on the workers, so the Process only ever gets instantiated there. So I think there should be no need to serialise an instantiated Process. 


When I had tried one year ago, my problem was certainly that I had instantiated the process on master and it was trying to serialize it to worker which isn't possible. But as you create it in the worker, it's just the JAR that is used there. Good to know :D I just discovered something that seemed trivial :)

Florian Verhein

unread,
Feb 11, 2015, 6:41:45 AM2/11/15
to sca...@googlegroups.com
Cool :-) Accidental serialization has bitten me many times in Scalding ;-) 

Back to the traversableOnce problem. I'd thought about implementing an iterator where next() somehow steps through the Process and grabs it's emitted values... but I'm not sure how to go about doing this... or if there's a better solution...


Florian Verhein

unread,
Feb 12, 2015, 11:10:23 PM2/12/15
to sca...@googlegroups.com

So here's an attempt at solving this:


However, it does not handle contexts at all (e.g. Task). 
Any tips about how to do this properly?
I suspect i'll also need a process that reads from an input stream that does not use Task (since I'm trying to drive it externally, the task shouldn't).
Reply all
Reply to author
Forward
0 new messages