pycassa - Cassandra : Python dictionary as input to Cassandra

1,172 views
Skip to first unread message

Sri

unread,
Jan 29, 2014, 9:52:28 PM1/29/14
to pycassa...@googlegroups.com
Hi,

Request you to provide some ideas on pycassa for the below scenario.

Basically I have input data in a table (say SQL). Now I want to interchange the rows into columns and vice-versa and then store this new strucutre into cassandra. So basically here the new rows becomes the row keys.

E.g:

Input table:

BID review_counts Stars City
B1 5             7    Singapore
B2              6                9    Melbourne


Output Required in Cassandra as below:

B1 B2

review_counts   5            6
Stars 7            9
City              Singapore  Melbourne

So here review_counts,Stars and City become the row keys.


Now I am able to get two lists as below containing the new rowkeys and columns

cols=[1,2]
rows=[review_counts,Stars,City]

Now how do I frame these two lists in terms of a python dictionary and then pass this dictionary into a new cassandra family using pycassa.


Tyler Hobbs

unread,
Jan 31, 2014, 8:32:30 PM1/31/14
to pycassa...@googlegroups.com
You would do something like this:

columnfamily.insert("Stars", {"B1": 7, "B2": 9})
columnfamily.insert("review_counts", {"B1": 5, "B2": 6})
columnfamily.insert("City", {"B1": "Singapore", "B2": "Melbourne"})

However, the exact data model that would work for you depends on how you want to query your data, so without those details, I can't suggest anything else.


--
You received this message because you are subscribed to the Google Groups "pycassa-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to pycassa-discu...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.



--
Tyler Hobbs
DataStax

Sri

unread,
Feb 1, 2014, 8:28:18 AM2/1/14
to pycassa...@googlegroups.com
Hi Tyler,

Thanks for your response. Basically the tables which I showed are quite small. But I would want to do it for large SQL tables .

I have written the following code snippet. Basically here for each rowkey containing all the columns (two-for loops are used) I am creating a tuple of the form (key,dict) and then using this tuple values into the insert statement.

But I am NOT sure on the efficiency of the code in future for large SQL tables. 

Is there any work around on this ??

Thanks

 

===========================================================================================================
def bulk_insert(a):
    print str(a[0]),"==>",str(a[1])
    col_faml_insert.insert(a[0],a[1])

def query_cassa(newcol,newrow):
    try:
        #print "inside query"
        #print "col==",col
        print col_faml_fetch.get(newcol)[newrow]
        return col_faml_fetch.get(newcol)[newrow]
    except:
        print "failed in query part"
====================================================================================================================

#Main

for rowkey in rowkeys:
    row_dict = {}
    for col in cols:
        #row_dict[col] = "null"
        row_dict[col] = str(query_cassa(col,rowkey))
        a = (rowkey,row_dict)
    bulk_insert(a)
======================================================================================================================

Tyler Hobbs

unread,
Feb 4, 2014, 7:46:18 PM2/4/14
to pycassa...@googlegroups.com
I'm sorry, but I can't really understand what you're trying to do.  Can you try to demonstrate it in a different way?

Sri

unread,
Feb 5, 2014, 10:57:30 PM2/5/14
to pycassa...@googlegroups.com
Hi Tyler,

I am sorry for unable to phrase my sentences properly. 

I am a complete newbie in cassandra.

Right now I have managed to get my code working for my problem scenario on a relatively small set of data. 

However when I try to do multiget on 1 million rowkeys it fails with a message as "Retried 6 times. Last failure was timeout: timed out" . 

e.g: colfam.multiget([rowkey1,...........,rowkey_Million])

Basically the column family I am trying to query has 1 million records with 28 columns each.

Here I am running a 2-node cassandra cluster on a single ubuntu virtual-box with system config as

RAM: 3GB
Processor: 1CPU

So how do I manage to handle multiget on so many rowkeys efficiently and then do bulk insert of the same into another cassandra column family??

Thanks in advance :) :)

Tyler Hobbs

unread,
Feb 6, 2014, 1:24:52 PM2/6/14
to pycassa...@googlegroups.com

On Wed, Feb 5, 2014 at 9:57 PM, Sri <srina...@gmail.com> wrote:

However when I try to do multiget on 1 million rowkeys it fails with a message as "Retried 6 times. Last failure was timeout: timed out" . 

e.g: colfam.multiget([rowkey1,...........,rowkey_Million])

multiget is a very expensive operation for Cassandra.  Each row in the multiget can require a couple of disk seeks.  pycassa automatically splits the query up into smaller chunks, but this is still really expensive.

If you're trying to read the whole column family, use get_range() instead.

