Need an example for the SparkTrials branch

96 views
Skip to first unread message

Shubham Ashok Gandhi

unread,
Aug 6, 2019, 2:36:50 AM8/6/19
to hyperopt-discuss
In this branch – 

A `SparkTrials` type is added to distribute trial runs across a Spark cluster. It uses concurrent Spark jobs for running asynchronous hyperopt tasks on Spark executors.

However I am running into issues when I try to use it. Hyperopt tries to pickle the objective function- `fn` provided to the `fmin` function. And this has been causing troubles. This is what I used

from hyperopt import hp, tpe, STATUS_OK, SparkTrials
from hyperopt.fmin import fmin
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from sklearn.datasets import make_classification
import pandas as pd
from pyspark.sql import SparkSession


spark = SparkSession.builder.getOrCreate()
input_cols = [
'col_'+str(i) for i in range(1, X.shape[1]+1)]
label_col =
'label'
classifier = LogisticRegression(featuresCol="features", labelCol=label_col)
bce = BinaryClassificationEvaluator(
labelCol=label_col, metricName="areaUnderROC")
# define objective function
def objective(space):
   
print(space)
    classifier.setParams(**space)
    classifier_model = classifier.fit(X_vector)

    predictions = classifier_model.transform(X_vector)
   
return bce.evaluate(predictions)


# create some data using sklearn's make_classification
X, y = make_classification(n_samples=5000, n_features=10, n_informative=5, random_state=42)
X = pd.DataFrame(X)
X.columns = input_cols
X.loc[:
, label_col] = y

# convert to spark dataframe and repartition
num_partitions = int(X.memory_usage(index=True, deep=True).sum()/1024/100)
_X = spark.createDataFrame(X)
del X, y
X = _X.repartition(num_partitions)

# vectorise the inputs
va = VectorAssembler(inputCols=input_cols, outputCol='features')
X_vector = va.transform(X)
X_vector = X_vector.drop(*input_cols)
#drop the vector inputs

# define hyperparameter space
space = {
   
"maxIter":hp.quniform ('maxIter', 50, 100, 1),
    "regParam": hp.quniform('regParam', 0.5, 1, 0.05),
    "elasticNetParam":hp.quniform('elasticNetParam', 0.5, 1, 0.05),
    "threshold":hp.quniform('threshold', 0.2, 0.8, 0.05),
}


# evaluate points
trials = SparkTrials()
best = fmin(
fn=objective,
            space=space,
            algo=tpe.suggest,
            max_evals=3, # change
            trials=trials)

print(best)

This results in the following error

Total Trials: 0: 0 succeeded, 0 failed, 0 cancelled.
Traceback (most recent call last):
  File "/Applications/PyCharm CE.app/Contents/helpers/pydev/pydevd.py", line 1741, in <module>
    main()
  File "/Applications/PyCharm CE.app/Contents/helpers/pydev/pydevd.py", line 1735, in main
    globals = debugger.run(setup['file'], None, None, is_module)
  File "/Applications/PyCharm CE.app/Contents/helpers/pydev/pydevd.py", line 1135, in run
    pydev_imports.execfile(file, globals, locals)  # execute the script
  File "/Applications/PyCharm CE.app/Contents/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in execfile
    exec(compile(contents+"\n", file, 'exec'), glob, loc)
  File "/Users/Library/Preferences/PyCharmCE2018.3/scratches/hyperopt_trials/spark_trials_hyperopt.py", line 62, in <module>
    trials=trials)
  File "/Users/Downloads/hyperopt/fmin.py", line 403, in fmin
    show_progressbar=show_progressbar,
  File "/Users/Downloads/hyperopt/spark.py", line 191, in fmin
    raise e
  File "/Users/Downloads/hyperopt/spark.py", line 188, in fmin
    show_progressbar=show_progressbar)
  File "/Users/Downloads/hyperopt/fmin.py", line 420, in fmin
    show_progressbar=show_progressbar)
  File "/Users/Downloads/hyperopt/fmin.py", line 126, in __init__
    msg = pickler.dumps(domain)
  File "/usr/local/lib/python3.7/site-packages/cloudpickle/cloudpickle.py", line 1108, in dumps
    cp.dump(obj)
  File "/usr/local/lib/python3.7/site-packages/cloudpickle/cloudpickle.py", line 473, in dump
    return Pickler.dump(self, obj)
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 437, in dump
    self.save(obj)
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 550, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 663, in save_reduce
    save(state)
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 505, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 857, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 883, in _batch_setitems
    save(v)
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 505, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/local/lib/python3.7/site-packages/cloudpickle/cloudpickle.py", line 547, in save_function
    return self.save_function_tuple(obj)
  File "/usr/local/lib/python3.7/site-packages/cloudpickle/cloudpickle.py", line 747, in save_function_tuple
    save(state)
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 505, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 857, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 883, in _batch_setitems
    save(v)
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 505, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 857, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 883, in _batch_setitems
    save(v)
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 550, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 663, in save_reduce
    save(state)
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 505, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 857, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 883, in _batch_setitems
    save(v)
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 525, in save
    rv = reduce(self.proto)
  File "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/local/lib/python3.7/site-packages/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/local/lib/python3.7/site-packages/py4j/protocol.py", line 332, in get_return_value
    format(target_id, ".", name, value))
py4j.protocol.Py4JError: An error occurred while calling o61.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:274)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)


Could you provide an example file which I can use as a template?
Regards
Shubham
Reply all
Reply to author
Forward
0 new messages