Spark Cassandra Java API partition with server filter - not working in parallel

191 views
Skip to first unread message

Gokul

unread,
Jul 16, 2016, 10:09:36 AM7/16/16
to DataStax Spark Connector for Apache Cassandra
I am performing a count operation using a spark-cassandra connector using a Java API. This involves a server side filter where i want to return the count of a column I am looking for. Map task is executed only by one worker and its very slow. I have added a code snippet on what I am trying to do.

Where I do client side filter in the workers I can see that the partitions are allocated to all executors.

I am running a 4 nodes where Cassandra is running on 3 nodes with 2 replication and running a datastax 3.2.1 version.

All 4 nodes are configured to run spark and with the Master in a non cassandra node. Spark version is 1.5 and spark-cassandra connector version is 2.10.

int count = CassandraJavaUtil.javaFunctions(ctx).cassandraTable("ks", "table1")
.select("column1")
.where("partitionkey1=?", k1)
.where("partitionkey2=?",k2)
.where("partitionkey3 in (?,?,?) new int []{1,2,3})
.map(new Function<CassandraRow, String>(){
public String call(CassandraRow row)
{
return row.getString("column1");
}
}).count();

Can anyone please help me if anything I am trying wrong. I have also tried setting configuration values with no luck

"spark.cassandra.input.split", "10000"
"spark.locality.wait", "5s"
"spark.cassandra.input.split.size_in_mb", "5242880"


Thanks in advance
Gokul

Russell Spitzer

unread,
Jul 16, 2016, 12:24:11 PM7/16/16
to DataStax Spark Connector for Apache Cassandra
This is as expected. A single C* partition will always be contained in the same Spark Partition by the connector. There are two reasons for this
1) It's impossible to know how the C* partition should be divided into parts (If it is an int column to we split INT min to INT Max? )
2) The partition still only exists on a few machines so most of the executors would be reading remotely if we broke it up

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--

Gokul

unread,
Jul 16, 2016, 12:49:30 PM7/16/16
to DataStax Spark Connector for Apache Cassandra
Thanks Russell for the quick response.

I am trying to distribute the data into 3 partitions which is handled in the code to load data. I could also see that the load is almost equally distributed using C* nodetool.

Now that is the reason that in the filter I am using a an "IN" in the where filter with a Int Array of 1,2,3. That means there should be only 3 tokens generated in this case isn't it?

Is it not the a better option. I wanted to avoid a full table scan.
> Russell Spitzer
> Software Engineer
>
>
>
>
>
>
>
> https://github.com/datastax/spark-cassandra-connector/blob/master/doc/FAQ.md
> http://spark-packages.org/package/datastax/spark-cassandra-connector

Russell Spitzer

unread,
Jul 16, 2016, 12:53:19 PM7/16/16
to DataStax Spark Connector for Apache Cassandra

We do the same thing with in causes since distributing a small amount of partitions is also usually a waste. For the more generic case we suggest using the joinWithCassandra method, this will let you join an rdd directly to c*.

Both this and using in will be faster than a full table scan

Gokul

unread,
Jul 18, 2016, 9:09:30 AM7/18/16
to DataStax Spark Connector for Apache Cassandra
Russell

As you suggested I doubled the number of C* partion and made it 6 from 3.
Now I created and RDD of a List of 6 partions and used joinWithCassandraTable.

I could see the load got distributed and 15 tasks got assigned. Tasks gets failed after some time and the job gets aborted because of failures, cassandra threw consistency errors.

I changed the consistency levels and tried every time its threw the same error.

These were the Spark worker errors after setting the consistency levels in read. Is there any Cassandra Configuration I missed out? Currently i have set RF=2 for the KeySpace.

Caused by: com.datastax.driver.core.exceptions.ReadTimeoutException: Cassandra timeout during read query at consistency LOCAL_ONE (1 responses were required but only 0 replica responded)

com.datastax.driver.core.exceptions.ReadFailureException: Cassandra failure during read query at consistency QUORUM (2 responses were required but only 0 replica responded, 1 failed)

Caused by: com.datastax.driver.core.exceptions.ReadFailureException: Cassandra failure during read query at consistency ONE (1 responses were required but only 0 replica responded, 2 failed)

Caused by: com.datastax.driver.core.exceptions.ReadTimeoutException: Cassandra timeout during read query at consistency LOCAL_ONE (1 responses were required but only 0 replica responded)

These were the cassandra errors. And its happening consistently.

ERROR [SharedPool-Worker-2] 2016-07-17 20:06:20,827 StorageProxy.java:1745 Scanned over 100001 tombstones during query 'SELECT * FROM ks.table WHERE paritionc1, partitionc2, partionc3 = 23987, 10, 1 LIMIT 5000' (last scanned row partion key was ((23987, 10, 1), 0909d3c0-8f2e-49dc-a85e-90051ccadc7f)); query aborted

Russell Spitzer

unread,
Jul 18, 2016, 12:06:48 PM7/18/16
to DataStax Spark Connector for Apache Cassandra
Scanned over 100001 tombstones during query is a tombstoning overwhelming exception. You have too many tombstones and the system is warning you. There are settings in the yaml for forcibly changing the limit.

Gokul

unread,
Jul 19, 2016, 4:47:46 PM7/19/16
to DataStax Spark Connector for Apache Cassandra
Hi Russell this worked for me thanks a lot.

I wanted to know if there is a way to create a RowReaderFactory which is not like a JavaBean.

My intention is that given a row the value of a column I select varies. I have many columns and wanted something like a CassandraRow.

Russell Spitzer

unread,
Jul 19, 2016, 4:52:30 PM7/19/16
to DataStax Spark Connector for Apache Cassandra
Then why not use CassandraRow?

Gokul

unread,
Jul 19, 2016, 4:56:26 PM7/19/16
to DataStax Spark Connector for Apache Cassandra
I tried using it but it threw this error

java.lang.IllegalArgumentException: Unsupported type: com.datastax.spark.connector.CassandraRow
at com.datastax.spark.connector.types.TypeConverter$.forCollectionType(TypeConverter.scala:871)
at com.datastax.spark.connector.types.TypeConverter$.forType(TypeConverter.scala:884)
at com.datastax.spark.connector.types.TypeConverter$.forType(TypeConverter.scala:903)
at com.datastax.spark.connector.japi.CassandraJavaUtil.typeConverter(CassandraJavaUtil.java:232)
at com.datastax.spark.connector.japi.CassandraJavaUtil.mapColumnTo(CassandraJavaUtil.java:273)
Reply all
Reply to author
Forward
0 new messages