Paging large queries into pandas dataframe

1,307 views
Skip to first unread message

Csaba Ragany

unread,
Jun 6, 2018, 11:42:52 AM6/6/18
to DataStax Python Driver for Apache Cassandra User Mailing List
Hello,

What is the proper way to execute large queries then insert the results into a pandas dataframe?

Now I use the pandas_factory to query results but how can it be combined with fetch_size?

Here is a sample code:

from cassandra.query import SimpleStatement

def pandas_factory(colnames, rows):
   
return pd.DataFrame(rows, columns=colnames)

session
.row_factory = pandas_factory

query
= "SELECT * FROM my_table"
statement = SimpleStatement(query, fetch_size=10)

rslt
= session.execute(
statement, timeout=None)
df = rslt._current_rows

The above code returns only with the first 10 rows of my table. How can I get all my records (let say there are 100 records in my_table)


Thank you!!!

Alan Boudreault

unread,
Jun 6, 2018, 2:09:24 PM6/6/18
to python-dr...@lists.datastax.com
Hello,

You simply need to iterate the ResultSet.

Example:

statement = SimpleStatement("SELECT * FROM users", fetch_size=10)
for user_row in session.execute(statement):
    process_user(user_row)


You can also materialize all rows with something like:

rows = list(session.execute(statement))  # will execute multiple requests to fetch *all* rows (depending on your fetch_size).

More documentation about large queries: https://datastax.github.io/python-driver/query_paging.html

Regards,
Alan

--
You received this message because you are subscribed to the Google Groups "DataStax Python Driver for Apache Cassandra User Mailing List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-driver-user+unsub...@lists.datastax.com.



--

Alan Boudreault
Software Engineer (Drivers) | alan.bo...@datastax.com


Csaba Ragany

unread,
Jun 6, 2018, 3:58:24 PM6/6/18
to DataStax Python Driver for Apache Cassandra User Mailing List
Dear Alan,

Thank you for your answer. I've read that article and tried the copied solution, but does not work:
1) In your code, what does process_user() do?
2) In your code, how can the result be converted into pandas dataframe? Where should I use pandas_factory, if I should?

Of course, if I use the default row_factory, then I can iterate through the fetches, but at that case the results are Row objects and not dataframe...

Thanks,
Csaba

Csaba Ragany

unread,
Jun 6, 2018, 4:07:37 PM6/6/18
to DataStax Python Driver for Apache Cassandra User Mailing List
If I use the official dict_factory then your iteration works, I get all my records as dictionaries. But with the above pandas_factory function, I get only the column names as many times as (number of records // fetch size) + 1

Alan Boudreault

unread,
Jun 6, 2018, 4:48:52 PM6/6/18
to python-dr...@lists.datastax.com
Hey Csaba,

1) This is application-specific... that method is only there to demonstrate the iteration use.
2) I´m afraid that I don´t know enough that pandas lib to give you a working row factory ... I haven´t touched to that lib since years. Sorry.

Here are a few links that might be interesting for you:


Note that since you are using `fetch_size`, you may have to merge multiple Dataframes depending on your need.

Alan

To unsubscribe from this group and stop receiving emails from it, send an email to python-driver-user+unsubscribe@lists.datastax.com.



--

Alan Boudreault
Software Engineer (Drivers) | alan.bo...@datastax.com


--
You received this message because you are subscribed to the Google Groups "DataStax Python Driver for Apache Cassandra User Mailing List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-driver-user+unsub...@lists.datastax.com.

Csaba Ragany

unread,
Jun 7, 2018, 8:04:32 AM6/7/18
to DataStax Python Driver for Apache Cassandra User Mailing List
Dear Alan,

Thanks for your answer!

