Cascalog 2.0 Feature Walkthrough, Part 1

1,035 views
Skip to first unread message

Sam Ritchie

unread,
Aug 4, 2013, 4:56:40 PM8/4/13
to cascalog-user, cascading-user
Hey everyone,

I'm very excited to announce that I've merged the Cascalog 2 branch! Special thanks to ipostelnik, quantisan and Soren Macbeth for their help on this release.

As we push toward the first release candidate, I wanted to give you all a glimpse of some of the upcoming features. There's honestly too much to go over in one email, so treat this as the first in a series. In this email, we'll talk about:

* def*ops are now just normal functions
* Anonymous function support
* Anonymous aggregators
* first-class higher order functions
* "prepared" functions with HadoopFlowProcess acccess

If you want to follow along, go ahead and clone the repo, cd into the "cascalog-core" subdirectory and run "lein repl". To try this code out in other projects, run "lein sub install" in the root directory. This will install

[cascalog/cascalog-core "2.0.0-SNAPSHOT"]

Locally.

Okay, here goes:

Easy def*op Testing!

Functions defined with any of the "def*op" macros are now just normal functions, making it MUCH easier to write tests against operations, or to use them outside of Cascalog:

(defmapop square [x] (* x x))
(square 10)
;;=> 100

(Incidentally, I've deprecated all of the "def*op" macros in favor of "def*fn". "defmapop" becomes "defmapfn", and so on and so forth. All of the old macros will continue working, but you'll get a deprecation notice when the old forms are evaluated.)

Functions, not just Vars

First, some setup:

