parallel aggregator

71 views
Skip to first unread message

Marc Limotte

unread,
Dec 27, 2010, 6:01:51 PM12/27/10
to cascal...@googlegroups.com
Hi Nathan,

Keeping up the pace of questions, here's another one...

I'm using some parallel aggregators in my code.  I believe these are implemented similarly to Cascading's AggregateBy, in that they tradeoff memory for better performance.  I'm wondering how to manage (tweak) that memory usage.  Cascading AggregateBy has a threshold parameter which controls the size of the LRU cache.  Is there something similar in Cascalog?

I'm also wondering how I can tell when that internal cache is filling up?  It seems that it is a lot like the io.sort buffer.  If we make it large enough, we can avoid a spill, which should optimize performance.  For one flow, I have Java heap max at 2g, and io.sort.mb at 800-- not sure how much memory Cascalog uses or wants in order to do it's map-side parallel aggregations.

thanks,
Marc


nathanmarz

unread,
Dec 27, 2010, 6:24:09 PM12/27/10
to cascalog-user
I pushed a small change to github that makes those caches
configurable. You can configure the size with
cascalog.combiner.aggregator.size in the JobConf. It defaults to
10000.

As for knowing when it evicts something from the cache, that's not
something that I want to log each time it happens since evictions
happen a lot. I'm thinking that the right way to go is have it log
every 10 seconds about the eviction stats. Opened up an issue to that
effect here: https://github.com/nathanmarz/cascalog/issues/issue/7

-Nathan

Marc Limotte

unread,
Dec 27, 2010, 6:40:12 PM12/27/10
to cascal...@googlegroups.com
Thanks, Nathan.

I can try some different values to see what works... comparing map input records to map output records.  To make sure I understand how it works and put me in the right ballpark:

