IllegalArgumentException during simple copy to parquet

22 views
Skip to first unread message

Bruno Bonacci

unread,
May 19, 2016, 9:52:21 AM5/19/16
to PigPen Support

Hi,

I trying to write a simple script which encodes the data into Parquet files from TSV.

The code look like this.

 
(require '[pigpen.pig])
  (require '
[pigpen.core :as pig])
 
(require '[pigpen.parquet :as pqt])


  (spit "/tmp/test.tsv" "1\t2\t3\n4\t5\t6\n")

  (defn copy-to-parquet
    [input output]
    (->> (pig/load-tsv input)
         (pqt/store-parquet
          output
          (pqt/message "test"
                       (pqt/binary "v1")
                       (pqt/binary "v2")
                       (pqt/binary "v3")))))

  ;; this doesn'
t produce a output file
 
(copy-to-parquet "/tmp/test.tsv" "/tmp/test.pq")
 
 
(pigpen.pig/write-script
   
"my-script.pig"
   
(copy-to-parquet "/tmp/test.tsv" "/tmp/test.pq"))



So when i try to execute the generate script I get the following error:

Caused by: pigpen.PigPenException: java.lang.IllegalArgumentException: Key must be integer
Caused by: java.lang.IllegalArgumentException: Key must be integer
 at clojure
.lang.APersistentVector.invoke(APersistentVector.java:284)
 at clojure
.core$map$fn__4553.invoke(core.clj:2622)
 at clojure
.lang.LazySeq.sval(LazySeq.java:40)
 at clojure
.lang.LazySeq.seq(LazySeq.java:49)
 at clojure
.lang.RT.seq(RT.java:507)
 at clojure
.core$seq__4128.invoke(core.clj:137)
 at clojure
.core$apply.invoke(core.clj:630)
 at pigpen
.pig.runtime$eval9900$fn__9901$fn__9902.invoke(runtime.clj:266)
 at pigpen
.runtime$process__GT_bind$fn__6872$fn__6873.invoke(runtime.clj:152)
 at pigpen
.runtime$keyword_field_selector__GT_bind$fn__6858$fn__6860.invoke(runtime.clj:125)
 at pigpen
.runtime$map__GT_bind$fn__6838$fn__6839.invoke(runtime.clj:48)
 at pigpen
.runtime$process__GT_bind$fn__6872$fn__6873.invoke(runtime.clj:152)
 at pigpen
.pig.runtime$eval_udf.invoke(runtime.clj:153)
 at clojure
.lang.Var.invoke(Var.java:383)
 at pigpen
.PigPenFn.exec(PigPenFn.java:63)
 at pigpen
.PigPenFn.exec(PigPenFn.java:39)
 at org
.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc.getNext(POUserFunc.java:345)
 at org
.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc.getNextDataBag(POUserFunc.java:389)
 at org
.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.getNext(PhysicalOperator.java:310)
 at org
.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.processPlan(POForEach.java:378)
 at org
.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNextTuple(POForEach.java:298)
 at org
.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:281)
 at org
.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNextTuple(POForEach.java:242)
 at org
.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.runPipeline(PigGenericMapBase.java:282)
 at org
.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.map(PigGenericMapBase.java:277)
 at org
.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.map(PigGenericMapBase.java:64)
 at org
.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
 at org
.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
 at org
.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
 at org
.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:212)

This is how the generated script looks like:

REGISTER pigpen.jar;

load34545 = LOAD '/tmp/test.tsv'
    USING PigStorage('\n')
    AS (value:chararray);

DEFINE udf34551 pigpen.PigPenFn('(clojure.core/require (quote [pigpen.runtime]) (quote [pigpen.extensions.core]))','(clojure.core/comp (pigpen.runtime/process->bind (pigpen.runtime/pre-process :pig :native)) (pigpen.runtime/map->bind (clojure.core/fn [s] (if s (pigpen.extensions.core/structured-split s "\\t")))) (pigpen.runtime/keyword-field-selector->bind [:v1 :v2 :v3]) (pigpen.runtime/process->bind (pigpen.runtime/post-process :pig :native)))');

project34549_0 = FOREACH load34545 GENERATE
    udf34551(value) AS (value0);

project34549 = FOREACH project34549_0 GENERATE
    FLATTEN(value0) AS (v1:chararray, v2:chararray, v3:chararray);

STORE project34549 INTO '/tmp/test.pq'
    USING parquet.pig.ParquetStorer();



using Pig 0.13 (tried with 0.15 as well)

  :dependencies [[org.clojure/clojure "1.7.0"]
                 [com.netflix.pigpen/pigpen-pig "0.3.1"]
                 [com.netflix.pigpen/pigpen-parquet-pig "0.3.1"]]

  :profiles {:dev {:dependencies [[org.apache.pig/pig "0.13.0"]
                                  [org.apache.hadoop/hadoop-core "1.1.2"]]}})


Any idea where might be the issue?

Bruno

Matt Bossenbroek

unread,
May 19, 2016, 12:34:23 PM5/19/16
to PigPen Support, Bruno Bonacci
This one took me a bit to figure out because the error message was pretty unhelpful… 

The store-parquet command expects a map with keyword keys. The keywords should match the field names you want to store. The load-tsv command just returns a vector, hence the mismatch.


