TransportException : Connection has been closed, when a node go down.

886 views
Skip to first unread message

Etienne Couritas

unread,
Sep 2, 2016, 2:11:28 PM9/2/16
to DataStax Java Driver for Apache Cassandra User Mailing List
When a node goes down, I get this exception.
According to the spark cassandra connector team it could be produced by a shut down of the coordinator.
It happens on a query with a IN clause, it's a query that brings back a lot of data (until 2M rows).

If the node goes down, shouldn't the driver try another replica ?


Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1922)
        at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:37)
        at ConsolidateAgg$.apply(ConsolidateAgg.scala:78)
        at Main$.main(Main.scala:12)
        at Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: com.datastax.driver.core.exceptions.TransportException: [/1.11.60.162] Connection has been closed
        at com.datastax.driver.core.exceptions.TransportException.copy(TransportException.java:38)
        at com.datastax.driver.core.exceptions.TransportException.copy(TransportException.java:24)
        at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
        at com.datastax.driver.core.ArrayBackedResultSet$MultiPage.prepareNextRow(ArrayBackedResultSet.java:308)
        at com.datastax.driver.core.ArrayBackedResultSet$MultiPage.isExhausted(ArrayBackedResultSet.java:265)
        at com.datastax.driver.core.ArrayBackedResultSet$1.hasNext(ArrayBackedResultSet.java:136)
        at com.datastax.spark.connector.rdd.reader.PrefetchingResultSetIterator.hasNext(PrefetchingResultSetIterator.scala:21)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:396)
        at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
        at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:193)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
        at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:189)
        at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: com.datastax.driver.core.exceptions.TransportException: [/1.11.60.162] Connection has been closed
        at com.datastax.driver.core.Connection$ConnectionCloseFuture.force(Connection.java:1124)
        at com.datastax.driver.core.Connection$ConnectionCloseFuture.force(Connection.java:1109)
        at com.datastax.driver.core.CloseFuture$Forwarding.force(CloseFuture.java:90)
        at com.datastax.driver.core.SessionManager.onDown(SessionManager.java:478)
        at com.datastax.driver.core.Cluster$Manager.onDown(Cluster.java:1809)
        at com.datastax.driver.core.Cluster$Manager.access$1300(Cluster.java:1283)
        at com.datastax.driver.core.Cluster$Manager$NodeRefreshRequestDeliveryCallback$4.runMayThrow(Cluster.java:2724)
        at com.datastax.driver.core.ExceptionCatchingRunnable.run(ExceptionCatchingRunnable.java:32)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)

Kundan Kumar

unread,
Jul 16, 2019, 5:56:57 AM7/16/19
to DataStax Java Driver for Apache Cassandra User Mailing List, e.cou...@gmail.com


Hi All,

I am also getting the same exception as below. Is there any clue regarding this?

Alex Ott

unread,
Jul 16, 2019, 8:50:23 AM7/16/19
to java-dri...@lists.datastax.com, e.cou...@gmail.com
If you're using Spark, then don't do queries with IN - it's better to create a RDD with data in IN, and perform joinWithCassandraTable.
If you're using Java driver directly, convert it into multiple parallel requests - for every partition individually.

--
To unsubscribe from this group and stop receiving emails from it, send an email to java-driver-us...@lists.datastax.com.


--
With best wishes,                    Alex Ott
http://alexott.net/
Twitter: alexott_en (English), alexott (Russian)

sha p

unread,
May 26, 2020, 1:59:10 PM5/26/20
to DataStax Java Driver for Apache Cassandra User Mailing List, e.cou...@gmail.com
@Alex,
any sample for the same please?


On Tuesday, July 16, 2019 at 6:20:23 PM UTC+5:30, Alex Ott wrote:
If you're using Spark, then don't do queries with IN - it's better to create a RDD with data in IN, and perform joinWithCassandraTable.
If you're using Java driver directly, convert it into multiple parallel requests - for every partition individually.

To unsubscribe from this group and stop receiving emails from it, send an email to java-dri...@lists.datastax.com.

Alex Ott

unread,
May 27, 2020, 2:23:16 AM5/27/20
to DataStax Java Driver for Apache Cassandra User Mailing List, e.cou...@gmail.com
Sample for what? RDD - just do parallelize on the input list to convert it into RDD, then use joinWithCassandra: https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md#using-joinwithcassandratable. I have some examples for both Scala & Java here: https://github.com/alexott/dse-playground/tree/master/spark-oss/src/main

--
You received this message because you are subscribed to the Google Groups "DataStax Java Driver for Apache Cassandra User Mailing List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to java-driver-us...@lists.datastax.com.
To view this discussion on the web visit https://groups.google.com/a/lists.datastax.com/d/msgid/java-driver-user/53f4ec7c-8273-44d3-8bab-a19f9582f129%40lists.datastax.com.
Reply all
Reply to author
Forward
0 new messages