I am attempting to use the Kite DatasetWriter by following the KafkaSink pattern detailed in the following link. The purpose of this is to have the executors writing to disk rather than having it write from the edge node.
http://allegro.tech/2015/08/spark-kafka-integration.html
My issue is that when I call the writer to write, it creates a temporary file ( with a “." prefix and a .tmp extension ) - but this is never renamed to remove the “." and the tmp extension to make it visible in the hive table. If I rename it manually after I run the spark application, the row becomes visible in the hive table. If I make the RDD only contain one record and call close() immediately after the write, the record is visible.
But when I attempt to call write and later call close – the rename never happens.
This is what the code looks like, this first part is the Kite DataSet Writer modeled after the Kafka Sink pattern referenced above:
class KiteDataSetWriter(createWriter: () => DatasetWriter[GenericRecord]) extends Serializable {
lazy val writer = createWriter()
def send(record: org.apache.avro.generic.GenericRecord) = {
writer.write(record.asInstanceOf[org.apache.avro.generic.GenericRecord])
}
def close() = {
writer.close()
}
}
object KiteDataSetWriter {
def apply(dataSetUri: String): KiteDataSetWriter = {
val f = () => {
val dataset = Datasets.load[View[GenericRecord]]("dataset:hive:damian/claimsubmitted8")
val writer = dataset.newWriter().asInstanceOf[DatasetWriter[GenericRecord]]
writer
}
new KiteDataSetWriter(f)
}
}
In the main body of the spark application - I want each executor to have it’s own writer so I am instantiating the writer as a Broadcast variable.
val kiteWriter = sc.broadcast(KiteDataSetWriter(dataSetUri))
Then we call the send method for each record in the RDD before calling close at the end.
try {
genRecRDD.foreach(record => kiteWriter.value.send(record))
}
finally{
kiteWriter.value.close()
}
How can I change this to ensure that records show up in the hive table?
--
You received this message because you are subscribed to the Google Groups "CDK Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cdk-dev+u...@cloudera.org.
For more options, visit https://groups.google.com/a/cloudera.org/d/optout.