Yarn cluster mode : RayTaskError(KeyError)

285 views
Skip to first unread message

AP

unread,
Apr 13, 2021, 9:15:19 AM4/13/21
to User Group for BigDL and Analytics Zoo
Hi All,
I am trying to fit the orca tf2 estimator on yarn-cluster mode.
First initialized orca context using :
init_orca_context(cluster_mode="spark-submit")

Then created 
est = Estimator.from_keras(model_creator=model_creators)
Then while fitting est using below
stats = est.fit(data,
                epochs=max_epoch,
                batch_size=batch_size,
                steps_per_epoch=60000 // batch_size,
                feature_cols=['....'],
                label_cols=['target'])

where data is spark dataframe
 But getting below error,
Any help is appreciated
Thanks

Logs:

  File "/dfs/12/yarn/nm/usercache/appcache/application_1613845361768_118673/container_e35_1613845361768_118673_01_000006/analytics-zoo-bigdl_0.12.1-spark_2.4.3-0.10.0-python-api.zip/zoo/orca/data/ray_xshards.py", line 209, in reduce_partitions_for_actors
  File "/dfs/12/yarn/nm/usercache/appcache/application_1613845361768_118673/container_e35_1613845361768_118673_01_000006/venv3.tar.gz/lib/python3.7/site-packages/ray/_private/client_mode_hook.py", line 47, in wrapper
    return func(*args, **kwargs)
  File "/dfs/12/yarn/nm/usercache/appcache/application_1613845361768_118673/container_e35_1613845361768_118673_01_000006/venv3.tar.gz/lib/python3.7/site-packages/ray/worker.py", line 1456, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(KeyError): [36mray::TFRunner.step() [39m (pid=1202952, ip=10.154.179.24)
  File "python/ray/_raylet.pyx", line 480, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 432, in ray._raylet.execute_task.function_executor
  File "/dfs/9/yarn/nm/usercache//appcache/application_1613845361768_118673/container_e35_1613845361768_118673_01_000026/analytics-zoo-bigdl_0.12.1-spark_2.4.3-0.10.0-python-api.zip/zoo/orca/learn/tf2/tf_runner.py", line 343, in step
    validation_steps=validation_steps)
  File "/dfs/9/yarn/nm/usercache/appcache/application_1613845361768_118673/container_e35_1613845361768_118673_01_000026/analytics-zoo-bigdl_0.12.1-spark_2.4.3-0.10.0-python-api.zip/zoo/orca/learn/tf2/tf_runner.py", line 79, in handle_datasets_train
    shuffle=True)
  File "/dfs/9/yarn/nm/usercache/appcache/application_1613845361768_118673/container_e35_1613845361768_118673_01_000026/analytics-zoo-bigdl_0.12.1-spark_2.4.3-0.10.0-python-api.zip/zoo/orca/learn/tf2/tf_runner.py", line 172, in _handle_xshards
    data, label = ray_partitions_get_data_label(ray.get(dataset),
  File "/dfs/9/yarn/nm/usercache/appcache/application_1613845361768_118673/container_e35_1613845361768_118673_01_000026/venv3.tar.gz/lib/python3.7/site-packages/ray/_private/client_mode_hook.py", line 47, in wrapper
    return func(*args, **kwargs)
ray.exceptions.RayTaskError(KeyError): [36mray::LocalStore.get_partition() [39m (pid=1202953, ip=10.154.179.24)
  File "python/ray/_raylet.pyx", line 480, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 432, in ray._raylet.execute_task.function_executor
  File "/dfs/9/yarn/nm/usercache/appcache/application_1613845361768_118673/container_e35_1613845361768_118673_01_000026/analytics-zoo-bigdl_0.12.1-spark_2.4.3-0.10.0-python-api.zip/zoo/orca/data/ray_xshards.py", line 52, in get_partition
    for shard_idx in range(self.shard_count[partition_id]):
KeyError: 6

Jason Dai

unread,
Apr 13, 2021, 9:58:33 AM4/13/21
to AP, User Group for BigDL and Analytics Zoo
How do you set up the environment and run the job? You may refer to the Hadoop/YARN guide at https://analytics-zoo.readthedocs.io/en/latest/doc/UserGuide/hadoop.html

Thanks,
-Jason

--
You received this message because you are subscribed to the Google Groups "User Group for BigDL and Analytics Zoo" group.
To unsubscribe from this group and stop receiving emails from it, send an email to bigdl-user-gro...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/bigdl-user-group/f00fcaa9-ef7c-49e4-b3d5-e8ffa6f9e9b9n%40googlegroups.com.

AP

unread,
Apr 13, 2021, 10:11:56 AM4/13/21
to User Group for BigDL and Analytics Zoo

Hi Jason,
Yes, I followed the guide to set up zoo for running on Yarn-cluster mode. and submitting py file through spark-submit file as mentioned in guide.
For ray, I pip installed ray in conda environment that I am zipping and providing in --archives 
Thanks 

Shan Yu

unread,
Apr 14, 2021, 3:33:18 AM4/14/21
to User Group for BigDL and Analytics Zoo
Hi AP,

Could you please share how you generate your dataframe? We guess that the issue might be caused by an empty partition in your dataframe. Could you help check that? If this is the case, you can try repartition your dataframe and see whether the issue remains.

Also, it would be much helpful if you could share us your full code and logs, so that we can reproduce the issue on our side.

Thanks,
Shan

Message has been deleted

AP

unread,
Apr 14, 2021, 7:16:35 AM4/14/21
to User Group for BigDL and Analytics Zoo
my bash file looks like:
*******
#!/bin/bash
core_executor=2
num_executor=5
let batch="core_executor * num_executor * 10"
epoch=1
app_name="cnn_first_check"
spark-submit \
    --conf spark.yarn.dist.archives=hdfs://user/venv3.tar.gz \
    --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=venv3.tar.gz/bin/python \
    --conf spark.pyspark.python=venv3.tar.gz/bin/python \
    --conf spark.pyspark.driver.python=venv3.tar.gz/bin/python \
    --conf spark.executor.memoryOverhead=6096 \
    --conf spark.driver.memoryOverhead=6096 \
    --master yarn \
    --deploy-mode cluster \
    --conf spark.port.maxRetries=100 \
    --conf spark.yarn.executor.memoryOverhead=5000000000 \
    --executor-memory 10g \
    --driver-memory 10g \
    --executor-cores $core_executor \
    --num-executors $num_executor \
    --conf spark.driver.extraJavaOptions="-XX:ParallelGCThreads=28 -XX:+UseParallelGC -XX:+UseParallelOldGC " \
    --conf spark.executor.extraJavaOptions="-XX:ParallelGCThreads=28 -XX:+UseParallelGC -XX:+UseParallelOldGC" \
    --conf spark.sql.catalogImplementation=hive \
    --conf spark.sql.shuffle.partitions=2000 \
    --conf spark.dynamicAllocation.enabled=false \
    --conf spark.ui.showConsoleProgress=true \
    --conf spark.yarn.maxAppAttempts=1 \
    --conf spark.locality.wait=2s \
    --conf spark.driver.maxResultSize=20g \
    --conf spark.sql.parquet.binaryAsString=true \
    --conf spark.shuffle.consolidateFiles=false \
    --conf spark.rdd.compress=true \
    --conf spark.stage.maxConsecutiveAttempts=30 \
    --conf spark.scheduler.minRegisteredResourcesRatio=1.0 \
    --conf spark.shuffle.memoryFraction=0.3 \
    --conf spark.storage.memoryFraction=0.2 \
    --conf spark.driver.extraClassPath=hdfs://user/analytics-zoo-bigdl_0.12.1-spark_2.4.3-0.10.0.jar \
    --conf spark.executor.extraClassPath=hdfs://user/analytics-zoo-bigdl_0.12.1-spark_2.4.3-0.10.0.jar \
    --jars hdfs://user/analytics-zoo-bigdl_0.12.1-spark_2.4.3-0.10.0.jar \
    --files hdfs://user/zoo/spark-analytics-zoo.conf,hdfs://user/tokenizer.pickle \
    --py-files hdfs://user/analytics-zoo-bigdl_0.12.1-spark_2.4.3-0.10.0-python-api.zip,hdfs://user/dlspark/zoo_train_model_trial2.py \
    --app_name=$app_name --model_path=$model_path \
    --epoch="$epoch" --batch_size="$batch" 

******************

and my python file looks like 

*****************

import tensorflow as tf
import zoo 
import ray 
import tensorflow_hub as hub

import transformers
import pickle


from zoo.orca import init_orca_context, stop_orca_context,OrcaContext
init_orca_context(cluster_mode="spark-submit")
from zoo.orca import OrcaContext
spark = OrcaContext.get_spark_session()
data= spark.read.parquet("/user/data.parquet")

def model_creators(config):  
   model definition
   model.compile(tf.keras.optimizers.Adam(lr=1e-5), loss=loss, metrics=[metric])
    return model

from zoo.orca.learn.tf2 import Estimator
batch_size = 100
est = Estimator.from_keras(model_creator=model_creators)
print("est created")
max_epoch=1
stats = est.fit(data,
                epochs=max_epoch,
                batch_size=batch_size,
                steps_per_epoch=60000 // batch_size,
                feature_cols=['input_token','masked_token','segment_ids'],
                label_cols=['target'])
**********************


and a part of the error logs are :

JavaGatewayServer has been successfully launched on executors
Start to launch ray on cluster
Start to launch ray driver on local
Executing command: ray start --address 10.154.178.222:38000 --redis-password 123456 --num-cpus 0 --node-ip-address 10.154.178.253
2021-04-14 05:47:27,576 INFO scripts.py:643 -- Local node IP: 10.154.178.253
2021-04-14 05:47:27,657 SUCC scripts.py:656 -- --------------------
2021-04-14 05:47:27,658 SUCC scripts.py:657 -- Ray runtime started.
2021-04-14 05:47:27,658 SUCC scripts.py:658 -- --------------------
2021-04-14 05:47:27,658 INFO scripts.py:660 -- To terminate the Ray runtime, run
2021-04-14 05:47:27,658 INFO scripts.py:661 --   ray stop


2021-04-14 05:47:27,576 INFO scripts.py:643 -- Local node IP: 10.154.178.253
2021-04-14 05:47:27,657 SUCC scripts.py:656 -- --------------------
2021-04-14 05:47:27,658 SUCC scripts.py:657 -- Ray runtime started.
2021-04-14 05:47:27,658 SUCC scripts.py:658 -- --------------------
2021-04-14 05:47:27,658 INFO scripts.py:660 -- To terminate the Ray runtime, run
2021-04-14 05:47:27,658 INFO scripts.py:661 --   ray stop

2021-04-14 05:47:29,565 INFO worker.py:655 -- Connecting to existing Ray cluster at address: 10.154.178.222:38000
{'node_ip_address': '10.154.178.253', 'raylet_ip_address': '10.154.178.253', 'redis_address': '10.154.178.222:38000', 'object_store_address': '/tmp/ray/session_2021-04-14_05-47-06_558873_934445/sockets/plasma_store', 'raylet_socket_name': '/tmp/ray/session_2021-04-14_05-47-06_558873_934445/sockets/raylet', 'webui_url': '10.154.178.222:8265', 'session_dir': '/tmp/ray/session_2021-04-14_05-47-06_558873_934445', 'metrics_export_port': 61094, 'node_id': 'b9a9685e09e09e38dfb4167e416c35a939a5a9a840dda469d65d3c2c'}
[2m [36m(pid=935304, ip=10.154.178.222) [0m 2021-04-14 05:47:31.712413: 
:
:
:
:

Traceback (most recent call last):
  File "zoo_train_model_trial2.py", line 110, in <module>
    label_cols=['target'])
  File "/dfs/5/yarn/nm/usercache/appcache/application_1613845361768_121020/container_e35_1613845361768_121020_01_000017/analytics-zoo-bigdl_0.12.1-spark_2.4.3-0.10.0-python-api.zip/zoo/orca/learn/tf2/estimator.py", line 239, in fit
  File "/dfs/5/yarn/nm/usercache/appcache/application_1613845361768_121020/container_e35_1613845361768_121020_01_000017/analytics-zoo-bigdl_0.12.1-spark_2.4.3-0.10.0-python-api.zip/zoo/orca/data/ray_xshards.py", line 209, in reduce_partitions_for_actors
  File "/dfs/5/yarn/nm/usercache/appcache/application_1613845361768_121020/container_e35_1613845361768_121020_01_000017/venv3.tar.gz/lib/python3.7/site-packages/ray/_private/client_mode_hook.py", line 47, in wrapper
    return func(*args, **kwargs)
  File "/dfs/5/yarn/nm/usercache/appcache/application_1613845361768_121020/container_e35_1613845361768_121020_01_000017/venv3.tar.gz/lib/python3.7/site-packages/ray/worker.py", line 1456, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(KeyError): [36mray::TFRunner.step() [39m (pid=380786, ip=10.154.179.252)
  File "python/ray/_raylet.pyx", line 480, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 432, in ray._raylet.execute_task.function_executor
  File "/dfs/3/yarn/nm/usercache/appcache/application_1613845361768_121020/container_e35_1613845361768_121020_01_000028/analytics-zoo-bigdl_0.12.1-spark_2.4.3-0.10.0-python-api.zip/zoo/orca/learn/tf2/tf_runner.py", line 343, in step
    validation_steps=validation_steps)
  File "/dfs/3/yarn/nm/usercache/appcache/application_1613845361768_121020/container_e35_1613845361768_121020_01_000028/analytics-zoo-bigdl_0.12.1-spark_2.4.3-0.10.0-python-api.zip/zoo/orca/learn/tf2/tf_runner.py", line 79, in handle_datasets_train
    shuffle=True)
  File "/dfs/3/yarn/nm/usercache/appcache/application_1613845361768_121020/container_e35_1613845361768_121020_01_000028/analytics-zoo-bigdl_0.12.1-spark_2.4.3-0.10.0-python-api.zip/zoo/orca/learn/tf2/tf_runner.py", line 172, in _handle_xshards
    data, label = ray_partitions_get_data_label(ray.get(dataset),
  File "/dfs/3/yarn/nm/usercache/appcache/application_1613845361768_121020/container_e35_1613845361768_121020_01_000028/venv3.tar.gz/lib/python3.7/site-packages/ray/_private/client_mode_hook.py", line 47, in wrapper
    return func(*args, **kwargs)
ray.exceptions.RayTaskError(KeyError): [36mray::LocalStore.get_partition() [39m (pid=380787, ip=10.154.179.252)
  File "python/ray/_raylet.pyx", line 480, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 432, in ray._raylet.execute_task.function_executor
  File "/dfs/3/yarn/nm/usercache/appcache/application_1613845361768_121020/container_e35_1613845361768_121020_01_000028/analytics-zoo-bigdl_0.12.1-spark_2.4.3-0.10.0-python-api.zip/zoo/orca/data/ray_xshards.py", line 52, in get_partition
    for shard_idx in range(self.shard_count[partition_id]):
KeyError: 0

Thanks
Also, even when init_ray_on_spark is False, still Ray runtime is getting started.
Can Orca tf2 estimator run without initializing ray ? 
Message has been deleted

Shan Yu

unread,
Apr 15, 2021, 1:38:16 AM4/15/21
to User Group for BigDL and Analytics Zoo
Hi AP,
 
Could you help check whether your dataframe contains empty partition? Below is the code you can refer to.

data= spark.read.parquet("/user/data.parquet")
empty_partition_ids = data.rdd.map(lambda _: 1).mapPartitionsWithIndex(lambda idx, part: [idx] if sum(part) == 0 else []).collect()

If you have empty partitions, please kindly try data2 = data.repartition(10) before feeding into est.fit() and see whether the error is gone.

Thanks a lot for reporting the error to us and looking forward to your feedback.

Besides , currently we are using ray as our backend for running tf2,  so Orca tf2 estimator cannot run without initializing ray.
 
Thanks,
Shan

AP

unread,
Apr 15, 2021, 6:43:01 AM4/15/21
to User Group for BigDL and Analytics Zoo
Hi Shan,
Thanks a lot.
Repartitioning resolved the issue.
Can you please tell me where to specify ray parameters such as num_ray_nodes, ray_node_cpu_cores, and others?
I am running on yarn cluster mode, so should I pass through init_orca_context() or through the spark-submit command (if yes how) ?
Thanks again

Shan Yu

unread,
Apr 15, 2021, 10:07:31 PM4/15/21
to User Group for BigDL and Analytics Zoo
Hi AP,

Great. Thanks again for your issue! We have fixed that and the new nightly-built version can deal with empty partitions.

You can refer to https://analytics-zoo.readthedocs.io/en/latest/doc/UserGuide/hadoop.html#yarn-cluster-mode, which is about how to run in yarn-cluster mode.

You can pass "num-executors" and "executor-cores"  with spark submit, and they have the same meaning with num_ray_nodes, ray_node_cpu_cores.

For other ray parameters, you can pass through init_orca_context().

Thanks for the question, we will add more description about how to pass ray parameters in our document.

Best,
Shan

Shan Yu

unread,
Apr 16, 2021, 2:32:33 AM4/16/21
to User Group for BigDL and Analytics Zoo
Hi AP,

To be more clear, for yarn cluster mode, we currently only support submitting applications using spark-submit command. You can specify the "num-executors" and "executor-cores" as you would normally do using spark-submit command. Internally we will use those values as num_ray_nodes and ray_node_cpus_cores to start a ray cluster to tf2 applications.

For yarn client mode, in case you want to try it, it doesn't need spart-submit command. You can directly passing "num_nodes" and "cores" in init_orca_context, which are also interpreted as num_ray_nodes and ray_node_cpus_cores internally.

Best,
Shan

AP

unread,
Apr 16, 2021, 7:20:56 AM4/16/21
to User Group for BigDL and Analytics Zoo
Thanks Shan,
That helped a lot. 
In yarn cluster-mode, as you said num_ray_nodes and ray_node_cpus_cores can be specified with spark-submit command.
And what about parameters like  "redis_port", "password", "object_store_memory", "verbose", "env", "include_webui". 
Do we need to specify these in init_orca_context()?
Thanks again

Imad Benkhelifa

unread,
Apr 16, 2021, 12:10:12 PM4/16/21
to AP, User Group for BigDL and Analytics Zoo
Hi every one , i am in the Middle if my Master project, i need a source code for action recognition in analytics zoo on yarn classer to start with , if any one could help i will be thankfull
Best 

Shan Yu

unread,
Apr 18, 2021, 11:15:43 PM4/18/21
to User Group for BigDL and Analytics Zoo
Hi AP,

Normally you can just use the default value. If you would like to change them, yes you can specify the ray parameters in init_orca_context().

Thanks,
Shan

yang wang

unread,
Apr 18, 2021, 11:28:19 PM4/18/21
to User Group for BigDL and Analytics Zoo
Hi Imad, 

I am sorry, but we currently do not have any action recognition code avaiable. 

I think you can first find a project written in pytorch or tensorflow and ported to analytics zoo to run on yarn. You can follow the examples in the Quick Start section in our document website https://analytics-zoo.readthedocs.io/en/latest/index.html to see how to port an existing pytorch/tensorflow application to analytics zoo.

-Yang

AP

unread,
Jun 7, 2021, 7:54:13 AM6/7/21
to User Group for BigDL and Analytics Zoo
Hi Team,
I am getting key error. I checked and there are no empty partitions.
Following are the logs:

Traceback (most recent call last):
  File "zoo_train_model_ray_rec.py", line 64, in <module>
    results = est.predict(data,feature_cols=['encoded_input'])
  File "/dfs/5/yarn/nm/usercache//appcache/application_1621089916366_56414/container_e41_1621089916366_56414_01_000001/analytics-zoo-bigdl_0.12.1-spark_2.4.3-0.10.0-python-api.zip/zoo/orca/learn/tf2/estimator.py", line 384, in predict
  File "/dfs/5/yarn/nm/usercache//appcache/application_1621089916366_56414/container_e41_1621089916366_56414_01_000001/analytics-zoo-bigdl_0.12.1-spark_2.4.3-0.10.0-python-api.zip/zoo/orca/learn/tf2/estimator.py", line 345, in _predict_spark_xshards
  File "/dfs/5/yarn/nm/usercache//appcache/application_1621089916366_56414/container_e41_1621089916366_56414_01_000001/analytics-zoo-bigdl_0.12.1-spark_2.4.3-0.10.0-python-api.zip/zoo/orca/data/ray_xshards.py", line 148, in to_spark_xshards
  File "/dfs/5/yarn/nm/usercache//appcache/application_1621089916366_56414/container_e41_1621089916366_56414_01_000001/analytics-zoo-bigdl_0.12.1-spark_2.4.3-0.10.0-python-api.zip/zoo/orca/data/shard.py", line 143, in __init__
  File "/dfs/5/yarn/nm/usercache//appcache/application_1621089916366_56414/container_e41_1621089916366_56414_01_000001/analytics-zoo-bigdl_0.12.1-spark_2.4.3-0.10.0-python-api.zip/zoo/orca/data/shard.py", line 207, in compute
  File "/opt/cloudera/parcels/CDH-6.3.3-1.cdh6.3.3.p4462.8166904/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1055, in count
  File "/opt/cloudera/parcels/CDH-6.3.3-1.cdh6.3.3.p4462.8166904/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1046, in sum
  File "/opt/cloudera/parcels/CDH-6.3.3-1.cdh6.3.3.p4462.8166904/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 917, in fold
  File "/opt/cloudera/parcels/CDH-6.3.3-1.cdh6.3.3.p4462.8166904/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 816, in collect
  File "/opt/cloudera/parcels/CDH-6.3.3-1.cdh6.3.3.p4462.8166904/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/opt/cloudera/parcels/CDH-6.3.3-1.cdh6.3.3.p4462.8166904/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/opt/cloudera/parcels/CDH-6.3.3-1.cdh6.3.3.p4462.8166904/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 167 in stage 6.0 failed 4 times, most recent failure: Lost task 167.3 in stage 6.0 (TID 617, hdp2stl020061.mastercard.int, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/cloudera/parcels/CDH-6.3.3-1.cdh6.3.3.p4462.8166904/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in main
    process()
  File "/opt/cloudera/parcels/CDH-6.3.3-1.cdh6.3.3.p4462.8166904/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 367, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/dfs/5/yarn/nm/usercache/e112008/appcache/application_1621089916366_56414/container_e41_1621089916366_56414_01_000001/analytics-zoo-bigdl_0.12.1-spark_2.4.3-0.10.0-python-api.zip/zoo/orca/data/ray_xshards.py", line 141, in <lambda>
  File "/dfs/12/yarn/nm/usercache/e112008/appcache/application_1621089916366_56414/container_e41_1621089916366_56414_01_000005/analytics-zoo-bigdl_0.12.1-spark_2.4.3-0.10.0-python-api.zip/zoo/orca/data/ray_xshards.py", line 97, in get_from_ray
    local_store_handle = ray.get_actor(idx_to_store_name[idx])
KeyError: 167

Shan Yu

unread,
Jun 8, 2021, 5:07:24 AM6/8/21
to User Group for BigDL and Analytics Zoo
Hi AP,

Could you try our latest analytic-zoo and see whether the issue remains? 

You can install our nightly-built version by

`pip install --pre --upgrade analytics-zoo`

Thanks,
Shan

AP

unread,
Jun 8, 2021, 7:24:11 AM6/8/21
to User Group for BigDL and Analytics Zoo
Hi Shan,
I am running zoo on yarn cluster mode and downloaded recently released below jar and python API files 
- analytics-zoo-bigdl_0.12.2-spark_2.4.3-0.11.0-20210606.195919-27-jar-with-dependencies.jar
- hdfs://dw-prod-new/user/e112008/analytics-zoo-bigdl_0.12.2-spark_2.4.3-0.11.0-20210606.195919-27_python_api.zip
but still getting below error.


 File "zoo_train_model_ray_rec.py", line 64, in <module>
    results = est.predict(data,feature_cols=['encoded_input'])
  File "/dfs/7/yarn/nm/usercache//appcache/application_1621089916366_59344/container_e41_1621089916366_59344_01_000008/analytics-zoo-bigdl_0.12.2-spark_2.4.3-0.11.0-20210606.195919-27_python_api_june.zip/zoo/orca/learn/tf2/estimator.py", line 384, in predict
  File "/dfs/7/yarn/nm/usercache//appcache/application_1621089916366_59344/container_e41_1621089916366_59344_01_000008/analytics-zoo-bigdl_0.12.2-spark_2.4.3-0.11.0-20210606.195919-27_python_api_june.zip/zoo/orca/learn/tf2/estimator.py", line 345, in _predict_spark_xshards
  File "/dfs/7/yarn/nm/usercache//appcache/application_1621089916366_59344/container_e41_1621089916366_59344_01_000008/analytics-zoo-bigdl_0.12.2-spark_2.4.3-0.11.0-20210606.195919-27_python_api_june.zip/zoo/orca/data/ray_xshards.py", line 151, in to_spark_xshards
  File "/dfs/7/yarn/nm/usercache//appcache/application_1621089916366_59344/container_e41_1621089916366_59344_01_000008/analytics-zoo-bigdl_0.12.2-spark_2.4.3-0.11.0-20210606.195919-27_python_api_june.zip/zoo/orca/data/shard.py", line 143, in __init__
  File "/dfs/7/yarn/nm/usercache//appcache/application_1621089916366_59344/container_e41_1621089916366_59344_01_000008/analytics-zoo-bigdl_0.12.2-spark_2.4.3-0.11.0-20210606.195919-27_python_api_june.zip/zoo/orca/data/shard.py", line 207, in compute
  File "/opt/cloudera/parcels/CDH-6.3.3-1.cdh6.3.3.p4462.8166904/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1055, in count
  File "/opt/cloudera/parcels/CDH-6.3.3-1.cdh6.3.3.p4462.8166904/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1046, in sum
  File "/opt/cloudera/parcels/CDH-6.3.3-1.cdh6.3.3.p4462.8166904/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 917, in fold
  File "/opt/cloudera/parcels/CDH-6.3.3-1.cdh6.3.3.p4462.8166904/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 816, in collect
  File "/opt/cloudera/parcels/CDH-6.3.3-1.cdh6.3.3.p4462.8166904/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/opt/cloudera/parcels/CDH-6.3.3-1.cdh6.3.3.p4462.8166904/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/opt/cloudera/parcels/CDH-6.3.3-1.cdh6.3.3.p4462.8166904/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 186 in stage 6.0 failed 4 times, most recent failure: Lost task 186.3 in stage 6.0 (TID 643, hdp2stl020397.mastercard.int, executor 7): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/cloudera/parcels/CDH-6.3.3-1.cdh6.3.3.p4462.8166904/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in main
    process()
  File "/opt/cloudera/parcels/CDH-6.3.3-1.cdh6.3.3.p4462.8166904/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 367, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/dfs/7/yarn/nm/usercache/appcache/application_1621089916366_59344/container_e41_1621089916366_59344_01_000008/analytics-zoo-bigdl_0.12.2-spark_2.4.3-0.11.0-20210606.195919-27_python_api_june.zip/zoo/orca/data/ray_xshards.py", line 144, in <lambda>
  File "/dfs/7/yarn/nm/usercache//appcache/application_1621089916366_59344/container_e41_1621089916366_59344_01_000051/analytics-zoo-bigdl_0.12.2-spark_2.4.3-0.11.0-20210606.195919-27_python_api_june.zip/zoo/orca/data/ray_xshards.py", line 100, in get_from_ray
    local_store_handle = ray.get_actor(idx_to_store_name[idx])
KeyError: 186

Shan Yu

unread,
Jun 8, 2021, 10:32:02 PM6/8/21
to User Group for BigDL and Analytics Zoo
Hi AP,

Thanks for your reply. I will dig into this issue. 

BTW, could you help provide how you start the script (including some spark confs) and more details of your python script? 

Many thanks,
Shan

Message has been deleted

AP

unread,
Jun 9, 2021, 12:52:42 AM6/9/21
to User Group for BigDL and Analytics Zoo
Hi Team,
___________________________________________________________________
My bash file looks like below:
core_executor=2
num_executor=10
let batch="core_executor * num_executor * 100"
epoch=1
app_name="my_app_name"

spark-submit \
    --conf spark.yarn.dist.archives=hdfs_path_to_venv3.tar.gz \
    --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=venv3.tar.gz/bin/python \
    --conf spark.pyspark.python=venv3.tar.gz/bin/python \
    --conf spark.pyspark.driver.python=venv3.tar.gz/bin/python \
    --conf spark.executor.memoryOverhead=6096 \
    --conf spark.driver.memoryOverhead=6096 \
    --master yarn \
    --deploy-mode cluster \
    --conf spark.port.maxRetries=100 \
    --conf spark.yarn.executor.memoryOverhead=5000000000 \
    --executor-memory 30g \
    --driver-memory 30g \
    --executor-cores $core_executor \
    --num-executors $num_executor \
    --conf spark.driver.extraJavaOptions="-XX:ParallelGCThreads=28 -XX:+UseParallelGC -XX:+UseParallelOldGC " \
    --conf spark.executor.extraJavaOptions="-XX:ParallelGCThreads=28 -XX:+UseParallelGC -XX:+UseParallelOldGC" \
    --conf spark.sql.catalogImplementation=hive \
    --conf spark.sql.shuffle.partitions=800 \
    --conf spark.default.parallelism=800 \
    --conf spark.dynamicAllocation.enabled=false \
    --conf spark.ui.showConsoleProgress=true \
    --conf spark.yarn.maxAppAttempts=1 \
    --conf spark.locality.wait=2s \
    --conf spark.driver.maxResultSize=30g \
    --conf spark.sql.parquet.binaryAsString=true \
    --conf spark.shuffle.consolidateFiles=false \
    --conf spark.rdd.compress=true \
    --conf spark.stage.maxConsecutiveAttempts=30 \
    --conf spark.scheduler.minRegisteredResourcesRatio=1.0 \
    --conf spark.shuffle.memoryFraction=0.3 \
    --conf spark.storage.memoryFraction=0.2 \
    --conf spark.shuffle.reduceLocality.enabled=false \
    --conf spark.shuffle.blockTransferService=nio \
    --conf spark.scheduler.minRegisteredResourcesRatio=1.0 \
    --conf spark.scheduler.maxRegisteredResourcesWaitingTime=3600s \
    --conf spark.speculation=false \
    --conf spark.serializer=org.apache.spark.serializer.JavaSerializer \
    --conf spark.driver.extraClassPath=hdfs_path_to_analytics-zoo-bigdl_0.12.2-spark_2.4.3-0.11.0-20210606.195919-27-jar-with-dependencies.jar \
    --conf spark.executor.extraClassPath=hdfs_path_to_analytics-zoo-bigdl_0.12.2-spark_2.4.3-0.11.0-20210606.195919-27-jar-with-dependencies.jar \
    --jars hdfs_path_to_analytics-zoo-bigdl_0.12.2-spark_2.4.3-0.11.0-20210606.195919-27-jar-with-dependencies.jar \
    --files hdfs_path_to_pretrained_bert_model_weight_file_tf_model.h5,hdfs_path_to_bert_config.json \
    --py-files hdfs_path_to_analytics-zoo-bigdl_0.12.2-spark_2.4.3-0.11.0-20210606.195919-27_python_api.zip,hdfs_path_to_python_file \
    --app_name=$app_name \
    --epoch="$epoch" --batch_size="$batch"

_____________________________________________________________________________________________________________________________
And my python file is :

#!/usr/bin/python
#########Import part###############

import tensorflow as tf
import zoo 
print(zoo.__version__)
print(tf.__version__)
import tensorflow_hub as hub
print(hub.__version__)
import transformers
print(transformers.__version__)
from zoo.orca import init_orca_context, stop_orca_context,OrcaContext
import ray 
print(ray.__version__)
from transformers import BertConfig, TFBertModel
import pickle
import time
import datetime
from zoo.orca.learn.tf2 import Estimator

init_orca_context(cluster_mode="spark-submit")
def model_creators(config):
    transformer_model = TFBertModel.from_pretrained("tf_model.h5", local_files_only=True,config = "config.json")
    input_ids_in = tf.keras.layers.Input(shape=(128,), name='input_token', dtype='int32')
    embedding_layer = transformer_model(input_ids_in)[0]
    cls_token = embedding_layer[:,0,:]
    model = tf.keras.Model(inputs=[input_ids_in], outputs = cls_token)
    loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
    model.compile(tf.keras.optimizers.Adam(lr=1e-5), loss=loss)
    print(model.summary())
    return model

spark = OrcaContext.get_spark_session()

print("data reading started")
start = time.time()
data = spark.read.parquet('path_to_data')
print(data.rdd.getNumPartitions())
end = time.time()
print("Took time:"+str((end-start)))
print("creating est")
est = Estimator.from_keras(model_creator=model_creators)
print("est created")
batch_size = 50000
max_epoch=1
start = time.time()
results = est.predict(data,feature_cols=['encoded_input'])
result_df = results.toDF()
result_df.write.mode('overwrite').parquet("path_to_results_parquet")
end = time.time()
print("Took time:"+str((end-start)))
print("Finished")

Shan Yu

unread,
Jun 14, 2021, 9:53:04 PM6/14/21
to User Group for BigDL and Analytics Zoo
Hi AP,

We recently merged a fix for your issue. Would you mind trying our latest nightly-built version to see whether it works?

Many thanks,
Shan

AP

unread,
Jun 15, 2021, 5:32:04 AM6/15/21
to User Group for BigDL and Analytics Zoo
Hi Shan,
Thanks a lot. It resolved the issue.
Can you please highlight what was the cause of the error?

Shan Yu

unread,
Jun 15, 2021, 10:32:19 PM6/15/21
to User Group for BigDL and Analytics Zoo
Hi AP,

Thanks a lot. It is due to a bug in our algorithm while assigning partitions index to actors. It could happen with unbalanced number of partitions on multiple nodes. 

Reply all
Reply to author
Forward
0 new messages