queries with dynamic predicates?

94 views
Skip to first unread message

toy

unread,
Mar 9, 2013, 7:46:12 PM3/9/13
to cascalog-user
I'm trying to create a function that allows me to pass in a string of
predicates to be evaluated so that adhoc queries can be run from a
browser and sent to cascalog. The predicates would be fairly simple
such as (= "john" ?name) or (> ?age 21) . I am unable to get it to
work, Im not sure if this is possible in cascalog, here is the code I
have:

;fields is a vector of fields
(defmapcatop user_data_parser
[& {:keys [line fields]}]
(let [ json (cheshire.core/parse-string line)]
(vector (map json fields))))

(defn fields_with_?s
[fields]
(vec (map #( str "?" % ) fields )))

;take a string of predicates in S-expression format and filter
(defn filter_predicates
[predicates]
(eval (read-string predicates))
)

(defn dynamic_test
[columns predicates]
(let [fields (clojure.string/split columns #"\s+")
tap (lfs-textline "/mnt/hadoop/data/users.json")
users (<- (fields_with_?s fields)
(tap ?line)
(user_data_parser :line ?line :fields fields :>>
(fields_with_?s fields))
(filter_predicates predicates)
)
]
(?- (stdout) users)
)
)

(dynamic_test "name id" "(> ?id 42)")




I am able to pass in any columns I want and have those returned, but I
can't get filter_predicates to work. Can anyone help me with this?

The error I get is:

ascading.pipe.OperatorException: [e7c3ab18-c32c-41fa-927...]
[sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)] operator Each failed executing operation
at cascading.pipe.Each$EachHandler.operate(Each.java:486)
at
cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElement.java:
94)
at
cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement.java:
82)
at cascading.pipe.Each$EachFunctionHandler$1.collect(Each.java:532)
at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
71)
at cascalog.ops.KryoInsert.operate(KryoInsert.java:34)
at cascading.pipe.Each.applyFunction(Each.java:380)
at cascading.pipe.Each.access$200(Each.java:53)
at cascading.pipe.Each$EachFunctionHandler.handle(Each.java:543)
at cascading.pipe.Each$EachHandler.operate(Each.java:478)
at
cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElement.java:
94)
at casca

