Worked! Thank you both so much - very very helpful. This will help my workflow tremendously.
Here's what I ended up with. Might not be the prettiest, but it works, and looks pretty to me (having written a lot of dynamic code lately)
val fields: Fields = new Fields(name)
def typedParquetSource[T] = {
new FixedPathParquetTuple(fields, inputFile) with TypedSource[T] {
override def converter[U >: T] = TupleConverter.asSuperConverter[T, U](TupleConverter.singleConverter[T])
}
}
def convertAndProcessStringField() = {
val input = TypedPipe.from(typedParquetSource[String])
processStringField(input).writeExecution(TypedTsv(fileName))
}
def convertAndProcessIntField() = {
val input = TypedPipe.from(typedParquetSource[Int])
processIntField(input).writeExecution(TypedTsv(fileName))
}
(cd.getType, schema.getType(cd.getPath:_*).getOriginalType) match {
case (PrimitiveTypeName.INT32, OriginalType.UINT_16) => //"INT32-U16"
convertAndProcessIntField()
case (PrimitiveTypeName.BINARY, OriginalType.UTF8) => //"BINARY-UTF8"
convertAndProcessStringField()
...etc...