ndb.non_transactional non-deterministically pulling away transaction from under calling transactional function

69 views
Skip to first unread message

Dag Sverre Seljebotn

unread,
Oct 14, 2013, 9:56:05 AM10/14/13
to appengine-...@googlegroups.com
Hi list,

unfortunately, I don't have a reproducible test case for this behaviour; it's a "heisenbug" that only hits sometimes. Running my single unit tests from terminal against the SDK I experience both 10 failures in a row and 30 successes in a row before a failure; sometimes it can be really difficult to trigger and I almost forget the problem before it hits again. The non-determinism seems to be related to the NDB async event loop -- is there a way to hack NDB to make the event loop less random, or failing that, more random? (Waiting and randomizing the event queue or something?)

The problem is a BadRequestError("Cannot start a new operation in a finished transaction."). Seems like what happens is that a call to ndb.non_transactional shuts down the calling transaction. However, the code of the transaction continues running as if nothing happened, leading to the BadRequestError.

The situation is described in the following snippet. Note that when such a reduced test-case is run by itself I never get the error, but I have a setup now with the same snippet except for things going on before the error, as indicated.

@ndb.tasklet
def txn():
    key = ndb.Key("MyModel", "someid")

    # a lot of things here that "should" be unrelated but is needed for bug to hit; may well
    # be doing something "wrong"...

    inner_ran = [False]
    def f():
        key.get(use_cache=False, use_memcache=False)
        inner_ran[0] = True

    ndb.non_transactional(f)()
    assert inner_ran[0]  # always succeeds fine
    print "after call to inner"  # this is printed AFTER stack trace pasted below
    key.get(use_cache=False, use_memcache=False)  # BadRequestError
    raise ndb.Return()

ndb.transaction_async(txn, xg=True).get_result()