If you're just trying to read a subset of the rows in that column family (based on some attribute) and you need to do this frequently, you need to use a different data model.

Since you're new to this, I would spend some time learning about data modeling in Cassandra: http://wiki.apache.org/cassandra/DataModel.  (Note: most of these examples will use CQL3, which pycassa does not support.  If you want to work with CQL3 instead, use another driver that I wrote: https://github.com/datastax/python-driver)


--
Tyler Hobbs
DataStax

Srinath Achanta

unread,
Feb 6, 2014, 10:10:46 PM2/6/14
to pycassa...@googlegroups.com
Thank you so much Tyler :) :)

Really appreciate your guidance.

--
​Sri​



--
You received this message because you are subscribed to a topic in the Google Groups "pycassa-discuss" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/pycassa-discuss/tkxOteA6ffI/unsubscribe.
To unsubscribe from this group and all its topics, send an email to pycassa-discu...@googlegroups.com.

Srinath Achanta

unread,
Feb 7, 2014, 3:51:54 AM2/7/14
to pycassa...@googlegroups.com
Hi Tyler,

I am trying to use get_range for this scenario as suggested by you. However I get an error for the below code.

Is it because I am using random partitioner ?? I am confused at this point. Request your help.

Code Snippet:
===================================================
import pycassa as pc
pool = pc.ConnectionPool('airlines',['localhost:9160'])
read_colfam = pc.ColumnFamily(pool,'flights')
res = read_colfam.get_range(start='A10',finish='A20')

for key, columns in res:
    print key, '=>', columns

===================================================

Error: 

pycassa.cassandra.ttypes.InvalidRequestException: InvalidRequestException(why="start key's token sorts after end key's token. this is not allowed; you probably should not specify end key at all excep ... (truncated)




Thanks

--
​Sri

Tyler Hobbs

unread,
Feb 7, 2014, 11:37:52 AM2/7/14
to pycassa...@googlegroups.com
I'll quote the docs here http://pycassa.github.io/pycassa/api/pycassa/columnfamily.html#pycassa.columnfamily.ColumnFamily.get_range:

The key range begins with start and ends with finish. If left as empty strings, these extend to the beginning and end, respectively. Note that if RandomPartitioner is used, rows are stored in the order of the MD5 hash of their keys, so getting a lexicographical range of keys is not feasible.


Basically, that means that with Random/Murmur3Partitioner, you can't do get_range(start='A10', finish='A20').  You can only really do a full get_range().  However, pycassa will transparently break that query up into pages for you, so you don't have to worry about fetching too much at once.


--
You received this message because you are subscribed to the Google Groups "pycassa-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to pycassa-discu...@googlegroups.com.

For more options, visit https://groups.google.com/groups/opt_out.



--
Tyler Hobbs
DataStax

Srinath Achanta

unread,
Feb 9, 2014, 2:33:23 AM2/9/14
to pycassa...@googlegroups.com
Hi Tyler,

Thanks for your inputs.

I have few queries as follows

As I am currently using Random Partitioner could you suggest any suitable approach for my problem scenario.

As you know I have 1 million row keys , initially I had a plan of fetching data from a range of rowkeys using get_range(start=rowkey1,finish=rowkeyN) then do some operation on the data and then bulk insert the new data into a new cassandra column family. Repeat the same thing for different range of rowkeys until all of 1million rowkeys have been read.

Since get_range(start,end) does not work for my scenario because of Random Partitioner , I was wondering if fetching 1 million rowkeys using get_range() at once and then doing bulk insert into new cassandra table will be efficient enough without seeing the below timout message.

<'pycassa.pool.MaximumRetryException'>
Retried 6 times. Last failure was timeout: timed out.

In otherwords how do I slice a set of rowkeys so as to perform operations and then do bulk insert of the new data into a new cassandra column family.

Also I am worried for large data in future for more than a million rowkeys.

Once again thanks for your guidance so far.

--
​Sri

Tyler Hobbs

unread,
Feb 11, 2014, 3:37:20 PM2/11/14
to pycassa...@googlegroups.com
On Sun, Feb 9, 2014 at 1:33 AM, Srinath Achanta <srina...@gmail.com> wrote:


As I am currently using Random Partitioner could you suggest any suitable approach for my problem scenario.

As you know I have 1 million row keys , initially I had a plan of fetching data from a range of rowkeys using get_range(start=rowkey1,finish=rowkeyN) then do some operation on the data and then bulk insert the new data into a new cassandra column family. Repeat the same thing for different range of rowkeys until all of 1million rowkeys have been read.

Since get_range(start,end) does not work for my scenario because of Random Partitioner , I was wondering if fetching 1 million rowkeys using get_range() at once and then doing bulk insert into new cassandra table will be efficient enough without seeing the below timout message.


You can still use get_range() with no start and end.  It will not fetch all 1 million rows at once; by default it fetches them in chunks of 1024.  You could easily do something like this:


with cf_to_write_to.batch(queue_size=100) as batch:
    for key, columns in cf_to_read_from.get_range():
        new_key, new_columns = my_transformation(key, columns)
        batch.insert(new_key, new_columns)  # this will send automatically if 100 mutations are queued

 
<'pycassa.pool.MaximumRetryException'>
Retried 6 times. Last failure was timeout: timed out.

In otherwords how do I slice a set of rowkeys so as to perform operations and then do bulk insert of the new data into a new cassandra column family.

Also I am worried for large data in future for more than a million rowkeys.

Try get_range()'s default of 1024 by doing get_range(buffer_size=100).  The docs describe this a bit: http://pycassa.github.io/pycassa/api/pycassa/columnfamily.html#pycassa.columnfamily.ColumnFamily.get_range.


--
Tyler Hobbs
DataStax

Sri

unread,
Feb 13, 2014, 11:16:31 PM2/13/14
to pycassa...@googlegroups.com
Hi Tyler,

Thanks for your inputs and guidance. Now my code works beautifully :) :)

