Cascalog 1.8.5-SNAPSHOT w/ Kryo serialization

410 views
Skip to first unread message

Sam Ritchie

unread,
Dec 7, 2011, 2:25:33 PM12/7/11
to cascal...@googlegroups.com
Hey all,

I've pushed Cascalog 1.8.5-SNAPSHOT to Clojars. Cascalog 1.8.5 adds support for Kryo serialization; practically, this allows you to use clojure primitives and collections (in addition to some common java primitives and collections) as fields in your tuples. See the ChangeLog (located here) for a list of all supported types. As a bonus, most other java objects and collections will work with Kryo!

The ability to pass complex data structures around Cascalog opens up all sorts of possibilities. Here's an example of a query that passes clojure maps around and modifies them directly in cascalog:

;; src of 2-tuples of the form
;; [identifier, timeseries-map]
(def src
  [["ascender" {:start-period 100
                :units :days
                :vals (range 100)}]
   ["random" {:start-period 100
              :units :days
              :vals (take 50 (repeatedly rand))}]])

;; Note that we're assoc-ing onto a map within a cascalog query!
(?<- (stdout)
     [?identifier ?new-timeseries]
     (src ?identifier ?timeseries)
     (assoc ?timeseries :field "value!" :> ?new-timeseries)
     (:distinct false))

I'm looking forward to hearing everyone's feedback on this new functionality. Please let me know if you run into any issues.

Cheers,
--
Sam Ritchie, Twitter Inc
@sritchie09

