Motor autoreconnect

175 views
Skip to first unread message

bergundy

unread,
Nov 7, 2012, 9:27:18 AM11/7/12
to python-...@googlegroups.com
This is mostly directed at A. Jesse Jiryu Davis.

Today I tried out Motor for the first time.
In my application, I have to ensure that all write are accepted by mongo and reconnect automatically on disconnect.

I noticed that motor.Op raises exceptions when they occur.
Should I except ConnectionFailure exceptions and reconnect manually or is there a built in mechanism for this?
Is there a difference in behavior for MotorConnection and MotorReplicaSetConnection?

Thanks.
- Roey

A. Jesse Jiryu Davis

unread,
Nov 7, 2012, 1:31:56 PM11/7/12
to python-...@googlegroups.com
Hi Roey. Yes, you should expect ConnectionFailure exceptions if there are network errors or if the MongoDB machine crashes, etc. AutoReconnect is a strangely-named subclass of ConnectionFailure. You can just catch ConnectionFailure and not worry about the subclass.

After you catch a ConnectionFailure, the next operation you attempt on that MotorConnection or MotorReplicaSetConnection will automatically reconnect before doing the operation. (That's why AutoReconnect exception is called that.) So something like this will try forever to do a query:

@gen.engine
def f():
    c = motor.MotorConnection()
    while True:
        try:
            return yield motor.Op(c.database.collection.find_one, {'field': 'value'})
        except pymongo.errors.ConnectionFailure:
            pass

In real life, of course, you'd want to limit the number of retries you attempt. The point is, operations you attempt with Motor can raise ConnectionFailures, but the MotorConnection or MotorRSC is still valid after that exception, and it will reconnect if necessary on the next operation.

MotorRSC behaves similarly to MotorConnection, although it tries to reconnect to the *whole* replica set in case of an error, because it needs to find out if a different replica set member became primary.

Roey Berman

unread,
Nov 9, 2012, 6:37:27 AM11/9/12
to python-...@googlegroups.com
Hey Jesse,

I decided that for write operations that are triggered by a user I'll let the stack_context created in @tornado.web.asynchronous report ConnectionFailure to the client.

For write operations that are internally triggered I have 2 approaches.
1. A 5 second PeriodicCallback that syncs entire state to the db.
2. Write immediately but use something like toro.Queue to make sure the writes are sequential.

In the second approach, might look like this:

def __init__(self):
    self.queue = toro.Queue(...)

def update(self, ...):
    self.queue.put(...)

@gen.engine
def writer(self, ...):
    while True:
        item = yield gen.Task(self.queue.get)
        while True:
            try:
                yield motor.Op(**item)
                break
            except pymongo.errors.ConnectionFailure as e:
                pass
            except Exception as e:
                # sync everything to the db
                # discard all items in the queue
                break

I'm just not sure how to discard the entire queue.
Should I just create a new one?

A. Jesse Jiryu Davis

unread,
Nov 9, 2012, 8:25:38 AM11/9/12
to python-...@googlegroups.com
Interesting. I think your "# sync everything to the db" comment is misplaced, shouldn't that be up at the top of writer()?

In "except Exception as e" you should log the exception, traceback, item, and current queue contents, since you've arrived at a very bad place and you'll want all the debug info you can get.

Replacing the queue with a new one will work as long as no one is waiting for an event on the old queue. That should work ok in your code, in the Exception handler, since no one is waiting on the queue at that point.

I'm concerned that the queue is unbounded - what happens if Mongo is down / unavailable for a long time? The queue size could grow huge. Perhaps you should set a max_size on the queue, and update should do:

queue.put(
    item,
    callback=lambda x: logging.error("queue full, can't put %s" % item) if x is Full else None,
    deadline=timedelta(seconds=10))

It all depends what you really want to do in error states. Pause until things get back to normal? Discard items?

Roey Berman

unread,
Nov 13, 2012, 8:18:16 PM11/13/12
to python-...@googlegroups.com
Hey Jesse,

Thanks for all your help.

I finally managed to free some time and this is what I came up with: https://gist.github.com/4069540

I might add the ability to manually stop/start the writer using a toro.Lock().
I don't really need it but I think it's a good use-case for toro.Lock() + toro.Queue().

A. Jesse Jiryu Davis

unread,
Nov 15, 2012, 3:37:55 PM11/15/12
to python-...@googlegroups.com
Oops, I sent my comments to Roey personally. Taking them back on-list:

Exciting to see Toro in use! I don't know the larger context of what you're doing here, but your code looks ok to me out-of-context. You're comfortable discarding items passed to enqueue() if queue_suspended is True? And note that a blanket 'except:' can catch Ctrl-C, which you probably don't want; do 'except Exception:'.

Roey Berman

unread,
Nov 15, 2012, 3:41:29 PM11/15/12
to python-...@googlegroups.com

Thanks for the tip about the exception.
I bet I'll use toro again in the future.

Reply all
Reply to author
Forward
0 new messages