Concurrent use of `bulk_insert_mappings` and `bulk_update_mappings`

883 views
Skip to first unread message

Soeren Medard

unread,
Jun 17, 2015, 4:05:16 AM6/17/15
to sqlal...@googlegroups.com
Hi,
I have had an issue with bulk operations arising from the following use case:
I was looping an iterator of mappings, which are either already in the DB (inserts) or not (updates). As I did not want to loop over it twice and wanted to do only on transaction, I used greenlets do split the iterator and concurrently run `bulk_insert_mappings` and `bulk_update_mappings` on the same session object. That looks like that:

"""
Insert some greenlet magic into generators
"""
from greenlet import greenlet


def greenletify_gen(mapping):
    """ This generator will be passed to bulk operations
        Greenlets allow us to get out of the bulk methods
        And thus run them concurrently """
    if mapping is None:
        raise StopIteration()
    yield mapping
    for mapping in iter(greenlet.getcurrent().parent.switch, None):
        yield mapping


"""
Concurrently run bulk operations
"""
from test_model import TestModel
from session_ctx_mgr import session_ctx_mgr
from sqlalchemy.exc import ResourceClosedError


with session_ctx_mgr() as session:
    insert_greenlet = greenlet(lambda mapping: session.bulk_insert_mappings(TestModel, greenletify_gen(mapping)))
    update_greenlet = greenlet(lambda mapping: session.bulk_update_mappings(TestModel, greenletify_gen(mapping)))
    insert_greenlet.switch({'id': 2, 'value': 2})
    update_greenlet.switch({'id': 1, 'value': 2})
    insert_greenlet.switch(None)
    update_greenlet.switch(None)

However the aforementioned example raises this error:

Traceback (most recent call last):
  File "/mnt/vendor/lib/python3.4/site-packages/sqlalchemy/orm/session.py", line 2332, in _bulk_save_mappings
    isstates, update_changed_only)
  File "/mnt/vendor/lib/python3.4/site-packages/sqlalchemy/orm/persistence.py", line 100, in _bulk_update
    if session_transaction.session.connection_callable:
AttributeError: 'NoneType' object has no attribute 'connection_callable'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "drunken-octo-dubstep.py", line 43, in <module>
    update_greenlet.switch(None)
  File "drunken-octo-dubstep.py", line 37, in <lambda>
    update_greenlet = greenlet(lambda mapping: session.bulk_update_mappings(TestModel, greenletify_gen(mapping)))
  File "/mnt/vendor/lib/python3.4/site-packages/sqlalchemy/orm/scoping.py", line 150, in do
    return getattr(self.registry(), name)(*args, **kwargs)
  File "/mnt/vendor/lib/python3.4/site-packages/sqlalchemy/orm/session.py", line 2318, in bulk_update_mappings
    self._bulk_save_mappings(mapper, mappings, True, False, False, False)
  File "/mnt/vendor/lib/python3.4/site-packages/sqlalchemy/orm/session.py", line 2340, in _bulk_save_mappings
    transaction.rollback(_capture_exception=True)
  File "/mnt/vendor/lib/python3.4/site-packages/sqlalchemy/util/langhelpers.py", line 63, in __exit__
    compat.reraise(type_, value, traceback)
  File "/mnt/vendor/lib/python3.4/site-packages/sqlalchemy/util/compat.py", line 182, in reraise
    raise value
  File "/mnt/vendor/lib/python3.4/site-packages/sqlalchemy/orm/session.py", line 2340, in _bulk_save_mappings
    transaction.rollback(_capture_exception=True)
  File "/mnt/vendor/lib/python3.4/site-packages/sqlalchemy/orm/session.py", line 408, in rollback
    self._assert_active(prepared_ok=True, rollback_ok=True)
  File "/mnt/vendor/lib/python3.4/site-packages/sqlalchemy/orm/session.py", line 223, in _assert_active
    raise sa_exc.ResourceClosedError(closed_msg)