(Too brief? Here's why! http://emailcharter.org)

Sam Ritchie

unread,
Dec 7, 2011, 2:27:59 PM12/7/11
to cascalog-user
By the way, for those of you using
org.apache.hadoop.io.serializer.JavaSerialization, you can take it out
of your job-conf.clj or with-job-conf maps. Kryo serialization should
be able to handle any java object you throw at it MUCH more
efficiently than JavaSerialization.

Cheers,
Sam

On Dec 7, 11:25 am, Sam Ritchie <sritchi...@gmail.com> wrote:
> Hey all,
>

> I've pushed Cascalog 1.8.5-SNAPSHOT to Clojars <http://clojars.org/cascalog>.
> Cascalog 1.8.5 adds support for Kryo <http://code.google.com/p/kryo/>


> serialization;
> practically, this allows you to use clojure primitives and collections (in
> addition to some common java primitives and collections) as fields in your
> tuples. See the ChangeLog (located

> here<https://github.com/nathanmarz/cascalog/blob/develop/CHANGELOG.md>)

Alex Miller

unread,
Dec 7, 2011, 2:46:31 PM12/7/11
to cascalog-user
I'd be a little careful about that characterization - in my admittedly
skimpy testing I've found that Kryo is significantly smaller in
serialization (depends what you're serializing of course but usually
1/2 or less of equivalent Java serialization) although Java is worst
with small amounts of data (where class headers cause a lot of early
bloat).

However, Kryo serialization speed varies and can be slower for many
kinds of Clojure data (1-2x range).

As with anything, if you really want to know, run some tests on *your*
data. :)

Sam Ritchie

unread,
Dec 7, 2011, 2:56:23 PM12/7/11
to cascal...@googlegroups.com
Well said. I've set up Cascalog so that if you include entries in the "io.serializations" field of job-conf.clj (or the map provided to with-job-conf), they'll take precedence over Kryo. If included, JavaSerialization will win out for any object that implements Serializable.
--
Sam Ritchie, Twitter Inc

Sam Ritchie

unread,
Dec 7, 2011, 5:08:11 PM12/7/11
to cascal...@googlegroups.com
Here are a few examples of how this opens up the door to use a far greater number of clojure.core functions in your cascalog queries: https://gist.github.com/1444898

François Le Lay

unread,
Dec 7, 2011, 5:56:43 PM12/7/11
to cascalog-user
Wow, really cool!
As a Clojure/Cascalog n00b I was just about to ask "how can I use a
hash-map as a generator and destructure it easily for querying?".
So now I don't even have to ask ;)
Thank you!

François

PS : anyone using cascalog and reading this group out there in Paris,
France?


On Dec 7, 11:08 pm, Sam Ritchie <sritchi...@gmail.com> wrote:
> Here are a few examples of how this opens up the door to use a far greater
> number of clojure.core functions in your cascalog queries:https://gist.github.com/1444898
>
>
>
>
>
>
>
>
>
> On Wed, Dec 7, 2011 at 11:25 AM, Sam Ritchie <sritchi...@gmail.com> wrote:
> > Hey all,
>

> > I've pushed Cascalog 1.8.5-SNAPSHOT to Clojars<http://clojars.org/cascalog>.
> > Cascalog 1.8.5 adds support for Kryo <http://code.google.com/p/kryo/> serialization;


> > practically, this allows you to use clojure primitives and collections (in
> > addition to some common java primitives and collections) as fields in your

> > tuples. See the ChangeLog (located here<https://github.com/nathanmarz/cascalog/blob/develop/CHANGELOG.md>)

> > (Too brief? Here's why!http://emailcharter.org)

tomoj

unread,
Dec 10, 2011, 1:09:23 PM12/10/11
to cascal...@googlegroups.com
Is Cascalog meant now to require Clojure 1.3? Carbonite imports BigInt which causes problems in 1.2. Here is the patch I used so I could test Kryo serialization: https://gist.github.com/931f7ffaccdffbf97d8d

Getting an EOFException, here's an example: https://gist.github.com/614e469ce5161fdf40c0 . Looks just like the error I got when my Comparison implementation was broken, but I note cascading.kryo doesn't have one... must one be careful not to have Cascalog sort by a Kryo-serialized field?

Sam Ritchie

unread,
Dec 10, 2011, 7:40:28 PM12/10/11
to cascal...@googlegroups.com
Cascalog should definitely remain compatible with 1.2. Interesting about BigInt, I hadn't gotten any exceptions running my tests in Cascalog with 1.2. Did you end up patching my fork of Carbonite? Can you send me the error you were getting when you tried to run cascalog 1.8.5-SNAPSHOT with 1.2?

I think the EOFException is a problem with Cascalog's StdoutTap and MemorySourceTap; I'll open up an issue and get this fixed right up. (Your example works fine with a textline or sequencefile tap, but fails with those two.)

Thanks for this, I knew we'd have a few kinks to iron out before this worked :)

Sam

On Sat, Dec 10, 2011 at 10:09 AM, tomoj <thoma...@gmail.com> wrote:
Is Cascalog meant now to require Clojure 1.3? Carbonite imports BigInt which causes problems in 1.2. Here is the patch I used so I could test Kryo serialization: https://gist.github.com/931f7ffaccdffbf97d8d

Getting an EOFException, here's an example: https://gist.github.com/614e469ce5161fdf40c0 . Looks just like the error I got when my Comparison implementation was broken, but I note cascading.kryo doesn't have one... must one be careful not to have Cascalog sort by a Kryo-serialized field?



tomoj

unread,
Dec 10, 2011, 10:47:50 PM12/10/11
to cascalog-user
Yeah, I patched your fork. Here is the problem with Clojure 1.2.1:
https://gist.github.com/7505aa3e77cd7f28d782

Another issue: after upgrading to the snapshot, code that worked
before is now breaking with a somewhat similar error:
https://gist.github.com/9fecf1d84ebf5143c3af . This despite the fact
that the query is wrapped in (with-job-conf {"io.serializations"
"org.apache.hadoop.io.serializer.WritableSerialization,my,custom,serializations"}).
Should this disable Kryo serialization entirely?

I get the same error here when sinking to an hfs-seqfile as with
stdout, and there are no memory source taps involved.

I haven't been able to reproduce yet with an analogous simple query on
primitive types, which makes me think there is something wrong with my
custom serialization, but the code works fine in 1.8.4. Any ideas?
I'll keep trying to extract a simple reproduction or maybe try to
bisect the Cascalog history...

François Le Lay

unread,
Dec 11, 2011, 5:19:33 AM12/11/11
to cascalog-user
Hi,

I thought promotion from Long to BigInt was automatic, but it doesn't
seem to be the case, do you guys think it's normal?

(* 9000 9000 9004 9001 1405 )
=> 9223326680220000000

(class (* 9000 9000 9004 9001 1405 ))
=> java.lang.Long

(* 9000 9000 9004 9002 1405 )
=> ArithmeticException integer overflow
clojure.lang.Numbers.throwIntOverflow (Numbers.java:1374)

Also, I am getting an error when I mix datatypes on one field, which
raises the Comparator issue previously mentioned.
git://gist.github.com/1459791.git


Cheers,
François


On 11 déc, 04:47, tomoj <thomasj...@gmail.com> wrote:
> Yeah, I patched your fork. Here is the problem with Clojure 1.2.1:https://gist.github.com/7505aa3e77cd7f28d782
>
> Another issue: after upgrading to the snapshot, code that worked

> before is now breaking with a somewhat similar error:https://gist.github.com/9fecf1d84ebf5143c3af. This despite the fact


> that the query is wrapped in (with-job-conf {"io.serializations"

> "org.apache.hadoop.io.serializer.WritableSerialization,my,custom,serializat ions"}).

François Le Lay

unread,
Dec 11, 2011, 7:39:38 AM12/11/11
to cascalog-user
OK so there is definitely a difference for BigInt handling between
clojure 1.2 and 1.3.
I haven't checked the language changelog, is this behavior expected or
not?
Looks quite buggish to me...

in 1.2 :

REPL started; server listening on localhost port 12871
user=> (class (* 9000 9000 9000 9000 9000 9000 9000 9000))
java.math.BigInteger
user=> (* 9000 9000 9000 9000 9000 9000 9000 9000)
43046721000000000000000000000000
user=> 43046721000000000000000000000000
43046721000000000000000000000000
user=> (class 43046721000000000000000000000000)
java.math.BigInteger

in 1.3 :

REPL started; server listening on localhost port 7690
user=> (class (* 9000 9000 9000 9000 9000 9000 9000 9000))


ArithmeticException integer overflow
clojure.lang.Numbers.throwIntOverflow (Numbers.java:1374)

user=> (* 9000 9000 9000 9000 9000 9000 9000 9000)


ArithmeticException integer overflow
clojure.lang.Numbers.throwIntOverflow (Numbers.java:1374)

user=> 43046721000000000000000000000000
43046721000000000000000000000000N
user=> (class 43046721000000000000000000000000)
clojure.lang.BigInt

François

François Le Lay

unread,
Dec 11, 2011, 10:21:10 AM12/11/11
to cascalog-user
Sorry for the noise, I figured things out. This is expected behavior
in 1.3, based on BigInt contagion.
http://dev.clojure.org/display/doc/Documentation+for+1.3+Numerics
If we want to allow silent overflow we need to use *' instead of *.

François

Andrew Xue

unread,
Dec 15, 2011, 6:17:15 PM12/15/11
to cascalog-user
Hi -- I am seeing an clojure.lang.PersistentArrayMap cannot be cast
into java.lang.Comparable issue

here is the test case:

(def test-2-data [["a" 1]
["b" 2]
["c" 3]])

(?<- (stdout) [?map] (test-2-data ?alpha ?num)
(zipmap
[:string :numeral]
[?alpha ?num] :> ?map))

stacktrace:

cascading.CascadingException: unable to compare Tuples, likely a
CoGroup is being attempted on fields of different types or custom
comparators are incorrectly set on Fields
at
cascading.tuple.hadoop.TupleElementComparator.compare(TupleElementComparator.java:
81)
at
cascading.tuple.hadoop.TupleElementComparator.compare(TupleElementComparator.java:
33)
at
cascading.tuple.hadoop.DelegatingTupleElementComparator.compare(DelegatingTupleElementComparator.java:
74)
...
Caused by: java.lang.ClassCastException:
clojure.lang.PersistentArrayMap cannot be cast to java.lang.Comparable
at clojure.lang.Util.compare(Util.java:104)
at cascalog.hadoop.ClojureKryoSerialization
$1.compare(ClojureKryoSerialization.java:36)
at
cascading.tuple.hadoop.TupleElementComparator.compare(TupleElementComparator.java:
77)


On Dec 11, 7:21 am, François Le Lay <mfw...@gmail.com> wrote:
> Sorry for the noise, I figured things out. This is expected behavior

> in 1.3, based on BigInt contagion.http://dev.clojure.org/display/doc/Documentation+for+1.3+Numerics

Sam Ritchie

unread,
Dec 15, 2011, 6:31:19 PM12/15/11
to cascal...@googlegroups.com
That's the correct behavior; cascalog queries try to distinct everything by default, and Hadoop has no way of knowing how to sort those maps. (sort [{:key "val"} {:key "val"}]) fails too.

You can get this particular test to pass by including (:distinct false) as a predicate:

(?<- (stdout)
     [?map]
     (test-2-data ?alpha ?num)
     (zipmap [:string :numeral] [?alpha ?num] :> ?map)
     (:distinct false))

Also, note that Cascalog can't understand that you're trying to nest those dynamic vars inside of a vector -- you have to do it explicitly. With the current code you'll get this:

{:numeral "?num", :string "?alpha"}
{:numeral "?num", :string "?alpha"}
{:numeral "?num", :string "?alpha"}

With a slight change everything this will work as expected: 

(defn nest [& xs] [(vec xs)])

(?<- (stdout)
     [?map]
     (test-2-data ?alpha ?num)
     (nest ?alpha ?num :> ?vals)
     (zipmap [:string :numeral] ?vals :> ?map)
     (:distinct false))

;; produces 
;; {:numeral 2, :string "b"}
;; {:numeral 3, :string "c"}

Reply all
Reply to author
Forward
0 new messages