(use_cache and use_memcache doesn't seem to be relevant).

For some diagnostics, I patched context.py, line 946 (in a "finally" clause inside Context.transaction, where the transaction shuts down), to add traceback.print_stack(). When the failure hits, the stack trace attached appears. It appears BEFORE the "after call to inner" line, so the transaction is indeed shut down by the non_transactional call, but the txn function continues running.

Anything I'm doing wrong, or any advice in how to reduce this to a more deterministic test-case? Any red flags I should look for in the code leading up to this?

Dag Sverre

Stack trace:


  File "/home/dagss/myproject/myproject/bin/nosetests", line 57, in <module>
    sys.exit(nose.run_exit())
  File "/home/dagss/myproject/myproject/eggs/nose-1.3.0-py2.7.egg/nose/core.py", line 118, in __init__
    **extra_args)
  File "/home/dagss/myproject/local/lib/python2.7/unittest/main.py", line 95, in __init__
    self.runTests()
  File "/home/dagss/myproject/myproject/eggs/nose-1.3.0-py2.7.egg/nose/core.py", line 197, in runTests
    result = self.testRunner.run(self.test)
  File "/home/dagss/myproject/myproject/eggs/nose-1.3.0-py2.7.egg/nose/core.py", line 61, in run
    test(result)
  File "/home/dagss/myproject/myproject/eggs/nose-1.3.0-py2.7.egg/nose/suite.py", line 176, in __call__
    return self.run(*arg, **kw)
  File "/home/dagss/myproject/myproject/eggs/nose-1.3.0-py2.7.egg/nose/suite.py", line 223, in run
    test(orig)
  File "/home/dagss/myproject/myproject/eggs/nose-1.3.0-py2.7.egg/nose/suite.py", line 176, in __call__
    return self.run(*arg, **kw)
  File "/home/dagss/myproject/myproject/eggs/nose-1.3.0-py2.7.egg/nose/suite.py", line 223, in run
    test(orig)
  File "/home/dagss/myproject/myproject/eggs/nose-1.3.0-py2.7.egg/nose/suite.py", line 176, in __call__
    return self.run(*arg, **kw)
  File "/home/dagss/myproject/myproject/eggs/nose-1.3.0-py2.7.egg/nose/suite.py", line 223, in run
    test(orig)
  File "/home/dagss/myproject/myproject/eggs/nose-1.3.0-py2.7.egg/nose/case.py", line 45, in __call__
    return self.run(*arg, **kwarg)
  File "/home/dagss/myproject/myproject/eggs/nose-1.3.0-py2.7.egg/nose/case.py", line 133, in run
    self.runTest(result)
  File "/home/dagss/myproject/myproject/eggs/nose-1.3.0-py2.7.egg/nose/case.py", line 151, in runTest
    test(result)
  File "/home/dagss/myproject/local/lib/python2.7/unittest/case.py", line 391, in __call__
    return self.run(*args, **kwds)
  File "/home/dagss/myproject/local/lib/python2.7/unittest/case.py", line 327, in run
    testMethod()
  File "/home/dagss/myproject/myproject/src/myproject/myproject/core/tests/test_something.py", line 64, in test_happy_day
    status=201)
  File "/home/dagss/myproject/myproject/src/myproject/myproject/core/testsupport/base_api_tester.py", line 70, in put
    return self.call('PUT', url, data, **kw)
  File "/home/dagss/myproject/myproject/src/myproject/myproject/core/testsupport/base_api_tester.py", line 49, in call
    headers=headers)
  File "/home/dagss/myproject/myproject/eggs/WebTest-2.0-py2.7.egg/webtest/app.py", line 255, in put
    content_type=content_type,
  File "/home/dagss/myproject/myproject/eggs/WebTest-2.0-py2.7.egg/webtest/app.py", line 594, in _gen_request
    expect_errors=expect_errors)
  File "/home/dagss/myproject/myproject/eggs/WebTest-2.0-py2.7.egg/webtest/app.py", line 467, in do_request
    res = req.get_response(app, catch_exc_info=True)
  File "/home/dagss/myproject/myproject/parts/google_appengine/lib/webob-1.2.3/webob/request.py", line 1292, in send
    application, catch_exc_info=True)
  File "/home/dagss/myproject/myproject/parts/google_appengine/lib/webob-1.2.3/webob/request.py", line 1260, in call_application
    app_iter = application(self.environ, start_response)
  File "/home/dagss/myproject/myproject/eggs/WebTest-2.0-py2.7.egg/webtest/lint.py", line 194, in lint_app
    iterator = application(environ, start_response_wrapper)
  File "/home/dagss/myproject/myproject/src/myproject/myproject/auth/middleware.py", line 33, in __call__
    return self.application(environ, start_response)
  File "/home/dagss/myproject/myproject/parts/google_appengine/lib/webapp2-2.5.2/webapp2.py", line 1529, in __call__
    rv = self.router.dispatch(request, response)
  File "/home/dagss/myproject/myproject/parts/google_appengine/lib/webapp2-2.5.2/webapp2.py", line 1278, in default_dispatcher
    return route.handler_adapter(request, response)
  File "/home/dagss/myproject/myproject/parts/google_appengine/lib/webapp2-2.5.2/webapp2.py", line 1102, in __call__
    return handler.dispatch()
  File "/home/dagss/myproject/myproject/parts/google_appengine/google/appengine/ext/ndb/tasklets.py", line 1049, in add_context_wrapper
    return synctaskletfunc(*args, **kwds)
  File "/home/dagss/myproject/myproject/parts/google_appengine/google/appengine/ext/ndb/tasklets.py", line 1030, in synctasklet_wrapper
    return taskletfunc(*args, **kwds).get_result()
  File "/home/dagss/myproject/myproject/parts/google_appengine/google/appengine/ext/ndb/tasklets.py", line 325, in get_result
    self.check_success()
  File "/home/dagss/myproject/myproject/parts/google_appengine/google/appengine/ext/ndb/tasklets.py", line 320, in check_success
    self.wait()
  File "/home/dagss/myproject/myproject/parts/google_appengine/google/appengine/ext/ndb/tasklets.py", line 304, in wait
    if not ev.run1():
  File "/home/dagss/myproject/myproject/parts/google_appengine/google/appengine/ext/ndb/eventloop.py", line 219, in run1
    delay = self.run0()
  File "/home/dagss/myproject/myproject/parts/google_appengine/google/appengine/ext/ndb/eventloop.py", line 181, in run0
    callback(*args, **kwds)
  File "/home/dagss/myproject/myproject/parts/google_appengine/google/appengine/ext/ndb/tasklets.py", line 474, in _on_future_completion
    self._help_tasklet_along(ns, ds_conn, gen, val)
  File "/home/dagss/myproject/myproject/parts/google_appengine/google/appengine/ext/ndb/tasklets.py", line 371, in _help_tasklet_along
    value = gen.send(val)
  File "/home/dagss/myproject/myproject/src/myproject/myproject/handlers/base/core.py", line 220, in dispatch
    result = method(*args, **kwargs)
  File "/home/dagss/myproject/myproject/src/myproject/myproject/auth/authinfo.py", line 141, in new_f
    return f(*args, **kwargs)
  File "/home/dagss/myproject/myproject/src/myproject/myproject/core/apibase/decorators.py", line 72, in new_func
    result = result.get_result()
  File "/home/dagss/myproject/myproject/parts/google_appengine/google/appengine/ext/ndb/tasklets.py", line 325, in get_result
    self.check_success()
  File "/home/dagss/myproject/myproject/parts/google_appengine/google/appengine/ext/ndb/tasklets.py", line 320, in check_success
    self.wait()
  File "/home/dagss/myproject/myproject/parts/google_appengine/google/appengine/ext/ndb/tasklets.py", line 304, in wait
    if not ev.run1():
  File "/home/dagss/myproject/myproject/parts/google_appengine/google/appengine/ext/ndb/eventloop.py", line 219, in run1
    delay = self.run0()
  File "/home/dagss/myproject/myproject/parts/google_appengine/google/appengine/ext/ndb/eventloop.py", line 181, in run0
    callback(*args, **kwds)
  File "/home/dagss/myproject/myproject/parts/google_appengine/google/appengine/ext/ndb/tasklets.py", line 371, in _help_tasklet_along
    value = gen.send(val)
  File "/home/dagss/myproject/myproject/src/myproject/myproject/handlers/somehandler.py", line 77, in put
    r = ndb.non_transactional(f)()
  File "/home/dagss/myproject/myproject/parts/google_appengine/google/appengine/ext/ndb/utils.py", line 173, in inner_wrapper
    return wrapped_decorator(func, args, kwds, **options)
  File "/home/dagss/myproject/myproject/parts/google_appengine/google/appengine/ext/ndb/model.py", line 3670, in non_transactional
    return func(*args, **kwds)
  File "/home/dagss/myproject/myproject/src/myproject/myproject/handlers/somehandler.py", line 69, in f
    key.get(use_cache=False, use_memcache=False)
  File "/home/dagss/myproject/myproject/parts/google_appengine/google/appengine/ext/ndb/key.py", line 532, in get
    return self.get_async(**ctx_options).get_result()
  File "/home/dagss/myproject/myproject/parts/google_appengine/google/appengine/ext/ndb/tasklets.py", line 325, in get_result
    self.check_success()
  File "/home/dagss/myproject/myproject/parts/google_appengine/google/appengine/ext/ndb/tasklets.py", line 320, in check_success
    self.wait()
  File "/home/dagss/myproject/myproject/parts/google_appengine/google/appengine/ext/ndb/tasklets.py", line 304, in wait
    if not ev.run1():
  File "/home/dagss/myproject/myproject/parts/google_appengine/google/appengine/ext/ndb/eventloop.py", line 219, in run1
    delay = self.run0()
  File "/home/dagss/myproject/myproject/parts/google_appengine/google/appengine/ext/ndb/eventloop.py", line 181, in run0
    callback(*args, **kwds)
  File "/home/dagss/myproject/myproject/parts/google_appengine/google/appengine/ext/ndb/tasklets.py", line 474, in _on_future_completion
    self._help_tasklet_along(ns, ds_conn, gen, val)
  File "/home/dagss/myproject/myproject/parts/google_appengine/google/appengine/ext/ndb/tasklets.py", line 371, in _help_tasklet_along
    value = gen.send(val)
  File "/home/dagss/myproject/myproject/parts/google_appengine/google/appengine/ext/ndb/context.py", line 947, in transaction
    traceback.print_stack()

