We're running into some performance issues trying to make some 3500 queries using execute_concurrent_with_args. We have a medium-sized cluster (~55 nodes running Cassandra 2.0.13). Doing these 7000 queries is taking some ~7s, so roughly 500 queries per second which is not that impressive.
It looks like we're CPU-bound on the client side. The process will eat 100% CPU while doing the queries. If I run 4 of these queries at the same time (4 being the number of cores my test virtual machine have), I get 500 queries per second per process, so the cluster can cope with the load.
I've tried playing with the concurrency argument to execute_concurrent - no difference. Also played with the core_connections_per host - also no difference.
async, libev or gevent - same performance.
All ideas welcome. Using threads or multiprocessing module perhaps? I could also try using PyPy.
Here's the test script I have been using, let me know if there's anything I can improve. The all_query_params.json contains the 3500 arguments to the query.
#!/usr/bin/env python
import os
import sys
import time
import json
import logging
import socket
from gevent import monkey
from cassandra.concurrent import execute_concurrent_with_args
from cassandra.cluster import Cluster
from cassandra.io.libevreactor import LibevConnection
from cassandra.policies import HostDistance
#monkey.patch_all()
logging.basicConfig(stream=sys.stderr, level=logging.INFO,
format="%(asctime)s [%(process)d] %(name)s %(module)s:%(lineno)d %(levelname)s %(message)s")
log = logging.getLogger("test_perf")
logging.getLogger("cassandra").setLevel(logging.WARNING)
protocol_version=2,
sockopts=[(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)],
connection_class=LibevConnection,
metrics_enabled = True)
cluster.set_core_connections_per_host(HostDistance.LOCAL, 6)
session = cluster.connect('production')
query = session.prepare('SELECT timeperiod_start, metric, value FROM "production"."Data_weekly" WHERE key = ? AND timeperiod_start >= ? AND timeperiod_start <= ?')
query_params = json.load(open(os.path.expanduser("~/all_query_params.json")))
log.info("Starting %d queries" % len(query_params))
started_at = time.time()
results = execute_concurrent_with_args(session, query, query_params, concurrency=300)
took = time.time() - started_at
log.info("Got results after %.2fs - %.2f queries/s" % (took, len(query_params) / took))