kundan Kumar
unread,Jul 12, 2023, 6:34:31 AM7/12/23Sign in to reply to author
Sign in to forward
You do not have permission to delete messages in this group
Either email addresses are anonymous for this group or you need the view member email addresses permission to view the original message
to ArangoDB
I am writing a data pipeline to ingest data from arango db to Bigquery. I have used arango spark data source.
here is the code :
df: DataFrame = spark.read.format("com.arangodb.spark") \
.option("query", query) \
.options(**arango_connection) \
.schema(doc_schema).load()
df.count()
df.write.format('bigquery').mode("append") \
.option('table', bq_table) \
.option("project", bq_project) \
.option("dataset", bq_dataset) \
.option("writeMethod", "direct") \
.option('credentialsFile', 'path/of/gcp-credential') \
.save()
this code is working if my arango collection has fewer documents, like able to write 10000 documents from arango db to Bigquery.
But my collection will have more than 10, 00, 000 document. When testing with above 50, 000 document spark job is failing.
error getting
Caused by: org.apache.spark.util.TaskCompletionListenerException: Response: 404, Error: 1600 - cursor not found
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (BS1ZN93 executor driver): org.apache.spark.util.TaskCompletionListenerException: Response: 404, Error: 1600 - cursor not found
at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:254)
at org.apache.spark.TaskContextImpl.invokeTaskCompletionListeners(TaskContextImpl.scala:144)
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:137)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:180)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Suppressed: com.arangodb.ArangoDBException: Response: 404, Error: 1600 - cursor not found
at com.arangodb.internal.util.ResponseUtils.checkError(ResponseUtils.java:53)
at com.arangodb.http.HttpCommunication.execute(HttpCommunication.java:86)
at com.arangodb.http.HttpCommunication.execute(HttpCommunication.java:66)
at com.arangodb.http.HttpProtocol.execute(HttpProtocol.java:44)
at com.arangodb.internal.ArangoExecutorSync.execute(ArangoExecutorSync.java:60)
at com.arangodb.internal.ArangoExecutorSync.execute(ArangoExecutorSync.java:48)
at com.arangodb.internal.ArangoDatabaseImpl$1.close(ArangoDatabaseImpl.java:219)
at com.arangodb.internal.cursor.ArangoCursorImpl.close(ArangoCursorImpl.java:69)
at org.apache.spark.sql.arangodb.datasource.reader.ArangoQueryReader.close(ArangoQueryReader.scala:53)
at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$advanceToNextIter$1(DataSourceRDD.scala:94)
at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$advanceToNextIter$1$adapted(DataSourceRDD.scala:89)
at org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:132)
at org.apache.spark.TaskContextImpl.$anonfun$invokeTaskCompletionListeners$1(TaskContextImpl.scala:144)
at org.apache.spark.TaskContextImpl.$anonfun$invokeTaskCompletionListeners$1$adapted(TaskContextImpl.scala:144)
at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:199)
... 10 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2284)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2328)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1019)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
at org.apache.spark.rdd.RDD.collect(RDD.scala:1018)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:354)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:382)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:354)
at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:3459)
at org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:3458)
at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4167)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4165)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4165)
at org.apache.spark.sql.Dataset.count(Dataset.scala:3458)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.util.TaskCompletionListenerException: Response: 404, Error: 1600 - cursor not found
at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:254)
at org.apache.spark.TaskContextImpl.invokeTaskCompletionListeners(TaskContextImpl.scala:144)
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:137)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:180)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
at org.apac
How can this be resolve?