java.lang.ClassNotFoundException with repartitionByCassandraReplica function

384 views
Skip to first unread message

Alexis

unread,
Apr 13, 2015, 1:34:34 PM4/13/15
to spark-conn...@lists.datastax.com
Hello,

I'm trying to use the new join functionality from the 1.2 version but I get an error with the repartitionByCassandraReplica function in the repl.

I've tried to duplicate the example of the website and created a cassandra table (shopping_history) with a couple of elements :
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.mde

import com.datastax.spark.connector.rdd._
import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.spark.connector._
import com.datastax.driver.core._

case class CustomerID(cust_id: Int)
val idsOfInterest = sc.parallelize(1 to 1000).map(CustomerID(_))
val repartitioned = idsOfInterest.repartitionByCassandraReplica("cim_dev", "shopping_history", 10)
repartitioned.first()

I get this error :

15/04/13 18:35:43 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 2, dev2-cim.aid.fr): java.lang.ClassNotFoundException: $line31.$read$$iwC$$iwC$CustomerID
at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:344)
at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$27.apply(RDD.scala:1098)
at org.apache.spark.rdd.RDD$$anonfun$27.apply(RDD.scala:1098)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


I use spark 1.2.0 with connector 1.2.0 RC 3.
The joinWithCassandraTable function used on idsOfInterest works.


I'm also curious about the differences betwween : joinWithCassandraTable / cassandraTable with a In clause / foreachPartition(withSessionDo) syntax.

Do they all request the data to the local node which acts as a coordinator ?
Is joinWithCassandraTable combine with repartitionByCassandraReplica as efficient as an async query, requesting data only to the local node ? What happen if repartitionByCassandraReplica is not applied ?


Thanks.

Benkhaoua Sidahmed

unread,
Apr 18, 2015, 4:35:21 PM4/18/15
to spark-conn...@lists.datastax.com
how you reslved ths probleme i have same probleme

Alexis

unread,
Apr 20, 2015, 9:40:50 AM4/20/15
to spark-conn...@lists.datastax.com

Russell Spitzer

unread,
Apr 20, 2015, 12:02:31 PM4/20/15
to spark-conn...@lists.datastax.com
I'm going to do some more detailed investigation into this today, can I get confirmation that the same code works when compiled into an application on your machine?


To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.

Craig Ingram

unread,
Apr 20, 2015, 12:36:43 PM4/20/15
to spark-conn...@lists.datastax.com
Have you tried to create a jar with the class defined and add it to jars list when you start spark-shell (i.e. spark-shell --jars foo.jar)? I think the REPL may not support some reflection features at runtime. I know I've run into issues using JSON libraries in the REPL that use reflection to convert to/from JSON in the past.

Craig Ingram

unread,
Apr 20, 2015, 1:05:01 PM4/20/15
to spark-conn...@lists.datastax.com
On Monday, April 20, 2015 at 12:36:43 PM UTC-4, Craig Ingram wrote:
> Have you tried to create a jar with the class defined and add it to jars list when you start spark-shell (i.e. spark-shell --jars foo.jar)? I think the REPL may not support some reflection features at runtime. I know I've run into issues using JSON libraries in the REPL that use reflection to convert to/from JSON in the past.

Update: I got the sample to work from the REPL. Are you sure the schema in Cassandra matches your CustomerID?

Russell Spitzer

unread,
Apr 20, 2015, 5:41:52 PM4/20/15
to spark-conn...@lists.datastax.com
I was able to run the sample again on my REPL and all tests are passing in the IT suite. Can you give some more details on your particular configuration?

Alexis

unread,
Apr 22, 2015, 11:21:49 AM4/22/15
to spark-conn...@lists.datastax.com
My configuration :
spark 1.2.1
scala 2.10.4
spark-cassandra-connector 1.2.0-rc3 build with "sbt assembly" (so scala 2.10.4)
cassandra 2.1.3

