history_meta.py IntegrityError: (IntegrityError) duplicate key value violates unique constraint "p2docs_history_pkey"

421 views
Skip to first unread message

HP3

unread,
Dec 4, 2014, 6:36:47 PM12/4/14
to sqlal...@googlegroups.com
Hello all,

We are facing a problem when using history_meta.py recipe.

It seems like two concurrent transactions read the same (id,version) tuple at "#1" and then each one tries to insert a new row into the pbases_history table with the same pk (id, version) combination (see #2)
By removing #2, the issue goes away but of course, there is no history kept anywhere.

####### CODE SNIPPET WHERE THE ISSUE IS TRIGGERED

history_meta.py:

    attr['version'] = obj.version # 1

    hist = history_cls()

    for key, value in attr.items():

        setattr(hist, key, value)

    session.add(hist) # 2

    obj.version += 1


#######  STACK TRACE:

140, in collection_post

    p2model.DBSession.flush()

  File "/home/plannotate/.virtualenvs/P2v0.0.1/local/lib/python2.7/site-packages/sqlalchemy/orm/scoping.py", line 150, in do

    return getattr(self.registry(), name)(*args, **kwargs)

  File "/home/plannotate/.virtualenvs/P2v0.0.1/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 1919, in flush

    self._flush(objects)

  File "/home/plannotate/.virtualenvs/P2v0.0.1/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2037, in _flush

    transaction.rollback(_capture_exception=True)

  File "/home/plannotate/.virtualenvs/P2v0.0.1/local/lib/python2.7/site-packages/sqlalchemy/util/langhelpers.py", line 60, in __exit__

    compat.reraise(exc_type, exc_value, exc_tb)

  File "/home/plannotate/.virtualenvs/P2v0.0.1/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2001, in _flush

    flush_context.execute()

  File "/home/plannotate/.virtualenvs/P2v0.0.1/local/lib/python2.7/site-packages/sqlalchemy/orm/unitofwork.py", line 367, in execute

    n.execute_aggregate(self, set_)

  File "/home/plannotate/.virtualenvs/P2v0.0.1/local/lib/python2.7/site-packages/sqlalchemy/orm/unitofwork.py", line 459, in execute_aggregate

    self.execute(uow)

  File "/home/plannotate/.virtualenvs/P2v0.0.1/local/lib/python2.7/site-packages/sqlalchemy/orm/unitofwork.py", line 526, in execute

    uow

  File "/home/plannotate/.virtualenvs/P2v0.0.1/local/lib/python2.7/site-packages/sqlalchemy/orm/persistence.py", line 65, in save_obj

    mapper, table, insert)

  File "/home/plannotate/.virtualenvs/P2v0.0.1/local/lib/python2.7/site-packages/sqlalchemy/orm/persistence.py", line 570, in _emit_insert_statements

    execute(statement, multiparams)

  File "/home/plannotate/.virtualenvs/P2v0.0.1/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 729, in execute

    return meth(self, multiparams, params)

  File "/home/plannotate/.virtualenvs/P2v0.0.1/local/lib/python2.7/site-packages/sqlalchemy/sql/elements.py", line 321, in _execute_on_connection

    return connection._execute_clauseelement(self, multiparams, params)

  File "/home/plannotate/.virtualenvs/P2v0.0.1/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 826, in _execute_clauseelement

    compiled_sql, distilled_params

  File "/home/plannotate/.virtualenvs/P2v0.0.1/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 958, in _execute_context

    context)

  File "/home/plannotate/.virtualenvs/P2v0.0.1/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1160, in _handle_dbapi_exception

    exc_info

  File "/home/plannotate/.virtualenvs/P2v0.0.1/local/lib/python2.7/site-packages/sqlalchemy/util/compat.py", line 199, in raise_from_cause

    reraise(type(exception), exception, tb=exc_tb)

  File "/home/plannotate/.virtualenvs/P2v0.0.1/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 951, in _execute_context

    context)

  File "/home/plannotate/.virtualenvs/P2v0.0.1/local/lib/python2.7/site-packages/sqlalchemy/engine/default.py", line 436, in do_execute

    cursor.execute(statement, parameters)