Right now I am trying to improve efficiency of my code by implementing multiprocessing module. Earlier my code takes about 2 hours to process 2.5 million rowkeys . However the execution time still seems to be the same even after implementing multiprocessing module. Basically I am using 2 processes to handle each of the column family independently and do multiple writes with same rowkeys into a new cassandra column family as each column family has exactly same set of columns.

(As explained before , I am using column names as rowkeys while inserting both colfam data into new cassandra columnfamily)

While printing in the debug I can see the two tables data printing altternatively, however when I try to read the new colfam data I can only see one columnfamily data is being inserted during the execution period.

My code snippet is as below..Currently  I use pool.map()......so should I use pool.map() or pool.apply_async() for my scenario

    count = multiprocessing.cpu_count()
    print "Total cores = ",count
    pool = multiprocessing.Pool(processes=count)
    pool.map(user_input,[('KS1','localhost:9160','cassa1','flight88'),('KS1','localhost:9160','cassa1','flight96')])


Thanks again for your guidance so far.


--
Sri

Tyler Hobbs

unread,
Feb 14, 2014, 11:57:11 AM2/14/14
to pycassa...@googlegroups.com
I can't tell from your code snippet if this is happening or not, but make sure that you create one ConnectionPool and new ColumnFamily instances for each process.  You do not want to share either of those across multiple processes.  If that's not the problem, I'm not sure what's going on.


--
You received this message because you are subscribed to the Google Groups "pycassa-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to pycassa-discu...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.



--
Tyler Hobbs
DataStax

Sri

unread,
Feb 16, 2014, 10:19:56 PM2/16/14
to pycassa...@googlegroups.com
Hi Tyler,

Basically I have two column families say A and B. Each of these CF's are assigned to individual processes.

A -> Process A
B -> Process B

However both these processes must write to a single column family C. 

The approach what I have taken is that these two processes(A & B) create two separate connection pools to connect to same cassandra instance for reading the CF's A & B. 

However the performance seems to be the same even after using multiprocessing approach. In other words with and without multiprocessing approach the program runs for about 2hrs for 1million data points in each of CF A & B.

So my doubt is if I am using the right python multiprocessing function.

Once again thanks Tyler for your guidance and inputs so far :) :)

--
Sri

Tyler Hobbs

unread,
Feb 18, 2014, 2:58:58 PM2/18/14
to pycassa...@googlegroups.com

On Sun, Feb 16, 2014 at 9:19 PM, Sri <srina...@gmail.com> wrote:

The approach what I have taken is that these two processes(A & B) create two separate connection pools to connect to same cassandra instance for reading the CF's A & B. 

However the performance seems to be the same even after using multiprocessing approach. In other words with and without multiprocessing approach the program runs for about 2hrs for 1million data points in each of CF A & B.

So my doubt is if I am using the right python multiprocessing function.

A couple of things to check:
* Are the python processes running on a weak server?  Maybe you're maxing out the CPU there.
* Is Cassandra itself getting overloaded?  Check disk utilization and cpu consumption (run "iostat -x 2 10", ignore the first output)


--
Tyler Hobbs
DataStax

Sri

unread,
Feb 21, 2014, 4:40:45 AM2/21/14
to pycassa...@googlegroups.com
Hi Tyler,