sqlalchemy.exc.ResourceClosedError: This transaction is closed

Now, what's funny is that when you invert the 2 last lines, the exception disappears (ie, `bulk_update_mappings` ends before `bulk_insert_mappings`).
I managed to generalize the behavior, and it seems it all depends on the 1st mapping of the loop. If it was an insert, `bulk_update_mappings` must end first. If it was an update, `bulk_insert_mappings` must end first.

The whole source code for the example is available here: https://github.com/Loamhoof/drunken-octo-dubstep, in the file `drunken-octo-dubsstep.py`.

Now, I posted here and not as an issue because:
1) I'm not sure there isn't an other, more legit way to do what I want
2) Such a way to use the API should be supported

So, questions related:
1) Is there indeed a better way to run both bulk operations while looping over an iterator only once?
2) Should this be considered a bug?

Versions used:
SQLAlchemy==1.0.5
greenlet==0.4.7
psycopg2==2.6.1
and Postgres 9.4.1, Python 3.4.3

Thanks for your time!

Regards,
Soeren

Mike Bayer

unread,
Jun 17, 2015, 10:43:38 AM6/17/15
to sqlal...@googlegroups.com


On 6/17/15 4:05 AM, Soeren Medard wrote:
Hi,
I have had an issue with bulk operations arising from the following use case:
I was looping an iterator of mappings, which are either already in the DB (inserts) or not (updates). As I did not want to loop over it twice and wanted to do only on transaction, I used greenlets do split the iterator and concurrently run `bulk_insert_mappings` and `bulk_update_mappings` on the same session object.

that won't work, because it means you are running multiple greenlets against a single database connection, and that is not thread/greenlet safe assuming you are using a greenlet-monkeypatched DBAPI.   you need to synchronize the work of each greenlet, or use individual connections.   

additionally, as the stack trace you've passed on indicates, the bulk methods on Session still make use of a subtransaction internally, which is stateful.    So that's the specific error you're seeing, multiple greenlets are competing for the state of the session.transaction object which is a linked list of transaction nests.  It absolutely is not threadsafe (which means greenlets also).    The Session doesn't have much need to support concurrent threads/greenlets doing things on it without locking because the DBAPI connections it refers to are virtually never safe in to use this way in any case.




So, questions related:
1) Is there indeed a better way to run both bulk operations while looping over an iterator only once?
2) Should this be considered a bug?

Versions used:
SQLAlchemy==1.0.5
greenlet==0.4.7
psycopg2==2.6.1
and Postgres 9.4.1, Python 3.4.3

So the async support for psycopg2 is ultimately using a single Postgresql Connection with async=1, this is documented at http://initd.org/psycopg/docs/advanced.html#asynchronous-support.   We can see here that this use is not supported: " Two cursors can’t execute concurrent queries on the same asynchronous connection."

So your options are to either synchronize the work of the multiple greenlets, which will pretty much eliminate any point to doing it that way, or to use a connection/session per greenlet.    At the end of the day you're communicating on a TCP socket where an INSERT/UPDATE string is sent along the wire and a response is being waited for, and multiple statements cannot be simultaneously multiplexed on a single connection.


Soeren Medard

unread,
Jun 18, 2015, 5:02:46 AM6/18/15
to sqlal...@googlegroups.com
Hi Mike,

Thanks for your answer. I guess I could construct two lists and then run both operations one at a time to avoid state conflict.
My only issue with this would be that the lists would be created twice, because the list would be cloned in the persistence bulk functions if I'm not mistaken (https://github.com/zzzeek/sqlalchemy/blob/master/lib/sqlalchemy/orm/persistence.py#L45).
I guess I'll compare both options (the one I'm running atm which works mostly by a stroke of luck, and this one) and see what I'll do.

Anyway, thanks a lot for your input. Oh, and for SQLAlchemy, it's pretty awesome :)

Cheers,
Soeren
Reply all
Reply to author
Forward
0 new messages