Transactional operations in a tasklet.

595 views
Skip to first unread message

Maxim Lacrima

unread,
Apr 4, 2012, 4:37:32 PM4/4/12
to appengine-...@googlegroups.com
Hello!

I am trying to implement async function that updates some entities with changed properties (kwargs_list is list of dictionaries of changed properties):

@ndb.tasklet
@ndb.transactional
def update_entities_async(kind, kwargs_list):
    entities = ndb.get_multi(ndb.Key(kind, props.pop('id'))
                             for props in kwargs_list)
    
    for ent, props in zip(entities, kwargs_list):
        ent.populate(**props)
    
    keys = yield ndb.put_multi_async(entities)
    raise ndb.Return(keys)


The code above seems to work on development server.
But I still have several questions:
1. Do I correctly placed ndb.transactional decorator?
2. Is there a way to use ndb.transaction_async here instead of synchronous transaction? How could it improve performance?
3. If there are a lot of entities to update, will all of them be locked for write during this transaction?

Thanks in advance!

--
with regards,
Maxim

Guido van Rossum

unread,
Apr 5, 2012, 10:37:07 AM4/5/12
to appengine-...@googlegroups.com
On Wed, Apr 4, 2012 at 13:37, Maxim Lacrima <lacrim...@gmail.com> wrote:
> I am trying to implement async function that updates some entities with
> changed properties (kwargs_list is list of dictionaries of changed
> properties):
>
> @ndb.tasklet
> @ndb.transactional
> def update_entities_async(kind, kwargs_list):
>     entities = ndb.get_multi(ndb.Key(kind, props.pop('id'))
>                              for props in kwargs_list)
>
>     for ent, props in zip(entities, kwargs_list):
>         ent.populate(**props)
>
>     keys = yield ndb.put_multi_async(entities)
>     raise ndb.Return(keys)
>
>
> The code above seems to work on development server.
> But I still have several questions:
> 1. Do I correctly placed ndb.transactional decorator?

Hm... Combining decorators is a black art. Reversing the order is bad,
but I honestly can't tell without doing more pencil-and-paper
computations than I care to do whether what you do here is correct, so
I would write it out using ndb.transaction(callback) instead.

> 2. Is there a way to use ndb.transaction_async here instead of synchronous
> transaction? How could it improve performance?

Easy enough: take the above without the @ndb.transactional, and then
write another tasklet that invokes the transaction:

@ndb.tasklet
def transactional_wrapper_async(kind, kwargs_list):
res = yield ndb.transaction_async(lambda kind, kwargs_list:
update_entities_async(kind, kwargs_list))
raise ndb.Return(res)

(The lambda is necessary because transaction_async() only takes a
callback that needs no parameters; this is done so you can pass
keyword arguments to transaction_async() itself that affect the
transaction, e.g. xg=True or retries=1.)

It could only improve performance if you had some other unrelated
tasklet that could run in parallel with that. The other tasklet would
be unaffected by the transaction.

> 3. If there are a lot of entities to update, will all of them be locked for
> write during this transaction?

Yes, but unless they are all descendants of the same root key the
transaction will raise an error complaining about multiple entity
groups. When using the HR Datastore, you can use xg=True to have a
transaction that spans multiple entity groups, but it is still limited
to at most 5 entity groups.

--
--Guido van Rossum (python.org/~guido)

Maxim Lacrima

unread,
Apr 6, 2012, 9:40:15 AM4/6/12
to appengine-...@googlegroups.com
Hi Guido,

Thank you very much for your answer! From your answer if I understand it correctly ndb.transaction_async is a tasklet and consequently can be executed in parallel with other tasklets, while ndb.transaction always blocks.

I have come up with the following code, that should update many entities transactionally in parallel:

@ndb.tasklet
def update_entities_async(kind, kwargs_list):
    
    def run_txn(id, props):
        @ndb.tasklet
        def txn():
            entity = ndb.Key(kind, id).get()
            entity.populate(**props)
            key = yield entity.put_async()
            raise ndb.Return(key)
        return ndb.transaction_async(txn)
            
    res = yield [run_txn(props.pop('id'), props) for props in kwargs_list]
    raise ndb.Return(res)

I haven't run this code on production yet, but it works on development server.
--
with regards,
Maxim

Guido van Rossum

unread,
Apr 6, 2012, 5:58:30 PM4/6/12
to appengine-...@googlegroups.com
On Fri, Apr 6, 2012 at 06:40, Maxim Lacrima <lacrim...@gmail.com> wrote:
> Thank you very much for your answer! From your answer if I understand it
> correctly ndb.transaction_async is a tasklet and consequently can be
> executed in parallel with other tasklets, while ndb.transaction always
> blocks.

Right.

> I have come up with the following code, that should update many entities
> transactionally in parallel:

By creating many transactions, right? (One for each key.)

> @ndb.tasklet
> def update_entities_async(kind, kwargs_list):
>
>     def run_txn(id, props):
>         @ndb.tasklet
>         def txn():
>             entity = ndb.Key(kind, id).get()

This should be

entity = yield ndb.Key(kind, id).get_async()

>             entity.populate(**props)
>             key = yield entity.put_async()
>             raise ndb.Return(key)
>         return ndb.transaction_async(txn)
>
>     res = yield [run_txn(props.pop('id'), props) for props in kwargs_list]
>     raise ndb.Return(res)
>
> I haven't run this code on production yet, but it works on development
> server.

Otherwise looks good.

Maxim Lacrima

unread,
Apr 7, 2012, 8:57:42 AM4/7/12
to appengine-...@googlegroups.com
Thank you, Guido!
--
with regards,
Maxim

Rafe Kaplan

unread,
Mar 11, 2013, 6:15:50 PM3/11/13
to appengine-...@googlegroups.com
  Has anyone considered writing a transactional tasklet decorator along the lines of:




def transactional_async(*ctx_args, **ctx_kwargs):
  def decorator(m):
    m_tasklet = ndb.tasklet(m)

    @ndb.tasklet
    def wrapper(*args, **kwargs):
      @ndb.tasklet
      def tx():
        value = yield m_tasklet(*args, **kwargs)
        raise ndb.Return(value)

      value = yield ndb.transaction_async(tx, *ctx_args, **ctx_kwargs)
      raise ndb.Return(value)
    return wrapper
  return decorator
  


@transactional_async()
def fn():
  assert ndb.in_transaction()
  yield ndb.sleep(0.2)
  raise ndb.Return('hi')

print fn().get_result()  # Prints 'hi'

Guido van Rossum

unread,
Mar 11, 2013, 7:51:37 PM3/11/13
to appengine-...@googlegroups.com
Thanks sounds like a great idea. What does Alfred think?
> --
> You received this message because you are subscribed to the Google Groups
> "appengine-ndb-discuss" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to appengine-ndb-di...@googlegroups.com.
> For more options, visit https://groups.google.com/groups/opt_out.

Alfred Fuller

unread,
Mar 11, 2013, 9:06:07 PM3/11/13
to appengine-...@googlegroups.com
Sounds like a good idea to me (though I would call it @ndb.transactional_tasklet)

James A. Morrison

unread,
Mar 11, 2013, 9:10:53 PM3/11/13
to appengine-...@googlegroups.com

Fyi This happens to be issue 195.

Reply all
Reply to author
Forward
0 new messages