Dag Sverre Seljebotn

unread,
Oct 14, 2013, 10:00:20 AM10/14/13
to appengine-...@googlegroups.com


On Monday, October 14, 2013 3:56:05 PM UTC+2, Dag Sverre Seljebotn wrote:
Hi list,

unfortunately, I don't have a reproducible test case for this behaviour; it's a "heisenbug" that only hits sometimes. Running my single unit tests from terminal against the SDK I experience both 10 failures in a row and 30 successes in a row before a failure; sometimes it can be really difficult to trigger and I almost forget the problem before it hits again. The non-determinism seems to be related to the NDB async event loop -- is there a way to hack NDB to make the event loop less random, or failing that, more random? (Waiting and randomizing the event queue or something?)

The problem is a BadRequestError("Cannot start a new operation in a finished transaction."). Seems like what happens is that a call to ndb.non_transactional shuts down the calling transaction. However, the code of the transaction continues running as if nothing happened, leading to the BadRequestError.

The situation is described in the following snippet. Note that when such a reduced test-case is run by itself I never get the error, but I have a setup now with the same snippet except for things going on before the error, as indicated.

@ndb.tasklet
def txn():
    key = ndb.Key("MyModel", "someid")

    # a lot of things here that "should" be unrelated but is needed for bug to hit; may well
    # be doing something "wrong"...

    inner_ran = [False]
    def f():
        key.get(use_cache=False, use_memcache=False)
        inner_ran[0] = True

    ndb.non_transactional(f)()
    assert inner_ran[0]  # always succeeds fine
    print "after call to inner"  # this is printed AFTER stack trace pasted below
    key.get(use_cache=False, use_memcache=False)  # BadRequestError
    raise ndb.Return()

ndb.transaction_async(txn, xg=True).get_result()

