def process(path: String, targetName: String): String \/ File = {
val HEADER = "HEADER(.*)".r
val TRAILER = "TRAILER\\|(\\d+)".r
val lineOrTrailer: Process1[String, String] = {
def go(lines: Int): Process1[String, String] =
receive1[String, String] {
case TRAILER(count) => if (count.toInt == lines) halt else Halt(new Exception(s"Expected $count lines, but got $lines"))
case HEADER(h) => Halt(new Exception(s"Didn't expected a HEADER here: $h"))
case s => emit(s) fby go(lines + 1)
}
go(0)
}
val linesStructure =
discardRegex("HEADER.*") fby
discardLine fby
lineOrTrailer
val read = io.linesR(path) |> linesStructure
val targetPath = path.replace(".DAT", "")+targetName
val task = ((read |> process1.intersperse("\n") |> process1.utf8Encode) to io.fileChunkW(targetPath)).run
task.attemptRun.leftMap(_.getMessage).map(_ => new File(targetPath))
}
But it performs horribly slowly compared to the equivalent imperative code:
def process2(path: String, targetName: String): String \/ File = {
val targetPath = path.replace(".DAT", "")+targetName
val writer = new FileWriter(targetPath)
val source = scala.io.Source.fromFile(new File(path))
var count = 0
var skipNextLine = false
try {
source.getLines().foreach { line =>
if (line.startsWith("HEADER")) skipNextLine = true
else if (skipNextLine) { skipNextLine = false }
else if (line.startsWith("TRAILER")) {
val expected = line.drop(8).headOption.map(_.toInt).getOrElse(0)
if (expected != count) throw new Exception(s"expected $expected, got $count")
}
else {
count = count + 1
writer.write(line)
}
}
} catch {
case t: Throwable => t.getMessage.left
} finally {
source.close
writer.close
}
new File(targetPath).right
}
7 minutes against 8 seconds for a 700M file!
Is there anything obvious which I should try here? Is this inherent to scalaz-stream and the fact that we are using a "trampolining" structure (Halt, Emit, Await)?
Thanks,
Eric.