Making some 3500 SELECT queries as quickly as possible (driver CPU-bound)

813 views
Skip to first unread message

Erik Forsberg

unread,
May 29, 2015, 10:55:28 AM5/29/15
to python-dr...@lists.datastax.com
Hi!

We're running into some performance issues trying to make some 3500 queries using execute_concurrent_with_args. We have a medium-sized cluster (~55 nodes running Cassandra 2.0.13). Doing these 7000 queries is taking some ~7s, so roughly 500 queries per second which is not that impressive.

It looks like we're CPU-bound on the client side. The process will eat 100% CPU while doing the queries. If I run 4 of these queries at the same time (4 being the number of cores my test virtual machine have), I get 500 queries per second per process, so the cluster can cope with the load.

I've tried playing with the concurrency argument to execute_concurrent - no difference. Also played with the core_connections_per host - also no difference.

async, libev or gevent - same performance.

Based on http://datastax.github.io/python-driver/performance.html I had expected better performance than 500 queries per second. Could it be related to the size of the cluster somehow, i.e 55 node cluster being more to handle than 3-node cluster? 

All ideas welcome. Using threads or multiprocessing module perhaps? I could also try using PyPy.

Here's the test script I have been using, let me know if there's anything I can improve. The all_query_params.json contains the 3500 arguments to the query.


--snip--
#!/usr/bin/env python

import os
import sys
import time
import json
import logging
import socket
from gevent import monkey
from cassandra.concurrent import execute_concurrent_with_args
from cassandra.cluster import Cluster
from cassandra.io.libevreactor import LibevConnection
from cassandra.policies import HostDistance

#monkey.patch_all()

logging.basicConfig(stream=sys.stderr, level=logging.INFO, 
                    format="%(asctime)s [%(process)d] %(name)s %(module)s:%(lineno)d %(levelname)s %(message)s")

log = logging.getLogger("test_perf")
logging.getLogger("cassandra").setLevel(logging.WARNING)


cluster = Cluster(contact_points=['cassandra.example.com'],
                  protocol_version=2,
                  sockopts=[(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)],
                  connection_class=LibevConnection,
                  metrics_enabled = True)

cluster.set_core_connections_per_host(HostDistance.LOCAL, 6)

session = cluster.connect('production')
query = session.prepare('SELECT timeperiod_start, metric, value FROM "production"."Data_weekly" WHERE key = ? AND timeperiod_start >= ? AND timeperiod_start <= ?')

query_params = json.load(open(os.path.expanduser("~/all_query_params.json")))

log.info("Starting %d queries" % len(query_params))
started_at = time.time()
results = execute_concurrent_with_args(session, query, query_params, concurrency=300)
took = time.time() - started_at
log.info("Got results after %.2fs - %.2f queries/s" % (took, len(query_params) / took))

--snap--

Pete Cable

unread,
May 29, 2015, 11:31:05 AM5/29/15
to python-dr...@lists.datastax.com
Erik,

I ran into similar results with the python driver and used this pattern to get pretty significant speedups (at the cost of lots of CPU!)

I have a block of code which creates a multiprocessing pool with an initializer to recreate the cluster/session objects after forking:

def create_execution_pool():
global execution_pool
pool_size = engine.app.config['POOL_SIZE']
execution_pool = Pool(pool_size, initializer=initialize_worker)

And then when I execute thousands of simultaneous queries I break them up into chunks and feed each chunk to the pool:


futures = [] for i in xrange(0, num_points, chunk_size):
futures.append(execution_pool.apply_async(execute_query, (stream_key, cols, times[i:i + chunk_size])))

The function execute_query still uses execute_concurrent_with_args under the hood to keep lots of requests in flight at any given time:


@cassandra_session
@log_timing
def execute_query(stream_key, cols, times, session=None, prepared=None):
query_name = '%s_%s_%s_%s_%s' % (stream_key.stream.name, stream_key.subsite,
stream_key.node, stream_key.sensor, stream_key.method)
if query_name not in prepared:
base = "select %s from %s where subsite='%s' and node='%s' and sensor='%s' and method='%s'" % \
(','.join(cols), stream_key.stream.name, stream_key.subsite,
stream_key.node, stream_key.sensor, stream_key.method)
query = session.prepare(base + ' and time<=? order by method desc limit 1')
prepared[query_name] = query
query = prepared[query_name]
result = list(execute_concurrent_with_args(session, query, times, concurrency=50))
return result