(use_cache and use_memcache doesn't seem to be relevant).

For some diagnostics, I patched context.py, line 946 (in a "finally" clause inside Context.transaction, where the transaction shuts down), to add traceback.print_stack(). When the failure hits, the stack trace attached appears. It appears BEFORE the "after call to inner" line, so the transaction is indeed shut down by the non_transactional call, but the txn function continues running.

Anything I'm doing wrong, or any advice in how to reduce this to a more deterministic test-case? Any red flags I should look for in the code leading up to this?

I just now had the same behaviour/failure with

r = ndb.transaction(f, propagation=ndb.TransactionOptions.INDEPENDENT)

instead of ndb.non_transactional.

Dag Sverre

Guido van Rossum

unread,
Oct 14, 2013, 1:11:52 PM10/14/13
to appengine-...@googlegroups.com
Hi Dag,

The most suspicious part of the NDB implementation here seems to be calls to datastore._SetConnection() and datastore._GetConnection(). The underlying "datastore" module has only a single "current" transaction which is manipulated by the non_transactional() decorator and also by the transaction() code you found in context.py. I wonder if something else you call in the
stuff that "should be unrelated" manipulates this? Otherwise the same for the NDB context.

You may just want to give up on running some code outside an ongoing transaction -- regardless of whether there's a bug in your code or in NDB, either way it's clearly too complex to debug...


--
You received this message because you are subscribed to the Google Groups "appengine-ndb-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to appengine-ndb-di...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.



--
--Guido van Rossum (python.org/~guido)

Dag Sverre Seljebotn

unread,
Oct 15, 2013, 6:34:27 AM10/15/13
to appengine-...@googlegroups.com
Hi Guido,

Thanks for your answer. The problem was indeed with my own code. I spawned a tasklet within the transaction and let it run without yielding on it, and then the transaction was terminated before that tasklet had finished running. The funny results were from within that tasklet.

This may be a general class of problems in our code, as we have calls to put_async without yielding on the resulting future. (I don't think using ndb.toplevel or ndb.synctasklet would work, there are tasklets outside the transaction operating at the same time, and we use ndb.transaction_async).

FWIW, I'm "fixing" this by calling the function pasted below at the end of any transaction, so that at least no transaction commits under such circumstances, and we fail early. (Of course, the real fix is to add the missing yields.)

Dag Sverre


def assert_all_tasklets_done():
    """
    Checks that all `ndb.Future` instances belonging to the current context are
    done running.
    """
    from google.appengine.ext.ndb import eventloop
    ctx = ndb.get_context()
    loop = eventloop.get_event_loop()
    for callback, a, k in list(loop.idlers) + list(loop.current):
        fut = getattr(callback, 'im_self', None)
        if isinstance(fut, ndb.Future) and fut._context is ctx:
            raise RuntimeError('tasklets belonging to the transaction still running '
                               '(check that every future is yield-ed!): %s' % fut)

Guido van Rossum

unread,
Oct 15, 2013, 11:01:15 AM10/15/13
to appengine-...@googlegroups.com
Glad you found it!

Beware that your function is using some undocumented APIs.

About put_async() without yield, there are some rare cases where an asynchronous call can safely be left un-waited-for, but in general it's very tricky as you've found. It's also not clear to me that it *isn't* a bug in NDB that this happened. In general mixing sync and async in NDB can be a bit fragile, and I recommend to everyone to stay on the safe and narrow road where it comes to writing "clever" code -- and doubly so when transactions are involved.

--Guido

Dag Sverre Seljebotn

unread,
Oct 15, 2013, 11:23:38 AM10/15/13
to appengine-...@googlegroups.com
(Guido, I know you're not working on NDB any longer, but as a general request to any other readers of the list,)

Regards to "making this documented", it would be great if something like the assertion I wrote in the previous post could just be rolled into ndb.transaction{,al,_async} to guard against this.

Another alternative would be to automatically yield on all those "as-yet-unwaited-for" Futures that belongs to a transactional context, making the behaviour (that we wrongly expected) explicitly supported. This would make any transaction operate sort-of like ndb.toplevel (but not quite, as you would still yield out of the transaction stack and have control jump between several different transactions; but each transaction would guarantee fully executing futures spawned within them before returning).

I'll probably make some internal wrappers around ndb.transaction{,al,_async} to make sure we always use this assertion (+ go chase for any remaining orphan futures). Are feature request patches are accepted upstream? In that case perhaps I will submit something in due time for what I sketched above.

Thanks again Guido for your feedback!,

Dag Sverre


Alfred Fuller

unread,
Oct 15, 2013, 1:52:46 PM10/15/13
to appengine-...@googlegroups.com
Feel free to add a feature request for such changes.
Reply all
Reply to author
Forward
0 new messages