[Motor] Simplest async insert

371 views
Skip to first unread message

Андрей Парамонов

unread,
Nov 17, 2015, 2:40:04 PM11/17/15
to mongodb-user
Hello MongoDB-users!

I'd doing batch database update, like this:

for doc in gendocs():
    db.test_collection.insert(doc)

This works nicely with pymongo.

Now I'd like to try speed it up. One idea is that I don't really need to wait for insert() to finish. If insert() retruned immediately after query is sent to MongoDB, the script could go on to constructing the next query while MongoDB is still busy processing the request. Seems perfect application for Motor.

However it appears that for Motor to work I need to construct something called "event loop". The official tutorial suggest that I basically make a tiny web server out of my script -- which seems too complicated for this very simple task. As could be seen above, I do not need any callback from MongoDB (assume it reliable enough), so I don't need any events either.

What is the simplest way I could initialize Motor for such code to work?

Best wishes,
Andrey Paramonov

A. Jesse Jiryu Davis

unread,
Nov 17, 2015, 3:34:28 PM11/17/15
to mongodb-user
Hi Andrey, I think the best example for what you want to do is a little farther down in the Motor tutorial:


The example is,

>>> @gen.coroutine
... def do_insert():
...     for i in range(2000):
...         future = db.test_collection.insert({'i': i})
...         result = yield future
...
>>> IOLoop.current().run_sync(do_insert)

Tornado's function IOLoop.current() creates an event loop for you, the first time you call it. "run_sync" runs the loop until the "do_insert" coroutine completes, then the loop terminates.

To parallelize and do 4 concurrent inserts, you might want to split your data into 4 even chunks:

from itertools import islice

data = range(2000)
n_chunks = 4
for i in range(n_chunks):
    # Get a chunk of numbers like 0, 3, 7, 11, ...
    chunk = islice(data, i, None, n_chunks)

Insert them in parallel and wait for all the inserts to finish:

from itertools import islice

from tornado import gen
from tornado.ioloop import IOLoop

from motor import MotorClient


@gen.coroutine
def insert_chunk(chunk):
    for i in chunk:
        yield db.test_collection.insert({'_id': i})


@gen.coroutine
def parallel_inserts():
    yield db.test_collection.remove({})

    data = range(2000)
    n_chunks = 4
    futures = []
    for i in range(n_chunks):
        # Get a chunk of numbers like 0, 3, 7, 11, ...
        chunk = islice(data, i, None, n_chunks)
        future = insert_chunk(chunk)
        futures.append(future)

    # await all in parallel
    yield futures
    n_inserted = yield db.test_collection.count()
    print('Inserted %d' % n_inserted)


db = MotorClient().test
IOLoop.current().run_sync(parallel_inserts)

Each of the four processes will run much faster if it does a bulk insert instead of inserting documents individually:

@gen.coroutine
def insert_chunk(chunk):
    yield db.test_collection.insert({'_id': i} for i in chunk)

I'd expect this parallelization to gain some performance with the MMAPv1 storage engine, but you'll tend to bottleneck on the database-level lock at a small number of n_chunks. Increasing n_chunks allows you to overcome network latency and to allow the client to use your client's CPU to bson-encode documents concurrently with the server performing the inserts, but increasing n_chunks to a very large number won't win you much. You can scale higher if you insert into collections in multiple databases at once.

With MongoDB 3 and WiredTiger you'll be able to scale up your n_chunks usefully, even if all the inserts are to one collection.

Андрей Парамонов

unread,
Nov 18, 2015, 6:11:03 AM11/18/15
to mongodb-user
Thank you for your quick reply!

The example is useful, but it requires complete rewrite of my PyMongo code, i.e. I need to introduce a generator to be run from run_sync(). However, my idea was to make a kind of drop-in replacement for PyMongo, difference being only in session object and probably insert call. I was hoping I wouldn't need any event loop at all.

Is it possible to form MongoDB command syncronously, but post it asyncronously (do not wait for response)?

Best wishes,
Andrey Paramonov

A. Jesse Jiryu Davis

unread,
Nov 18, 2015, 7:39:29 AM11/18/15
to mongod...@googlegroups.com
Hi, using Motor requires that you rewrite your code and create an event loop, it's true. If you want to use PyMongo for bulk inserts in parallel, consider the "insert_many" operation on a small number of threads:


from itertools import islice

# "python -m pip install futures" on Python 2.
import time
from concurrent.futures import ThreadPoolExecutor, wait

from pymongo import MongoClient


def insert_chunk(exe, chunk):
    future = exe.submit(db.test_collection.insert_many,
                        ({'_id': i} for i in chunk))
    return future


def parallel_inserts(exe):
    db.test_collection.remove({})

    data = range(20000)
    n_chunks = 4
    futures = []
    start = time.time()
    for i in range(n_chunks):
        # Get a chunk of numbers like 0, 3, 7, 11, ...
        chunk = islice(data, i, None, n_chunks)
        future = insert_chunk(exe, chunk)
        futures.append(future)

    # await all in parallel
    wait(futures)
    end = time.time()
    n_inserted = db.test_collection.count()
    print('Inserted %d in %.2f' % (n_inserted, end - start))


db = MongoClient().test

with ThreadPoolExecutor(max_workers=4) as executor:
    parallel_inserts(executor)

The same notes about scaling apply: you'll achieve peak client-side performance with just a handful of threads. A large number of threads will hit one of several bottlenecks. Besides what I said above, consider Python's global interpreter lock as well. The GIL prevents more than one thread from actually executing Python concurrently. Having multiple threads merely allows one thread to work on encoding a batch of documents while the other threads are waiting for the network or the server, so setting max_workers to 4 is plenty.

And finally, you may find explanations of "unacknowledged writes" with PyMongo, but that won't help in your case; there's no good way with unacknowledged writes to wait for the final write to complete before executing a "count" to check the outcome.


Best wishes,
Andrey Paramonov

--
You received this message because you are subscribed to the Google Groups "mongodb-user"
group.

For other MongoDB technical support options, see: http://www.mongodb.org/about/support/.
---
You received this message because you are subscribed to a topic in the Google Groups "mongodb-user" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/mongodb-user/WIyPnU9nb-s/unsubscribe.
To unsubscribe from this group and all its topics, send an email to mongodb-user...@googlegroups.com.
To post to this group, send email to mongod...@googlegroups.com.
Visit this group at http://groups.google.com/group/mongodb-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/mongodb-user/300e05e7-c9ac-4a56-9e07-197b74d6ad94%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply all
Reply to author
Forward
0 new messages