How to Properly Manage Akka's Memory Usage with 40 Million Lines Task?

2,062 views
Skip to first unread message

Allen Nie

unread,
Jan 9, 2015, 10:53:33 AM1/9/15
to akka...@googlegroups.com

Hi,

    I am trying to process a csv file with 40 million lines of data in there. It's a 5GB size file. I'm trying to use Akka to parallelize the task. However, it seems like I can't stop the quick memory growth. It expanded from 1GB to almost 15GB (the limit I set) under 5 minutes. This is the code in my main() method:

val inputStream = new FileInputStream("E:\\Allen\\DataScience\\train\\train.csv")
val sc = new Scanner(inputStream, "UTF-8")

var counter = 0

while (sc.hasNextLine) {

  rowActors(counter % 20) ! Row(sc.nextLine())

  counter += 1
}

sc.close()
inputStream.close()

    Someone pointed out that I was essentially creating 40 million Row objects, which naturally will take up a lot of space. My row actor is not doing much. Just simply transforming each line into an array of integers (if you are familiar with the concept of vectorizing, that's what I'm doing). Then the transformed array gets printed out. Done. I originally thought there was a memory leak but maybe I'm not managing memory right. Can I get any wise suggestions from the Akka experts here??

    


Soumya Simanta

unread,
Jan 9, 2015, 12:43:49 PM1/9/15
to akka...@googlegroups.com
I would recommend using the Akka-streams API for this. 
Here is sample. I was able to process a 1G file with around 1.5 million records in 20MB of memory. The file read and the writing on the console rates are different but the streams API handles that.  This is not the fastest but you at least won't run out of memory. 



import java.io.FileInputStream
import java.util.Scanner

import akka.actor.ActorSystem
import akka.stream.{FlowMaterializer, MaterializerSettings}
import akka.stream.scaladsl.Source

import scala.util.Try


object StreamingFileReader extends App {


  val inputStream = new FileInputStream("/path/to/file")
  val sc = new Scanner(inputStream, "UTF-8")

  implicit val system = ActorSystem("Sys")
  val settings = MaterializerSettings(system)
  implicit val materializer = FlowMaterializer(settings.copy(maxInputBufferSize = 256, initialInputBufferSize = 256))

  val fileSource = Source(() => Iterator.continually(sc.nextLine()))

  import system.dispatcher    

  fileSource.map { line =>
    line //do nothing
  //in the for each print the line. 
  }.foreach(println).onComplete { _ =>
    Try {
      sc.close()
      inputStream.close()
    }
    system.shutdown()

Allen Nie

unread,
Jan 9, 2015, 3:52:40 PM1/9/15
to akka...@googlegroups.com
Thank you Soumya, 

       I think Akka-streams is the way to go. However, I would also appreciate some performance boost as well - still have 40 million lines to go through! But thanks anyway!

Viktor Klang

unread,
Jan 9, 2015, 4:03:13 PM1/9/15
to Akka User List
Hi Allen,

What's the bottleneck?
Have you tried enabling the experimental optimizations?

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Cheers,

Allen Nie

unread,
Jan 9, 2015, 4:53:29 PM1/9/15
to akka...@googlegroups.com
Hey Viktor,

    I'm trying to use Akka to parallelize this process. There shouldn't be any bottleneck, and I don't understand why I got memory overflow with my first version (actor version). The main task is to read in a line, break it up, and turn each segments (strings) into an integer, then prints it out to a CSV file (vectorization process).

   def processLine(line: String): Unit = {
  val vector: ListBuffer[String] = ListBuffer()
  val segs = line.split(",")

  println(segs(0))

  (1 to segs.length - 1).map {i =>
    val factorArray = dictionaries(i-1)
    vector += factorArray._2.indexOf(segs(i)).toString   //get the factor level of string
  }

  timer ! OneDone

  printer ! Print(vector.toList)
}

    When I'm doing this in pure Akka (with actors), since I created 40 million objects: Row(line: String), I get memory overflow issue. If I use Akka-stream, there is no memory overflow issue, but the performance is too similar to the non-parallelized version (even slower).

    It's my first time using Akka-stream. So I'm unfamiliar with the optimization you were talking about.

Sincerely,
Allen

Soumya Simanta

unread,
Jan 9, 2015, 5:39:15 PM1/9/15
to akka...@googlegroups.com
Allen,

What are your constraints ? Does the output CSV have to maintain the order of the input file ? Do you have an upper bound ?

I don't think you are CPU bound so you need to look at ways of reading/writing faster. Maybe async IO using nio can help. 
You can split the input and process in parallel if you don't mind multiple output files.  

I'm not aware of any way of doing file IO faster using Akka. Maybe the Akka folks can provide better guidance there. 

BTW how much memory are you giving your akka-streams program? 

-Soumya

Viktor Klang

unread,
Jan 10, 2015, 4:02:43 AM1/10/15
to Akka User List
Hi Allen,

I'd suspect the reason that it works well with Akka Streams is that they have back-pressure while your actor solution does not (you'll send 40 million messages as fast as you can, but the actor processing them might not be able to keep up)

Endre Varga

unread,
Jan 10, 2015, 5:38:42 AM1/10/15
to akka...@googlegroups.com
Hi,

On Fri, Jan 9, 2015 at 10:53 PM, Allen Nie <aimi...@gmail.com> wrote:
Hey Viktor,