(use 'cascalog.api)

(def src [1 2 3 4 5])


(defn square [x] (* x x))

It used to be the case that functions needed to be passed in to Cascalog queries as vars:


(defn my-query [op]
  (??<- [!x !y]
        (src !x)
        (op !x :> !y)))

(my-query #'square)
;;=> ([1 1] [2 4] [3 9] [4 16] [5 25])

Now, the bare function works great:

(my-query square)

Anonymous Functions

Cascalog 2.0 supports anonymous functions, thanks to Nathan's strong work on a serializable function. Here's an example of an old-style query for squaring numbers:

(defn square [x]
    (* x x))

(??<- [!x !squared]
      (src !x)
      (square !x :> !squared))
;;=> ([1 1] [2 4] [3 9] [4 16] [5 25])

It's now possible to in-line this using cascalog.api/mapfn:

(??<- [!x !squared]
      (src !x)
      ((mapfn [x] (* x x)) !x :> !squared))

Boom. "mapfn", "filterfn" and "mapcatfn" are the anonymous alternatives to, respectively, "defmapop", "deffilterop" and "defmapcatop".

Anonymous Aggregators

You can also in-line aggregators:

(def pairs
  [[1 1] [1 2] [1 3] [2 4] [2 5]])

(let [sum (aggregatefn
           ([] 0)
           ([acc y] (+ acc y))
           ([x] [x]))]
  (??<- [?x ?sum]
        (sum ?y :> ?sum)
        (pairs ?x ?y)))
;;=> ([1 6] [2 9])

"sum" here is created using "aggregatefn", the in-line alternative to "defaggregateop". "bufferfn" and "bufferiterfn" mirror "defbufferop" and "defbufferiterop", respectively.

You can also turn any normal two-argument function into a parallel aggregator with "parallelagg". The definition of a map-side optimized "sum" operation is now as easy as:

(??<- [?x ?sum]
      ((parallelagg +) ?y :> ?sum)
      (pairs ?x ?y))
;;=> ([1 6] [2 9])

Easy Higher Order Functions

One result of the new anonymous function syntax is that  higher-order function definitions become easy. The old syntax required you to use an extra vector around the name, like this:

(defmapop [times [x]]
  [y]
  (* x y))

And then in-line the higher-order parameter:

(??<- [!x !y]
      (src !x)
      (times [4] !x :> !y))
;;=> ([1 4] [2 8] [3 12] [4 16] [5 20])

In this new, beautiful world, you can accomplish the same goal by writing a normal clojure function that returns an anonymous Cascalog function (described above):

(defn times [x]
  (mapfn [y] (* x y)))

(let [times-four (times 4)]
  (??<- [!x !y]
        (src !x)
        (times-four !x :> !y)))
;;=> ([1 4] [2 8] [3 12] [4 16] [5 20])

So GOOD! Now you can pass these bad boys around as first class objects, just like any other clojure function.

Prepared Functions

Cascalog 2.0's "prepfn" and "defprepfn" makes it easy to get access to the FlowProcess and ConcreteCall instances provided by Cascading. This lets you increment counters and get access to the jobconf within your operations. Here's an example of how to use prepfn:

(import 'cascading.flow.hadoop.HadoopFlowProcess)
(import 'cascading.operation.ConcreteCall)

(defprepfn times-with-path
  [^HadoopFlowProcess a ^ConcreteCall b]
  (mapfn [y] [(* x y) (-> (.getConfigCopy a)
                          (.get "mapred.input.dir"))]))

"defprepfn" is essentially a higher-order function of two parameters that Cascading calls after the Hadoop job begins. Cascading will pass in the HadoopFlowProcess and ConcreteCall instances, and use the return value as the operation:

(??<- [!x !y !conf]
      (src !x)
      (times-with-path !x :> !y !conf))
;;=> ([1 2 "file:/211a1120-fb5e-4d10-aa9e-25227fd95935"] [2 4 "file:/211a1120-fb5e-4d10-aa9e-25227fd95935"] [3 6 "file:/211a1120-fb5e-4d10-aa9e-25227fd95935"] [4 8 "file:/211a1120-fb5e-4d10-aa9e-25227fd95935"] [5 10 "file:/211a1120-fb5e-4d10-aa9e-25227fd95935"])

You can also use the higher-order function trick described above to parametrize prepared functions created with "prepfn":

(defn times-with-path [x]
  (prepfn [^HadoopFlowProcess a ^ConcreteCall b]
          (mapfn [y] [(* x y) (-> (.getConfigCopy a)
                                  (.get "mapred.input.dir"))])))

(??<- [!x !y !conf]
      (src !x)
      ((times-with-path 2) !x :> !y !conf))

Finally, if you need to perform some sort of cleanup -- say, closing a connection to an external database, or incrementing some final counter, just return a map from the body of your prepfn. The value under :operate will be used as your operation, and the function stored under :cleanup will be called once at the end of the function's run:

(defn times-with-path [x]
  (prepfn [^HadoopFlowProcess a ^ConcreteCall b]
          {:operate (mapfn [y] [(* x y) (-> (.getConfigCopy a)
                                            (.get "mapred.input.dir"))])
           :cleanup (mapfn [] (println "FINISHED!"))}))

Conclusion

That's it for now, and we haven't even covered the new, standalone Cascading DSL! I'd love to hear feedback from you all on this new work. Are the APIs clear? Can we change anything to make this new work easier to use? Please reply and let me know what you think.

Thanks,
--
Sam Ritchie, Twitter Inc
703.662.1337
@sritchie


--
Sam Ritchie, Twitter Inc
703.662.1337
@sritchie

Paco Nathan

unread,
Aug 4, 2013, 5:00:48 PM8/4/13
to cascal...@googlegroups.com
Many many thanks, Sam, for all your work!

This is wonderful. So much progress and velocity on Cascalog, that it's become difficult to keep up :) That's a great problem to have


--
You received this message because you are subscribed to the Google Groups "cascalog-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascalog-use...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

Bruce Durling

unread,
Aug 4, 2013, 5:20:02 PM8/4/13
to cascal...@googlegroups.com
Sam,

This is great news and I'm really excited to see the next parts of
your overview!

cheers,
Bruce
--
@otfrom | CTO & co-founder @MastodonC | mastodonc.com
See recent coverage of us in the Economist http://econ.st/WeTd2i and
the Financial Times http://on.ft.com/T154BA

Robin Kraft

unread,
Aug 4, 2013, 9:42:48 PM8/4/13
to cascal...@googlegroups.com
+1 this is great stuff!

Kang Tu

unread,
Aug 10, 2013, 7:00:51 PM8/10/13
to cascal...@googlegroups.com, cascading-user
Wonderful!!! Thanks for the great work! Espectially for replacing def*op with defn since that make debugging much easier. Anonymous function is also very cool, saving xxxx LOC.

Sam, when the 2.0 doc would be available?

Sam Ritchie

unread,
Aug 11, 2013, 6:03:17 PM8/11/13
to cascal...@googlegroups.com, cascading-user
Hey Kang,

You can generate it now by running "lein doc" in the "cascalog-core" directory. I'm working on figuring out how to generate the documentation for all modules in the root project using "lein sub"... if you can get it to work, I'd love a pull request. Otherwise I'll keep trying to figure it out.

August 10, 2013 5:00 PM
Wonderful!!! Thanks for the great work! Espectially for replacing def*op with defn since that make debugging much easier. Anonymous function is also very cool, saving xxxx LOC.

Sam, when the 2.0 doc would be available?

On Sunday, August 4, 2013 1:56:40 PM UTC-7, Sam Ritchie wrote:
--
You received this message because you are subscribed to the Google Groups "cascalog-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascalog-use...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
August 4, 2013 2:56 PM
August 4, 2013 2:56 PM

Kang Tu

unread,
Aug 11, 2013, 11:37:01 PM8/11/13
to cascal...@googlegroups.com, cascading-user
Hi Sam,

Just made the "doc" generated at each sub project directory with lein sub doc. (https://github.com/tninja/cascalog)

Is it what you mean?

Kang


--
You received this message because you are subscribed to a topic in the Google Groups "cascalog-user" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/cascalog-user/F8EkFM7HiE0/unsubscribe.
To unsubscribe from this group and all its topics, send an email to cascalog-use...@googlegroups.com.
postbox-contact.jpg
compose-unknown-contact.jpg
postbox-contact.jpg

Sam Ritchie

unread,
Aug 11, 2013, 11:52:55 PM8/11/13
to cascal...@googlegroups.com, cascading-user
Nice! Can you send a pull request?

August 11, 2013 9:37 PM
Hi Sam,

Just made the "doc" generated at each sub project directory with lein sub doc. (https://github.com/tninja/cascalog)

Is it what you mean?

Kang



--
You received this message because you are subscribed to the Google Groups "cascalog-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascalog-use...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
August 11, 2013 4:03 PM
Hey Kang,

You can generate it now by running "lein doc" in the "cascalog-core" directory. I'm working on figuring out how to generate the documentation for all modules in the root project using "lein sub"... if you can get it to work, I'd love a pull request. Otherwise I'll keep trying to figure it out.


Reply all
Reply to author
Forward
0 new messages