3. However, once the query has completed, the data seen in the object appears to be the value read from the previous query, not the SELECT .. FOR UPDATE one.
In the test program, a database object is created with val="abc". Two threads both read the row under a lock, append X and write it back again. So the final answer should be abcXX, but in fact it's abcX.
- this has to be run on a proper database (I am using mysql). sqlite doesn't support SELECT .. FOR UPDATE.
- I have some workarounds. If instead of reading a new object I do db.refresh(v, lockmode="update") then all is fine. However I understood that the lockmode="string" interface is being deprecated.
Brian.
from __future__ import absolute_import, division, print_function, unicode_literals
from sqlalchemy import create_engine
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from contextlib import contextmanager
from six.moves.queue import Queue, Empty
from threading import Thread
DEFAULT_DB_URI = 'mysql+pymysql://root@localhost/testdb'
Base = declarative_base()
class Foo(Base):
__tablename__ = "foo"
id = Column(Integer, primary_key=True)
val = Column(String(255))
engine = create_engine(DEFAULT_DB_URI, echo=True)
try: Base.metadata.drop_all(engine)
except: pass
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
@contextmanager
def private_session():
s = Session()
try:
yield s
finally:
s.rollback()
s.close()
def runner(ref, omsg, imsg):
with private_session() as db:
print("<<<< Read object")
v = db.query(Foo).filter_by(id=ref).one()
print("---- Discard session")
db.rollback()
print(">>>> Get object's id")
print("!!!! Reload object with FOR UPDATE")
# db.expire(v)
v = db.query(Foo).filter_by(id=id).with_for_update().one()
# Alt: db.refresh(v, lockmode='update')
print("==== v.val=%r" % v.val)
omsg.put("started")
imsg.get()
v.val += "X"
db.commit()
with private_session() as db:
f = Foo(id=1, val="abc")
db.add(f)
db.commit()
o1 = Queue()
i1 = Queue()
o2 = Queue()
i2 = Queue()
t1 = Thread(target=runner, kwargs={"ref":1, "omsg": o1, "imsg": i1})
t2 = Thread(target=runner, kwargs={"ref":1, "omsg": o2, "imsg": i2})
t1.start()
assert o1.get(True, 1) == "started"
# Next thread should block on SELECT FOR UPDATE
t2.start()
try:
o2.get(True, 1)
raise RuntimeError("This thread should be blocked on SELECT FOR UPDATE")
except Empty:
pass
# Let first thread complete
i1.put("go")
# Now second thread is unblocked
assert o2.get(True, 1) == "started"
i2.put("go")
t1.join(2)
assert not t1.isAlive()
t2.join(2)
assert not t2.isAlive()
# Check final state
print("*** FINISHED ***")
print("*** RESULTS ***")
print("val=%r" % f.val)
Base.metadata.drop_all(engine)