    I'm trying to use Akka to parallelize this process. There shouldn't be any bottleneck, and I don't understand why I got memory overflow with my first version (actor version). The main task is to read in a line, break it up, and turn each segments (strings) into an integer, then prints it out to a CSV file (vectorization process).

   def processLine(line: String): Unit = {
  val vector: ListBuffer[String] = ListBuffer()
  val segs = line.split(",")

  println(segs(0))

  (1 to segs.length - 1).map {i =>
    val factorArray = dictionaries(i-1)
    vector += factorArray._2.indexOf(segs(i)).toString   //get the factor level of string
  }

  timer ! OneDone

  printer ! Print(vector.toList)
}

    When I'm doing this in pure Akka (with actors), since I created 40 million objects: Row(line: String), I get memory overflow issue.

No surprise there, you just slurp up all rows faster than the actors can keep up processing them, so most of them are in a mailbox. In fact if your actors do something trivially simple, the whole overhead of asynchronously passing elements to the actors might be larger than what you gain. In these cases it is recommended to pass batches of Rows instead of one-by-one. Remember, parallelisation only gains when the overhead of it is smaller than the task it parallelizes. 

 
If I use Akka-stream, there is no memory overflow issue, but the performance is too similar to the non-parallelized version (even slower).

No surprise there either, you did nothing to parallelize or pipeline any computation in the stream, so you get the overhead of asynchronous processing and none of the benefits of it (but at least you get backpressure).

You have a few approaches to get the benefints of multi-core processing with streams:
 - if you have multiple processing steps for a row you can pipeline them, see the intro part of this doc page: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-rate.html
 - you can use mapAsync to have similar effects but with one computation step, see here: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-integrations.html#Illustrating_ordering_and_parallelism
 - you can explicitly add fan-out elements to parallelise among multiple explicit workers, see here: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-cookbook.html#Balancing_jobs_to_a_fixed_pool_of_workers

Overall, for this kind of tasks I recommend using Streams, but you need to read the documentation first to understand how it works. 

-Endre

Allen Nie

unread,
Jan 10, 2015, 7:42:57 PM1/10/15
to akka...@googlegroups.com
Hi Endre,

    That's a very valid suggestion. I'm quite new to Akka (finished about 35% of its docs). I'm still trying to understand how to properly parallelize tasks. You and Viktor mentioned back-pressure. Can you go a bit deeper in that. For example, what is back-pressure and how to build it into my actor solutions ? (Info links would be all I need). I asked a similar question like this on StackOverflow but no one could point me to the right direction.

    Thank you for linking Akka-stream's docs.

Allen

Soumya Simanta

unread,
Jan 10, 2015, 8:03:18 PM1/10/15
to akka...@googlegroups.com
Allen, 

Here is one definition of back pressure http://www.reactivemanifesto.org/glossary#Back-Pressure
For example, in your initial user question about memory errors the back pressure mechanism  of akka-streams allows processing your data even with a limited memory budget. 

I have personally found this presentation (https://www.youtube.com/watch?v=khmVMvlP_QA
) by Roland Kuhn very helpful in understanding the motivations and core concepts behind akka-streams which is an implementation of reactive streams ( http://www.reactive-streams.org/). 


-Soumya


You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/LvCY31-ILBA/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.

Allen Nie

unread,
Jan 10, 2015, 11:05:18 PM1/10/15
to akka...@googlegroups.com
Thanks Soumya. That's very helpful!

Drewhk, 

    Sorry to bother you one more time. After reading the documents, I changed my code to this in order to fully utilize parallelism in Akka Stream:

    fileSource.map {line =>
  line.split(",")
}
  .map { segs =>
  segs(0) +: (1 to segs.length -1).map { i =>
    stableDictionaries(i-1).indexOf(segs(i)).toString
  }
}
  .foreach{(segs) =>
  println(segs(0))
  timer ! OneDone //not sure if this works
  printer ! Print(segs)

}.onComplete { _ =>
  Try {
    sc.close()
    inputStream.close()
  }
  system.shutdown()
}

    As you can see, one of the heavier duty actually happens within the element of the input array. Is there a way for me to parallelize that process as well since it happened inside a map?

Allen

Akka Team

unread,
Jan 16, 2015, 4:15:42 AM1/16/15
to Akka User List
Hello there,


    As you can see, one of the heavier duty actually happens within the element of the input array. Is there a way for me to parallelize that process as well since it happened inside a map?
Reply all
Reply to author
Forward
0 new messages