Bulk write operation features with multiprocessing and generators

57 views
Skip to first unread message

Mic L

unread,
Oct 28, 2014, 1:56:46 AM10/28/14
to couchdb...@googlegroups.com
Hi,
Does CouchDB-Python has bulk write operation feature which executes write operations in batches 
in order to reduces the number of network round trips and increases rite throughput. For example to
iterate the generator until it has yielded 1000 a documents or a size in MB of data, then pause the generator 
while it inserts the batch into CouchDB. Once the batch is inserted CouchDB-Python resumes 
the generator to create the next batch, and continues until all documents are inserted. 


Does CouchDB-Python is able to iterate the generator together with multiprocessing until it has yielded e.g. 1000 
documents or a size in MB of data, then pause the generator while it inserts the batch into CouchDB? Or would I need to do
something like this 

    for docs_group in grouper(iter_something(converted), BULK_SIZE):
        docs_group = [ doc for doc in docs_group if doc is not None ]  # filter out Nones
        sDB.insert(docs_group, ...)

?

This is the full code

    #!/usr/bin/env python
    from __future__ import absolute_import, division, print_function
    from itertools import groupby
    import couch
    from multiprocessing import Process, JoinableQueue
    import csv

    parts = [["Test", "A", "B01", 828288,  1,    7, 'C', 5],
   ["Test", "A", "B01", 828288,  1,    7, 'T', 6],
   ["Test", "A", "B01", 171878,  3,    7, 'C', 5],
   ["Test", "A", "B01", 171878,  3,    7, 'T', 6],
   ["Test", "A", "B01", 871963,  3,    9, 'A', 5],
   ["Test", "A", "B01", 871963,  3,    9, 'G', 6],
   ["Test", "A", "B01", 1932523, 1,   10, 'T', 4],
   ["Test", "A", "B01", 1932523, 1,   10, 'A', 5],
   ["Test", "A", "B01", 1932523, 1,   10, 'X', 6],
   ["Test", "A", "B01", 667214,  1,   14, 'T', 4],
   ["Test", "A", "B01", 667214,  1,   14, 'G', 5],
   ["Test", "A", "B01", 667214,  1,   14, 'G', 6]]


    def iter_something(rows):
         key_names = ['type', 'name', 'sub_name', 'pos', 's_type', 'x_type']
         chr_key_names = ['letter', 'no']
         for keys, group in groupby(rows, lambda row: row[:6]):
              result = dict(zip(key_names, keys))
              result['chr'] = [dict(zip(chr_key_names, row[6:])) for row in group]
              yield result

    class Loading(Process):

        def __init__(self, task_queue):
              Process.__init__(self)
              self.task_queue = task_queue
              server = Server()
              self.server.create('abc')

       def run(self):
             while True:
                  doc = self.task_queue.get()
                  if doc is None:  # None means shutdown
                       self.task_queue.task_done()
                       break
                   else:
                       self.sDB.insert(doc)

    def main():
         num_cores = 2

         tasks = JoinableQueue()

          threads = [Loading(tasks) for i in range(num_cores)]

         for i, w in enumerate(threads):
              w.start()
              print('Thread ' + str(i+1) + ' has started!')

              converters = [str, str, str, int, int, int, str, int]
              with open("/home/mic/tmp/test.txt") as f:
                   reader = csv.reader(f, skipinitialspace=True)
                   converted = ([conv(col) for conv, col in zip(converters, row)] for row in reader)

            # Enqueue jobs
             for i in iter_something(converted):
                  tasks.put(i)

            # Add None to kill each thread
             for i in range(num_cores):
                 tasks.put(None)

            # Wait for all of the tasks to finish
            tasks.join()


    if __name__ == '__main__':
        main()


Thank you in advance.

Mic

Dirkjan Ochtman

unread,
Oct 29, 2014, 8:58:53 AM10/29/14
to couchdb...@googlegroups.com
On Tue, Oct 28, 2014 at 6:56 AM, Mic L <mict...@gmail.com> wrote:
> Does CouchDB-Python is able to iterate the generator together with
> multiprocessing until it has yielded e.g. 1000
> documents or a size in MB of data, then pause the generator while it inserts
> the batch into CouchDB? Or would I need to do
> something like this

We don't currently have anything like this. I'm not sure if it would
make sense to include that, that would depend on the amount of added
complexity. It doesn't seem like very common use case to me...

Cheers,

Dirkjan
Reply all
Reply to author
Forward
0 new messages