Is it an LRU cache (like Cascading's map side combiner)?  And the size is specified in entries (or is that kb)?

What's the impact of the eviction?  Does it just push the data out to the io.sort buffer, meaning that we're getting less aggregation on this phase than we would with more memory?  Or does it do it's own merge step at the end of the map?

Marc

nathanmarz

unread,
Dec 27, 2010, 6:49:41 PM12/27/10
to cascalog-user
Yes, it's an LRU cache. The size is specified in entries.

Eviction just means the record is pushed to the reducer where all
remaining intermediate values will be combined into the final result.
The larger the cache, the less eviction needs to be done map-side.

I'd be surprised if you're able to get much of a performance increase
by increasing the cache size. The overwhelmingly common cases in jobs
are either lots of grouping keys or a small amount of grouping keys
(e.g. hundreds). If you think the cardinality of your grouping keys is
in the low millions then you might be able to see some benefit.

Marc Limotte

unread,
Dec 27, 2010, 7:21:51 PM12/27/10
to cascal...@googlegroups.com
The number of keys averages around 8 million.  It's skewed, though, so the top 5% of keys accommodate 95% of the input records.  SO, I think tuning the cache size might be of moderate help.  How can I think about the memory requirements?  If my output records from the map are around 300 bytes, can I extrapolate and say the LRU cache will use about 10000 entries * 300 bytes = 3 mb?

Marc

nathanmarz

unread,
Dec 28, 2010, 12:56:35 AM12/28/10
to cascalog-user
It's pretty tricky to estimate memory usage with Java. The map entries
themselves probably take up a lot more mem than you think. Your best
bet is to experiment with different values and monitor the process to
see how much memory it's using.

-Nathan

Marc Limotte

unread,
Dec 28, 2010, 10:59:38 AM12/28/10
to cascal...@googlegroups.com
I'll need to compile a SNAPSHOT for this feature, right?  Or is it in the repo?  Assuming I need to compile, I'm getting this compile error:

mlimottemx:cascalog mlimotte$ lein compile
Compiling cascalog.api
Exception in thread "main" java.lang.ClassNotFoundException: cascalog.SimplePrintDirectedGraph (graph.clj:16)
    at clojure.lang.Compiler$InvokeExpr.eval(Compiler.java:2911)
    at clojure.lang.Compiler.compile1(Compiler.java:5933)
    at clojure.lang.Compiler.compile1(Compiler.java:5923)
    at clojure.lang.Compiler.compile(Compiler.java:5992)
    at clojure.lang.RT.compile(RT.java:368)
    at clojure.lang.RT.load(RT.java:407)
    at clojure.lang.RT.load(RT.java:381)
    at clojure.core$load$fn__4511.invoke(core.clj:4905)
    at clojure.core$load.doInvoke(core.clj:4904)
    at clojure.lang.RestFn.invoke(RestFn.java:409)
    at clojure.core$load_one.invoke(core.clj:4729)
    at clojure.core$load_lib.doInvoke(core.clj:4766)
    at clojure.lang.RestFn.applyTo(RestFn.java:143)
    at clojure.core$apply.invoke(core.clj:542)
    at clojure.core$load_libs.doInvoke(core.clj:4804)
    at clojure.lang.RestFn.applyTo(RestFn.java:138)
    at clojure.core$apply.invoke(core.clj:544)
    at clojure.core$use.doInvoke(core.clj:4880)
    at clojure.lang.RestFn.invoke(RestFn.java:409)
    at cascalog.api$loading__4410__auto__.invoke(api.clj:16)
    at clojure.lang.AFn.applyToHelper(AFn.java:159)
    at clojure.lang.AFn.applyTo(AFn.java:151)
    at clojure.lang.Compiler$InvokeExpr.eval(Compiler.java:2906)
    at clojure.lang.Compiler.compile1(Compiler.java:5933)
    at clojure.lang.Compiler.compile1(Compiler.java:5923)
    at clojure.lang.Compiler.compile(Compiler.java:5992)
    at clojure.lang.RT.compile(RT.java:368)
    at clojure.lang.RT.load(RT.java:407)
    at clojure.lang.RT.load(RT.java:381)
    at clojure.core$load$fn__4511.invoke(core.clj:4905)
    at clojure.core$load.doInvoke(core.clj:4904)
    at clojure.lang.RestFn.invoke(RestFn.java:409)
    at clojure.core$load_one.invoke(core.clj:4729)
    at clojure.core$compile$fn__4516.invoke(core.clj:4916)
    at clojure.core$compile.invoke(core.clj:4915)
    at user$eval7.invoke(NO_SOURCE_FILE:1)
    at clojure.lang.Compiler.eval(Compiler.java:5424)
    at clojure.lang.Compiler.eval(Compiler.java:5415)
    at clojure.lang.Compiler.eval(Compiler.java:5391)
    at clojure.core$eval.invoke(core.clj:2382)
    at clojure.main$eval_opt.invoke(main.clj:235)
    at clojure.main$initialize.invoke(main.clj:254)
    at clojure.main$null_opt.invoke(main.clj:279)
    at clojure.main$main.doInvoke(main.clj:354)
    at clojure.lang.RestFn.invoke(RestFn.java:422)
    at clojure.lang.Var.invoke(Var.java:369)
    at clojure.lang.AFn.applyToHelper(AFn.java:165)
    at clojure.lang.Var.applyTo(Var.java:482)
    at clojure.main.main(main.java:37)
Caused by: java.lang.ClassNotFoundException: cascalog.SimplePrintDirectedGraph
    at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
    at clojure.lang.DynamicClassLoader.findClass(DynamicClassLoader.java:58)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:307)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:248)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:169)
    at cascalog.graph$loading__4410__auto__.invoke(graph.clj:16)
    at clojure.lang.AFn.applyToHelper(AFn.java:159)
    at clojure.lang.AFn.applyTo(AFn.java:151)
    at clojure.lang.Compiler$InvokeExpr.eval(Compiler.java:2906)
    ... 48 more

nathanmarz

unread,
Dec 28, 2010, 5:18:04 PM12/28/10
to cascalog-user
Yes you need to compile. Looks like you don't have the latest
leiningen. You should upgrade to 1.4.

-Nathan
Reply all
Reply to author
Forward
0 new messages