Hey everyone,
I'm very excited to announce that I've merged the Cascalog 2 branch!
Special thanks to ipostelnik, quantisan and Soren Macbeth for their help
on this release.
As we push toward the first release candidate, I wanted to give you all a
glimpse of some of the upcoming features. There's honestly too much to
go over in one email, so treat this as the first in a series. In this
email, we'll talk about:
* def*ops are now just normal functions
* Anonymous function support
* Anonymous aggregators
* first-class higher order functions
* "prepared" functions with HadoopFlowProcess acccess
If you want to follow along, go ahead and clone the repo, cd into the
"cascalog-core" subdirectory and run "lein repl". To try this code out
in other projects, run "lein sub install" in the root directory. This
will install
[cascalog/cascalog-core "2.0.0-SNAPSHOT"]
Locally.
Okay, here goes:
Easy def*op Testing!
Functions
defined with any of the "def*op" macros are now just normal functions,
making it MUCH easier to write tests against operations, or to use them
outside of Cascalog:
(defmapop square [x] (* x x))
(square 10)
;;=> 100
(Incidentally, I've deprecated all of the "def*op" macros in
favor of "def*fn". "defmapop" becomes "defmapfn", and so on and so
forth. All of the old macros will continue working, but you'll get a
deprecation notice when the old forms are evaluated.)
Functions, not just Vars
First, some setup:
(use 'cascalog.api)
(def src [1 2 3 4 5])
(defn square [x] (* x x))
It used to be the case that functions needed to be passed in to
Cascalog queries as vars:
(defn my-query [op]
(??<- [!x !y]
(src !x)
(op !x :> !y)))
(my-query #'square)
;;=> ([1 1] [2 4] [3 9] [4
16] [5 25])
Now, the bare function works great:
(my-query square)
Anonymous Functions
Cascalog 2.0 supports
anonymous functions, thanks to Nathan's strong work on a serializable
function. Here's an example of an old-style query for squaring numbers:
(defn square [x]
(* x x))
(??<- [!x !squared]
(src !x)
(square !x :>
!squared))
;;=> ([1 1] [2 4] [3
9] [4 16] [5 25])
It's now possible to in-line this using cascalog.api/mapfn:
(??<- [!x !squared]
(src !x)
((mapfn [x] (* x x)) !x
:> !squared))
Boom. "mapfn", "filterfn" and "mapcatfn" are the anonymous alternatives
to, respectively, "defmapop", "deffilterop" and "defmapcatop".
Anonymous Aggregators
You can also in-line
aggregators:
(def pairs
[[1 1] [1 2] [1 3] [2 4] [2
5]])
(let [sum (aggregatefn
([] 0)
([acc y] (+ acc y))
([x] [x]))]
(??<- [?x ?sum]
(sum ?y :> ?sum)
(pairs ?x ?y)))
;;=> ([1 6] [2 9])
"sum"
here is created using "aggregatefn", the in-line alternative to
"defaggregateop". "bufferfn" and "bufferiterfn" mirror "defbufferop" and
"defbufferiterop", respectively.
You can also turn any normal two-argument function into a parallel
aggregator with "parallelagg". The definition of a map-side optimized
"sum" operation is now as easy as:
(??<- [?x ?sum]
((parallelagg +) ?y :>
?sum)
(pairs ?x ?y))
;;=> ([1 6] [2 9])
Easy Higher Order Functions
One result of the new anonymous function syntax is that higher-order
function definitions become easy. The old syntax required you to use an
extra vector around the name, like this:
(defmapop [times [x]]
[y]
(* x y))
And then in-line the higher-order parameter:
(??<- [!x !y]
(src !x)
(times [4] !x :> !y))
;;=> ([1 4] [2 8] [3 12] [4
16] [5 20])
In this new, beautiful world, you can accomplish the same goal by
writing a normal clojure function that returns an anonymous Cascalog
function (described above):
(defn times [x]
(mapfn [y] (* x y)))
(let [times-four (times 4)]
(??<- [!x !y]
(src !x)
(times-four !x :>
!y)))
;;=> ([1 4] [2
8] [3 12] [4 16] [5 20])
So GOOD! Now you can pass these bad boys around as first class objects,
just like any other clojure function.
Prepared Functions
Cascalog 2.0's "prepfn" and "defprepfn" makes it easy to get access to
the FlowProcess and ConcreteCall instances provided by Cascading. This
lets you increment counters and get access to the jobconf within your
operations. Here's an example of how to use prepfn:
(import
'cascading.flow.hadoop.HadoopFlowProcess)
(import
'cascading.operation.ConcreteCall)
(defprepfn times-with-path
[^HadoopFlowProcess a
^ConcreteCall b]
(mapfn [y] [(* x y) (->
(.getConfigCopy a)
(.get "mapred.input.dir"))]))
"defprepfn" is essentially a higher-order function of two parameters
that Cascading calls after the Hadoop job begins. Cascading will pass in
the HadoopFlowProcess and ConcreteCall instances, and use the return
value as the operation:
(??<- [!x !y !conf]
(src !x)
(times-with-path !x
:> !y !conf))
;;=> ([1 2
"file:/211a1120-fb5e-4d10-aa9e-25227fd95935"]
[2 4
"file:/211a1120-fb5e-4d10-aa9e-25227fd95935"]
[3 6
"file:/211a1120-fb5e-4d10-aa9e-25227fd95935"]
[4 8
"file:/211a1120-fb5e-4d10-aa9e-25227fd95935"]
[5 10
"file:/211a1120-fb5e-4d10-aa9e-25227fd95935"])
You can also use the higher-order function trick described above to
parametrize prepared functions created with "prepfn":
(defn times-with-path [x]
(prepfn [^HadoopFlowProcess a
^ConcreteCall b]
(mapfn [y] [(* x y)
(-> (.getConfigCopy a)
(.get "mapred.input.dir"))])))
(??<- [!x !y !conf]
(src !x)
((times-with-path 2) !x
:> !y !conf))
Finally, if you need to perform some sort of cleanup -- say, closing a
connection to an external database, or incrementing some final counter,
just return a map from the body of your prepfn. The value under :operate
will be used as your operation, and the function stored under :cleanup
will be called once at the end of the function's run:
(defn times-with-path [x]
(prepfn [^HadoopFlowProcess a
^ConcreteCall b]
{:operate (mapfn [y]
[(* x y) (-> (.getConfigCopy a)
(.get "mapred.input.dir"))])
:cleanup (mapfn []
(println "FINISHED!"))}))
Conclusion
That's it for now, and we haven't even covered the new, standalone
Cascading DSL! I'd love to hear feedback from you all on this new work.
Are the APIs clear? Can we change anything to make this new work easier
to use? Please reply and let me know what you think.
Thanks,