Akka-Stream not truncating and failing

797 views
Skip to first unread message

Harit Himanshu

unread,
Oct 6, 2015, 3:28:37 PM10/6/15
to Akka User List
Hello there,

I am new to Akka-Stream and working on a use case where I need to parse log files. These log files have lines separated by new line.

I was looked at GroupLogFile.scala and my code now looks like  

class LogFile(file: File, implicit val system: ActorSystem) {
  Predef.assert(file.exists(), "log file must exists")

  implicit val materializer = ActorMaterializer()
  val logger = Logger(LoggerFactory.getLogger(getClass))

  val source: Source[ByteString, Future[Long]] = Source.synchronousFile(file)

  // todo (harit): what should be maximumFrameLength
  val flow: Flow[ByteString, String, Unit] = Flow[ByteString]
    .via(Framing.delimiter(ByteString(System.lineSeparator), maximumFrameLength = 1500, allowTruncation = true))
    .map(_.utf8String)

  def process() = {
    logger.debug(s"processing $file")
    source.via(flow).runForeach(println)
  }
}

The problem arises when a log line is more than 1500 characers (1500 bytes), the stream fails. I asserted by running it against a log file and it stopped on a line which had 1932 characters (1932 bytes)
I set allowTruncation = true, so I thought the line would truncate everything after 1500 bytes and move on, but its not the case.

Question
- How can I truncate over 1500 bytes, process (or print) the line and move on with next line? without failing the stream?

Martynas Mickevičius

unread,
Oct 11, 2015, 12:36:54 PM10/11/15
to akka...@googlegroups.com
Hi Harit,

according to the docs allowTruncation controls whether to fail the stream if the last upstream line does not contain a delimiter.

The built-in delimiter framing stage does not handle all use cases. However it is a good example of such functionality. I would recommend taking a look at the source code and adapting it to your needs.

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Martynas Mickevičius
TypesafeReactive Apps on the JVM

Harit Himanshu

unread,
Oct 11, 2015, 7:53:19 PM10/11/15
to akka...@googlegroups.com
Thanks Martynas

I just wanted to check what is the difference between

val source = Source.synchronousFile(file)

and

val source = scala.io.Source.fromFile(file).getLines()) (as this also returns Iterator)


as I can use it as  

source.map(_.utf8String).runForeach(println)

Are there any performance issues with the seconds approach (scala.io.Source.fromFile(file).getLines()))?

Thanks




You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/sbHgCbcGYiI/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.

Konrad Malawski

unread,
Oct 11, 2015, 8:25:02 PM10/11/15
to akka...@googlegroups.com, Harit Himanshu
Are there any performance issues with the seconds approach (scala.io.Source.fromFile(file).getLines()))?

If I remember correctly 5 to 10 times slower than SynchronousFileSource, *and* the Source.fromFile used (like in the above) example leaks open FileInputStreams which you never close.

SynchronousFileSource is fast, safe, and uses a dedicated thread-pool for the blocking operations by default – use it instead of hand-rolling file reading.


FYI, benchmarks (to be found in akka-bench-jmh-dev on branch release-2.3-dev):

[info] Benchmark                                         (bufSize)  Mode  Cnt     Score     Error  Units

[info] FileSourcesBenchmark.fileChannel                       2048  avgt   10   711.195 ±  36.094  ms/op  // this is SynchronousFileSource

[info] FileSourcesBenchmark.fileChannel_noReadAhead           2048  avgt   10  1660.726 ±  49.221  ms/op

[info] FileSourcesBenchmark.inputStream                       2048  avgt   10   587.248 ±   9.179  ms/op

[info] FileSourcesBenchmark.naive_ioSourceLinesIterator       2048  avgt   10  3794.313 ± 839.539  ms/op


-- konrad

Harit Himanshu

unread,
Oct 11, 2015, 9:22:30 PM10/11/15
to Konrad Malawski, akka...@googlegroups.com
Thanks Konrad

This is insightful. In this case, I am not able to read through the entire longline and I asked a question where I am looking for guidance.

Any help/recommendation is very much appreciated

Thanks again

Harit Himanshu

