using @hybrid_property setter in place of table trigger

73 views
Skip to first unread message

Gord Thompson

unread,
Dec 21, 2020, 1:42:16 PM12/21/20
to sqlalchemy
Tinkering with a possible solution to this SO question. The following code does work, but I'll admit that it has a whiff of "brute force" about it. Can anybody suggest a more elegant approach?

import sqlalchemy as sa
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import relationship

connection_uri = (
    "mssql+pyodbc://@localhost:49242/myDb?driver=ODBC+Driver+17+for+SQL+Server"
)
engine = sa.create_engine(
    connection_uri,
    future=True,
    echo=False,
)

Base = sa.orm.declarative_base()


class Project(Base):
    __tablename__ = "project"
    id = sa.Column(sa.Integer, primary_key=True)
    title = sa.Column(sa.Unicode(100), nullable=False)
    _archived = sa.Column(sa.Boolean, nullable=False, default=False)

    @hybrid_property
    def archived(self):
        return self._archived

    @archived.setter
    def archived(self, value):
        self._archived = value
        if value:
            with engine.begin() as conn:
                sql = """\
                UPDATE project_note SET archived = :yes 
                WHERE project_id = :proj_id
                    AND archived = :no
                """
                conn.execute(
                    sa.text(sql),
                    {"yes": True, "no": False, "proj_id": self.id},
                )


class ProjectNote(Base):
    __tablename__ = "project_note"
    id = sa.Column(sa.Integer, primary_key=True)
    project_id = sa.Column(sa.Integer, sa.ForeignKey("project.id"))
    project = sa.orm.relationship(Project)
    note_text = sa.Column(sa.Unicode(255), nullable=False)
    archived = sa.Column(sa.Boolean, nullable=False, default=False)


Base.metadata.drop_all(engine, checkfirst=True)
Base.metadata.create_all(engine)

p1 = Project(title="project 1")
p1n1 = ProjectNote(
    project=p1, note_text="project 1, note 1, archived", archived=True
)
p1n2 = ProjectNote(project=p1, note_text="project 1, note 2, not archived")

with sa.orm.Session(engine, future=True) as session:
    session.add_all([p1, p1n1, p1n2])
    session.commit()
    print(f"p1n2.archived is: {p1n2.archived}")  # p1n2.archived is: False

    p1.archived = True
    session.commit()
    print(f"p1.archived is: {p1.archived}")  # p1.archived is: True
    print(f"p1n2.archived is: {p1n2.archived}")  # p1n2.archived is: True

Mike Bayer

unread,
Dec 21, 2020, 2:55:14 PM12/21/20
to noreply-spamdigest via sqlalchemy
hey Gord -

for any kind of "do DML query X when Y changes", we usually embed those in the flush process using events, most notably the before_insert/after_insert/before_update/after_update events.

Important things we are achieving with the ORM include:

- all DML, inserts, updates deletes etc. occur within a fixed space called the "flush"

- everything on a particular Session happens all on one connection/transaction per database backend at time.

- ORM mapped classes themselves should ideally know nothing about engines or sessions.

so overall I think the approach you have here combined with what the stack overflow user is doing would be best; do the connection SQL you have below but do it inside the after_update() event, that's why that event receives the Connection object and not the Session.
--
SQLAlchemy -
The Python SQL Toolkit and Object Relational Mapper
 
 
To post example code, please provide an MCVE: Minimal, Complete, and Verifiable Example. See http://stackoverflow.com/help/mcve for a full description.
---
You received this message because you are subscribed to the Google Groups "sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email to sqlalchemy+...@googlegroups.com.

Reply all
Reply to author
Forward
0 new messages