Error creating H2OFrame

61 views
Skip to first unread message

Renan Pinzon

unread,
Jun 7, 2016, 2:44:05 PM6/7/16
to H2O Open Source Scalable Machine Learning - h2ostream
Hi there,

I've been using H2O (3.8.2.3) with Spark (1.6.0), Scala (2.10.5) and JVM (1.8.0 update 92) and everything was working fine with a small dataset but when I changed to a bigger data set the problems started, so I decided that it was time to move to Sparkling Water (1.6.1).

The problem that I'm facing with H2O is regarding to the creation of water.fvec.H2OFrame using a set of timeseries data of my entire dataset, splitted by two keys in order to apply a GLM model. So, I created a simple scenario to illustrate the problem.

Given the following content (the order doesn't matter) that represents when a car was sold, what's your name, its color and also the price on the day of sale as well as the price the day before the sale:

time,product,color,price,lag
2016-01-01,cruze,black,16.0,
2016-01-02,cruze,black,15.4,16.0
2016-01-03,cruze,black,14.5,15.4
2016-01-04,cruze,black,15.2,14.5
2016-01-05,cruze,black,16.3,15.2
2016-01-06,cruze,black,15.7,16.3
2016-01-01,cruze,white,15.0,
2016-01-02,cruze,white,14.4,15.0
2016-01-03,cruze,white,13.5,14.4
2016-01-04,cruze,white,14.2,13.5
2016-01-05,cruze,white,15.3,14.2
2016-01-06,cruze,white,14.7,15.3

Note: As the lag field represents the price on the day before, the first day is null

In my current solution I select the data from a temporary table and then group by product and color and as a result it produces a Row which is my key and an Iterable[Row] which is my timeseries data and both goes to my function through the .map call. In my function I convert this iterable to an Array of Double to be able to create a water.fvec.H2OFrame using the water.util.ArrayUtils.frame(names: Array[String], rows: Array[Double]*) function.

Everything works fine while using a small dataset.. I'm able to create the frames as well as apply the GLM model, but when I change to a bigger dataset the following exception occurs:

Note: When I say a bigger dataset I mean a lot of data for each pair and not a lot of pairs with some data.. In this way it will produce a lot of frames

16/06/07 10:41:23 WARN TaskSetManager: Lost task 4.0 in stage 52.0 (TID 6219, localhost): TaskKilled (killed intentionally)
org.apache.spark.SparkException: Job aborted.
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:156)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
        at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58)
        at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56)
        at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
        at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:256)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139)
        at com.databricks.spark.avro.package$AvroDataFrameWriter$$anonfun$avro$1.apply(package.scala:37)
        at com.databricks.spark.avro.package$AvroDataFrameWriter$$anonfun$avro$1.apply(package.scala:37)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:52)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:57)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:59)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:61)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:63)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:65)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:67)
        at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:69)
        at $iwC$$iwC$$iwC$$iwC.<init>(<console>:71)
        at $iwC$$iwC$$iwC.<init>(<console>:73)
        at $iwC$$iwC.<init>(<console>:75)
        at $iwC.<init>(<console>:77)
        at <init>(<console>:79)
        at .<init>(<console>:83)
        at .<clinit>(<console>)
        at .<init>(<console>:7)
        at .<clinit>(<console>)
        at $print(<console>)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
        at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
        at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
        at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
        at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
        at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
        at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
        at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
        at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
        at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
        at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
        at org.apache.spark.repl.Main$.main(Main.scala:31)
        at org.apache.spark.repl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 52.0 failed 1 times, most recent failure: Lost task 2.0 in stage 52.0 (TID 6217, localhost): org.apache.spark.SparkException: Task failed while writing rows.
        at org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:414)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 65535
        at water.Key.home_node(Key.java:196)
        at water.Key.home(Key.java:191)
        at water.Atomic.fork(Atomic.java:38)
        at water.Atomic.invoke(Atomic.java:31)
        at water.fvec.Vec$VectorGroup.reserveKeys(Vec.java:1229)
        at water.fvec.Vec$VectorGroup.addVecs(Vec.java:1237)
        at water.util.ArrayUtils.frame(ArrayUtils.java:1227)
        at water.util.ArrayUtils.frame(ArrayUtils.java:1243)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.coefficients(<console>:62)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:49)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:49)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:350)
        ... 8 more


After this I tried to run my code on Sparkling Water, but the problem that I'm facing now is the fact that I cannot use the H2OContext inside my function (SparkConf is not serializable) to create the H2OFrame to be able to apply the model using only the data regarding to each pair (product and color).

I also tried to create the H2OFrame with Sparkling Water using the entire dataset and it works well but it is useless in my case because I must apply the GLM model in the data of each pair of product and color separately and doing this I cannot group the data by product and color to apply the GLM model.


Thanks!
Renan

Reply all
Reply to author
Forward
0 new messages