PySpark fails evaluating keras neural networks within hyperopt SparkTrials

147 views
Skip to first unread message

Thomas Weber

unread,
Mar 24, 2021, 10:14:27 AM3/24/21
to hyperopt-discuss

Hello everyone,

I have some strange bugg that I have been stuck on for a few days and can't get it solved (for better code formating also see https://stackoverflow.com/questions/64784770/pyspark-fails-evaluating-keras-neural-networks-within-hyperopt-sparktrials).

My goal is to evaluate several keras nns within hyperopt. For boosting the evaluation process I use SparkTrails (also see http://hyperopt.github.io/hyperopt/scaleout/spark/). For all scikit-learn regressors this works perfectly fine. But everytime I use a keras nn the model gets evaluated but the result is not returnd. The following is the error message I get:

20/11/11 11:30:56 ERROR TaskSetManager: Task 0 in stage 9.0 failed 1 times; aborting job trial task 9 failed, exception is An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServeWithJobGroup. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 9.0 failed 1 times, most recent failure: Lost task 0.0 in stage 9.0 (TID 9, path executor driver): java.net.SocketException: Connection reset at java.net.SocketInputStream.read(SocketInputStream.java:210) at java.net.SocketInputStream.read(SocketInputStream.java:141) at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) at java.io.BufferedInputStream.read(BufferedInputStream.java:265) at java.io.DataInputStream.readInt(DataInputStream.java:387) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:628) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62) at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49) at scala.collection.TraversableOnce.to(TraversableOnce.scala:315) at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313) at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28) at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307) at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307) at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28) at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294) at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288) at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28) at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1004) at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2139) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2164) at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:388) at org.apache.spark.rdd.RDD.collect(RDD.scala:1003) at org.apache.spark.api.python.PythonRDD$.collectAndServeWithJobGroup(PythonRDD.scala:183) at org.apache.spark.api.python.PythonRDD.collectAndServeWithJobGroup(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: java.net.SocketException: Connection reset at java.net.SocketInputStream.read(SocketInputStream.java:210) at java.net.SocketInputStream.read(SocketInputStream.java:141) at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) at java.io.BufferedInputStream.read(BufferedInputStream.java:265) at java.io.DataInputStream.readInt(DataInputStream.java:387) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:628) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62) at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49) at scala.collection.TraversableOnce.to(TraversableOnce.scala:315) at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313) at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28) at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307) at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307) at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28) at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294) at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288) at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28) at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1004) at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2139) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more . None 0%| | 0/10 [00:41<?, ?trial/s, best loss=?] Total Trials: 10: 0 succeeded, 10 failed, 0 cancelled. Traceback (most recent call last): File "E:/Git/SystemidentificationTool/play/mini_example.py", line 68, in <module> best = fmin(f_nn, space, algo=tpe.suggest, max_evals=10, trials=trials) File "E:\venv\lib\site-packages\hyperopt\fmin.py", line 522, in fmin trials_save_file=trials_save_file, File "E:\venv\lib\site-packages\hyperopt\spark.py", line 274, in fmin raise e File "E:\venv\lib\site-packages\hyperopt\spark.py", line 270, in fmin trials_save_file="", # not supported File "E:\venv\lib\site-packages\hyperopt\fmin.py", line 558, in fmin "There are no evaluation tasks, cannot return argmin of task losses." Exception: There are no evaluation tasks, cannot return argmin of task losses.

I built a minimal code example where the error can be recreated with:

from hyperopt import fmin, tpe, hp, STATUS_OK, Trials, SparkTrials 
from sklearn.metrics import mean_squared_error 
import sys 
import numpy as np 
from pandas import DataFrame 

time = np.arange(0, 100, 0.1) 
X = DataFrame(np.sin(time)) 
y = DataFrame(np.cos(time)) 
X_val = DataFrame(np.sin(time)) 
y_val = DataFrame(np.cos(time)) 

space = {'choice': hp.choice('num_layers', [ {'layers':'two', }, {'layers':'three', 'units3': hp.uniform('units3', 64,1024), 'dropout3': hp.uniform('dropout3', .25,.75)} ]), 'units': hp.uniform('units', 64,1024), 'units1': hp.uniform('units1', 64,1024), 'units2': hp.uniform('units2', 64,1024), 'dropout1': hp.uniform('dropout1', .25,.75), 'dropout2': hp.uniform('dropout2', .25,.75), 'batch_size' : 28, 'nb_epochs' : 2, 'optimizer': hp.choice('optimizer',['adadelta','adam','rmsprop']), 'activation': 'relu' }

def f_nn(params): 
     from keras.models import Sequential 
     from keras.layers.core  import Dense, Dropout, Activation 
     from keras.optimizers import Adadelta, Adam 
     print ('Params testing: ', params) 

     model = Sequential() 
     model.add(Dense(params['units1'], input_dim = X.shape[1]))
model.add(Activation(params['activation'])) 
model.add(Dropout(params['dropout1'])) 
model.add(Dense(params['units2'], kernel_initializer="glorot_uniform")) model.add(Activation(params['activation'])) 
model.add(Dropout(params['dropout2'])) 
if params['choice']['layers']== 'three': 
model.add(Dense(params['choice']['units3'], kernel_initializer="glorot_uniform")) model.add(Activation(params['activation'])) 
model.add(Dropout(params['choice']['dropout3'])) 
model.add(Dense(1)) model.add(Activation('sigmoid')) model.compile(loss='binary_crossentropy', optimizer=params['optimizer']) 
model.fit(X, y, epochs=params['nb_epochs'], batch_size=params['batch_size']) 

pred_auc =model.predict(X_val) 
acc = mean_squared_error(y_val, pred_auc) 
print('AUC:', acc) 
sys.stdout.flush() 
return {'loss': -acc, 'status': STATUS_OK} 

trials = SparkTrials() 
best = fmin(f_nn, space, algo=tpe.suggest, max_evals=10, trials=trials) 
print('best: ', best)

My systems settings are the following: 

python 3.7 hyperopt 0.2.5 keras 2.4.3 tensorflow 2.3.1 pyspark 3.0.1

I use java version: openjdk version "1.8.0_272" OpenJDK Runtime Environment (AdoptOpenJDK)(build 1.8.0_272-b10) OpenJDK 64-Bit Server VM (AdoptOpenJDK)(build 25.272-b10, mixed mode)

And Spark version 3.0.1

Thank for your help!

Reply all
Reply to author
Forward
0 new messages