Google Dataproc Spark jobs failing with “AgentException: Node was restarted while executing a job.”

2,308 views
Skip to first unread message

Victor Solakhian

unread,
Sep 26, 2018, 1:38:01 PM9/26/18
to Google Cloud Dataproc Discussions
We have a Dataproc cluster with master node and two worker nodes (custom (Machine type: 4 vCPUs, 10 GB memory).

I submitted 8 Spark jobs simultaneously trying to test how it performs under load.

The Spark configuration for these jobs is:

spark.executor.memory1024m
spark.executor.cores=40
spark.executor.instances=2
spark.default.paralle1ism=80

This cluster configuration allows to run 4 jobs in parallel. Each job uses 3 YARN containers:

Cont 1 - driver: 1 GB memory
Cont 2 - executor #1: 1.5 GB memory
Cont 3 - executor #2: 1.5 GB memory

and takes 4 GB memory. We expect that all 8 jobs will be scheduled and will start running as soon as there is a container available, and some jobs will take more time to run.

In reality, only one job finished successfully. Other jobs failed, because a worker node was restarted:

=========== Cloud Dataproc Agent Error ===========
com.google.cloud.hadoop.services.agent.AgentException: Node was restarted while executing a job. This could be user-initiated or caused by Compute Engine maintenance event. (TASK_FAILED)
    at com.google.cloud.hadoop.services.agent.AgentException$Builder.build(AgentException.java:83)
    at com.google.cloud.hadoop.services.agent.job.AbstractJobHandler.lambda$kill$0(AbstractJobHandler.java:211)
    at com.google.cloud.hadoop.services.repackaged.com.google.common.util.concurrent.AbstractTransformFuture$AsyncTransformFuture.doTransform(AbstractTransformFuture.java:205)
    at com.google.cloud.hadoop.services.repackaged.com.google.common.util.concurrent.AbstractTransformFuture$AsyncTransformFuture.doTransform(AbstractTransformFuture.java:194)
    at com.google.cloud.hadoop.services.repackaged.com.google.common.util.concurrent.AbstractTransformFuture.run(AbstractTransformFuture.java:110)
    at com.google.cloud.hadoop.services.repackaged.com.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:398)
    at com.google.cloud.hadoop.services.repackaged.com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1029)
    at com.google.cloud.hadoop.services.repackaged.com.google.common.util.concurrent.AbstractFuture.addListener(AbstractFuture.java:675)
    at com.google.cloud.hadoop.services.repackaged.com.google.common.util.concurrent.AbstractFuture$TrustedFuture.addListener(AbstractFuture.java:105)
    at com.google.cloud.hadoop.services.repackaged.com.google.common.util.concurrent.AbstractTransformFuture.create(AbstractTransformFuture.java:39)
    at com.google.cloud.hadoop.services.repackaged.com.google.common.util.concurrent.Futures.transformAsync(Futures.java:459)
    at com.google.cloud.hadoop.services.agent.job.AbstractJobHandler.kill(AbstractJobHandler.java:202)
    at com.google.cloud.hadoop.services.agent.job.JobManagerImpl.recoverAndKill(JobManagerImpl.java:153)
    at com.google.cloud.hadoop.services.agent.MasterRequestReceiver$NormalWorkReceiver.receivedJob(MasterRequestReceiver.java:141)
    at com.google.cloud.hadoop.services.agent.MasterRequestReceiver.pollForJobsAndTasks(MasterRequestReceiver.java:105)
    at com.google.cloud.hadoop.services.agent.MasterRequestReceiver.pollForWork(MasterRequestReceiver.java:77)
    at com.google.cloud.hadoop.services.agent.MasterRequestReceiver.lambda$doStart$0(MasterRequestReceiver.java:67)
    at com.google.cloud.hadoop.services.repackaged.com.google.common.util.concurrent.MoreExecutors$ScheduledListeningDecorator$NeverSuccessfulListenableFutureTask.run(MoreExecutors.java:630)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
======== End of Cloud Dataproc Agent Error ========



I found two postings on the web describing the same problem. One suggests that

   - there was "a bug in image version 1.1.34. Downgrade to image 1.1.29 and that fixes the issue."

and other

   - Since Compute Engine VMs don't configure a swap partition, when you run out of RAM all daemons will crash and restart.


