Groups keyboard shortcuts have been updated
Dismiss
See shortcuts

scalaz-stream: can a Process be made Traversable?

201 views
Skip to first unread message

etorreborre

unread,
Dec 4, 2013, 1:14:47 AM12/4/13
to sca...@googlegroups.com
I tried to write a Traversable instance for Process[F, A] but I can't find a way to handle Await.recv.

Is it even possible? I was expecting to be able to do it because I was viewing a Process more or less as a stream of elements but I might have the wrong idea here.

E.

Pavel Chlupacek

unread,
Dec 4, 2013, 1:39:53 AM12/4/13
to sca...@googlegroups.com
I think problem with await is that it may be asynchronous, and not really sure how that can be implemented by traversable. 

Traversable for me does`not have concept of `asynchronous` notification of next step, so I doubt this will be possible. 

However I believe this really depends on type of ¨F` where process runs. I would say for example Process0 can be converted easily to traversable (see Process0 syntax, toList). 
But for example for F == Task, this will require your `driver` to actually run the `req` in Await. Like Task.run. This will obviously block the running thread.  

I think Traversable here is like a `driver` that runs the process. Look on runXXX for implementation of these. 


Pavel. 


--
You received this message because you are subscribed to the Google Groups "scalaz" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scalaz+un...@googlegroups.com.
To post to this group, send email to sca...@googlegroups.com.
Visit this group at http://groups.google.com/group/scalaz.
For more options, visit https://groups.google.com/groups/opt_out.

etorreborre

unread,
Dec 5, 2013, 7:55:33 AM12/5/13
to sca...@googlegroups.com
Hi Pavel,

Thanks for you hint, that really helped. I decided to write a short blog post about today's experiment: bit.ly/ItSadw .

I hope I haven't mis-represented the library and that I didn't miss something huge somewhere (which is fairly possible :-)).

E.

Pavel Chlupacek

