SQLAlchemy + multiprocessing problems

Skip to first unread message


Mar 31, 2022, 7:06:17 AM3/31/22
to sqlalchemy

From time to time, I need to update data in tables and multiprocessing can
speed up this process. Last example: I’m trying to update data 7M rows in table

SQLAlchemy 1.4.31, psycopg2 2.8.6, PostgreSQL

def job_update_rd(data_list):    
    updated = []
    with Session() as session:
        for t, ts in data_list:
            rd = session.query(RawDataTable).filter(and_(
                RawDataTable.timestamp == t, 
                RawDataTable.ts == ts)).one()

            rd.ts = updated_ts[ts]


    return updated

with Pool(10) as p:
    upd_list = p.map(job_update_rd, chunks)

Code is very simple, but it does not work. I get these errors randomly:

  • psycopg2.OperationalError: SSL error: sslv3 alert bad record mac
  • sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) SSL SYSCALL error: EOF detected

But this example works fine:

def other_job(data_list):
    with Session() as s:
         return [s.query(RawDataTable).filter(and_(
           RawDataTable.timestamp == t, 
           RawDataTable.ts == ts)).all() for t, ts in data_list]

with Pool(10) as p:
    res = p.map(other_job, chunks)

Please, help to solve this problem.
Some people is our team also uses multiprocessing, and 1 time a week get these errors.

Mike Bayer

Mar 31, 2022, 8:42:25 AM3/31/22
to noreply-spamdigest via sqlalchemy
when using multiprocessing, the connection pool in the new process must be replaced with a new one.     This is usually accomplished by calling engine.dispose().  However, to maintain the pool in the parent process as well, replace the connection pool alone without disposing the old one:

engine = create_engine(...)

Session = sessionmaker(bind=engine)

def job_update_rd(data_list):
    updated = []
    with Session() as session:
        for t, ts in data_list:
            rd = session.query(RawDataTable).filter(and_(
                RawDataTable.timestamp == t,
                RawDataTable.ts == ts)).one()

            rd.ts = updated_ts[ts]


    return updated

def initializer():
    engine.pool = engine.pool.recreate()

with Pool(10, initializer=initializer) as p:
    upd_list = p.map(job_update_rd, chunks)

For many years we've advised calling engine.dispose() here as documented at https://docs.sqlalchemy.org/en/14/core/pooling.html#using-connection-pools-with-multiprocessing-or-os-fork .    It was recently pointed out that this closes out the parent process' connections, so in SQLAlchemy 1.4.33 there will be a parameter so you can change the above code to engine.dispose(close=False).  
SQLAlchemy -
The Python SQL Toolkit and Object Relational Mapper
To post example code, please provide an MCVE: Minimal, Complete, and Verifiable Example. See http://stackoverflow.com/help/mcve for a full description.
You received this message because you are subscribed to the Google Groups "sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email to sqlalchemy+...@googlegroups.com.


Mar 31, 2022, 10:41:49 AM3/31/22
to sqlalchemy
Works! Thank you!

четверг, 31 марта 2022 г. в 15:42:25 UTC+3, Mike Bayer:
Reply all
Reply to author
0 new messages