Spark Connection Pool

784 views
Skip to first unread message

Steve Severance

unread,
May 16, 2016, 4:55:39 PM5/16/16
to DataStax Spark Connector for Apache Cassandra
Hi,

I am trying to troubleshoot an issue with spark. I am performing a full table scan but I am seeing errors like the following:

com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: 10.0.2.6/10.0.2.6:9042 (com.datastax.driver.core.exceptions.DriverException: Timeout while trying to acquire available connection (you may want to increase the driver number of per-host connections)))
at com.datastax.driver.core.RequestHandler.reportNoMoreHosts(RequestHandler.java:218)
at com.datastax.driver.core.RequestHandler.access$1000(RequestHandler.java:43)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.sendRequest(RequestHandler.java:284)
at com.datastax.driver.core.RequestHandler.startNewExecution(RequestHandler.java:115)
at com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:91)
at com.datastax.driver.core.SessionManager.executeAsync(SessionManager.java:132)
... 23 more

Having dug through the connector code and the driver code I have a couple of questions.

1. It looks like the driver sets the number of connections to 1 for v3. The requests per connection is set to 1024 by default, so it looks like we should be able to run 1024 simultaneous queries to the cassandra node on our spark machine (or 256 on a remote machine). I have confirmed that it fails first on the local machine and then it also fails on a retry to the remote machine. So is this covering up another issue that I am having? Is the connection pool really being starved on the local machine? If so what can I do to prevent this?

2. I have 5 machines and have set the spark executors to 5 and the executor-cores to 1. How many queries should really be in flight at once? Is it not just paging through a result set on a single thread?

3. What other knobs do I have to be able to run a full table scan? I am not worries about impact on read performance as there is very little.

Happy to provide any other needed information.

Russell Spitzer

unread,
May 16, 2016, 5:45:51 PM5/16/16
to DataStax Spark Connector for Apache Cassandra
1) This could be because the connections are timing out of the pool and re connections are being made too frequently. I've noticed this is an issue sometimes but you can fix this by just extending the pool keep alive

spark.cassandra.connection.keep_alive_ms
Set this in your spark-defaults or on the cli with --conf

2)Each executor should be paging through a single result set at a time. Basically each core will be a thread with a very large resultset (since the queries are by default giant range scans). They are prefetched so they should constantly be pulling data back from C*. Increasing the number of the spark cores will increase inflight queries but by default it should be just 1 per executor core.

3) For read speeds you have the options in the reference doc
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md#read-tuning-parameters
input.fetch.size_in_rows
Will directly control the prefetching amount

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.

Steve Severance

unread,
May 16, 2016, 6:17:45 PM5/16/16
to DataStax Spark Connector for Apache Cassandra

Thanks Russell,

1. If there is only a single connection in the connection pool for V3 (and i guess later) than what exactly is timing out? Am I misunderstanding something? I will extend the keep_alive in either case.

3. Should I be increasing or decreasing the amount of data I am reading? I would guess decreasing...


On Monday, May 16, 2016 at 2:45:51 PM UTC-7, Russell Spitzer wrote:
> 1) This could be because the connections are timing out of the pool and re connections are being made too frequently. I've noticed this is an issue sometimes but you can fix this by just extending the pool keep alive
>
> spark.cassandra.connection.keep_alive_ms
> Set this in your spark-defaults or on the cli with --conf
> 2)Each executor should be paging through a single result set at a time. Basically each core will be a thread with a very large resultset (since the queries are by default giant range scans). They are prefetched so they should constantly be pulling data back from C*. Increasing the number of the spark cores will increase inflight queries but by default it should be just 1 per executor core.
>
> 3) For read speeds you have the options in the reference doc
> https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md#read-tuning-parameters
> input.fetch.size_in_rowsWill directly control the prefetching amount

Russell Spitzer

unread,
May 16, 2016, 6:24:03 PM5/16/16
to DataStax Spark Connector for Apache Cassandra
1) The Driver itself has "connections" and the Spark Cassandra Connector has a Pool of which contains sessions and references to driver cluster instances. These are separate since the connector sometimes connects to multiple C* clusters or multiple times to the same cluster with different security configurations. The keep_alive affects the Spark Cassandra Connector Pool not the driver's internal execution pooling mechanisms