This updated version works:

(defn copy-to-parquet
[input output]
(->> (pig/load-tsv input)
       (pig/map (fn [[v1 v2 v3]] {:v1 v1, :v2 v2, :v3 v3}))

(pqt/store-parquet
output
(pqt/message "test"
(pqt/binary "v1")
(pqt/binary "v2")
(pqt/binary "v3")))))


I’ll add a better error message for that case as well.

-Matt

--
You received this message because you are subscribed to the Google Groups "PigPen Support" group.
To unsubscribe from this group and stop receiving emails from it, send an email to pigpen-suppor...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Bruno Bonacci

unread,
May 19, 2016, 2:41:28 PM5/19/16
to Matt Bossenbroek, PigPen Support
Thanks, that one wasn't really obvious.
It should be stated more explicitly in the doc maybe with a simple example.

one more question,
Is there a way to execute the script without having to generate it?

I mean something like packing a uberjar and the use hadoop -jar to submit the job,
similarly to Cascalog.

Bruno 

Matt Bossenbroek

unread,
May 19, 2016, 2:59:59 PM5/19/16
to Bruno Bonacci, PigPen Support
It should be stated more explicitly in the doc maybe with a simple example.

The docs do state that [1], is it unclear? I can add an example to clarify further.

"The relation prior to this command must be a map with keywords matching the parquet columns to be stored."



something like packing a uberjar and the use hadoop -jar to submit the job

Not that I’m aware of for pig. You could generate a cascading flow though [2], which would give you that style of entrypoint.

-Matt

Bruno Bonacci

unread,
May 19, 2016, 6:15:03 PM5/19/16
to PigPen Support

Sorry, my bad, I must have missed it.


Now the next issue is the following:
once the store function writes the output files the output it looks like a typical Pig/Hadoop output

/tmp/test.pq
/tmp/test.pq/._SUCCESS.crc
/tmp/test.pq/.part-m-00000.parquet.crc
/tmp/test.pq/_SUCCESS
/tmp/test.pq/part-m-00000.parquet

but If i try to load the records back 

     (pig/dump
(pqt/load-parquet
"/tmp/test.pq"

(pqt/message "test"
(pqt/binary "v1")
(pqt/binary "v2")
(pqt/binary "v3"))))


Please note that the input specified is folder produced by the store.
Like for other Pig's load/store function I would have expected the following symmetry.

 store "/tmp/data1"
 load  "/tmp/data1

however I get the following error:

InvalidInputException Input path does not exist: file:/tmp/test.pq/_SUCCESS  org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus (FileInputFormat.java:235)


If I specify the actual part-file "/tmp/test.pq/part-m-00000.parquet"  it works on the REPL.
But if i try to generate a script a get the following error:

(pigpen.pig/write-script
 "my-script.pig"
 (pig/dump (load-from-parquet "/tmp/test.pq/part-m-00000.parquet")))


AssertionError Assert failed: Query was not a pigpen query
(->> query meta keys (some #{:pig :baked}))  pigpen.oven/bake (oven.clj:366)

Do you have any idea what's wrong this time?

thanks
Bruno

Bruno Bonacci

unread,
May 23, 2016, 11:17:33 AM5/23/16
to PigPen Support
Any help/update on this?

If is not clear enough I can provide a  test script to reproduce the problem

Bruno

Matt Bossenbroek

unread,
May 23, 2016, 12:24:50 PM5/23/16
to PigPen Support, Bruno Bonacci
Sorry - this fell off my radar on Friday. Thanks for bumping the thread.

There is an asymmetry with the parquet loader because I’m actually using the hadoop code to read & write the file. Unfortunately when the hadoop code is reading its own output, it doesn’t know to ignore those extra files (no idea how this actually works on the cluster). IIRC, there was some complication when trying to get it to ignore those files, but I can’t remember what it was at the moment. I can dig into that further if you’d like.


Your immediate issue is that you’re passing the result of pig/dump (local/repl mode) to the generate script command. Drop that part & it should work fine.

(pigpen.pig/write-script
 "my-script.pig"
 (load-from-parquet "/tmp/test.pq/part-m-00000.parquet"))



-Matt

Bruno Bonacci

unread,
May 23, 2016, 12:40:26 PM5/23/16
to Matt Bossenbroek, PigPen Support
Hi Matt,

thanks for getting the time to reply.

So the pig/dump works only in the REPL?
I thought that the `pig/dump` was getting compiled to the DUMP instruction in Pig, something like:

A = LOAD 'myfile.txt' USING PigStorage('\t');

DUMP A;

Which is a valid Pig script.
I see if I can condense this thread into a couple of example to add to the doc, and I'll send a PR.

Thanks again
Bruno

Matt Bossenbroek

unread,
May 23, 2016, 1:45:35 PM5/23/16
to Bruno Bonacci, PigPen Support
The pig/dump is similar to the one in the script, but it’s meant more as an interactive command. Because it returns data (as opposed to a pigpen query), there’s no real way to convert it into a script command.

It would be possible to add this functionality, in the form of a pigpen.pig/dump-script command, but I’d like to understand more of the use case first. It seems limited because there is the REPL version.


Thanks in advance for the doc updates!

-Matt

Reply all
Reply to author
Forward
0 new messages