Kite dataset writer - renaming of tmp files not happening

13 views
Skip to first unread message

Damian Smith

unread,
Jun 30, 2016, 3:36:35 PM6/30/16
to CDK Development

I am attempting to use the Kite DatasetWriter by following the KafkaSink pattern detailed in the following link. The purpose of this is to have the executors writing to disk rather than having it write from the edge node.

http://allegro.tech/2015/08/spark-kafka-integration.html


My issue is that when I call the writer to write, it creates a temporary file ( with a “." prefix and a .tmp extension ) -  but this is never renamed to remove the “." and the tmp extension to make it visible in the hive table. If I rename it manually after I run the spark application, the row becomes visible in the hive table. If I make the RDD only contain one record and call close() immediately after the write, the record is visible. 


But when I attempt to call write and later call close – the rename never happens.


This is what the code looks like, this first part is the Kite DataSet Writer modeled after the Kafka Sink pattern referenced above:

class KiteDataSetWriter(createWriter: () => DatasetWriter[GenericRecord]) extends Serializable {

  lazy val writer = createWriter()

  def send(record: org.apache.avro.generic.GenericRecord) = {

    writer.write(record.asInstanceOf[org.apache.avro.generic.GenericRecord])

  }

  def close() = {

    writer.close()

  }

}



object KiteDataSetWriter {

  def apply(dataSetUri: String): KiteDataSetWriter = {

    val f = () => {

      val dataset = Datasets.load[View[GenericRecord]]("dataset:hive:damian/claimsubmitted8")

      val writer = dataset.newWriter().asInstanceOf[DatasetWriter[GenericRecord]]

      writer

    }

    new KiteDataSetWriter(f)

  }

}


In the main body of the spark application - I want each executor to have it’s own writer so I am instantiating the writer as a Broadcast variable.

val kiteWriter = sc.broadcast(KiteDataSetWriter(dataSetUri))

Then we call the send method for each record in the RDD before calling close at the end.

try {

  genRecRDD.foreach(record => kiteWriter.value.send(record))

}

finally{

  kiteWriter.value.close()

}

How can I change this to ensure that records show up in the hive table?

Micah Whitacre

unread,
Jul 1, 2016, 8:55:05 AM7/1/16
to CDK Development
Just as an FYI there is an example here[1] of integrating Spark with Kite.  It uses the input format as the termination point of part of your processing graph which it seems like you are trying to avoid but is an option.

As far as renaming, this happens when the writer gets closed[2]. Do you get a DatasetOperationException like the code shows when the rename fails?  If not then I wonder if the writers are actually being closed.  Perhaps something with you you are serializing the writers as broadcast variables you are creating new writers or the code is being initialized distinctly such that each writer instance in the executors are getting distinct Appenders[3] and therefore those never get closed.

Damian Smith

unread,
Jul 1, 2016, 10:34:35 AM7/1/16
to Micah Whitacre, CDK Development
Yeah I think you're right. If I make my RDD have one record and then call close in the same function as the write- immediately afterwards, then the record shows up in the hive table - the rename happens. It's does seem that it is a problem of them never closing.

I don't actually get an exception with the code like this- but I added a close to a shutdown hook and when I did that I got a dataset io exception.


Sent from my iPhone
--
You received this message because you are subscribed to the Google Groups "CDK Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cdk-dev+u...@cloudera.org.
For more options, visit https://groups.google.com/a/cloudera.org/d/optout.
Reply all
Reply to author
Forward
0 new messages