[...]
>> The problem is, in SQLAlchemy, at least with a PostgreSQL+psycopg2 backend,
>> the exception I get when that happens is "InvalidRequestError". This is
>> not very helpful, as I cannot tell the difference between a genuinely
>> invalid request and a mere transaction failure like this. I also don't
>> know if the exception will be different on different backends.
>
>then that means you’re doing something incorrectly. The actual message of
>this exception as well as a stack trace and concise and self-contained
>examples of code would allow us to have some idea what this might be.
This is the full text of the exception that I'm getting:
Task pyfarm.scheduler.tasks.poll_agents[3c2f6fa8-fa29-481a-baa1-1e82b6e14b04]
raised unexpected: InvalidRequestError("This Session's transaction has been
rolled back due to a previous exception during flush. To begin a new
transaction with this Session, first issue Session.rollback(). Original
exception was: (TransactionRollbackError) could not serialize access due to
concurrent update\n 'UPDATE jobs SET state=%(state)s,
time_started=%(time_started)s WHERE
jobs.id = %(jobs_id)s' {'state': 105,
'time_started': datetime.datetime(2014, 11, 26, 17, 8, 56, 651872), 'jobs_id':
2444L}",)
Traceback (most recent call last):
File "/usr/lib/python2.7/site-packages/celery/app/trace.py", line 240, in
trace_task
R = retval = fun(*args, **kwargs)
File "/usr/lib/python2.7/site-packages/celery/app/trace.py", line 437, in
__protected_call__
return self.run(*args, **kwargs)
File "/usr/lib/python2.7/site-packages/pyfarm/scheduler/tasks.py", line 345,
in poll_agents
db.session.commit()
File "/usr/lib/python2.7/site-packages/sqlalchemy/orm/scoping.py", line 149,
in do
return getattr(self.registry(), name)(*args, **kwargs)
File "/usr/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 768,
in commit
self.transaction.commit()
File "/usr/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 368,
in commit
self._assert_active(prepared_ok=True)
File "/usr/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 210,
in _assert_active
% self._rollback_exception
InvalidRequestError: This Session's transaction has been rolled back due to a
previous exception during flush. To begin a new transaction with this Session,
first issue Session.rollback(). Original exception was:
(TransactionRollbackError) could not serialize access due to concurrent update
'UPDATE jobs SET state=%(state)s, time_started=%(time_started)s WHERE
jobs.id
= %(jobs_id)s' {'state': 105, 'time_started': datetime.datetime(2014, 11, 26,
17, 8, 56, 651872), 'jobs_id': 2444L}
You're right, there's something going wrong here.
We are using Celery to run some tasks asynchronously (the main application is
a WSGI web app) and Flask-SQLAlchemy to get a db session object. It turns out
that in combination, that means that db session objects from Flask-SQLAlchemy
get reused between tasks running on the same celery worker. This in turn means
that if one task fails a transaction, all later tasks that happen to run on
the same celery worker will end up with a db session that's in an invalid
transaction state.
After I noticed what was going on there, my workaround for that was to just
unconditionally call rollback() on the db session at the start of every celery
task. I'm not exactly happy with that solution (it's ugly and counter-
intuitive), but it does work for now.
The original exception looked like this:
Task
pyfarm.scheduler.tasks.assign_tasks_to_agent[252c5387-948e-4205-9d73-1ac700cebc8e]
raised unexpected: DBAPIError('(TransactionRollbackError) could not serialize
access due to concurrent update\n',)
(Traceback omitted)
This still leaves me confused as to what exception exactly I am to catch. It
looks like there is some 'TransactionRollbackError' involved somewhere, but it
seems to get repackaged into a DBAPIError before actually reaching the user...
[...]
>> After some experimentation and research, it turned out that the root cause
>> here was that PostgreSQL's default transaction isolation level of "read
>> committed" was just not enough in my case, and I would need to increase it
>> to "serializable”.
>
>that is also suspect. serializable isolation is very tough to use due to the
>high degree of locking and it suggests there is some level of concurrency
>here that is probably better accommodated using either a correctly
>implemented optimistic approach (e.g. the retry) or if UPDATES are part of
>the issue, SELECT..FOR UPDATE can often help to lock rows ahead of time when
>updates will be needed.
[...]
>feel free to share more specifics as long as they are concise and
>self-contained.
Well, as I said in my previous mail, I have a model Job and a model Task with
a one-to-many relation between jobs and tasks, i.e. every task belongs to
exactly one job, but a job can have any number of tasks.
Tasks are being updated or deleted, and when certain counts for the tasks in a
a job (e.g. the number of tasks in a given job with state != 106 and state !=
107) reach 0, certain actions need to be taken. Currently, we are doing that
in an event handler for updates. See:
https://github.com/pyfarm/pyfarm-master/blob/master/pyfarm/models/task.py#L145
That has the nice advantage that we can just change task states around
anywhere in the code and have the event handler do the right thing
automatically when necessary. However, this seems to also be the root cause of
our concurrency problems, mostly because the check for remaining active tasks
runs inside the same transaction as the update: When the updates for the last
two remaining tasks come in simultaneously, they will both be updated inside
their own transaction, then check the number of remaining active task, and
will both come up with 1 remaining task, because they don't see the effects of
the other (not yet committed) transaction.
Setting transaction isolation level to serializable would solve this, but
thinking about this a bit further, I could probably also solve this by moving
the check for remaining tasks outside of the update transaction. In this case
whichever thread finishes the transaction with the update last will
necessarily get to see the effects of both transactions when doing its check.
The bad thing about that is that we would have to remember to do that manually
everywhere and every time a task gets updated instead of just relying on the
update event handler to do that for us.
Regards and thanks for the help,
Guido