I am in the middle of porting a legacy application that uses H2 into Spark. The legacy application populated H2 via JOOQ using DSLContext ie dslContext.execute("CREATE TABLE blah...."), and it populated the actual tables using the CSVREAD function ie dslContext.execute("INSERT INTO blah ... FROM CSVREAD ..."). In the new application I can't use the CSVREAD function to populate tables because now I'm reading Datasets off HDFS. So now I'm trying to populate the tables using JDBC and batch statements like so(scala pseudocode):
// I am creating a new DB instance per key, this is desired
Class.forName("org.h2.Driver")
val conn = DriverManager.getConnection("jdbc:h2:mem:"+key)
// Then I'm populating the db like so, BATCH_SIZE is 1000 currently
rows.grouped(BATCH_SIZE).foreach(batch => {
var stmt = conn.createStatement()
batch.foreach(row => {
stmt.addBatch(s"""
INSERT INTO blah (col1,col2,col3,col4,...)""" +
s""" VALUES('${row.val1}',${row.val2},${row.val3},'${row.val4}',...})""".stripMargin)
})
stmt.executeBatch()
conn.commit()
stmt.close()
})
At runtime in my spark application I am getting OutOfMemory errors around populating the tables with JDBC. The spark executors have ample memory to handle the dataset I'm working with, is there something I'm doing wrong with my JDBC commands? Also is there a way to use the CSVREAD function via an InputStream? Because I don't have the ability to read the file from HDFS and then write it back again locally somewhere to be used by CSVREAD.