scalaz-stream: how to go from Process[Task, Byte] to InputStream?

424 views
Skip to first unread message

etorreborre

unread,
Nov 7, 2013, 1:17:24 AM11/7/13
to sca...@googlegroups.com
Hi,

I'm trying to transform a Process[Task, Byte] into an InputStream (required by the AWS api), like this:

import stream._
import scalaz.concurrent._

case class InputStreams2(var process: Process[Task, Byte]) extends InputStream {
  private val EOF = -1

  def read() = {
    val v = process.take(1).map(_.toInt).runLastOr(EOF).run
    process = process.drop(1)
    v
  }
}

I'm keeping a variable here because I don't know how to do better. This works but this is horribly slow (takes forever with 10.000 elements).

I'm clearly not doing the right thing because this version is almost instantaneous:

case class InputStreams3(iterator: Iterator[Byte]) extends InputStream {
  private val EOF = -1

  def read() = {
    if (iterator.hasNext) iterator.next.toInt
    else EOF
  }
}

Thanks,

Eric.


etorreborre

unread,
Nov 7, 2013, 1:19:29 AM11/7/13
to sca...@googlegroups.com
By the way my 2 tests are:

    "iterate with scalaz-stream" >> {
      val size = 10000
      val inputStreams = InputStreams2(Process.range(1, size + 1).flatMap(_ => Process("hello".getBytes("UTF-8"):_*)))
      characters(inputStreams) ==== (5 * size)
    }
    "iterate with an iterator" >> {
      val size = 10000
      val inputStreams = InputStreams3(Iterator.range(1, size + 1).flatMap(_ => "hello".getBytes("UTF-8")))
      characters(inputStreams) ==== (5 * size)

etorreborre

unread,
Nov 7, 2013, 1:25:57 AM11/7/13
to sca...@googlegroups.com
Apparently my flatMap is O(n2) with Streams. Not surprising if it's slow :-)

Pavel Chlupacek

unread,
Nov 7, 2013, 5:16:34 AM11/7/13
to sca...@googlegroups.com
Yeah, I think this is the reason :-)

We have open issue on effective working with immutable view on array of Bytes in streams, but not yet done (https://github.com/scalaz/scalaz-stream/issues/25). 
That may eventually help when dealing with chunk of bytes. 
 
however I think I would express the code differently (no vars :-]): 

val source:Process0[Int]: = Process.range(1, size + 1)

val process: Process0[Byte] =
source.flatMap(_=>emitAll("hello".getBytes("UTF-8")))

characters(process.toList) ====  (5 * size)

// If the process is running in NonDet environment you should probably do rather: 

val source:Process[Task,Int]: = Process.range(1, size + 1).toSource

val process: Process[Task,Byte] =
source.flatMap(_=>emitAll("hello".getBytes("UTF-8")))

characters(process.runLog.run) ====  (5 * size)


Cheers : -]

P. 



--
You received this message because you are subscribed to the Google Groups "scalaz" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scalaz+un...@googlegroups.com.
To post to this group, send email to sca...@googlegroups.com.
Visit this group at http://groups.google.com/group/scalaz.
For more options, visit https://groups.google.com/groups/opt_out.

etorreborre

unread,
Nov 7, 2013, 6:47:44 AM11/7/13
to sca...@googlegroups.com
Hi Pavel,

Thanks for your help. If I understand correctly the code you posted is a variation on my mine but not intended to solve the problem yet, right?

I'll have to resort to an ugly iterator for now then.

> We have open issue on effective working with immutable view on array of Bytes in streams

Isn't it an even more general problem with "flatMapping" Process[X, A] with a function A => Seq[B]? maybe we need a special combinator for this and some internal state to track which Seq[B] we are currently emitting?

E.

Pavel Chlupacek

unread,
Nov 7, 2013, 8:48:54 AM11/7/13
to sca...@googlegroups.com
Well I am not really sure what problem domain is. 

I just rewrote the code, and essentially, if you still ensist on iterating over individual `bytes` I  don`t think we can do any better. 

I was mere mentioning this if you need to work with chunk of bytes that you need to split concatenate etc and you need as result chunk of bytes. 

The code I posted shall be a bit slower than iterator, but generally should not be `that` slow. 

I may run a bit of tests to see where is the problem. Will try to do something over tonite and sent you some results? 

Also you really don`t need that `var` now. The code posted is `correct` pattern how to use streams and `iterate` over bytes.

P. 

Paul Chiusano

unread,
Nov 7, 2013, 9:43:54 AM11/7/13
to sca...@googlegroups.com
In general I'm not totally sure the best way of handling this. You are doing a sort of inversion of control, converting between control belonging to streams and stream transducers, and the normal java io world, where the user of the InputStream has control. Generally, the more idiomatic thing is to move your logic into the stream world, if possible (i.e., consume the stream with another stream transducer, sink, or channel from scalaz-stream). If that's not possible (sounds like it's not in your case), here are some ideas:

* Send your Process[Task,Byte] to a BlockingQueue[Array[Byte]]. (As Pavel mentioned, using a Process[Task,Seq[Byte]] or Process[Task,Array[Byte]] would be more efficient, and we will be implementing a solution for this sort of use case.)

* Similarly, use PipedInput/OutputStream

* Use `toTask`. (See below). It is kind of a hack, and it would be nice to have a more principled 'Step' type that could be used for these situations.

Another thing you can do is call `toTask`.

val p: Process[Task,Byte] = ...
val t: Task[Byte] = p.toTask

You can call `t.attemptRun: Throwable \/ Byte` repeatedly. If you get back `Process.End`, that indicates normal termination. It's a hack because it's not resource safe - if you stop examining the `Task` before it completes, finalizers for the stream are not guaranteed to be run. I've created an issue to track this.

The `toTask` function uses a mutable variable internally, but it's actually pattern matching on the `Process`, so it's not O(n^2) like the repeated `take` and `drop`.

etorreborre

unread,
Nov 7, 2013, 6:00:58 PM11/7/13
to sca...@googlegroups.com
Thanks Paul for the `toTask` trick. That did it.

In my case I don't have much choice I think because I'm feeding a Process to the AWS/S3 api which basically expects an InputStream to further do http requests.
I'll watch issue 46 because I think that's what I need in that case.

E.

Pavel Chlupáček

unread,
Nov 8, 2013, 1:09:11 AM11/8/13
to sca...@googlegroups.com
Eric can you please link the part of api so i can take a look ? Thanks. P. 

Sent from my iPad

etorreborre

unread,
Nov 8, 2013, 9:02:16 AM11/8/13
to sca...@googlegroups.com
See "putObject" there.

Florian Verhein

unread,
Feb 15, 2015, 9:15:08 PM2/15/15
to sca...@googlegroups.com

Hi Eric, 

Sorry for awakening an old thread, but is there any chance you can post your solution to this?

I have a similar problem - the API expects a TraversableOnce and I'd rather not use blocking queues. I'm trying to stream the output of a Process directly into a Spark RDD.

Cheers,
   Florian

Florian Verhein

unread,
Feb 15, 2015, 10:03:47 PM2/15/15
to sca...@googlegroups.com
Actually, I think I got it. 

Posted it here in case someone else runs into this problem / has any better ideas: 
Reply all
Reply to author
Forward
0 new messages