RESULTS
-----------------------
-----------------------
cascading.flow.FlowException: step failed: (1/1) ...
22fx0lj6fw_cq95q9_jr0000gn/T/
temp79905635924576651181362876218433041000"]"], with job id:
job_local_0003, please see cluster logs for failure messages
(NO_SOURCE_FILE:0)
user=>
ding.flow.stack.EachMapperStackElement.collect(EachMapperStackElement.java:
82)
at cascading.pipe.Each$EachFunctionHandler$1.collect(Each.java:532)
at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
71)
at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
55)
at cascading.operation.Identity.operate(Identity.java:99)
at cascading.pipe.Each.applyFunction(Each.java:380)
at cascading.pipe.Each.access$200(Each.java:53)
at cascading.pipe.Each$EachFunctionHandler.handle(Each.java:543)
at cascading.pipe.Each$EachHandler.operate(Each.java:478)
at
cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElement.java:
94)
at
cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement.java:
82)
at cascading.pipe.Each.applyFilter(Each.java:375)
at cascading.pipe.Each.access$300(Each.java:53)
at cascading.pipe.Each$EachFilterHandler.handle(Each.java:558)
at cascading.pipe.Each$EachHandler.operate(Each.java:478)
at
cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElement.java:
94)
at
cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement.java:
82)
at cascading.pipe.Each$EachFunctionHandler$1.collect(Each.java:532)
at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
71)
at cascalog.ClojureMapcat.operate(ClojureMapcat.java:40)
at cascading.pipe.Each.applyFunction(Each.java:380)
at cascading.pipe.Each.access$200(Each.java:53)
at cascading.pipe.Each$EachFunctionHandler.handle(Each.java:543)
at cascading.pipe.Each$EachHandler.operate(Each.java:478)
at
cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElement.java:
94)
at
cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement.java:
82)
at cascading.pipe.Each$EachFunctionHandler$1.collect(Each.java:532)
at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
71)
at cascalog.ops.KryoInsert.operate(KryoInsert.java:34)
at cascading.pipe.Each.applyFunction(Each.java:380)
at cascading.pipe.Each.access$200(Each.java:53)
at cascading.pipe.Each$EachFunctionHandler.handle(Each.java:543)
at cascading.pipe.Each$EachHandler.operate(Each.java:478)
at
cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElement.java:
94)
at
cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement.java:
82)
at cascading.pipe.Each.applyFilter(Each.java:375)
at cascading.pipe.Each.access$300(Each.java:53)
at cascading.pipe.Each$EachFilterHandler.handle(Each.java:558)
at cascading.pipe.Each$EachHandler.operate(Each.java:478)
at
cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElement.java:
94)
at
cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement.java:
82)
at cascading.pipe.Each$EachFunctionHandler$1.collect(Each.java:532)
at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
71)
at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
55)
at cascading.operation.Identity.operate(Identity.java:99)
at cascading.pipe.Each.applyFunction(Each.java:380)
at cascading.pipe.Each.access$200(Each.java:53)
at cascading.pipe.Each$EachFunctionHandler.handle(Each.java:543)
at cascading.pipe.Each$EachHandler.operate(Each.java:478)
at
cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElement.java:
94)
at
cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement.java:
82)
at cascading.flow.stack.FlowMapperStack.map(FlowMapperStack.java:220)
at cascading.flow.FlowMapper.map(FlowMapper.java:75)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:358)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307)
at org.apache.hadoop.mapred.LocalJobRunner
$Job.run(LocalJobRunner.java:176)
Caused by: java.lang.RuntimeException: java.lang.Exception: Unable to
resolve symbol: ?id in this context (NO_SOURCE_FILE:0)
at
cascalog.ClojureCascadingBase.applyFunction(ClojureCascadingBase.java:
70)
at cascalog.ClojureFilter.isRemove(ClojureFilter.java:32)
at cascading.pipe.Each.applyFilter(Each.java:372)
at cascading.pipe.Each.access$300(Each.java:53)
at cascading.pipe.Each$EachFilterHandler.handle(Each.java:558)
at cascading.pipe.Each$EachHandler.operate(Each.java:478)
... 67 more
Caused by: java.lang.Exception: Unable to resolve symbol: ?id in this
context (NO_SOURCE_FILE:0)
at clojure.lang.Compiler.analyze(Compiler.java:5205)
at clojure.lang.Compiler.analyze(Compiler.java:5151)
at clojure.lang.Compiler$HostExpr$Parser.parse(Compiler.java:830)
at clojure.lang.Compiler.analyzeSeq(Compiler.java:5369)
at clojure.lang.Compiler.analyze(Compiler.java:5190)
at clojure.lang.Compiler.analyze(Compiler.java:5151)
at clojure.lang.Compiler.analyzeSeq(Compiler.java:5364)
at clojure.lang.Compiler.analyze(Compiler.java:5190)
at clojure.lang.Compiler.analyze(Compiler.java:5151)
at clojure.lang.Compiler$BodyExpr$Parser.parse(Compiler.java:4670)
at clojure.lang.Compiler$FnMethod.parse(Compiler.java:4328)
at clojure.lang.Compiler$FnExpr.parse(Compiler.java:3173)
at clojure.lang.Compiler.analyzeSeq(Compiler.java:5367)
at clojure.lang.Compiler.analyze(Compiler.java:5190)
at clojure.lang.Compiler.eval(Compiler.java:5421)
at clojure.lang.Compiler.eval(Compiler.java:5391)
at clojure.core$eval.invoke(core.clj:2382)
at interests.users$filter_predicates.invoke(users.clj:13)
at clojure.lang.Var.invoke(Var.java:365)
at clojure.lang.AFn.applyToHelper(AFn.java:161)
at clojure.lang.Var.applyTo(Var.java:482)
at
cascalog.ClojureCascadingBase.applyFunction(ClojureCascadingBase.java:
67)
... 72 more
Caused by: java.lang.Exception: Unable to resolve symbol: ?id in this
context
at clojure.lang.Compiler.resolveIn(Compiler.java:5677)
at clojure.lang.Compiler.resolve(Compiler.java:5621)
at clojure.lang.Compiler.analyzeSymbol(Compiler.java:5584)
at clojure.lang.Compiler.analyze(Compiler.java:5172)
... 93 more


Paul Lam

unread,
Mar 10, 2013, 4:39:58 AM3/10/13
to cascal...@googlegroups.com
eval wouldn't work because the query itself is generated from macros. a couple solutions. 1) make dynamic_test a macro; or 2) refactor filter_predicates so you pass in the predicate func and field name each as its own argument from dynamic_test without having to use eval. something like this:

(defn dynamic-test [vars pred field]
  (...)
  (pred field))

(dynamic-test my-vars (partial <= 42) "?id")

I am not sure about #2 though because of field. Give it a go and let us know how it goes.

toy

unread,
Mar 10, 2013, 3:57:24 PM3/10/13
to cascalog-user
Hi Paul, thank you for the tips, I surprisingly got this working as I
never worked with macros before:

(defmacro dynamic_test
[columns predicates]
`(let [fields# (clojure.string/split ~columns #"\s+")
tap# (lfs-textline "/mnt/hadoop/data/users.json")
users# (<- (fields_with_?s fields#)
(tap# ?line)
(user_data_parser :line ?line :fields fields# :>>
(fields_with_?s fields#))
~(read-string predicates)
)
]
(?- (stdout) users#)
)
)


I did play around with the 2nd method a little,but was unable to get
that to work.
> > cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement. java:
> ...
>
> read more »

toy

unread,
Mar 10, 2013, 4:29:13 PM3/10/13
to cascalog-user
One other issue I get now is when I try to define defmain to finish
packaging this up with:
(defmain DynamicTest [columns predicates]
(dynamic_test columns predicates))

I get the error :
java.lang.ClassCastException: clojure.lang.Symbol cannot be cast to
java.lang.String

Which is happening on this line:
(dynamic_test columns predicates))

How can I make it so I can call this from defmain?
I also tried to put a method "in between", but got the same error
with:

(defn test_generator
[columns predicates]
(dynamic_test columns predicates)
)

(defmain TestGenerator [columns predicates]
(test_generator columns predicates))
> > >    ...
>
> read more »

toy

unread,
Mar 11, 2013, 4:12:17 PM3/11/13
to cascalog-user
Not exactly sure why this works, but I fixed it by changing:
~(read-string predicates)
to:
(read-string ~predicates)
> > > >         at...
>
> read more »

toy

unread,
Mar 17, 2013, 5:23:05 PM3/17/13
to cascalog-user
Upon further testing, although this compiles without errors, the
results are wrong:
(read-string ~predicates)

I still get correct results if I use the other way, but then I can't
use defmain with this:
~(read-string predicates)

I'd appreciate it if anyone know how to fix this. I'm trying to be
run cascalog queries where the predicates are passed in as strings
from the command line. My query works, but then when I wrap my query
in a defmain so that it can be called from the command line, I get a
compile error:
java.lang.ClassCastException: clojure.lang.Symbol cannot be cast to
java.lang.String

I'm pretty sure I'm doing the metaprogramming incorrectly, but I'm not
sure how to fix this, any help on this would be appreciated.

Sam Ritchie

unread,
Mar 18, 2013, 2:05:08 PM3/18/13
to cascal...@googlegroups.com
Rather than use a macro, you should be using the "construct" form. This allows you to build cascalog queries out of data structures:

(construct ["?word" "?count]
                 [[src "?text"]
                  [split "?text" :> "?word"]
                  [c/count "?count"]])

That should get you out of macro land and make life easier.

March 10, 2013 12:57 PM
Hi Paul, thank you for the tips, I surprisingly got this working as I
never worked with macros before:

(defmacro dynamic_test
[columns predicates]
`(let [fields# (clojure.string/split ~columns #"\s+")
tap# (lfs-textline "/mnt/hadoop/data/users.json")
users# (<- (fields_with_?s fields#)
(tap# ?line)
(user_data_parser :line ?line :fields fields# :>>
(fields_with_?s fields#))
~(read-string predicates)
)
]
(?- (stdout) users#)
)
)


I did play around with the 2nd method a little,but was unable to get
that to work.

March 10, 2013 12:39 AM
eval wouldn't work because the query itself is generated from macros. a couple solutions. 1) make dynamic_test a macro; or 2) refactor filter_predicates so you pass in the predicate func and field name each as its own argument from dynamic_test without having to use eval. something like this:

(defn dynamic-test [vars pred field]
  (...)
  (pred field))

(dynamic-test my-vars (partial <= 42) "?id")

I am not sure about #2 though because of field. Give it a go and let us know how it goes.


On Sunday, 10 March 2013 00:46:12 UTC, toy wrote:
--
You received this message because you are subscribed to the Google Groups "cascalog-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascalog-use...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 
March 9, 2013 4:46 PM
cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement.java:
at clojure.lang.AFn.applyToHelper(AFn.java:161)
at clojure.lang.Var.applyTo(Var.java:482)
at
cascalog.ClojureCascadingBase.applyFunction(ClojureCascadingBase.java:
67)
... 72 more

Caused by: java.lang.Exception: Unable to resolve symbol: ?id in this
context
at clojure.lang.Compiler.resolveIn(Compiler.java:5677)
at clojure.lang.Compiler.resolve(Compiler.java:5621)
at clojure.lang.Compiler.analyzeSymbol(Compiler.java:5584)
at clojure.lang.Compiler.analyze(Compiler.java:5172)
... 93 more



--
Sam Ritchie, Twitter Inc
703.662.1337
@sritchie

Reply all
Reply to author
Forward
0 new messages