This postings were from more than a year ago, so I suppose the bug is already fixed. Our cluster is created on Sep 11, 2018 and the image is:

   - imageVersion: 1.2.47-deb8

Any ideas?

Thanks,
 
Victor

Dennis Huo

unread,
Sep 26, 2018, 9:57:58 PM9/26/18
to Google Cloud Dataproc Discussions
Does this fail consistently for that workload, or did it only happen once? In general, though rare, it's always possible to get unlucky and experience a node reboot caused by a catastrophic physical machine failure, in which case a job can simply be retried.

The 1.1.29 downgrade issue doesn't appear to what's happening in this case.

The OOM issue is indeed a possible explanation.

Victor Solakhian

unread,
Sep 27, 2018, 12:06:06 PM9/27/18
to Google Cloud Dataproc Discussions
It happened twice with the same test workload.

If OOM in one of the Spark  tasks is considered a "catastrophic failure"  and results in node restart (and loss of all history), it is too bad. In this case Spark job should fail with a clear message in the log and developer would just increase  size of the spark.executor.memory property.to avoid future OOM events.

If this is happening due to lack of memory to shedule/process all Spark jobs (YARN UI shows that all available memory is used by 4 jobs and we have submitted 8 jobs), it is even worse. The jobs should wait until memory is available for new containers to start processing.

Thanks,

Victor

Dennis Huo

unread,
Sep 27, 2018, 1:12:43 PM9/27/18
to Google Cloud Dataproc Discussions
OOM of a spark task is not considered a catastrophic failure. An example of a catastrophic failure is when a physical machine catches on fire or an asteroid hits. In this case, a VM will experience a "reboot".

YARN scheduling will indeed wait to schedule containers until there is enough memory.

In Spark "client" mode, the Spark driver does not cooperate with YARN. If it doesn't cooperate with YARN and there are no memory constraints, then there is no way for YARN to prevent it from using too much memory. Do you have evidence that it was a worker node that was rebooted instead of the master node?

You can also try to submit the jobs with "--properties spark.submit.deployMode=cluster" to run in Spark cluster mode instead of client mode, so that the drivers will run inside YARN containers.

Victor Solakhian

unread,
Sep 27, 2018, 4:42:32 PM9/27/18
to Google Cloud Dataproc Discussions
>In Spark "client" mode, the Spark driver does not cooperate with YARN. If it doesn't cooperate with YARN and there are no memory constraints, then there is no way for YARN to prevent it from using too much memory. Do you have evidence that it was a worker node that was rebooted instead of the master node?

No, I do not know which nodes were rebooted, but you gave me good information regarding Spark's client/cluster mode. I do not know which mode is used by default. I am submitting jobs programmatically using com.google.api.services.dataproc.Dataproc class. First I thought that the driver process is running on the master node, but then I noticed that when I submit a Sprak job with spark.executor.instances=2, I see 3 containers in YARN UI - 2 worker processes with memory equal to 'spark.executor.memory + 0.5 GB' and one (driver ?) process with memory 1 GB. All three containers were running on worker nodes.

I went through all logs I could find for these 8 jobs - from Google Cloud Job Details UI. I still do not know if the driver process had OOM event. All jobs logged the following warnings:

18/09/21 19:36:50 INFO org.spark_project.jetty.server.Server: Started @51880ms
18/09/21 19:36:50 WARN org.apache.spark.util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
18/09/21 19:36:50 WARN org.apache.spark.util.Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
18/09/21 19:36:50 WARN org.apache.spark.util.Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
18/09/21 19:36:50 WARN org.apache.spark.util.Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
18/09/21 19:36:50 INFO org.spark_project.jetty.server.AbstractConnector: Started ServerConnector@ba2953c{HTTP/1.1,[http/1.1]}{0.0.0.0:4044}
18/09/21 19:36:51 WARN org.apache.hadoop.hdfs.ClientContext: Existing client context 'default' does not match requested configuration.  Existing: shortCircuitStreamsCacheSize = 256, shortCircuitStreamsCacheExpiryMs = 300000, shortCircuitMmapCacheSize = 256, shortCircuitMmapCacheExpiryMs = 3600000, shortCircuitMmapCacheRetryTimeout = 300000, shortCircuitCacheStaleThresholdMs = 1800000, socketCacheCapacity = 16, socketCacheExpiry = 3000, shortCircuitLocalReads = false, useLegacyBlockReaderLocal = false, domainSocketDataTraffic = false, shortCircuitSharedMemoryWatcherInterruptCheckMs = 60000, keyProviderCacheExpiryMs = 864000000, Requested: shortCircuitStreamsCacheSize = 256, shortCircuitStreamsCacheExpiryMs = 300000, shortCircuitMmapCacheSize = 256, shortCircuitMmapCacheExpiryMs = 3600000, shortCircuitMmapCacheRetryTimeout = 300000, shortCircuitCacheStaleThresholdMs = 1800000, socketCacheCapacity = 16, socketCacheExpiry = 3000, shortCircuitLocalReads = true, useLegacyBlockReaderLocal = false, domainSocketDataTraffic = false, shortCircuitSharedMemoryWatcherInterruptCheckMs = 60000, keyProviderCacheExpiryMs = 864000000
. . .
18/09/21 19:37:18 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: Submitted application application_1537552951571_0008
. . .
=========== Cloud Dataproc Agent Error ===========