unread,
Dec 5, 2013, 8:44:16 AM12/5/13
to sca...@googlegroups.com
Cool Eric, 

    I may come up laters today or this week with solution that don`t need runState to be implemented. 

   If I understand correctly you need a very simple parser that will validate line by line structure of file possible of BootStrap, Headers, and then lines, terminating early indicating where the problem was…

   If I am reading this correctly I hope I can come with solution that is using current combinators w/o runState. 

  
Pavel. 

Pascal Voitot Dev

unread,
Dec 5, 2013, 9:33:06 AM12/5/13
to scalaz
Hi Eric,

I'm studying scalaz-stream too and read your article.

When you say "A process is Process[F[_], O] where O is the type of the output and the type of the input is... nowhere?"

It's just that F is something doing somewhat somewhere somehow :)... So it can create some inputs but not necessarily.

But you've got Process1[I, O] which explicitly represents a Process accepting I and outputting O.

What's cool is that you can do that with it :

Process[F, I] pipe Process[I, O] => Process[F, O]

An idea (which could be stupid) I had while reading your article: you could have fun with Channels which are streams of effectul functions.

type Channel[F, O, A] = Process[F, O => F[A]]

If you pass the Process[F, O] through the channel, you apply the function on each output element.

Process[F, O] through Process[F, O => F[State[E \/ S, Option[A]]] =>
Process[F, State[E \/ S, Option[A]]]

So you've got a stream of states that you can certainly fold in some way with the monad and catchable to compute the final State...

Regards
Pascal

etorreborre

unread,
Dec 5, 2013, 3:20:46 PM12/5/13
to sca...@googlegroups.com
Good point Pascal, I also thought of using Channels at some stage but didn't try to pursue the idea further. Let's see what Pavel will come up with.

Pascal Voitot Dev

unread,
Dec 5, 2013, 4:05:15 PM12/5/13
to scalaz
On Thu, Dec 5, 2013 at 9:20 PM, etorreborre <etorr...@gmail.com> wrote:
Good point Pascal, I also thought of using Channels at some stage but didn't try to pursue the idea further. Let's see what Pavel will come up with.


I believe he will have something better and more indiomatic ;)

Pavel Chlupacek

unread,
Dec 5, 2013, 4:11:56 PM12/5/13
to sca...@googlegroups.com
Possible solution is here:


P.

Pascal Voitot Dev

unread,
Dec 5, 2013, 4:17:46 PM12/5/13
to scalaz
On Thu, Dec 5, 2013 at 10:11 PM, Pavel Chlupacek <chlu...@gmail.com> wrote:

It's too simple, it's not interesting :D

etorreborre

unread,
Dec 5, 2013, 4:23:03 PM12/5/13
to sca...@googlegroups.com
Very, very interesting, thanks!

How do you think I could validate the number of lines that is included in the TRAILER line?

Pavel Chlupacek

unread,
Dec 5, 2013, 4:42:37 PM12/5/13
to sca...@googlegroups.com
Just updated the gist, that should do the trick :-)

P.

Paul Chiusano

unread,
Dec 5, 2013, 4:48:08 PM12/5/13
to sca...@googlegroups.com
Just kind of a general comment, when you're using scalaz-stream (or any stream processing library), you don't usually need to use State. A stream transducer (like Process1) already gives you all the state transition capabilities you need. Rather than:

sealed trait LineState
case object ExpectHeader extends LineState
case object ExpectHeaderColumns extends LineState
case class  ExpectLineOrTrailer(lineCount: Int = 0) extends LineState

which you pattern match on in your State action, which you then have to run, you can just make each of these a local definition inside of your line parser, so something like:

def lineParser: Process1[String,String] = {
  
  def expectHeader: Process1[String,String] = 
     await1[String].flatMap(h => expectHeaderColumns(parseHeader(h)))
  
  def expectHeaderColumns(hdr: Header): Process1[String,String] = 
     await1[String].flatMap(cols => ... expectLineOrTrailer(0))
  
  def expectLineOrTrailer(nRead: Int): Process1[String,String] = 
    ...

  expectHeader // initial state
}

Hopefully you get the idea. This is the 'explicit' recursion way of encoding the state machine. As Pavel demo'd, you can also sometimes assemble the same state machine using combinators, which can work out nicely.

Paul :)

Pascal Voitot Dev

unread,
Dec 5, 2013, 4:59:42 PM12/5/13
to scalaz
On Thu, Dec 5, 2013 at 10:48 PM, Paul Chiusano <paul.c...@gmail.com> wrote:
Just kind of a general comment, when you're using scalaz-stream (or any stream processing library), you don't usually need to use State. A stream transducer (like Process1) already gives you all the state transition capabilities you need. Rather than:

sealed trait LineState
case object ExpectHeader extends LineState
case object ExpectHeaderColumns extends LineState
case class  ExpectLineOrTrailer(lineCount: Int = 0) extends LineState

which you pattern match on in your State action, which you then have to run, you can just make each of these a local definition inside of your line parser, so something like:

def lineParser: Process1[String,String] = {
  
  def expectHeader: Process1[String,String] = 
     await1[String].flatMap(h => expectHeaderColumns(parseHeader(h)))
  
  def expectHeaderColumns(hdr: Header): Process1[String,String] = 
     await1[String].flatMap(cols => ... expectLineOrTrailer(0))
  
  def expectLineOrTrailer(nRead: Int): Process1[String,String] = 
    ...

  expectHeader // initial state
}

Hopefully you get the idea. This is the 'explicit' recursion way of encoding the state machine. As Pavel demo'd, you can also sometimes assemble the same state machine using combinators, which can work out nicely.


This is not far from streamed simple parsers!

etorreborre

unread,
Dec 5, 2013, 7:55:53 PM12/5/13
to sca...@googlegroups.com
Hi Pavel,

Thanks for the update but there seems to be a fundamental flaw somewhere because the processing is takes polynomial time. I'm currently trying to figure out what's wrong.

E.

etorreborre

unread,
Dec 5, 2013, 8:06:18 PM12/5/13
to sca...@googlegroups.com
Update: if I don't use buffering I'm only reading lines once.

etorreborre

unread,
Dec 5, 2013, 10:51:05 PM12/5/13
to sca...@googlegroups.com
I hope I'm not annoying anyone with my silly use case... These are my current results.

The current code works ok:

  def process(path: String, targetName: String): String \/ File = {

    val HEADER  = "HEADER(.*)".r
    val TRAILER = "TRAILER\\|(\\d+)".r

    val lineOrTrailer: Process1[String, String]  = {
      def go(lines: Int): Process1[String, String] =
        receive1[String, String] {
          case TRAILER(count) => if (count.toInt == lines) halt else Halt(new Exception(s"Expected $count lines, but got $lines"))
          case HEADER(h)      => Halt(new Exception(s"Didn't expected a HEADER here: $h"))
          case s              => emit(s) fby go(lines + 1)
        }

      go(0)
    }

    val linesStructure =
      discardRegex("HEADER.*") fby
      discardLine              fby
      lineOrTrailer

    val read       = io.linesR(path) |> linesStructure
    val targetPath = path.replace(".DAT", "")+targetName
    val task       = ((read |> process1.intersperse("\n") |> process1.utf8Encode) to io.fileChunkW(targetPath)).run

    task.attemptRun.leftMap(_.getMessage).map(_ => new File(targetPath))
  }

But it performs horribly slowly compared to the equivalent imperative code:

  def process2(path: String, targetName: String): String \/ File = {

    val targetPath = path.replace(".DAT", "")+targetName
    val writer = new FileWriter(targetPath)
    val source = scala.io.Source.fromFile(new File(path))
    var count = 0
    var skipNextLine = false
    try {
      source.getLines().foreach { line =>
        if (line.startsWith("HEADER")) skipNextLine = true
        else if (skipNextLine) { skipNextLine = false }
        else if (line.startsWith("TRAILER")) {
          val expected = line.drop(8).headOption.map(_.toInt).getOrElse(0)
          if (expected != count) throw new Exception(s"expected $expected, got $count")
        }
        else {
          count = count + 1
          writer.write(line)
        }
      }
    } catch {
      case t: Throwable => t.getMessage.left
    } finally {
      source.close
      writer.close
    }
    new File(targetPath).right
  }

7 minutes against 8 seconds for a 700M file! 

Is there anything obvious which I should try here? Is this inherent to scalaz-stream and the fact that we are using a "trampolining" structure (Halt, Emit, Await)?

Thanks,

Eric.

Paul Chiusano

unread,
Dec 5, 2013, 11:08:32 PM12/5/13
to sca...@googlegroups.com
You might want to check out this SO question: http://stackoverflow.com/questions/18853079/performance-of-line-counting-with-scalaz-stream This is a case where you're doing very little work per stream element, so the overhead of scalaz-stream itself can become an issue. I suspect doing some chunking would get the performance a lot closer to the imperative version.

Also, there's some NIO support in the works or on the roadmap that would allow huge memory-mapped files to be treated as streams and processed extremely efficiently. At that point, I'm hoping we can start blowing hand tuned ordinary imperative Java code out of the water. :)

Paul

Pascal Voitot Dev

unread,
Dec 6, 2013, 2:55:04 AM12/6/13
to scalaz

We have exactly the same kind of issues with Iteratees when you trying to pipeline IO/Future effects with too short unity of work. As you said, the overhead of the streaming mechanism on top of the effect itself becomes the bottleneck.
Chunking the file helps clearly.
With NIO, it would help even more!

Pascal

etorreborre

unread,
Dec 7, 2013, 8:15:02 PM12/7/13
to sca...@googlegroups.com
Chunking indeed makes a difference, if you know where to chunk!

This is what I ended up with. First I need a new combinator:

  def linesRChunk(filename: String, chunkSize: Int = 10000): Process[Task, Vector[String]] =
   io.resource(Task.delay(scala.io.Source.fromFile(filename)))(src => Task.delay(src.close)) { src =>
      lazy val lines = src.getLines.sliding(chunkSizechunkSize) // A stateful iterator
      Task.delay {
        if (lines.hasNext) lines.next.toVector
        else throw End
      }
   }

I tried to go `linesR(path).chunk(chunkSize)` but I observed that the chunks were created with increasing sizes from 1 to chunkSize. I think this is the result of this issue with orElse.

Then my "lines parser" becomes:

    val HEADER  = "HEADER(.*)".r
    val TRAILER = "TRAILER\\|(\\d+)".r

    def linesParser(state: LineState): Process1[Vector[String], Vector[String]] = {
      def onHeader(rest: Vector[String]) = 
        (emit(rest) |> linesParser(ExpectLineOrTrailer(0))) fby 
        linesParser(ExpectLineOrTrailer(rest.size))

      def onTrailer(ls: Vector[String], count: Int, actual: Int) =
        if ((actual + ls.size) == count) emit(ls)
        else                             fail(new Exception(s"expected $count lines, got $actual"))

      def onLines(ls: Vector[String], actual: Int) =
        emit(ls) fby linesParser(ExpectLineOrTrailer(actual + ls.size))

      receive1[Vector[String], Vector[String]] { case lines =>
        (lines, state) match {
          case (Vector(),                  _)                      => halt
          case (HEADER(_) +: cols +: rest, ExpectHeader)           => onHeader(rest)
          case (_,                         ExpectHeader)           => fail(new Exception("expected a header"))
          case (ls :+ TRAILER(count),      ExpectLineOrTrailer(n)) => onTrailer(ls, count.toInt, n)
          case (ls,                        ExpectLineOrTrailer(n)) => onLines(ls, n)
        }
      }
    }

    val read = linesRChunk(path, bufferSize) |> linesParser(ExpectHeader).map(lines => lines.mkString("\n"))
    val task = ((read |> process1.intersperse("\n") |> process1.utf8Encode) to io.fileChunkW(targetPath)).run

I think that in terms of readability this is pretty good and there's no ugly loop with variables. The performances are almost on par with the loop approach (13s for a 700M file vs 8 seconds for the loop version) but I think that most of the difference comes from using regular expression in pattern matching instead of just testing "if line.startsWith".

Now I can't wait to see what NIO brings to the table! Who's working on it?

E.

etorreborre

unread,
Dec 9, 2013, 7:37:38 AM12/9/13
to sca...@googlegroups.com
The never-ending thread!

I turns out that the solution above looks nice but is utterly incorrect. Here is a better working solution (and still ok performance-wise):

  def process(path: String, targetName: String, chunkSize: Int = 1): String \/ File = {

    val targetPath = path.replace(".DAT", "")+targetName

    val read = linesRChunk(path, chunkSize) |> validateLines.map(lines => lines.mkString("\n"))
    val task = ((read |> process1.intersperse("\n") |> process1.utf8Encode) to io.fileChunkW(targetPath)).run

    task.attemptRun.leftMap(_.getMessage).map(_ => new File(targetPath))
  }

  /**
   * validate that the lines have the right sequence of HEADER/column names/lines/TRAILER
   * and the right number of lines
   */
  def validateLines: Process1[Vector[String], Vector[String]] = {
    val HEADER  = "HEADER(.*)".r
    val TRAILER = "TRAILER\\|(\\d+)".r

    def linesParser(state: LineState): Process1[Vector[String], Vector[String]] = {
      receive1[Vector[String], Vector[String]] { case lines =>
        (lines.headOption, state) match {
          case (Some(HEADER(_)), LineState(o, l)) =>
            if (o) fail("the section must not be reopened")
            else {
              (emit(lines.drop(2)) |> linesParser(LineState(openedSection = true, 0))) fby
              linesParser(LineState(openedSection = lines.count(isHeader) > lines.count(isTrailer), lines.drop(2).size))
            }

          case (Some(TRAILER(count)), LineState(o, l)) =>
            if (!o) fail("closing section too soon")
            else if (l != count.toInt) fail(s"expected $count lines, got $l")
            else
              (emit(lines.drop(1)) |> linesParser(LineState(false, l))) fby
              linesParser(LineState(lines.drop(1).count(isHeader) > lines.drop(1).count(isTrailer), lines.drop(1).size))

          case (h,          LineState(o, l)) =>
            if (!o && h.isDefined) fail("section should be opened")
            else if (!h.isDefined) halt
            else {
              val (first, rest) = lines.span(line => !isTrailer(line))
              emit(first) fby
              (emit(rest) |> linesParser(LineState(o, l + first.size))) fby
              linesParser(LineState(o, l + lines.size))
            }
        }
      }
    }
    // initialise the parsing expecting a HEADER
    linesParser(LineState())
  }

  private def fail(message: String) = Halt(new Exception(message))
  private def isHeader(line: String) = line.startsWith("HEADER")
  private def isTrailer(line: String) = line.startsWith("TRAILER")

The previous solution was wrong when the file was containing several sections HEADER/column names/lines/TRAILER or an incorrect structure. The reason is that, depending on the chunk size, the content of a chunk can contain all sorts of variations:
  
 - a HEADER/column names and some lines
 - some lines, a TRAILER, and a new section starting
 - ...

The one above can probably refactored to something more pleasant but before I try to do this I need to handle one case and need your help for this.

What if the TRAILER is missing from a file? How can I detect this situation. In this case no lines will be fed to the parser so I don't have the opportunity to halt with an Exception! I tried various combinations of `onComplete` or `orElse`, but couldn't find a way to make it work. 

I really didn't suspect that the whole thing would be so hard to code and I would be happy to nail this down once and for all. So what do you think I can do in that case?

Thanks,

Eric.

Pavel Chlupacek

unread,
Dec 9, 2013, 8:07:10 AM12/9/13
to sca...@googlegroups.com
I`ll loook on that laters, 