3) I wouldn't worry about this just yet, if the keep alive fixes your problem just leave it alone. If not lower the value.

Steve Severance

unread,
May 19, 2016, 11:19:20 PM5/19/16
to DataStax Spark Connector for Apache Cassandra
So the keep alive setting has not fixed my problem. I did make some progress by greatly increasing the amount of memory for both the spark executors and the driver. I also restricted the data being selected to just a few fields. The job does make it further now but I am still getting the Timeout from inside the cassandra driver.

Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: 10.0.2.6/10.0.2.6:9042 (com.datastax.driver.core.exceptions.DriverException: Timeout while trying to acquire available connection (you may want to increase the driver number of per-host connections)))
at com.datastax.driver.core.RequestHandler.reportNoMoreHosts(RequestHandler.java:218)
at com.datastax.driver.core.RequestHandler.access$1000(RequestHandler.java:43)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.sendRequest(RequestHandler.java:284)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution$1.run(RequestHandler.java:406)

Looking at the code there it seems that it is spinning while trying to "borrow" a connection from its internal pool which looks like its just incrementing a counter for the number of active queries. Also once this timeout exception is thrown all (I think) future requests using that connection fail. I don't see anything in the system.log on the node its try to connect to.

One interesting thing is that I believe (haven't proven) that this only happens when a task needs to talk to a cassandra server that is not on the same node. Any thoughts on that?

Also the returned data size seems to play a role. I can table scan when just pulling out a single or a couple of columns. However one of my columns is a large (on average say 60K, but I have not confirmed this) blob. Might this be causing an issue?

I am happy to share additional details if you need them.

Russell Spitzer

unread,
May 19, 2016, 11:57:01 PM5/19/16
to DataStax Spark Connector for Apache Cassandra
Sounds like it may be causing GC issues on the C* nodes?  Large columns could be causing some GC problems on your C* nodes since they make rather large objects on the heap, so tuning GC may help with that... I would make sure to check the C* instances while this is running. My guess is that you only see it querying "different" nodes because it has failed at the local machine and then every other machine in the cluster.

From my reading of the code this should only happen after the Retry policy is exhausted, it may help to turn on logging for the driver and see if you get a bunch of retries before finally failing. 

https://github.com/datastax/java-driver/blob/3.0/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java#L364-L365

I'll ping the driver team and see what they think as well

Steve Severance

unread,
May 20, 2016, 9:43:24 AM5/20/16
to DataStax Spark Connector for Apache Cassandra
So I got it to run. The change I made for this run was enabling connection.compression. Here is my config in case it helps someone else in the future.

set("spark.cassandra.connection.timeout_ms","60000").
set("spark.executor.memory","16g").
set("spark.driver.memory","4gb").
set("spark.cassandra.input.fetch.size_in_rows", "250").
set("spark.cassandra.connection.keep_alive_ms","900000").
set("spark.cassandra.connection.compression","SNAPPY")

Thanks for all your help.
> Russell Spitzer
> Software Engineer
>
>
>
>
>
>
>
> https://github.com/datastax/spark-cassandra-connector/blob/master/doc/FAQ.md
> http://spark-packages.org/package/datastax/spark-cassandra-connector
Message has been deleted

Qi Songtao

unread,
Nov 3, 2017, 12:36:50 PM11/3/17
to DataStax Spark Connector for Apache Cassandra
BTW,
Cassandra spark connector 2.0+ introduced a new configuration parameter

connection.connections_per_executor_max:
Maximum number of connections per Host set on each Executor JVM. Will be updated to DefaultParallelism / Executors for Spark Commands. Defaults to 1 if not specifying and not in a Spark Env
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md

Which likely (from the documentation, I haven't check the code yet) to be corresponding to the Cassandra driver connection setting
setMaxConnectionsPerHost()
http://docs.datastax.com/en/developer/java-driver/2.1/manual/pooling/

Which will also be helpful to resolve this kind of issues.

Reply all
Reply to author
Forward
0 new messages