IntegrityError: (IntegrityError) duplicate key value violates unique constraint "pbases_history_pkey"

DETAIL:  Key (id, version)=(125, 21) already exists.


####### MODEL

The simplified version of our model looks like this:

class PBase(Versioned, Base):
    __tablename__ = 'pbases'

   id = Column(Integer, primary_key=True)
   uuid = Column(String(50), unique=True)
   classname = Column(String(50))

   __mapper_args__ = {

       'polymorphic_identity' : 'PBase',

       'polymorphic_on' : classname

    }


class PDocument(PBase):
    __tablename__ = 'pdocuments'

   id = Column(Integer, ForeignKey('p2bases.id'), primary_key=True)
   name = Column(String)

__mapper_args__ = {

        'polymorphic_identity' : 'P2Document',

    }


class PNote(PBase):
    __tablename__ = 'pnotes'

     id = Column(Integer, ForeignKey('p2bases.id'), primary_key=True)
     comment = Column(String)
    position = Column(Integer)
     document_id = Column(Integer, ForeignKey('p2documents.id'))

    document_version = Column(Integer, default=0)

    document = relationship("PDocument", primaryjoin="PDocument.id == PNode.document_id"

                            backref=backref('pages', order_by="PNode.position", collection_class=ordering_list('position')))

   __mapper_args__ = {

        'polymorphic_identity' : 'PNote',

    }

@event.listens_for(PNode.document_id, 'set', active_history=True)

def _on_note_set_document_id(note, new_doc_id, old_old_id, initiator):

    doc = None

    if new_doc_id:

        doc = DBSession.query(PDocument).filter(PDocument.id == new_doc_id).one()

    elif old_doc_id:

        page = DBSession.query(PDocument).filter(PDocument.id == old_doc_id).one()

    if doc:

        note.document_version = doc.version


#######  SYSTEM CONFIG

PostgreSQL 9.3.5 on x86_64-unknown-linux-gnu, compiled by gcc (Ubuntu 4.8.2-19ubuntu1) 4.8.2, 64-bit

argparse (1.2.1)

beautifulsoup4 (4.3.2)

Chameleon (2.16)

colander (1.0b1)

cornice (0.17)

coverage (3.7.1)

Mako (1.0.0)

MarkupSafe (0.23)

nose (1.3.4)

p2server (0.0)

PasteDeploy (1.5.2)

pip (1.5.6)

psycopg2 (2.5.4)

Pygments (1.6)

pyramid (1.5.1)

pyramid-chameleon (0.3)

pyramid-debugtoolbar (2.2)

pyramid-mako (1.0.2)

pyramid-tm (0.7)

repoze.lru (0.6)

requests (2.4.3)

setuptools (3.6)

simplejson (3.6.4)

six (1.8.0)

SQLAlchemy (0.9.7)

transaction (1.4.3)

translationstring (1.1)

venusian (1.0)

waitress (0.8.9)

WebOb (1.4)

WebTest (2.0.16)

wsgiref (0.1.2)

zope.deprecation (4.1.1)

zope.interface (4.1.1)

zope.sqlalchemy (0.7.5)




Michael Bayer

unread,
Dec 4, 2014, 6:50:31 PM12/4/14
to sqlal...@googlegroups.com
On Dec 4, 2014, at 6:36 PM, HP3 <henddher...@gmail.com> wrote:

Hello all,

We are facing a problem when using history_meta.py recipe.

