I have a Cassandra table where one column has 1-3Kb JSON strings.
C* table accessed via joinWithCassandraTable; JSON strings from joined RDD converted to DataFrame and DataFrame is registered as Spark's temp table. GROUP BY aggregation is executed against Spark's temp table. C* table partitioning is organized in a way where rows from same partition will join into one of GROUP BY produced groups, this groups are formed only from rows from the same partition. C* table has 120 partitions each having 800K - 11M rows.
When aggregation query is running - Spark starts many concurrent queries reading from C* table partitions and once JSON strings are big a three problems could happen:
1) C* server could die because of memory problems (I seen GC overlimit exceptions and situations where server was died perhaps because of OOM killer);
2) Some requests could end with following exception:
com.datastax.driver.core.exceptions.UnavailableException: Not enough replicas available for query at consistency LOCAL_ONE (1 required but only 0 alive)
at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:50)
at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:37)
at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:277)
at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:257)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88)
3) Some requests could end with:
com.datastax.driver.core.exceptions.ReadFailureException: Cassandra failure during read query at consistency LOCAL_ONE (1 responses were required but only 0 replica responded, 1 failed)
at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:76)
at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:37)
at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:277)
at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:257)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88)
How can I control number of concurrent queries issued from CassandraJoinRDD?
I found spark.cassandra.input.join.throughput_query_per_sec parameter and when it is set to 6 it helps. But I think if partitions size will grow tomorrow then it could lead to exceptions again. So in my case it is better to control just the number of concurrent queries, is it possible?
PS.
other connector related properties are:
spark.cassandra.read.timeout_ms=3600000
spark.cassandra.connection.timeout_ms=60000
spark.cassandra.connection.keep_alive_ms=900000
spark.cassandra.input.join.throughput_query_per_sec is your concurrency limiter. It is the only threshold on how many queries are being executed at a time. I would warn you that if you are seeing errors with this method it is most likely your server will not be stable under moderate load from outside the cluster as well.
These queries are run using executeAsync so the concurrency itself is managed by the underlying Cassandra Java Driver.
https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraJoinRDD.scala#L129
The parameter above bounds how many executeAsyncs can be running per second per core and limits new executeAsyncs from being run when above the threshold.
--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
Partitions size was varying from 300K to 11M rows, partitions count were 120, total rows number ~102M and biggest column size is approximately varying 1-3KB.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.
--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.
Ok I think we understand the problem now, currently beyond limiting the connection pool i'm not sure there is a way to limit the actual number of inflight requests. I've pinged the driver team to see if they have any ideas and we'll think about workarounds on our end. Thanks for pointing out the issue
On Wed, Jan 18, 2017 at 8:19 AM Russell Spitzer <rus...@datastax.com> wrote:
How large are these partitions? Just want to know what use case we are trying to optimize here, also adding on Jaroslaw
On Tue, Jan 17, 2017 at 9:53 PM Mikhail Tsaplin <tsmi...@gmail.com> wrote:
I checked executeAsync implementation; if I correctly understand - executeAsync implementation at https://github.com/datastax/java-driver/blob/3.x/driver-core/src/main/java/com/datastax/driver/core/SessionManager.java#L143 immediately calls execute method which in case of initialized state of SessionManager immediately calls:
new RequestHandler(this, callback, statement).sendRequest();
The RequestHandler does not counts concurrent calls and HostConnectionPool is the only one limiter of concurrent connections which in case of overlimit will fail future at RequestHandler ( https://github.com/datastax/java-driver/blob/3.x/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java#L329 ) .
Now if CassandraJoinRDD will issue many long running queries with execution time more than 1s - connection pool could be eventually exceeded.
Don't we need to control number of running queries on connector's side?
I will try to decrease number of partitions in C* table (assuming that it would not issue more queries than number of partitions).
Another successful option was switching from CassandraJoinRDD by storing data for single aggregation in separate table and loading it with cassandraTable.
On Wednesday, January 18, 2017 at 6:07:13 AM UTC+6, Russell Spitzer wrote:
> spark.cassandra.input.join.throughput_query_per_sec is your concurrency limiter. It is the only threshold on how many queries are being executed at a time. I would warn you that if you are seeing errors with this method it is most likely your server will not be stable under moderate load from outside the cluster as well.
>
> These queries are run using executeAsync so the concurrency itself is managed by the underlying Cassandra Java Driver.
> https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraJoinRDD.scala#L129
>
> The parameter above bounds how many executeAsyncs can be running per second per core and limits new executeAsyncs from being run when above the threshold.
>
>
> You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
>
> To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.
>
>
> --
>
>
>
>
> Russell Spitzer
> Software Engineer
--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.
Yes there is Spark 503 and spark 507, 503 is in now and 507 will be in the next release
Hi Russell,do you know if this problem already solved in new versions of driver/connector?
2017-01-18 23:29 GMT+07:00 Russell Spitzer <rus...@datastax.com>:
Ok I think we understand the problem now, currently beyond limiting the connection pool i'm not sure there is a way to limit the actual number of inflight requests. I've pinged the driver team to see if they have any ideas and we'll think about workarounds on our end. Thanks for pointing out the issue
On Wed, Jan 18, 2017 at 8:19 AM Russell Spitzer <rus...@datastax.com> wrote:
How large are these partitions? Just want to know what use case we are trying to optimize here, also adding on Jaroslaw
On Tue, Jan 17, 2017 at 9:53 PM Mikhail Tsaplin <tsmi...@gmail.com> wrote:
I checked executeAsync implementation; if I correctly understand - executeAsync implementation at https://github.com/datastax/java-driver/blob/3.x/driver-core/src/main/java/com/datastax/driver/core/SessionManager.java#L143 immediately calls execute method which in case of initialized state of SessionManager immediately calls:
new RequestHandler(this, callback, statement).sendRequest();
The RequestHandler does not counts concurrent calls and HostConnectionPool is the only one limiter of concurrent connections which in case of overlimit will fail future at RequestHandler ( https://github.com/datastax/java-driver/blob/3.x/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java#L329 ) .
Now if CassandraJoinRDD will issue many long running queries with execution time more than 1s - connection pool could be eventually exceeded.
Don't we need to control number of running queries on connector's side?
I will try to decrease number of partitions in C* table (assuming that it would not issue more queries than number of partitions).
Another successful option was switching from CassandraJoinRDD by storing data for single aggregation in separate table and loading it with cassandraTable.
On Wednesday, January 18, 2017 at 6:07:13 AM UTC+6, Russell Spitzer wrote:
> spark.cassandra.input.join.throughput_query_per_sec is your concurrency limiter. It is the only threshold on how many queries are being executed at a time. I would warn you that if you are seeing errors with this method it is most likely your server will not be stable under moderate load from outside the cluster as well.
>
> These queries are run using executeAsync so the concurrency itself is managed by the underlying Cassandra Java Driver.
> https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraJoinRDD.scala#L129
>
> The parameter above bounds how many executeAsyncs can be running per second per core and limits new executeAsyncs from being run when above the threshold.
>
>
> You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
>
> To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
>
>
> --
>
>
>
>
> Russell Spitzer
> Software Engineer
--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
Those are jira tickets in the Spark Cassandra Connector
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.
--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsubscrib...@lists.datastax.com.
--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsubscrib...@lists.datastax.com.
There are still no releases versions with those patches, you have to build off the master or b2. 0 branches to have them included
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.
--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.
--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.
--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.