Cascalog + Avro + Snappy Woes

235 views
Skip to first unread message

Seth Madison

unread,
May 7, 2013, 10:56:38 AM5/7/13
to cascal...@googlegroups.com
I'm trying to get Cascalog working with a Snappy compressed Avro input file, and I'm having quite a tough time. I can run the job just fine, but none of my map tasks produce any data, even if I take away any filter/aggregation predicates and make a simple pass through. I can see from the counters that the tasks are reading data, but they simply do not write anything. I based my test off of mykidong's Avro/Cascalog example.

I'm trying to figure out where I'm losing the data (Cascading, Cascalog, Avro, Snappy etc). Here is my avro tap:

    (defn hfs-avro
    "
        in-out-path: input or output path.
        schema-path: avro schema path from the classpath.
    "
      [in-out-path schema-path & opts]
      (let [url (get-url schema-path)
            schema (-> (Schema$Parser.)
                     (.parse (.openStream url)))
            avro-scheme (AvroScheme. schema)]
    (apply tap/hfs-tap avro-scheme in-out-path opts))
      )

Here is my config function along with a stupid pass-through flow:

    (defn job-conf [compress?]
      {"mapred.compress.map.output" compress?
       "mapred.map.output.compression.codec" "org.apache.hadoop.io.compress.SnappyCodec"
       "mapred.output.compress" compress?
       "mapred.output.compression" "org.apache.hadoop.io.compress.SnappyCodec"
       })

    (defmain AvroTest [path-to-avro-data output-path]
      (conf/set-job-conf! (job-conf true)) ; snappy compression enabled.
      (let [avro-src (util/hfs-avro path-to-avro-data "/avro/myschema.avsc")]
    (?<- (hfs-textline output-path) [?session]
        (avro-src :> ?user ?session ?group)
    )
       )
      )

One potential issue is that my Avro fields are nested, but I think cascading.avro (I'm using 2.1.1) should handle that. I printed out the graph of the flow and everything looks normal. Here is a rough approximation of what I see:

    [HEAD] -> Hfs[AvroScheme] -> Each(...')[Identity[decl:..]] -> Each('...')[ClojureFilter[decl:...]] -> Each('...')[Identity[decl:...]] -> Hfs[TextLine[['line']]

Any pointers would be super helpful!

Dan Young

unread,
Dec 13, 2013, 6:18:21 PM12/13/13
to cascal...@googlegroups.com
Seth,

Did you ever get this working?  I'm interested as well in this since some of our source data is in avro now and am trying to convert some Cascading jobs to Cascalog. I have not seen success stories.... If you did, would you mind sharing how/what you did to get it working?

Regards,

Dano

Seth Madison

unread,
Dec 15, 2013, 7:40:52 PM12/15/13
to cascal...@googlegroups.com
I never did get it working, sorry.


--
You received this message because you are subscribed to a topic in the Google Groups "cascalog-user" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/cascalog-user/cS8PZkaxEI8/unsubscribe.
To unsubscribe from this group and all its topics, send an email to cascalog-use...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Reply all
Reply to author
Forward
0 new messages