I did have some issues where lots of worker processes all connecting to the cluster simultaneously would sometimes time out (due to large schema size), so I used a multiprocessing bounded semaphore in the initialization code to prevent more than N workers from attempting to connect simultaneously; YMMV.
Hope this helps.

-
Pete


To unsubscribe from this group and stop receiving emails from it, send an email to python-driver-u...@lists.datastax.com.

Adam Holmberg

unread,
May 29, 2015, 11:34:11 AM5/29/15
to python-dr...@lists.datastax.com
Erik,

A lot of people have come up against this. The numbers cited on the performance page are a rough benchmark for total request throughput without much deserialization. Thus they depict a theoretical maximum request throughput. As you noticed, the driver is CPU bound, and things roll off sharply with more read-intensive queries. We have plans to optimize the driver that have been delayed by resource constraints and prioritizing new features. We expect this to come about soon as new resources have been added, and the push for Cassandra 2.2 is coming to a close.

Some further discussion on this can be found in this mailing list thread.

In lieu of those enhancements, your thinking is in the right vein: PyPy runtime is an easy win. Multiprocessing helps break out of the single core (threading doesn't accomplish much in this scenario since we're CPU-bound and limited by the GIL).

In addition, I would recommend using the TokenAware load balancing policy, which will save you a hop for most of your requests (assuming RF << N for your large cluster).

Adam Holmberg

Erik Forsberg

unread,
Jun 1, 2015, 4:43:48 AM6/1/15
to python-dr...@lists.datastax.com
Thanks for very informative answers.

PyPy doesn't seem to be any easy win for us, rather an easy loose. Running the same script, just not using LibevConnection, the results are worse. Maybe I'm doing something suboptimal from a pypy point of view? See test results below:

Changing to:

                  load_balancing_policy = TokenAwarePolicy(RoundRobinPolicy()),

..also gives me worse results.

CPython/pypy/pypy3 results:

CPython 2.7.3, i.e. what I ran before:

forsberg@devshell:~$ python-driver-performance/bin/python python-driver-performance/test_perf.py 
2015-06-01 08:24:43,917 [22717] test_perf test_perf:36 INFO Starting 3672 queries
2015-06-01 08:24:52,087 [22717] test_perf test_perf:40 INFO Got results after 8.17s - 449.49 queries/s

Latest pypy3:

forsberg@devshell:~$ python-driver-performance-pypy3/bin/pypy --version
Python 3.2.5 (b2091e973da69152b3f928bfaabd5d2347e6df46, Oct 24 2014, 12:08:04)
[PyPy 2.4.0 with GCC 4.8.2]
forsberg@devshell:~$ python-driver-performance-pypy3/bin/pypy python-driver-performance/test_perf.py 
2015-06-01 08:24:03,972 [22706] test_perf test_perf:36 INFO Starting 3672 queries
2015-06-01 08:24:28,176 [22706] test_perf test_perf:40 INFO Got results after 24.20s - 151.72 queries/s

Latest pypy2:


forsberg@devshell:~$ python-driver-performance-pypy/bin/pypy --version
Python 2.7.9 (af9d5af2e8e35e39a6dd7266becb8eb08926cab4, Apr 16 2015, 08:49:57)
[PyPy 2.5.1 with GCC 4.9.2]
forsberg@devshell:~$ python-driver-performance-pypy/bin/pypy python-driver-performance/test_perf.py 
2015-06-01 08:25:05,432 [22727] test_perf test_perf:36 INFO Starting 3672 queries
2015-06-01 08:25:15,999 [22727] test_perf test_perf:40 INFO Got results after 10.57s - 347.49 queries/s

Regards,
\EF
Reply all
Reply to author
Forward
0 new messages