SQLAlchemy + multiprocessing problems

1,044 views
Skip to first unread message

Evgenii

unread,
Mar 31, 2022, 7:06:17 AM3/31/22
to sqlalchemy

Hello!
From time to time, I need to update data in tables and multiprocessing can
speed up this process. Last example: I’m trying to update data 7M rows in table

SQLAlchemy 1.4.31, psycopg2 2.8.6, PostgreSQL

def job_update_rd(data_list):    
    updated = []
    with Session() as session:
        for t, ts in data_list:
            rd = session.query(RawDataTable).filter(and_(
                RawDataTable.timestamp == t, 
                RawDataTable.ts == ts)).one()

            rd.ts = updated_ts[ts]
            session.commit()

            updated.append(rd)

    return updated

with Pool(10) as p:
    upd_list = p.map(job_update_rd, chunks)

Code is very simple, but it does not work. I get these errors randomly:

  • psycopg2.OperationalError: SSL error: sslv3 alert bad record mac
  • sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) SSL SYSCALL error: EOF detected

But this example works fine:

def other_job(data_list):
    with Session() as s:
         return [s.query(RawDataTable).filter(and_(
           RawDataTable.timestamp == t, 
           RawDataTable.ts == ts)).all() for t, ts in data_list]

with Pool(10) as p:
    res = p.map(other_job, chunks)

Please, help to solve this problem.
Some people is our team also uses multiprocessing, and 1 time a week get these errors.

Mike Bayer

unread,
Mar 31, 2022, 8:42:25 AM3/31/22
to noreply-spamdigest via sqlalchemy
when using multiprocessing, the connection pool in the new process must be replaced with a new one.     This is usually accomplished by calling engine.dispose().  However, to maintain the pool in the parent process as well, replace the connection pool alone without disposing the old one:

engine = create_engine(...)

Session = sessionmaker(bind=engine)

def job_update_rd(data_list):
    updated = []
    with Session() as session:
        for t, ts in data_list:
            rd = session.query(RawDataTable).filter(and_(
                RawDataTable.timestamp == t,
                RawDataTable.ts == ts)).one()

            rd.ts = updated_ts[ts]
            session.commit()

            updated.append(rd)

    return updated


def initializer():
    engine.pool = engine.pool.recreate()

with Pool(10, initializer=initializer) as p:
    upd_list = p.map(job_update_rd, chunks)


For many years we've advised calling engine.dispose() here as documented at https://docs.sqlalchemy.org/en/14/core/pooling.html#using-connection-pools-with-multiprocessing-or-os-fork .    It was recently pointed out that this closes out the parent process' connections, so in SQLAlchemy 1.4.33 there will be a parameter so you can change the above code to engine.dispose(close=False).  
--
SQLAlchemy -
The Python SQL Toolkit and Object Relational Mapper
 
 
To post example code, please provide an MCVE: Minimal, Complete, and Verifiable Example. See http://stackoverflow.com/help/mcve for a full description.
---
You received this message because you are subscribed to the Google Groups "sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email to sqlalchemy+...@googlegroups.com.

Evgenii

unread,
Mar 31, 2022, 10:41:49 AM3/31/22
to sqlalchemy
Works! Thank you!

четверг, 31 марта 2022 г. в 15:42:25 UTC+3, Mike Bayer:
Reply all
Reply to author
Forward
0 new messages