unread,
Oct 12, 2015, 12:49:15 AM10/12/15
to Konrad Malawski, akka...@googlegroups.com
During my search, I stumbled upon this place, where Adam Warski created a new ParseLinesStage. This worked for me but failed where we so a check for 

if (buffer.size > maximumLineBytes) {
println("XXX FAIL")
ctx.fail(new IllegalStateException(s"Read ${buffer.size} bytes " +
s"which is more than $maximumLineBytes without seeing a line terminator"))
}

So, I removed this logic and it now looks as  

object LogFile {
  val maxBytesPerLine = 1500
  implicit val system = ActorSystem("system")

  def apply(file: File) = new LogFile(file)

  def main(args: Array[String]) {
    val file: File = new File("processor/src/main/resources/Demo_log_004.log")
    //    LogFile(file).processGraph()
    LogFile(file).process()
    implicit val materializer = ActorMaterializer()
  }
}

class LogFile(file: File)(implicit val system: ActorSystem) {
  // todo (harit): apply more filters to make sure file is right
  Predef.assert(file.exists(), "log file must exists")

  implicit val materializer = ActorMaterializer()
  val logger = Logger(LoggerFactory.getLogger(getClass))

  val source: Source[ByteString, Future[Long]] = SynchronousFileSource(file, 1500)

  def process() = {
    source.transform(() => new ParseLinesStage("\n")).runForeach(println)
  }
}


class ParseLinesStage(separator: String) extends StatefulStage[ByteString, String] {
  private val separatorBytes = ByteString(separator)
  private val firstSeparatorByte = separatorBytes.head
  private var buffer = ByteString.empty
  private var nextPossibleMatch = 0

  def initial = new State {
    override def onPush(chunk: ByteString, ctx: Context[String]) = {
      buffer ++= chunk
      emit(doParse(Vector.empty).iterator, ctx)
    }

    @tailrec
    private def doParse(parsedLinesSoFar: Vector[String]): Vector[String] = {
      val possibleMatchPos = buffer.indexOf(firstSeparatorByte, from = nextPossibleMatch)
      if (possibleMatchPos == -1) {
        // No matching character, we need to accumulate more bytes into the buffer
        nextPossibleMatch = buffer.size
        parsedLinesSoFar
      } else {
        if (possibleMatchPos + separatorBytes.size > buffer.size) {
          // We have found a possible match (we found the first character of the terminator
          // sequence) but we don't have yet enough bytes. We remember the position to
          // retry from next time.
          nextPossibleMatch = possibleMatchPos
          parsedLinesSoFar
        } else {
          if (buffer.slice(possibleMatchPos, possibleMatchPos + separatorBytes.size)
            == separatorBytes) {
            // Found a match
            val parsedLine = buffer.slice(0, possibleMatchPos).utf8String
            buffer = buffer.drop(possibleMatchPos + separatorBytes.size)
            nextPossibleMatch -= possibleMatchPos + separatorBytes.size
            doParse(parsedLinesSoFar :+ parsedLine)
          } else {
            nextPossibleMatch += 1
            doParse(parsedLinesSoFar)
          }
        }
      }

    }
  }
}

Couple of open questions

a.) I use SynchronousFileSource(file, 1500), does changing chunkSize will make differ huge difference in performance? As I understand (and could be totally wrong), this is how much you read from file each time, right? what is the motivation of keeping defaultChunkSize of 8192

b.) As I mentioned I removed the check that original code listed, I am not sure if this would have any performance hits. My apologies, I am new and have questions that I can not find answers to.

Thank you
+ Harit Himanshu

Martynas Mickevičius

unread,
Nov 5, 2015, 10:21:47 AM11/5/15
to akka...@googlegroups.com
a.) I use SynchronousFileSource(file, 1500), does changing chunkSize will make differ huge difference in performance? As I understand (and could be totally wrong), this is how much you read from file each time, right? what is the motivation of keeping defaultChunkSize of 8192

chunkSize is the size of the ByteStrings emitted by the source. You can find the source of the actor that backs this source here.
Reply all
Reply to author
Forward
0 new messages