SQLAlchemy asyncio ThreadPoolExecutor “Too many clients”

1,214 views
Skip to first unread message

Viktor Roytman

unread,
Sep 28, 2016, 4:51:44 PM9/28/16
to sqlalchemy
I wrote a script with this sort of logic in order to insert many records into a PostgreSQL table as they are generated.

#!/usr/bin/env python3
import asyncio
from concurrent.futures import ProcessPoolExecutor as pool
from functools import partial

import sqlalchemy as sa
from sqlalchemy.ext.declarative import declarative_base


metadata
= sa.MetaData(schema='stackoverflow')
Base = declarative_base(metadata=metadata)


class Example(Base):
    __tablename__
= 'example'
    pk
= sa.Column(sa.Integer, primary_key=True)
    text
= sa.Column(sa.Text)


sa
.event.listen(Base.metadata, 'before_create',
    sa
.DDL('CREATE SCHEMA IF NOT EXISTS stackoverflow'))

engine
= sa.create_engine(
   
'postgresql+psycopg2://postgres:password@localhost:5432/stackoverflow'
)
Base.metadata.create_all(engine)
session
= sa.orm.sessionmaker(bind=engine, autocommit=True)()


def task(value):
    engine
.dispose()
   
with session.begin():
        session
.add(Example(text=value))


async
def infinite_task(loop):
    spawn_task
= partial(loop.run_in_executor, None, task)
   
while True:
        await asyncio
.wait([spawn_task(value) for value in range(10000)])


def main():
    loop
= asyncio.get_event_loop()
   
with pool() as executor:
        loop
.set_default_executor(executor)
        asyncio
.ensure_future(infinite_task(loop))
        loop
.run_forever()
        loop
.close()


if __name__ == '__main__':
    main
()

This code works just fine, creating a pool of as many processes as I have CPU cores, and happily chugging along forever. I wanted to see how threads would compare to processes, but I could not get a working example. Here are the changes I made:

from concurrent.futures import ThreadPoolExecutor as pool

session_maker
= sa.orm.sessionmaker(bind=engine, autocommit=True)
Session = sa.orm.scoped_session(session_maker)


def task(value):
    engine
.dispose()
   
# create new session per thread
    session
= Session()
   
with session.begin():
        session
.add(Example(text=value))
   
# remove session once the work is done
   
Session.remove()

This version runs for a while before a flood of "too many clients" exceptions:

sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) FATAL:  sorry, too many clients already

What's causing the problem?

Mike Bayer

unread,
Sep 28, 2016, 5:11:59 PM9/28/16
to sqlal...@googlegroups.com
> if__name__ =='__main__':
> main()|
>
> |
>
> This code works just fine, creating a pool of as many processes as I
> have CPU cores, and happily chugging along forever. I wanted to see how
> threads would compare to processes, but I could not get a working
> example. Here are the changes I made:
>
> |
>
> |fromconcurrent.futures importThreadPoolExecutoraspool
>
> session_maker =sa.orm.sessionmaker(bind=engine,autocommit=True)
> Session=sa.orm.scoped_session(session_maker)
>
>
> deftask(value):
> engine.dispose()
> # create new session per thread
> session =Session()
> withsession.begin():
> session.add(Example(text=value))
> # remove session once the work is done
> Session.remove()|
>
> |
>
> This version runs for a while before a flood of "too many clients"
> exceptions:
>
> |sqlalchemy.exc.OperationalError:(psycopg2.OperationalError)FATAL:sorry,too
> many clients already|
>
>
> What's causing the problem?

it would appear that either the ThreadExecutorPool is starting more
threads than your Postgresql database has available connections, or your
engine.dispose() is leaving PG connections lying open to be garbage
collected. A single Engine should be all that's necessary in a single
process.




>
> --
> You received this message because you are subscribed to the Google
> Groups "sqlalchemy" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to sqlalchemy+...@googlegroups.com
> <mailto:sqlalchemy+...@googlegroups.com>.
> To post to this group, send email to sqlal...@googlegroups.com
> <mailto:sqlal...@googlegroups.com>.
> Visit this group at https://groups.google.com/group/sqlalchemy.
> For more options, visit https://groups.google.com/d/optout.

Viktor Roytman

unread,
Sep 28, 2016, 9:58:01 PM9/28/16
to sqlalchemy
Yes, the problem was the call to engine.dispose()

Thanks!
Reply all
Reply to author
Forward
0 new messages