It seems like two concurrent transactions read the same (id,version) tuple at "#1" and then each one tries to insert a new row into the pbases_history table with the same pk (id, version) combination (see #2)
By removing #2, the issue goes away but of course, there is no history kept anywhere.

That’s the correct behavior.  Both transactions load the object at version X, then transaction A applies changes, transaction B applies changes.  Only one may be committed, and at that point, the other one is invalid; it’s modifying an object that no longer exists.

The application needs to anticipate collisions like this and resolve them.  Usually, retrying the transaction after loading the fresh data is the most typical approach, though a pessimistic approach would involve row locking such as SELECT FOR UPDATE.



--
You received this message because you are subscribed to the Google Groups "sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email to sqlalchemy+...@googlegroups.com.
To post to this group, send email to sqlal...@googlegroups.com.
Visit this group at http://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.

HP3

unread,
Dec 4, 2014, 8:04:25 PM12/4/14
to sqlal...@googlegroups.com

Thank you very much Mike 


We just tried this:

session.query(obj.__class__.version).with_for_update().filter(obj.__class__.id == obj.id).one()

attr['version'] = obj.version 

...


But the end result was the same:

IntegrityError: (IntegrityError) duplicate key value violates unique constraint "p2bases_history_pkey"

DETAIL:  Key (id, version)=(3, 8) already exists.

 'INSERT INTO p2bases_history (id, uuid, classname, "timeCreated", "timeModified", predecessor_id, predecessor_version, time, mark_as_deleted, mark_as_branched, version, changed) VALUES (%(id)s, %(uuid)s, %(classname)s, %(timeCreated)s, %(timeModified)s, %(predecessor_id)s, %(predecessor_version)s, %(time)s, %(mark_as_deleted)s, %(mark_as_branched)s, %(version)s, %(changed)s)' {'predecessor_version': 0, 'mark_as_deleted': False, 'mark_as_branched': False, 'timeCreated': datetime.datetime(2014, 9, 11, 12, 50, tzinfo=psycopg2.tz.FixedOffsetTimezone(offset=-300, name=None)), 'changed': datetime.datetime(2014, 12, 5, 0, 41, 39, 284872), 'classname': u'P2Page', 'predecessor_id': None, 'version': 8, 'time': datetime.datetime(2014, 10, 15, 13, 20, 6, tzinfo=psycopg2.tz.FixedOffsetTimezone(offset=-300, name=None)), 'id': 3, 'timeModified': datetime.datetime(2014, 10, 15, 13, 20, 6, tzinfo=psycopg2.tz.FixedOffsetTimezone(offset=-300, name=None)), 'uuid': u'00000001-0000-0000-0002-000000000001'}

(FYI: the example uses 'pbases' but in our real model is called "p2bases". The 'classname' cited there is the "P2Document" in our example)


Any other suggestion apart from retrying transactions?

(the failure rate would be very high because many users would typically add many notes concurrently to the same document)

HP3

unread,
Dec 4, 2014, 8:11:37 PM12/4/14
to sqlal...@googlegroups.com

2014-12-04 19:06:16,938 INFO  [root][MainThread] http://localhost:6543/resources/pages/00000001-0000-0000-0002-000000000001/annotations

2014-12-04 19:06:16,938 INFO  [root][MainThread] http://localhost:6543/resources/pages/00000001-0000-0000-0002-000000000001/annotations

2014-12-04 19:06:17,594 INFO  [root][MainThread] [0] Thread exited, failure = 0

2014-12-04 19:06:17,595 INFO  [root][MainThread] http://localhost:6543/resources/pages/00000001-0000-0000-0002-000000000001/annotations

2014-12-04 19:06:18,147 INFO  [root][MainThread] [1] Thread exited, failure = 0

2014-12-04 19:06:18,148 INFO  [root][MainThread] http://localhost:6543/resources/pages/00000001-0000-0000-0002-000000000001/annotations

2014-12-04 19:06:18,271 INFO  [root][MainThread] [0] Thread exited, failure = 1

2014-12-04 19:06:18,272 INFO  [root][MainThread] http://localhost:6543/resources/pages/00000001-0000-0000-0002-000000000001/annotations

2014-12-04 19:06:18,407 INFO  [root][MainThread] [1] Thread exited, failure = 1

2014-12-04 19:06:18,408 INFO  [root][MainThread] http://localhost:6543/resources/pages/00000001-0000-0000-0002-000000000001/annotations

2014-12-04 19:06:18,502 INFO  [root][MainThread] [0] Thread exited, failure = 2

2014-12-04 19:06:18,503 INFO  [root][MainThread] http://localhost:6543/resources/pages/00000001-0000-0000-0002-000000000001/annotations

2014-12-04 19:06:18,597 INFO  [root][MainThread] [1] Thread exited, failure = 2

2014-12-04 19:06:18,597 INFO  [root][MainThread] http://localhost:6543/resources/pages/00000001-0000-0000-0002-000000000001/annotations

2014-12-04 19:06:18,691 INFO  [root][MainThread] [0] Thread exited, failure = 3

2014-12-04 19:06:18,733 INFO  [root][MainThread] [1] Thread exited, failure = 3


############### test case


def test_reproduce(self):

        from multiprocessing import Process

        

        pool = []

        loops = 4


        for i in range(2):

            proc = Process(target=ConcurrencyTransactions.insert_annotation, args=(self.page_uuid, loops, i))

            pool.append(proc)

        for proc in pool:

            proc.start()

        for proc in pool:

            proc.join()

        logging.info('Main thread exited.')

        pass


    @classmethod

    def insert_annotation(cls, page_uuid, loops, pid):

        import requests

        import simplejson

        from logging import info

        failure_count = 0

        request_json_template = simplejson.dumps({ 'payload' : [load_json(15)]})

        for i in range(loops):

            request_json = replace_uuid(request_json_template, '0000000F-0000-0000-0002-000000000001')


            request_url = 'http://localhost:6543/resources/pages/{}/annotations'.format(page_uuid)

            info(request_url)

            response = requests.post(request_url,

                          headers={'Content-Type':'application/json'},

                          data=request_json)

            if response.status_code != 200:

                failure_count += 1        

            info('[%d] Thread exited, failure = %d' % (pid, failure_count))

        return    


annotations is pnotes and pages is pdocuments

kirk holland

unread,
Dec 4, 2014, 9:55:59 PM12/4/14
to sqlal...@googlegroups.com
Hi,

We were having a similar problem with history_meta, so we created history_meta_date (.py file attached).  In our version, we use timestamps rather than sequence numbers to track history versions and don't need to worry about duplicate key problems.  The "version_date" timestamps still provide sortability on version, but are infinitely less likely to have the same value issued to separate processes.

Disclaimer: The attached file was developed in a hurry to solve an urgent problem, it has performed well for us, but probably hasn't gone through the testing rigours that this group might expect.  Feel free to use/adapt it as necessary and if you have the time/inclination, post a more rigorous version for consideration as a formal recipe.

Cheers
history_meta_date.py

HP3

unread,
Dec 8, 2014, 7:15:26 PM12/8/14
to sqlal...@googlegroups.com
Thank you all,

It turned out that combining SELECT FOR UPDATE and isolation_level SERIALIZABLE and pyramid_tm 0.8 produced the expected result. IOW: everything is now working correctly:

The only change to history_meta.py we ended up doing was the following:

```

    #attr['version'] = obj.version

    session.query(obj.__class__).with_for_update().filter(obj.__class__.id == obj.id).one() #S1

    version_number = session.query(obj.__class__.version).with_for_update().filter(obj.__class__.id == obj.id).one() #2

    attr['version'] = version_number

```

Though we haven't tested extensively, we suspect that between 'SELECT FOR UPDATE' S1 and S2, only S1 is needed. 

Thanks again.

HP3

unread,
Dec 8, 2014, 7:16:58 PM12/8/14
to sqlal...@googlegroups.com
Clarification:

Originally, when we tried SELECT FOR UPDATE and isolation_level SERIALIZABLE unsuccessfully, we were using pyramid_tm 0.7. As soon as we upgraded to 0.8, it all started working correctly.
Reply all
Reply to author
Forward
0 new messages