I'm trying to get Cascalog working with a Snappy compressed Avro input file, and I'm having quite a tough time. I can run the job just fine, but none of my map tasks produce any data, even if I take away any filter/aggregation predicates and make a simple pass through. I can see from the counters that the tasks are reading data, but they simply do not write anything. I based my test off of mykidong's Avro/Cascalog example.
I'm trying to figure out where I'm losing the data (Cascading, Cascalog, Avro, Snappy etc). Here is my avro tap:
(defn hfs-avro
"
in-out-path: input or output path.
schema-path: avro schema path from the classpath.
"
[in-out-path schema-path & opts]
(let [url (get-url schema-path)
schema (-> (Schema$Parser.)
(.parse (.openStream url)))
avro-scheme (AvroScheme. schema)]
(apply tap/hfs-tap avro-scheme in-out-path opts))
)
Here is my config function along with a stupid pass-through flow:
(defn job-conf [compress?]
{"mapred.compress.map.output" compress?
"mapred.map.output.compression.codec" "org.apache.hadoop.io.compress.SnappyCodec"
"mapred.output.compress" compress?
"mapred.output.compression" "org.apache.hadoop.io.compress.SnappyCodec"
})
(defmain AvroTest [path-to-avro-data output-path]
(conf/set-job-conf! (job-conf true)) ; snappy compression enabled.
(let [avro-src (util/hfs-avro path-to-avro-data "/avro/myschema.avsc")]
(?<- (hfs-textline output-path) [?session]
(avro-src :> ?user ?session ?group)
)
)
)
One potential issue is that my Avro fields are nested, but I think cascading.avro (I'm using 2.1.1) should handle that. I printed out the graph of the flow and everything looks normal. Here is a rough approximation of what I see:
[HEAD] -> Hfs[AvroScheme] -> Each(...')[Identity[decl:..]] -> Each('...')[ClojureFilter[decl:...]] -> Each('...')[Identity[decl:...]] -> Hfs[TextLine[['line']]
Any pointers would be super helpful!