[Scalding] read arbitrary parquet columns with TypedParquet

388 views
Skip to first unread message

Tristan Reid

unread,
Oct 6, 2015, 6:01:00 PM10/6/15
to cascading-user
I'm trying to process parquet columns conditionally on their type.  I can get the type from the footer, but how do I read an arbitrary column with TypedParquet?  I'm using TypedParquet because I'm using ExecutionApp and don't have the implicit flowDef required for the read method of ParquetTupleSource. 

I'm at the point of trying to understand how the parquet.tuple.macros.Macros work (they convert case classes and ReadSupport classes to SchemaImplementations), but before I get too deep I'm wondering if this is a good way to pursue this, or should I be trying to work around the flowDef thing instead of using TypedParquet?

So in summary:
 -Should be TypedParquet with ExecutionApp, or skip it and make up a flowDef for ParquetTupleSource.read?
 -If I do use TypedParquet
 --- do I need to dynamically create the case class (since the name & type will be dynamic)?  
 --- is there a way to achieve predicate pushdown without faking a FilterPredicate no-op?

Thanks in advance

Tristan 


Oscar Boykin

unread,
Oct 6, 2015, 8:24:59 PM10/6/15
to cascadi...@googlegroups.com
see this example in the tests:


I don't know what you mean about predicate pushdown by faking a FilterPredicate. To get predicate pushdown you need to create a real predicate, right?

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/cascading-user/98b6843c-959d-465f-b8d7-215cc9ec7cb6%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Oscar Boykin :: @posco :: http://twitter.com/posco
Message has been deleted

Tristan Reid

unread,
Oct 6, 2015, 10:50:27 PM10/6/15
to cascading-user
Ah - so sorry, I misspoke.  I'm looking for column projection pushdown.  I misunderstood a thread discussing FilterPredicates that I needed them to achieve only-reading those columns.  I believe in order to get the project I only need to provide a case class with the name:type of the fields.  So if I want to dynamically specify the projected field, do I need to dynamically create a case class, in the sense that parquet.tuple.macros.Macros does?


Something like this...all of this works if I statically provide the case class, it's the line indicated below that's giving me trouble.  :(

        val readFooter = ParquetFileReader.readFooter(configuration, inputPath, ParquetMetadataConverter.NO_FILTER)
        val schema = readFooter.getFileMetaData.getSchema
        val execs = schema.getColumns.map { cd:ColumnDescriptor =>
          val name = cd.getPath.last
          def processInt() = {
            val col = intColumn(name)
            // THIS IS THE PART I'M TRYING TO FIGURE OUT, makeIntClass and extractIntProperty
            val input = TypedParquet[makeIntCaseClass(name)](inputFile).map { extractIntProperty(_,name)} 
            processIntField(input).writeExecution(TypedTsv(outputFile + "_" + name))
          }
          (cd.getType, schema.getType(cd.getPath:_*).getOriginalType) match {
            case (PrimitiveTypeName.INT32, OriginalType.INT_16) => //"INT32-16"
                processInt()

Ian O'Connell

unread,
Oct 6, 2015, 11:39:24 PM10/6/15
to cascadi...@googlegroups.com
if you have the types/names combo at compile time you could use the macros based one, but I think you just want that parquet tuple source. You shouldn't be calling read in any typed job on  a source directly. Do a TypedPipe.from(TupleSource) you'll need to provide some tupleconverter to extract from a cascading tuple into some form of typed thing... but you could use a List[Any] as your type i imagine

Tristan Reid

unread,
Oct 7, 2015, 8:42:37 PM10/7/15
to cascading-user
Worked!  Thank you both so much - very very helpful.  This will help my workflow tremendously.

Here's what I ended up with.  Might not be the prettiest, but it works, and looks pretty to me (having written a lot of dynamic code lately)

    val fields: Fields = new Fields(name)
    def typedParquetSource[T] = {
      new FixedPathParquetTuple(fields, inputFile) with TypedSource[T] {
        override def converter[U >: T] = TupleConverter.asSuperConverter[T, U](TupleConverter.singleConverter[T])
      }
    }
    def convertAndProcessStringField() = {
      val input = TypedPipe.from(typedParquetSource[String])
      processStringField(input).writeExecution(TypedTsv(fileName))
    }
    def convertAndProcessIntField() = {
      val input = TypedPipe.from(typedParquetSource[Int])
      processIntField(input).writeExecution(TypedTsv(fileName))
    }
    (cd.getType, schema.getType(cd.getPath:_*).getOriginalType) match {
      case (PrimitiveTypeName.INT32, OriginalType.UINT_16) => //"INT32-U16"
          convertAndProcessIntField()
      case (PrimitiveTypeName.BINARY, OriginalType.UTF8) => //"BINARY-UTF8"
        convertAndProcessStringField()
      ...etc... 
Reply all
Reply to author
Forward
0 new messages