attr['version'] = obj.version # 1
hist = history_cls()
for key, value in attr.items():
setattr(hist, key, value)
session.add(hist) # 2
obj.version += 1
140, in collection_post
p2model.DBSession.flush()
File "/home/plannotate/.virtualenvs/P2v0.0.1/local/lib/python2.7/site-packages/sqlalchemy/orm/scoping.py", line 150, in do
return getattr(self.registry(), name)(*args, **kwargs)
File "/home/plannotate/.virtualenvs/P2v0.0.1/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 1919, in flush
self._flush(objects)
File "/home/plannotate/.virtualenvs/P2v0.0.1/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2037, in _flush
transaction.rollback(_capture_exception=True)
File "/home/plannotate/.virtualenvs/P2v0.0.1/local/lib/python2.7/site-packages/sqlalchemy/util/langhelpers.py", line 60, in __exit__
compat.reraise(exc_type, exc_value, exc_tb)
File "/home/plannotate/.virtualenvs/P2v0.0.1/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2001, in _flush
flush_context.execute()
File "/home/plannotate/.virtualenvs/P2v0.0.1/local/lib/python2.7/site-packages/sqlalchemy/orm/unitofwork.py", line 367, in execute
n.execute_aggregate(self, set_)
File "/home/plannotate/.virtualenvs/P2v0.0.1/local/lib/python2.7/site-packages/sqlalchemy/orm/unitofwork.py", line 459, in execute_aggregate
self.execute(uow)
File "/home/plannotate/.virtualenvs/P2v0.0.1/local/lib/python2.7/site-packages/sqlalchemy/orm/unitofwork.py", line 526, in execute
uow
File "/home/plannotate/.virtualenvs/P2v0.0.1/local/lib/python2.7/site-packages/sqlalchemy/orm/persistence.py", line 65, in save_obj
mapper, table, insert)
File "/home/plannotate/.virtualenvs/P2v0.0.1/local/lib/python2.7/site-packages/sqlalchemy/orm/persistence.py", line 570, in _emit_insert_statements
execute(statement, multiparams)
File "/home/plannotate/.virtualenvs/P2v0.0.1/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 729, in execute
return meth(self, multiparams, params)
File "/home/plannotate/.virtualenvs/P2v0.0.1/local/lib/python2.7/site-packages/sqlalchemy/sql/elements.py", line 321, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/home/plannotate/.virtualenvs/P2v0.0.1/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 826, in _execute_clauseelement
compiled_sql, distilled_params
File "/home/plannotate/.virtualenvs/P2v0.0.1/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 958, in _execute_context
context)
File "/home/plannotate/.virtualenvs/P2v0.0.1/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1160, in _handle_dbapi_exception
exc_info
File "/home/plannotate/.virtualenvs/P2v0.0.1/local/lib/python2.7/site-packages/sqlalchemy/util/compat.py", line 199, in raise_from_cause
reraise(type(exception), exception, tb=exc_tb)
File "/home/plannotate/.virtualenvs/P2v0.0.1/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 951, in _execute_context
context)
File "/home/plannotate/.virtualenvs/P2v0.0.1/local/lib/python2.7/site-packages/sqlalchemy/engine/default.py", line 436, in do_execute
cursor.execute(statement, parameters)
IntegrityError: (IntegrityError) duplicate key value violates unique constraint "pbases_history_pkey"
DETAIL: Key (id, version)=(125, 21) already exists.
__mapper_args__ = {
'polymorphic_identity' : 'PBase',
'polymorphic_on' : classname
}
__mapper_args__ = {
'polymorphic_identity' : 'P2Document',
}
document_version = Column(Integer, default=0)
document = relationship("PDocument", primaryjoin="PDocument.id == PNode.document_id",
backref=backref('pages', order_by="PNode.position", collection_class=ordering_list('position')))
'polymorphic_identity' : 'PNote',
}
@event.listens_for(PNode.document_id, 'set', active_history=True)
def _on_note_set_document_id(note, new_doc_id, old_old_id, initiator):
doc = None
if new_doc_id:
doc = DBSession.query(PDocument).filter(PDocument.id == new_doc_id).one()
elif old_doc_id:
page = DBSession.query(PDocument).filter(PDocument.id == old_doc_id).one()
if doc:
note.document_version = doc.version
####### SYSTEM CONFIG
PostgreSQL 9.3.5 on x86_64-unknown-linux-gnu, compiled by gcc (Ubuntu 4.8.2-19ubuntu1) 4.8.2, 64-bit
argparse (1.2.1)
beautifulsoup4 (4.3.2)
Chameleon (2.16)
colander (1.0b1)
cornice (0.17)
coverage (3.7.1)
Mako (1.0.0)
MarkupSafe (0.23)
nose (1.3.4)
p2server (0.0)
PasteDeploy (1.5.2)
pip (1.5.6)
psycopg2 (2.5.4)
Pygments (1.6)
pyramid (1.5.1)
pyramid-chameleon (0.3)
pyramid-debugtoolbar (2.2)
pyramid-mako (1.0.2)
pyramid-tm (0.7)
repoze.lru (0.6)
requests (2.4.3)
setuptools (3.6)
simplejson (3.6.4)
six (1.8.0)
SQLAlchemy (0.9.7)
transaction (1.4.3)
translationstring (1.1)
venusian (1.0)
waitress (0.8.9)
WebOb (1.4)
WebTest (2.0.16)
wsgiref (0.1.2)
zope.deprecation (4.1.1)
zope.interface (4.1.1)
zope.sqlalchemy (0.7.5)
On Dec 4, 2014, at 6:36 PM, HP3 <henddher...@gmail.com> wrote:Hello all,We are facing a problem when using history_meta.py recipe.It seems like two concurrent transactions read the same (id,version) tuple at "#1" and then each one tries to insert a new row into the pbases_history table with the same pk (id, version) combination (see #2)By removing #2, the issue goes away but of course, there is no history kept anywhere.
--
You received this message because you are subscribed to the Google Groups "sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email to sqlalchemy+...@googlegroups.com.
To post to this group, send email to sqlal...@googlegroups.com.
Visit this group at http://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.
Thank you very much Mike
session.query(obj.__class__.version).with_for_update().filter(obj.__class__.id == obj.id).one()
attr['version'] = obj.version
...
But the end result was the same:
IntegrityError: (IntegrityError) duplicate key value violates unique constraint "p2bases_history_pkey"
DETAIL: Key (id, version)=(3, 8) already exists.
'INSERT INTO p2bases_history (id, uuid, classname, "timeCreated", "timeModified", predecessor_id, predecessor_version, time, mark_as_deleted, mark_as_branched, version, changed) VALUES (%(id)s, %(uuid)s, %(classname)s, %(timeCreated)s, %(timeModified)s, %(predecessor_id)s, %(predecessor_version)s, %(time)s, %(mark_as_deleted)s, %(mark_as_branched)s, %(version)s, %(changed)s)' {'predecessor_version': 0, 'mark_as_deleted': False, 'mark_as_branched': False, 'timeCreated': datetime.datetime(2014, 9, 11, 12, 50, tzinfo=psycopg2.tz.FixedOffsetTimezone(offset=-300, name=None)), 'changed': datetime.datetime(2014, 12, 5, 0, 41, 39, 284872), 'classname': u'P2Page', 'predecessor_id': None, 'version': 8, 'time': datetime.datetime(2014, 10, 15, 13, 20, 6, tzinfo=psycopg2.tz.FixedOffsetTimezone(offset=-300, name=None)), 'id': 3, 'timeModified': datetime.datetime(2014, 10, 15, 13, 20, 6, tzinfo=psycopg2.tz.FixedOffsetTimezone(offset=-300, name=None)), 'uuid': u'00000001-0000-0000-0002-000000000001'}
(FYI: the example uses 'pbases' but in our real model is called "p2bases". The 'classname' cited there is the "P2Document" in our example)
Any other suggestion apart from retrying transactions?
(the failure rate would be very high because many users would typically add many notes concurrently to the same document)
2014-12-04 19:06:16,938 INFO [root][MainThread] http://localhost:6543/resources/pages/00000001-0000-0000-0002-000000000001/annotations
2014-12-04 19:06:16,938 INFO [root][MainThread] http://localhost:6543/resources/pages/00000001-0000-0000-0002-000000000001/annotations
2014-12-04 19:06:17,594 INFO [root][MainThread] [0] Thread exited, failure = 0
2014-12-04 19:06:17,595 INFO [root][MainThread] http://localhost:6543/resources/pages/00000001-0000-0000-0002-000000000001/annotations
2014-12-04 19:06:18,147 INFO [root][MainThread] [1] Thread exited, failure = 0
2014-12-04 19:06:18,148 INFO [root][MainThread] http://localhost:6543/resources/pages/00000001-0000-0000-0002-000000000001/annotations
2014-12-04 19:06:18,271 INFO [root][MainThread] [0] Thread exited, failure = 1
2014-12-04 19:06:18,272 INFO [root][MainThread] http://localhost:6543/resources/pages/00000001-0000-0000-0002-000000000001/annotations
2014-12-04 19:06:18,407 INFO [root][MainThread] [1] Thread exited, failure = 1
2014-12-04 19:06:18,408 INFO [root][MainThread] http://localhost:6543/resources/pages/00000001-0000-0000-0002-000000000001/annotations
2014-12-04 19:06:18,502 INFO [root][MainThread] [0] Thread exited, failure = 2
2014-12-04 19:06:18,503 INFO [root][MainThread] http://localhost:6543/resources/pages/00000001-0000-0000-0002-000000000001/annotations
2014-12-04 19:06:18,597 INFO [root][MainThread] [1] Thread exited, failure = 2
2014-12-04 19:06:18,597 INFO [root][MainThread] http://localhost:6543/resources/pages/00000001-0000-0000-0002-000000000001/annotations
2014-12-04 19:06:18,691 INFO [root][MainThread] [0] Thread exited, failure = 3
2014-12-04 19:06:18,733 INFO [root][MainThread] [1] Thread exited, failure = 3
############### test case
def test_reproduce(self):
from multiprocessing import Process
pool = []
loops = 4
for i in range(2):
proc = Process(target=ConcurrencyTransactions.insert_annotation, args=(self.page_uuid, loops, i))
pool.append(proc)
for proc in pool:
proc.start()
for proc in pool:
proc.join()
logging.info('Main thread exited.')
pass
@classmethod
def insert_annotation(cls, page_uuid, loops, pid):
import requests
import simplejson
from logging import info
failure_count = 0
request_json_template = simplejson.dumps({ 'payload' : [load_json(15)]})
for i in range(loops):
request_json = replace_uuid(request_json_template, '0000000F-0000-0000-0002-000000000001')
request_url = 'http://localhost:6543/resources/pages/{}/annotations'.format(page_uuid)
info(request_url)
response = requests.post(request_url,
headers={'Content-Type':'application/json'},
data=request_json)
if response.status_code != 200:
failure_count += 1
info('[%d] Thread exited, failure = %d' % (pid, failure_count))
return
annotations is pnotes and pages is pdocuments
#attr['version'] = obj.version
session.query(obj.__class__).with_for_update().filter(obj.__class__.id == obj.id).one() #S1
version_number = session.query(obj.__class__.version).with_for_update().filter(obj.__class__.id == obj.id).one() #2
attr['version'] = version_number
```
Though we haven't tested extensively, we suspect that between 'SELECT FOR UPDATE' S1 and S2, only S1 is needed.
Thanks again.