I have done line profiling of my bulk_insert function and found out that batch insert statement is taking 92% of total time taken to execute this function.(shaded in yellow below)

Also, the batch insert is NOT happening in terms of queue_size specified say 5000. Batch insert happens after the outer for loop exits say after 450,000.

As far as I know inserting 2 million rows into cassandra should not take more than 3-4 minutes on a high end machine with 12 cores.

I am using 2 node cassandra 2.0.3 cluster.

Request you to kindly share your inputs if any.

Function: bulk_insert at line 88
Total time: 268.23 s

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
    88                                           @profile
    89                                           def bulk_insert(parent_dict,colfam_write,header):
    90         5            5      1.0      0.0      try:
    91         5          145     29.0      0.0   print "Inside bulk_insert"
    92                                           #print parent_dict,"iiiiiiiiiiiiiii"
    93         5            4      0.8      0.0   bulkcount = 0
    94                                           #with colfam_write.batch(queue_size=c.BATCHSIZE) as b:
    95         5          444     88.8      0.0   with colfam_write.batch() as b:
    96         5           60     12.0      0.0      print "Initiating batch insert into master cassandra table."
    97                                              
    98    229892       319352      1.4      0.1      for pkeys,pvalues in parent_dict.iteritems():
    99    229887       281611      1.2      0.1   row_dict = {}
   100    229887       229665      1.0      0.1   child_dict = pvalues
   101   2298870      6628015      2.9      2.5   for ckeys in child_dict:
   102                                              #print "key = ",ckeys
   103   2068983      8322508      4.0      3.1      row_dict[header+"-"+pkeys.lower()] = str(child_dict[ckeys])
   104                                              #print "value =",row_dict    
   105   2068983      2407904      1.2      0.9      a = (ckeys,row_dict)
   106   2068983    247028097    119.4     92.1      b.insert(a[0],a[1])
   107   2068983      3011982      1.5      1.1      bulkcount = bulkcount + 1
   108                                          
   109         5          131     26.2      0.0   print "Batch insert complete for ",bulkcount,"rows.."        
   110                                               except Exception as e:
   111                                           print "Error in bulk_insert  ",sys.exc_info()[0]
   112                                           print "Error : ",e
   113                                           sys.exit(0)

Tyler Hobbs

unread,
Feb 21, 2014, 12:48:52 PM2/21/14
to pycassa...@googlegroups.com
On Fri, Feb 21, 2014 at 3:40 AM, Sri <srina...@gmail.com> wrote:

I have done line profiling of my bulk_insert function and found out that batch insert statement is taking 92% of total time taken to execute this function.(shaded in yellow below)

calling b.insert() will sometimes result in the queued operations being sent (every queue_size), so that makes sense.
 

Also, the batch insert is NOT happening in terms of queue_size specified say 5000. Batch insert happens after the outer for loop exits say after 450,000.

How did you verify that?
 

As far as I know inserting 2 million rows into cassandra should not take more than 3-4 minutes on a high end machine with 12 cores.

Agreed, a few minutes is a reasonable estimate.


--
Tyler Hobbs
DataStax

Sri

unread,
Feb 22, 2014, 1:01:03 AM2/22/14
to pycassa...@googlegroups.com
HI Tyler,


Also, the batch insert is NOT happening in terms of queue_size specified say 5000. Batch insert happens after the outer for loop exits say after 450,000.

How did you verify that?

I have set up a counter as you can see in my code  " bulkcount = bulkcount + 1" . Based on this I am able to get to this number.

Also I am worried that batch insert is taking a lot of time . For instance for processing 2 million rows it is taking close to 10 mins rather than a minute or two and I find that bulk insert is taking most of the time as shown in the line profiling.

If I am not wrong  writes must happen a lot faster in cassandra .

Thank you.

Tyler Hobbs

unread,
Feb 25, 2014, 6:52:04 PM2/25/14
to pycassa...@googlegroups.com
On Sat, Feb 22, 2014 at 12:01 AM, Sri <srina...@gmail.com> wrote:

I have set up a counter as you can see in my code  " bulkcount = bulkcount + 1" . Based on this I am able to get to this number.

That doesn't measure when the queued data actually gets sent to Cassandra, it just counts when you add something to the queue.
 

Also I am worried that batch insert is taking a lot of time . For instance for processing 2 million rows it is taking close to 10 mins rather than a minute or two and I find that bulk insert is taking most of the time as shown in the line profiling.

That's about 3.5k inserts per second, which isn't bad for a single thread in Python.  Multithreading or (ideally) multiprocessing are your best bet for scaling up client performance.


--
Tyler Hobbs
DataStax
Reply all
Reply to author
Forward
0 new messages