I've tried to load the connector with spark.executor.extraClassPath / spark.driver.extraClassPath set in spark-default.conf, SPARK_CLASSPATH and with/without --jars.

I've used the command on the github to create the cassandra table ("CREATE TABLE cim_dev.shopping_history ( cust_id INT, date TIMESTAMP, product TEXT, quantity INT, PRIMARY KEY (cust_id, date, product));") so CustomerID is supposed to match the schema.

I've tried to compile my code on my local computer with Intellij and I get an error :
error: could not find implicit value for parameter rwf: com.datastax.spark.connector.writer.RowWriterFactory[CustomerID]
[INFO] val repartitioned = idsOfInterest.repartitionByCassandraReplica("cim_dev", "shopping_history", 10)

As I said before I have a piece of code using the joinWithCassandraTable function that works in the shell but when I try to compile it I get a diverging implicit expansion for type com.datastax.spark.connector.rdd.reader.RowReaderFactory[R] error.

I'm using mvn with :

<properties>
<spark.version>1.2.1</spark.version>
<scala.version>2.10.4</scala.version>
</properties>

<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>${scala.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.10</artifactId>
<version>1.2.0-rc3</version>
<exclusions>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
</exclusion>
</exclusions>
</dependency>

Russell Spitzer

unread,
Apr 22, 2015, 5:01:58 PM4/22/15
to spark-conn...@lists.datastax.com
Verified that this bug is unrelated to the connector, and we have patched it in the enterprise shell code (sorry if this doesn't help you but i'm still looking for a workaround.) The same code works fine when built into an application (further implicating the shell )

Non-connector repo
case class CustomerID(cust_id: Int)
val idsOfInterest = sc.parallelize(1 to 1000).map(new CustomerID(_)).repartition(1).take(1)
15/04/22 13:58:59 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 2, 10.0.2.15): java.lang.ClassNotFoundException: $line5.$read$$iwC$$iwC$CustomerID
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)

On Wed, Apr 22, 2015 at 1:57 PM, Russell Spitzer <rus...@datastax.com> wrote:
I have a repo for the shell issue in a vanilla spark shell 1.2.1. The issue is not present in our DSE builds which makes me think this is one of the outer class reference bugs in the spark shell. Looking for a workaround. As for why intellij isn't properly finding the implicts it may be because of multiple definitions of that class CustomerID, or definition of the case class within another class.

Russell Spitzer

unread,
Apr 22, 2015, 5:03:17 PM4/22/15
to spark-conn...@lists.datastax.com
I have a repo for the shell issue in a vanilla spark shell 1.2.1. The issue is not present in our DSE builds which makes me think this is one of the outer class reference bugs in the spark shell. Looking for a workaround. As for why intellij isn't properly finding the implicts it may be because of multiple definitions of that class CustomerID, or definition of the case class within another class.
On Wed, Apr 22, 2015 at 8:21 AM, Alexis <alexis....@googlemail.com> wrote:

Russell Spitzer

unread,
Apr 22, 2015, 5:08:10 PM4/22/15
to spark-conn...@lists.datastax.com
One more note, running the shell in local mode works but obviously will not be an adequate workaround. 

Russell Spitzer

unread,
Apr 22, 2015, 5:44:01 PM4/22/15
to spark-conn...@lists.datastax.com
Last email in this thread for now,
Created a bug on the Spark Side for this
https://issues.apache.org/jira/browse/SPARK-7061
Although it may be a duplicate of 
https://issues.apache.org/jira/browse/SPARK-2620

Alexis

unread,
Apr 23, 2015, 8:08:11 AM4/23/15
to spark-conn...@lists.datastax.com
Thanks for your answers Russell.

I've been able to compile the code with the repartitionByCassandraReplica function but still have a diverging implicit expansion error with joinWithCassandraTable function, any hint ?

Should I open a new thread ?
Reply all
Reply to author
Forward
0 new messages