Hi,
Does CouchDB-Python has bulk write operation feature which executes write operations in batches
in order to reduces the number of network round trips and increases rite throughput. For example to
iterate the generator until it has yielded 1000 a documents or a size in MB of data, then pause the generator
while it inserts the batch into CouchDB. Once the batch is inserted CouchDB-Python resumes
the generator to create the next batch, and continues until all documents are inserted.
Does CouchDB-Python is able to iterate the generator together with multiprocessing until it has yielded e.g. 1000
documents or a size in MB of data, then pause the generator while it inserts the batch into CouchDB? Or would I need to do
something like this
for docs_group in grouper(iter_something(converted), BULK_SIZE):
docs_group = [ doc for doc in docs_group if doc is not None ] # filter out Nones
sDB.insert(docs_group, ...)
?
This is the full code
#!/usr/bin/env python
from __future__ import absolute_import, division, print_function
from itertools import groupby
import couch
from multiprocessing import Process, JoinableQueue
import csv
parts = [["Test", "A", "B01", 828288, 1, 7, 'C', 5],
["Test", "A", "B01", 828288, 1, 7, 'T', 6],
["Test", "A", "B01", 171878, 3, 7, 'C', 5],
["Test", "A", "B01", 171878, 3, 7, 'T', 6],
["Test", "A", "B01", 871963, 3, 9, 'A', 5],
["Test", "A", "B01", 871963, 3, 9, 'G', 6],
["Test", "A", "B01", 1932523, 1, 10, 'T', 4],
["Test", "A", "B01", 1932523, 1, 10, 'A', 5],
["Test", "A", "B01", 1932523, 1, 10, 'X', 6],
["Test", "A", "B01", 667214, 1, 14, 'T', 4],
["Test", "A", "B01", 667214, 1, 14, 'G', 5],
["Test", "A", "B01", 667214, 1, 14, 'G', 6]]
def iter_something(rows):
key_names = ['type', 'name', 'sub_name', 'pos', 's_type', 'x_type']
chr_key_names = ['letter', 'no']
for keys, group in groupby(rows, lambda row: row[:6]):
result = dict(zip(key_names, keys))
result['chr'] = [dict(zip(chr_key_names, row[6:])) for row in group]
yield result
class Loading(Process):
def __init__(self, task_queue):
Process.__init__(self)
self.task_queue = task_queue
server = Server()
self.server.create('abc')
def run(self):
while True:
doc = self.task_queue.get()
if doc is None: # None means shutdown
self.task_queue.task_done()
break
else:
self.sDB.insert(doc)
def main():
num_cores = 2
tasks = JoinableQueue()
threads = [Loading(tasks) for i in range(num_cores)]
for i, w in enumerate(threads):
w.start()
print('Thread ' + str(i+1) + ' has started!')
converters = [str, str, str, int, int, int, str, int]
with open("/home/mic/tmp/test.txt") as f:
reader = csv.reader(f, skipinitialspace=True)
converted = ([conv(col) for conv, col in zip(converters, row)] for row in reader)
# Enqueue jobs
for i in iter_something(converted):
tasks.put(i)
# Add None to kill each thread
for i in range(num_cores):
tasks.put(None)
# Wait for all of the tasks to finish
tasks.join()
if __name__ == '__main__':
main()
Thank you in advance.
Mic