Hi,
I am trying to process a csv file with 40 million lines of data in there. It's a 5GB size file. I'm trying to use Akka to parallelize the task. However, it seems like I can't stop the quick memory growth. It expanded from 1GB to almost 15GB (the limit I set) under 5 minutes. This is the code in my main() method:
val inputStream = new FileInputStream("E:\\Allen\\DataScience\\train\\train.csv")
val sc = new Scanner(inputStream, "UTF-8")
var counter = 0
while (sc.hasNextLine) {
rowActors(counter % 20) ! Row(sc.nextLine())
counter += 1
}
sc.close()
inputStream.close()Someone pointed out that I was essentially creating 40 million Row objects, which naturally will take up a lot of space. My row actor is not doing much. Just simply transforming each line into an array of integers (if you are familiar with the concept of vectorizing, that's what I'm doing). Then the transformed array gets printed out. Done. I originally thought there was a memory leak but maybe I'm not managing memory right. Can I get any wise suggestions from the Akka experts here??
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
val vector: ListBuffer[String] = ListBuffer()
val segs = line.split(",")
println(segs(0))
(1 to segs.length - 1).map {i =>
val factorArray = dictionaries(i-1)
vector += factorArray._2.indexOf(segs(i)).toString //get the factor level of string
}
timer ! OneDone
printer ! Print(vector.toList)
}Hey Viktor,I'm trying to use Akka to parallelize this process. There shouldn't be any bottleneck, and I don't understand why I got memory overflow with my first version (actor version). The main task is to read in a line, break it up, and turn each segments (strings) into an integer, then prints it out to a CSV file (vectorization process).def processLine(line: String): Unit = {val vector: ListBuffer[String] = ListBuffer() val segs = line.split(",") println(segs(0)) (1 to segs.length - 1).map {i => val factorArray = dictionaries(i-1) vector += factorArray._2.indexOf(segs(i)).toString //get the factor level of string } timer ! OneDone printer ! Print(vector.toList) }When I'm doing this in pure Akka (with actors), since I created 40 million objects: Row(line: String), I get memory overflow issue.
If I use Akka-stream, there is no memory overflow issue, but the performance is too similar to the non-parallelized version (even slower).
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/LvCY31-ILBA/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.
line.split(",")
}
.map { segs =>
segs(0) +: (1 to segs.length -1).map { i =>
stableDictionaries(i-1).indexOf(segs(i)).toString
}
}
.foreach{(segs) =>
println(segs(0))
timer ! OneDone //not sure if this works
printer ! Print(segs)
}.onComplete { _ =>
Try {
sc.close()
inputStream.close()
}
system.shutdown()
}As you can see, one of the heavier duty actually happens within the element of the input array. Is there a way for me to parallelize that process as well since it happened inside a map?