issue with mutli input

22 views
Skip to first unread message

elba...@gmail.com

unread,
Jul 10, 2013, 1:17:22 PM7/10/13
to lemur...@googlegroups.com
I am trying to run some multi step job using lemur+clojure.

I have issue with passing multiple input as argument to clojure+lemur.

As first step for my job I trying to run emr Streaming Job

lemur run ${CONF_DIR}/run-pipeline.clj --master-instance-type ${MASTER_INSTANCE_TYPE} --slave-instance-type ${SLAVE_INSTANCE_TYPE} --num-instances ${NUM_INSTANCES} --ami-version ${AMI_VERSION} --hadoop-version ${HADOOP_VERSION}--bucket ${BUCKET} --jar-src-path ${CONF_DIR}/run-pipeline.clj --input_folder "${input_folder}" --output-folder "${output_folder}" --reduce-tasks "${REDUCE_TASKS}" --map-tasks "${MAP_TASKS}"

with single input file my code looks like this

(import com.amazonaws.services.elasticmapreduce.util.StepFactory)
(import com.amazonaws.services.elasticmapreduce.model.StepConfig)
(import com.amazonaws.services.elasticmapreduce.util.StreamingStep)


(defn create-step-parsing [eopts]
(def step (new StreamingStep))
(.withInputs step (into-array [(str (:input-folder eopts) "/inputs/*")]))
...

This works fine, but when I try to pass list of input files I get error

lemur run ${CONF_DIR}/run-pipeline.clj --master-instance-type ${MASTER_INSTANCE_TYPE} --slave-instance-type ${SLAVE_INSTANCE_TYPE} --num-instances ${NUM_INSTANCES} --ami-version ${AMI_VERSION} --hadoop-version ${HADOOP_VERSION}--bucket ${BUCKET} --jar-src-path ${CONF_DIR}/run-pipeline.clj --input_folder "${input_folder1}" --input_folder "${input_folder2}" --input_folder "${input_folder3}" --input_folder "${input_folder}" --output-folder "${output_folder}" --reduce-tasks "${REDUCE_TASKS}" --map-tasks "${MAP_TASKS}"

(defn create-normalizer-step [eopts]
(def step (new StreamingStep))
(.withInputs step (to-array (:input-folder eopts)))

Here is error I am getting

15:44:05 Exception in thread "main" java.lang.ClassCastException
15:44:05 at java.lang.Class.cast(Class.java:2990)
15:44:05 at clojure.lang.Reflector.boxArg(Reflector.java:429)
15:44:05 at clojure.lang.Reflector.boxArgs(Reflector.java:462)
15:44:05 at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:57)
15:44:05 at clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:30)
15:44:05 at run_pipeline17$create_normalizer_step.invoke(run-pipeline.clj:18)
15:44:05 at run_pipeline17$run_pipeline.invoke(run-pipeline.clj:209)
15:44:05 at lemur.core$fire_BANG_.doInvoke(core.clj:711)
15:44:05 at clojure.lang.RestFn.invoke(RestFn.java:423)
15:44:05 at run_pipeline17$eval178.invoke(run-pipeline.clj:222)
15:44:05 at clojure.lang.Compiler.eval(Compiler.java:6465)
15:44:05 at clojure.lang.Compiler.load(Compiler.java:6902)
15:44:05 at clojure.lang.Compiler.loadFile(Compiler.java:6863)
15:44:05 at clojure.lang.RT$3.invoke(RT.java:305)
15:44:05 at lemur.core$execute_jobdef.invoke(core.clj:742)
15:44:05 at lemur.core$_main$fn__1388.invoke(core.clj:929)
15:44:05 at lemur.core$_main.doInvoke(core.clj:924)
15:44:05 at clojure.lang.RestFn.applyTo(RestFn.java:137)
15:44:05 at lemur.core.main(Unknown Source)

The code I added is from line 17 to line 19.

Thanks

Marc Limotte

unread,
Jul 10, 2013, 2:43:49 PM7/10/13
to lemur...@googlegroups.com
Hi.

I think your first problem is using to-array in create-normalizer-step.  to-array returns an Object[].  You want a String[], so use into-array instead. And to be explicit about the type (into-array String (:input-folder eopts)).

Another issue is that you're referencing :input-folder (with a hyphen), but your command line specifies --input_folder (with an underscore).  But more importantly, your command line lists --input_folder multiple times.  lemur does not support that style of command line argument.  Instead, a common solution is to use a comma separated list.  E.g. --input-folders "/tmp/dir1,/tmp/dir2".

Also, I'd recommend putting most of those command line args in the defcluster in your jobdef (--master-instance-type ${MASTER_INSTANCE_TYPE} --slave-instance-type ${SLAVE_INSTANCE_TYPE} --num-instances ${NUM_INSTANCES} --ami-version ${AMI_VERSION} --hadoop-version ${HADOOP_VERSION}--bucket ${BUCKET} --jar-src-path ${CONF_DIR}/run-pipeline.clj).  You can have your defcluster reference environment variables.  E.g.

(defcluster pipeline-cluster
  :master-instance-type ${ENV.MASTER_INSTANCE_TYPE}
  …)

One final suggestion; rather than using def which is global, which seems unnecessary.  You might also want to use the convenience function lfn.  Here's a suggested (but untested) revised function:

(require '[clojure.string :as s])
(lfn create-normalizer-step [input-folders]
  (doto (StreamingStep.)
    (.withInputs (into-array String (s/split input-folders #",')))))

marc




--
You received this message because you are subscribed to the Google Groups "Lemur User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to lemur-user+...@googlegroups.com.
To post to this group, send email to lemur...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/lemur-user/2d043772-b8e0-4fa0-a443-2d15e2885ce9%40googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.



elba...@gmail.com

unread,
Jul 18, 2013, 5:51:45 PM7/18/13
to lemur...@googlegroups.com, elba...@gmail.com
Thanks let me try, will post update
Reply all
Reply to author
Forward
0 new messages