Hi all, I'm struggling a bit with best practices for my ETL application.
Each part of the ETL app is completely separate from the others, but I have a MixIn for some common functions that each of them need to do, like move this record to error if there's a data integrity problem. Or move this record to manual review if there's insufficient data to move it along to the next stage of the ETL.
The problem I'm having is that I don't understand the correct way to pass an object to a function, update it, and eventually commit it.
I have for example:
class DataMoverMixin:
def __init__(self) -> None:
self.db_session = get_db_session()
<insert a number of other things here that all my ETL classes share>
self.move_to_error(obj: Any, error_stage: str, traceback: Exception) -> bool:
json_data = json.dumps(obj, cls=AlchemyEncoder)
e = Error(
error_stage=error_stage,
error_message=repr(traceback),
error_data=json_data,
)
obj.status = "error"
with self.db_session as session:
session.add(e)
session.add(obj)
session.commit()
return True
class IngestDataManager(DataMoverMixin):
def __init__(self):
super().__init__()
<insert some class-specific things here>
def load_new_data(self, accounts: List[Dict]) -> bool:
for acc in accounts:
new_obj = NewObj(**acc)
with self.db_session as session:
session.add(new_obj)
session.commit()
# now the raw data is loaded, I need to check if it conforms and do some stuff with the newly created id.
session.refresh(new_obj)
if not new_obj.important_stuff:
self.move_to_error(new_obj, "ingest_integrity_error", f"missing {important stuff} for account_id: {
new_obj.id}
This is the simplest example of what does and doesn't work. And I can tell from the errors that I must be doing something very anti pattern, but I can't quite figure out what.
This pattern gives me a DetachedInstanceError.
So if I change Mixin.move_to_error like so:
. . .
with self.db_session as session:
session.add(e)
session.add(obj)
session.commit()
. . .
I get no error. But also the changes to the obj are not actually committed to the DB.
The new record for error is committed.
My expectation was that by attaching the session to the class that any method on the class would reference the same session, and that using the context manager was just a good practice to open and close it. But that doesn't seem to be the case.
I might certainly be wrong, but it appears that when you pass an SQLAlchemy object to a function inside of a session context manager, it does not carry the session with it?
And also reopening what I think is the session in a context manager fixes that but also then doesn't allow me to update the object?
I guess I'm just kinda confused, and I'm sure there's a better way to do this.
I've searched around a lot to try and understand this problem, but for whatever reason, nothing has clicked for me about what I'm doing wrong.
Appreciate any help from people.
-andrew