I think OOM happens in the driver process, because I had another issue with failing jobs before, when executor memory was too low. The logs were:

18/09/14 00:24:24 WARN org.apache.spark.HeartbeatReceiver: Removing executor 1 with no recent heartbeats: 120839 ms exceeds timeout 120000 ms
18/09/14 00:24:24 ERROR org.apache.spark.scheduler.cluster.YarnScheduler: Lost executor 1 on qex-stg-3-w-1.c.zoominfo-1.internal: Executor heartbeat timed out after 120839 ms
18/09/14 00:24:24 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 23.0 in stage 0.0 (TID 23, qex-stg-3-w-1.c.zoominfo-1.internal, executor 1): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 120839 ms

After increasing spark.executor.memory value or decreasing spark.executor.cores, everything was working OK.

Thanks again,

Victor




Victor Solakhian

unread,
Sep 27, 2018, 4:56:07 PM9/27/18
to Google Cloud Dataproc Discussions
I found that spark.submit.deployMode=client by default ( and I see that in Spark properties for  the submitted jobs)
  • --deploy-mode: Whether to deploy your driver on the worker nodes (cluster) or locally as an external client (client) (default: client
so the driver process should run on the master node, but then why do I see the third 1 GB container running on a worker node when I have only 2 executors configured?

Regards,

Victor

Dennis Huo

unread,
Sep 27, 2018, 6:19:34 PM9/27/18
to Google Cloud Dataproc Discussions
Right I guess i forgot to mention; I suggested trying mode=cluster because the default is client-mode.

Even in client-mode, Spark has to create a separate ApplicatoinMaster container in YARN; the YARN container-requests are performed via the app master even if the driver is running in the master. In some workloads, for example, the driver might be doing large "collect()" calls and using lots of memory on the driver, while using a smaller ApplicationMaster container in YARN.

In cluster-mode, the driver runs in the app master itself. https://techvidvan.com/tutorials/spark-modes-of-deployment/

Victor Solakhian

unread,
Sep 28, 2018, 2:17:09 PM9/28/18
to Google Cloud Dataproc Discussions
Thanks Dennis. I figured out this too.

What is the advantage of running Spark in cluster mode in Dataproc? In Dataproc case Spark job submitting machine is NOT remote from “spark infrastructure”. It is the master node in the Dataproc cluster. Also, if I understood it right, when Spark job is running in cluster mode, the client process (running on the master node) that launches the Spark application will quit. In our use case there is a JBoss application that submits a Dataproc job using com.google.api.services.dataproc.Dataproc class and it needs to wait until SPark job is completed. In the Spark cluster mode scenario, it will loose connection.

Thanks,

Victor

Dennis Huo

unread,
Sep 28, 2018, 3:49:08 PM9/28/18
to Google Cloud Dataproc Discussions
As you noticed with OOM issues, the main advantage of cluster-mode is that it forces the Driver to cooperate with YARN, so that its resource usage is tightly accounted for. If the driver is also written to be safe to re-run, then YARN's ability to automatically retry containers including the AppMaster means cluster-mode is more resilient against generic node failures.

For most use cases, client-mode is fine, especially when dataproc clusters are being used per-workload, so that a single master isn't being shared by thousands of different people running jobs. It's a more straightforward model, and allows some versatility in resource dependencies that don't play well with YARN. For example, if a Spark task is written to launch an arbitrary subprocess, the memory usage of the subprocess wouldn't cooperate with YARN in either case, and you might want to tune a master node to handle those special dependencies more easily than making it work in YARN.

In Dataproc, running in cluster-mode also means you don't get the stdout/stderr of your driver in your job output, though you should still get that output from the Application Master in Stackdriver under dataproc's "userlogs" category.

I believe even in cluster-mode, the thin launcher program continues to hang until the job is actually complete. In yarn-cluster mode, the launcher that runs on the master node stays in this wait-loop: https://github.com/apache/spark/blob/master/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L1029

In Dataproc, you'll also be able to get the YARN application progress in either mode with the "YarnApplications" field of the Dataproc Job resource: https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.jobs#Job

Dennis Huo

unread,
Sep 28, 2018, 3:51:50 PM9/28/18
to Google Cloud Dataproc Discussions
The behavior of whether to wait for completion when running in cluster-mode is controlled with the Spark property: spark.yarn.submit.waitAppCompletion

Victor Solakhian

unread,
Sep 28, 2018, 5:39:54 PM9/28/18
to Google Cloud Dataproc Discussions
>I believe even in cluster-mode, the thin launcher program continues to hang until the job is actually complete. In yarn-cluster mode, the launcher that runs on the master node stays in this wait-loop

Yes, that is correct, but the problem I have now is that Spark job finishes successfully from my point of view (does what it is designed to do), but the 'thin launcher program' throws an exception. Here is the log of this think client:

18/09/28 18:47:01 INFO com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase: GHFS version: 1.6.8-hadoop2
18/09/28 18:47:02 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at qex-stg-3-m/10.142.0.5:8032
18/09/28 18:47:13 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: Submitted application application_1537558835949_0027
Exception in thread "main" org.apache.spark.SparkException: Application application_1537558835949_0027 finished with failed status
	at org.apache.spark.deploy.yarn.Client.run(Client.scala:1122)
	at org.apache.spark.deploy.yarn.Client$.main(Client.scala:1168)
	at org.apache.spark.deploy.yarn.Client.main(Client.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:775)
	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Job output is complete

and my code in JBoss application that is polling the status of the submitted job, gets an 'ERROR' status. The simplified snippet of the code that polls for the job status:

            com.google.api.services.dataproc.model.Job job;
            Job job = dataproc.projects().regions().jobs().get(projectId, region, jobId).execute();
            String state = job.getStatus().getState();
           
Thanks,

Victor

Dennis Huo

unread,
Sep 28, 2018, 8:41:46 PM9/28/18
to Google Cloud Dataproc Discussions
The exception being thrown indicates that the job really did fail from YARN's perspective. Are you sure your driver program terminated cleanly?  Were you able to diagnose why it failed?

If you visit the YARN UI you should be able to see diagnostics for why the application master indicated a failure.

For a basic example of a working Spark program you could run one of the built-in examples in cluster-mode:

gcloud dataproc jobs submit spark --cluster ${CLUSTER} --class org.apache.spark.examples.SparkPi --jars file:///usr/lib/spark/examples/jars/spark-examples_2.11-2.3.1.jar --properties spark.submit.deployMode=cluster -- 10000


Victor Solakhian

unread,
Oct 1, 2018, 2:44:22 PM10/1/18
to Google Cloud Dataproc Discussions
Hi Dennis,

YARN UI shows:

Diagnostics: Shutdown hook called before final status was reported.

There are two YARN attempts to run Spark job - both FAILED, which created duplicated output files. The driver logs for both attempts look normal for this Spark application and end with

18/10/01 18:23:24 INFO org.spark_project.jetty.server.AbstractConnector: Stopped Spark@75a3c73c{HTTP/1.1,[http/1.1]}{0.0.0.0:0}
18/10/01 18:23:24 INFO com.....: ======= Reporting Processing Status = _SUCCESS
18/10/01 18:23:24 INFO com.....: Created HDFS file (to report completion status: SparkQexCompletionStatus_SUCCESS
18/10/01 18:23:24 INFO org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl: Waiting for application to be successfully unregistered.

I think, it is because my Spark command line application has 'System.exit(0);' in the 'main(String[] args)' method.

Thanks,

Victor
Reply all
Reply to author
Forward
0 new messages