I understand that with the original suggested solution your issue was performance right? 

Pavel. 

etorreborre

unread,
Dec 9, 2013, 1:45:53 PM12/9/13
to sca...@googlegroups.com
Yes indeed, this why I needed to chunk lines with 'linesRChunk'. Now I'm thinking of modifying this function to have it emit one last marker line when I reach the end of the file in order to detect a missing trailer. But that looks ugly.

Pavel Chlupacek

unread,
Dec 11, 2013, 8:26:19 AM12/11/13
to sca...@googlegroups.com
Eric, sorry to not reply before,

from the amount of non-scalaz-streams code in your solution, I am sure this can be done better, but maybe current performance limitations and your requirements does not provide better solution. However we are working hard to improve performance and please follow scalaz.stream gtihub for more updates…


Pavel.

On Dec 9, 2013, at 7:45 PM, etorreborre <etorr...@gmail.com> wrote:

> Yes indeed, this why I needed to chunk lines with 'linesRChunk'. Now I'm thinking of modifying this function to have it emit one last marker line when I reach the end of the file in order to detect a missing trailer. But that looks ugly.
>

etorreborre

unread,
Dec 12, 2013, 6:46:12 AM12/12/13
to sca...@googlegroups.com
Thanks for taking the time to ponder this use-case. I have one more question though.

When I use I code I pasted on a big file that is missing the last trailer I don't get an error. I tried to use "orElse" thinking that, when there are no more lines to consume I would get a failure message:

              emit(first) ++
              (emit(rest) |> linesParser(LineState(o, l + first.size))) ++
              linesParser(LineState(o, l + lines.size)).orElse(fail("a trailer is expected")) 


But this fails after the first chunk instead of failing after the last one. Do you have any idea on how to fix this?

Thanks,

Eric.
Reply all
Reply to author
Forward
0 new messages