Session management for general functions within a class

24 views
Skip to first unread message

Andrew Martin

unread,
Apr 29, 2022, 12:10:01 AM4/29/22
to sqlalchemy
Hi all, I'm struggling a bit with best practices for my ETL application.

Each part of the ETL app is completely separate from the others, but I have a MixIn for some common functions that each of them need to do, like move this record to error if there's a data integrity problem. Or move this record to manual review if there's insufficient data to move it along to the next stage of the ETL.

The problem I'm having is that I don't understand the correct way to pass an object to a function, update it, and eventually commit it.

I have for example:

class DataMoverMixin:
    def __init__(self) -> None:
    self.db_session = get_db_session()
    <insert a number of other things here that all my ETL classes share>

    self.move_to_error(obj: Any, error_stage: str, traceback: Exception) -> bool:
        logger.info("Moving object to error.")
        json_data = json.dumps(obj, cls=AlchemyEncoder)
        e = Error(
        id=obj.id,
        error_stage=error_stage,
        error_message=repr(traceback),
        error_data=json_data,
    )
        obj.status = "error"
        with self.db_session as session:
            session.add(e)
            session.add(obj)
            session.commit()
        logger.info("Successfully moved object to error.")
        return True

class IngestDataManager(DataMoverMixin):
    def __init__(self):
    super().__init__()
    <insert some class-specific things here>


    def load_new_data(self, accounts: List[Dict]) -> bool:
        for acc in accounts:
            new_obj = NewObj(**acc)
            with self.db_session as session:
                session.add(new_obj)
                session.commit()
                # now the raw data is loaded, I need to check if it conforms and do some stuff          with the newly created id. 
                session.refresh(new_obj)
                if not new_obj.important_stuff:
                     self.move_to_error(new_obj, "ingest_integrity_error", f"missing {important stuff} for account_id: {new_obj.id}


This is the simplest example of what does and doesn't work. And I can tell from the errors that I must be doing something very anti pattern, but I can't quite figure out what.

This pattern gives me a DetachedInstanceError.

So if I change Mixin.move_to_error like so:

. . . 
        with self.db_session as session:
            session.refresh(obj)
            obj.status = "error"
            session.add(e)
            session.add(obj)
            session.commit()
. . .

I get no error. But also the changes to the obj are not actually committed to the DB.
The new record for error is committed.
 
My expectation was that by attaching the session to the class that any method on the class would reference the same session, and that using the context manager was just a good practice to open and close it. But that doesn't seem to be the case. 

I might certainly be wrong, but it appears that when you pass an SQLAlchemy object to a function inside of a session context manager, it does not carry the session with it?

And also reopening what I think is the session in a context manager fixes that but also then doesn't allow me to update the object?

I guess I'm just kinda confused, and I'm sure there's a better way to do this.

I've searched around a lot to try and understand this problem, but for whatever reason, nothing has clicked for me about what I'm doing wrong.

Appreciate any help from people.

-andrew


Simon King

unread,
Apr 29, 2022, 6:51:26 AM4/29/22
to sqlal...@googlegroups.com
It's difficult to debug this without a script that we can run to reproduce the problem. What kind of object is self.db_session? You use it as a context manager without calling it, so I don't think it can be a sessionmaker or a session.

You're nesting calls to the context manager:

# in load_new_data
with self.db_session as outersession:
    # add new_obj to outersession
    # call move_to_error
    with self.db_session as innersession:
        # add new_obj to innersession

Are innersession and outersession supposed to be the same object? If they are different sessions, you're trying to add new_obj to both of them, which is going to be a problem.

If it were me, I would explicitly pass the session to the move_to_error method. If you don't like that, you can also use sqlalchemy.orm.object_session to get the session that new_obj already belongs to.

Hope that helps,

Simon

--
SQLAlchemy -
The Python SQL Toolkit and Object Relational Mapper
 
http://www.sqlalchemy.org/
 
To post example code, please provide an MCVE: Minimal, Complete, and Verifiable Example. See http://stackoverflow.com/help/mcve for a full description.
---
You received this message because you are subscribed to the Google Groups "sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email to sqlalchemy+...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/sqlalchemy/628f6d67-51ce-4251-a90e-9f27341b793cn%40googlegroups.com.

Andrew Martin

unread,
Apr 29, 2022, 9:21:48 PM4/29/22
to sqlalchemy
Hi Simon, thank you for your thoughts. Sorry about the incomplete code. This project has gotten out of control, and I'm tired and a little burned out and have a launch deadline for Monday, and I was hoping this would be enough to uncover some basic stupidity in my approach. As long as I don't care about code being duplicated in lots of places, it all works. But refactoring to clean things up is uncovering some fundamental lack of understanding about how this stuff works.

The session is about as basic as it gets from a utility function.

from sqlalchemy import create_engine
def get_db_session() -> Session:
engine = create_engine(
f"postgresql://{settings.PG_USER}:{settings.PG_PASS}@{settings.PG_DSN}:{settings.PG_PORT}/{settings.DATABLENDER_DB}" # noqa: E501
)
session = Session(engine)
return session

So in my Mixin class there's just 

from app.utils import get_db_session

and self.db_session = get_db_session()

Everything else is working from that. I'll get a complete working example up tonight or tomorrow. It's gotten complex because some of the logic is distributed and in Airflow DAGs and tasks and stuff. It's not super easy to pull out a class and some models and have something to demonstrate. And I sure as hell didn't want to just dump the whole thing on people here and be like, "Hey can you fix this for me?" lol!

Reply all
Reply to author
Forward
0 new messages