Finally I modified pandas_factory to return with a list of pandas.DataFrame() because the problem with the original pandas_factory and the PagedResult object in the for cycle was that the for cycle iterated over not on the PagedResult objects that are pandas.DataFrame() objects in my case (and the number of them is (number of records // fetch size) + 1)  but on the  pandas.DataFrame() object itself. That's why I got only the column names of the pandas.DataFrame() without the rows of it.

So the proper pandas_factory in case of setting fetch size is:
from cassandra.query import SimpleStatement

def pandas_factory(colnames, rows):
    res
= []
res
.append(pd.DataFrame(rows, columns=colnames))

    return res

session
.row_factory = pandas_factory

query
= "SELECT * FROM my_table"
statement = SimpleStatement(query, fetch_size=10)

df = pd.DataFrame()

for user_row in session.execute(statement):
df = df.append(user_row, ignore_index=True)


I've run a test with a table having ~130.000 records with the following fetch_size and query execution time:
fetch_size = 1000; query execution time = 0:00:34.374
fetch_size = 5000; query execution time = 0:00:19.854
fetch_size = 10; query execution time = 0:29:56.726
fetch_size = 10000; cassandra.ReadFailure :(




Thanks also for the links, user "ragesz" is me at the linked stackoverflow page :)

Nice day!!
Csaba
To unsubscribe from this group and stop receiving emails from it, send an email to python-driver-user+unsub...@lists.datastax.com.



--

Alan Boudreault
Software Engineer (Drivers) | alan.bo...@datastax.com


--
You received this message because you are subscribed to the Google Groups "DataStax Python Driver for Apache Cassandra User Mailing List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-driver-user+unsub...@lists.datastax.com.

Alan Boudreault

unread,
Jun 7, 2018, 8:48:33 AM6/7/18
to python-dr...@lists.datastax.com
Glad you got something that works! Also, take a look at the numpy deserializer if you are looking for better performance:


Good luck,
Alan

To unsubscribe from this group and stop receiving emails from it, send an email to python-driver-user+unsubscribe@lists.datastax.com.



--

Alan Boudreault
Software Engineer (Drivers) | alan.bo...@datastax.com


--
You received this message because you are subscribed to the Google Groups "DataStax Python Driver for Apache Cassandra User Mailing List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-driver-user+unsubscribe@lists.datastax.com.



--

Alan Boudreault
Software Engineer (Drivers) | alan.bo...@datastax.com


--
You received this message because you are subscribed to the Google Groups "DataStax Python Driver for Apache Cassandra User Mailing List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-driver-user+unsub...@lists.datastax.com.

Csaba Ragany

unread,
Jun 7, 2018, 9:03:08 AM6/7/18
to DataStax Python Driver for Apache Cassandra User Mailing List
Thanks, I will check it, specially how it handles Cassandra map objects / fields (I've struggled getting map fields into pandas before).

I use the following pandas_factory function that converts Cassandra map object into Python dictionary in order to place the dict into pandas.DataFrame() column:

def pandas_factory(colnames, rows):

   
# Convert tuple items of 'rows' into list (elements of tuples cannot be replaced)
    rows = [list(i) for i in rows]

 
# Convert only 'OrderedMapSerializedKey' type list elements into dict
 
for idx_row, i_row in enumerate(rows):

 
for idx_value, i_value in enumerate(i_row):

 
if type(i_value) is OrderedMapSerializedKey:
  rows
[idx_row][idx_value] = dict(rows[idx_row][idx_value])

 
# Place pandas.DataFrame() result into list to be able to iterate over PagedResult if number of records > fetch_size

  res = []
  res
.append(pd.DataFrame(rows, columns=colnames))

 
return res

It would be great if this or something better :) will be integrated into official Cassandra driver in query.py as another option of row_factory
To unsubscribe from this group and stop receiving emails from it, send an email to python-driver-user+unsub...@lists.datastax.com.



--

Alan Boudreault
Software Engineer (Drivers) | alan.bo...@datastax.com


--
You received this message because you are subscribed to the Google Groups "DataStax Python Driver for Apache Cassandra User Mailing List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-driver-user+unsub...@lists.datastax.com.



--

Alan Boudreault
Software Engineer (Drivers) | alan.bo...@datastax.com


--
You received this message because you are subscribed to the Google Groups "DataStax Python Driver for Apache Cassandra User Mailing List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-driver-user+unsub...@lists.datastax.com.

Jim Witschey

unread,
Jun 12, 2018, 12:00:49 PM6/12/18
to python-dr...@lists.datastax.com
You may be able to get the Map->Pandas-compatible-data-structure
result processing you need with the NumpyProtocolHandler mentioned in
the link Alan sent:

https://docs.datastax.com/en/developer/python-driver/3.14/api/cassandra/protocol/#faster-deserialization

Then you may be able to simply use the built-in row factories.

> It would be great if this or something better :) will be integrated into official Cassandra driver in query.py as another option of row_factory

To be clear, we don't currently test Pandas integration on our end,
and don't plan to anytime soon, so we're unlikely to add any explicit
Pandas support to our row factories anytime soon.
Reply all
Reply to author
Forward
0 new messages