Speed performance

172 views
Skip to first unread message

Jaime Valdez

unread,
Dec 7, 2022, 1:41:11 PM12/7/22
to sqlalchemy
Hi group,

I'm having some issues  trying to do some queries using the create_async_engine, the following code is the fastest one.

import time
import asyncio

from typing import Any, Dict, List

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy import func, select

from asyncio import run as aiorun

import models

def get_session():
    engine = create_engine("postgresql+psycopg2://postgres:post...@127.0.0.1:5431/db",
        echo=True,
        pool_size=20,
        max_overflow=0,
        pool_pre_ping=True,
    )
    return sessionmaker(bind=engine)

sess = get_session()
session = sess()

_geom = "SRID=4269;POLYGON(( ... ))"

async def get_areas(geometry) -> List[Dict[str, Any]]:
    _res = []
   
    sql = select(
        models.GeoTable, func.ST_Intersection(models.GeoTable.geom, geometry)
    ).where(func.ST_Intersects(models.GeoTable.geom, geometry))

    if rows := session.execute(sql).scalars():
        _res.extend(
            {
                "column": _f.column_value,
            }
            for _f in rows
        )
    return _res

loop = asyncio.get_event_loop()

async def _search():
    tic = time.perf_counter()
   
    (
        result1,
        result2,
    ) = loop.run_until_complete(asyncio.gather(
        get_areas(_geom),
        get_areas(_geom)
    ))    
    print(result1)
    print(result2)

    toc = time.perf_counter()
    print(f"{toc - tic:0.4f} seconds")

aiorun(_search())
loop.close()


This code takes about 1.5 secs to complete. But using this other approach (by using async engine) as the code below:

import time
import asyncio

from typing import Any, Dict, List, AsyncGenerator

from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.pool import NullPool
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy import func, select

from asyncio import run as aiorun

import models

async_engine = create_async_engine(
    "postgresql+asyncpg://postgres:post...@127.0.0.1:5431/db",
)

async def get_async_session() -> AsyncGenerator[AsyncSession, None]:
    async_session = sessionmaker(bind=async_engine, class_=AsyncSession, expire_on_commit=False)
    async with async_session() as session:
        yield session

_geom = "SRID=4269;POLYGON(( ... ))"

async def get_areas(session, geometry) -> List[Dict[str, Any]]:
    sql = select(
        models.GeoTable, func.ST_Intersection(models.GeoTable.geom, geometry)
    ).where(func.ST_Intersects(models.GeoTable.geom, geometry))

    rows = await session.execute(sql)
    return [
        {
            "column": _f.column_value,
        }
        for _r in rows.all()
    ]

async def _search():
    tic = time.perf_counter()

    async for session in get_async_session():    
        (
            result1,
            result2,
        ) = await asyncio.gather(
            get_areas(session, _geom),
            get_areas(session, _geom),
        )
        print(result1)
        print(result2)

    toc = time.perf_counter()
    print(f"{toc - tic:0.4f} seconds")

aiorun(_search())


This code takes about 354.1 seconds to complete, I wonder if I’m doing something wrong?

Thanks!

Mike Bayer

unread,
Dec 7, 2022, 2:47:15 PM12/7/22
to noreply-spamdigest via sqlalchemy
it's not clear what would cause the time delay, seems like a timeout after a deadlock of some kind.  however first thing is that your async code is written incorrectly, as it is sharing the same AsyncSession among two concurrent async tasks, which will cause random errors at any number of levels (ORM Session, Core connection, asyncpg connection) and is likely to be related to your issue.

for your async.gather(), each task should receive (or create on its own) a separate AsyncSession that's independent of the other, and each AsyncSession should proceed on its own connection/transaction.  You can't do concurrent work within a single transaction even with an asyncpg connection.
--
SQLAlchemy -
The Python SQL Toolkit and Object Relational Mapper
 
 
To post example code, please provide an MCVE: Minimal, Complete, and Verifiable Example. See http://stackoverflow.com/help/mcve for a full description.
---
You received this message because you are subscribed to the Google Groups "sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email to sqlalchemy+...@googlegroups.com.

Reply all
Reply to author
Forward
Message has been deleted
0 new messages