do_connect listener called couple of times

Visto 201 veces
Saltar al primer mensaje no leído

Srinu Chp

no leída,
21 mar 2022, 0:51:2521/3/22
a sqlalchemy
Hello Team,

I tried to create a standalone application POC for sqlalchemy to db connection. When I registered do_connect event, I see event is triggered couple of times for one call:
sqlalchemy_connection.py

import os
import cx_Oracle
from sqlalchemy import create_engine
from sqlalchemy import event

cx_Oracle.init_oracle_client(lib_dir=os.environ.get("HOME") + "/Downloads/instantclient_19_8")
SQLALCHEMY_CONN = "test"
count = 0
engine = None
def connect_db(pwd):
global count
global engine
print(SQLALCHEMY_CONN)
username = "ADMIN"
password = pwd
dsn = "pydidb_high"
engine = create_engine(
f'oracle://{username}:{password}@{dsn}/?encoding=UTF-8&nencoding=UTF-8', max_identifier_length=128)
setup_event_handlers(engine)

def setup_event_handlers(engine):
@event.listens_for(engine, 'do_connect')
def receive_do_connect(dialect, conn_rec, cargs, cparams):
print("inside do_connect")
global count
try:
with engine.connect() as conn:
print("inside do_connect try block")
print(conn.scalar("select sysdate from dual"))
count += 2
except Exception as e:
print("inside do_connect except block")
count += 1

def db_connect_test():
print(engine)
with engine.connect() as conn:
print(conn.scalar("select sysdate from dual"))

gevent_sync.py

import gevent
import random
import sqlalchemy_connection

def task(pid):
gevent.sleep(random.randint(0,2)*0.001)
print('Task %s done' % pid)
sqlalchemy_connection.connect_db(**********)
sqlalchemy_connection.db_connect_test()

def synchronous():
for i in range(1,2):
task(i)
# sqlalchemy_connection.connect_db(**********)

def asynchronous():
threads = [gevent.spawn(task, i) for i in range(2)]
gevent.joinall(threads)

print('Synchronous:')
synchronous()
print('count %s ' % sqlalchemy_connection.count)

# print('Asynchronous:')
# asynchronous()
# print('count %s' % sqlalchemy_connection.count)

Output:

Synchronous:
Task 1 done
test
Engine(oracle://ADMIN:***@pydidb_high/?encoding=UTF-8&nencoding=UTF-8)
inside do_connect
inside do_connect
inside do_connect
inside do_connect
inside do_connect
inside do_connect
inside do_connect
inside do_connect
inside do_connect
inside do_connect
inside do_connect
inside do_connect
inside do_connect
inside do_connect
inside do_connect
inside do_connect except block
inside do_connect try block
2022-03-21 04:39:47
inside do_connect try block
2022-03-21 04:39:49
inside do_connect try block
2022-03-21 04:39:51
inside do_connect try block
2022-03-21 04:39:54
inside do_connect try block
2022-03-21 04:39:56
inside do_connect try block
2022-03-21 04:39:59
inside do_connect try block
2022-03-21 04:40:01
inside do_connect try block
2022-03-21 04:40:04
inside do_connect try block
2022-03-21 04:40:09
inside do_connect try block
2022-03-21 04:40:15
inside do_connect try block
2022-03-21 04:40:17
inside do_connect try block
2022-03-21 04:40:19
inside do_connect try block
2022-03-21 04:40:21
inside do_connect try block
2022-03-21 04:40:24
2022-03-21 04:40:26
count 29

highly appreciate any inputs.
Regards,
Pydi

Simon King

no leída,
21 mar 2022, 5:22:0421/3/22
a sqlal...@googlegroups.com
I don't really understand what's going on in your code, but you seem
to be calling engine.connect() inside your "do_connect" event handler.
I would expect that to trigger another "do_connect" event, which in
turn will call engine.connect() again, which will trigger another
"do_connect" event, and so on. I'm surprised the application gets as
far as it does. Maybe the exception handler inside receive_do_connect
is allowing it to stumble on.

Simon
> --
> SQLAlchemy -
> The Python SQL Toolkit and Object Relational Mapper
>
> http://www.sqlalchemy.org/
>
> To post example code, please provide an MCVE: Minimal, Complete, and Verifiable Example. See http://stackoverflow.com/help/mcve for a full description.
> ---
> You received this message because you are subscribed to the Google Groups "sqlalchemy" group.
> To unsubscribe from this group and stop receiving emails from it, send an email to sqlalchemy+...@googlegroups.com.
> To view this discussion on the web visit https://groups.google.com/d/msgid/sqlalchemy/08096636-06c4-478f-a54d-0bc8f71db414n%40googlegroups.com.

Srinu Chp

no leída,
21 mar 2022, 12:26:0221/3/22
a sqlalchemy
Hello Simon,

Thank you for prompt response. I really appreciate your help. I am trying to achieve password rotation and we are using secret client to fetch new password. I tried do_connect event and fetch new password from secret client, working as expected but we are facing performance issue as we are every time connecting to secret client(3~5sec for each request). Instead I am trying to achieve if connect fails then fetch from secret client. 

I tried with handle_error event, when i get error check for invalid user/pwd and update session with latest engine. This approach also did not help

Any insights are highly appreciated. Please suggest best approach.

Regards,
Pydi

Simon King

no leída,
21 mar 2022, 12:52:0421/3/22
a sqlal...@googlegroups.com
As suggested here:

https://docs.sqlalchemy.org/en/14/core/engines.html#fully-replacing-the-dbapi-connect-function

In your do_connect handler, rather than calling engine.connect(), you
need to call cx_Oracle.connect(), and return the result. You can wrap
this in an exception handler that detects the "incorrect password"
error to fetch new credentials. Something like this perhaps:

@event.listens_for(engine, 'do_connect')
def receive_do_connect(dialect, conn_rec, cargs, cparams):
try:
return cx_Oracle.connect(*cargs, **cparams)
except <appropriate-cx-oracle-error>:
cparams["password"] = get_new_password()
return cx_Oracle.connect(*args, **cparams)

Hope that helps,

Simon
> To view this discussion on the web visit https://groups.google.com/d/msgid/sqlalchemy/a0d30530-fc23-4e70-8965-a7b10c17a65cn%40googlegroups.com.

Srinu Chp

no leída,
21 mar 2022, 13:55:0921/3/22
a sqlalchemy
Hello Simon,

Perfect, working as expected in standalone POC. Thank you quick help

Regards,
Pydi

Srinu Chp

no leída,
21 mar 2022, 16:27:2321/3/22
a sqlalchemy
Hello Simon,

I tried POC approach in my project where we are using Airflow using sqlalchemy to connect to db. Here is the event code:

@event.listens_for(engine, "do_connect")
def receive_do_connect(dialect, conn_rec, cargs, cparams):
global SQL_ALCHEMY_CONN
log.info("receive_do_connect called for user AIRFLOW.")
log.info("user details DB: {}".format(SQL_ALCHEMY_CONN))
# creating new engine to valide using cx_oracle driver
engine_new = create_engine(f'{SQL_ALCHEMY_CONN}/?encoding=UTF-8&nencoding=UTF-8', max_identifier_length=128)
try:
with engine_new.connect() as conn:
log.info(conn.scalar("select sysdate from dual"))
SQL_ALCHEMY_CONN = "testNew_try"
except Exception as e:
# check for invalid user/pwd error
if search('ORA-01017', str(e)):
log.info("receive_do_connect exception occurred during engine connection e: {}".format(e))
...<connect to secret client and fetch new password>
//update with new password
SQL_ALCHEMY_CONN = "testNew_except"
# this log print new value with updated password
log.info("user details DB after update in except block: {}".format(SQL_ALCHEMY_CONN))
cparams['New password']

global SQL_ALCHEMY_CONN value is set during initialization. Once password is rotated I am trying to update the SQL_ALCHEMY_CONN so that next request will not go in except block. Every time logs print old SQL_ALCHEMY_CONN value even value is update in except block. 

second approach:
I tried to set env variable in except block:
os.environ['AIRFLOW__CORE__SQL_ALCHEMY_CONN']
env variable also refer to old value even after updating in except block.

Can you please suggestion?
Regards,
Pydi

Simon King

no leída,
22 mar 2022, 6:11:3422/3/22
a sqlal...@googlegroups.com
I don't know anything about Airflow. Are you sure that each of these
tasks is running inside the same Python interpreter/process? I see
Airflow can distribute tasks among workers:

https://airflow.apache.org/docs/apache-airflow/stable/executor/index.html

This sounds like a problem that is going to be very specific to your
deployment environment. If you have multiple worker processes, you're
going to need some way to distribute the new password to each of the
workers (eg. a shared cache)

But regardless of that, you're still not following the pattern from
the documentation. I don't understand why you are creating a new
engine inside your do_connect handler. You should be creating a
cx-Oracle connection and returning it. The parameters passed to the
do_connect handler have already been parsed out of the connection
string. So for example, if your connection string includes a
"password=some_password" parameter, then cparams will have a
"password" key with the value "some_password". The same cparams
dictionary will be passed to the do_connect handler each time, so if
you mutate the dictionary (eg. by updating the "password" key), the
next call to the handler will contain the new value.

If each task invocation is creating a new engine using a connection
string that is out of date, then none of that will help you, but that
would be an Airflow problem, not an SQLAlchemy problem.

Simon
> To view this discussion on the web visit https://groups.google.com/d/msgid/sqlalchemy/136a743f-e4fb-4cde-b363-9af7670057b8n%40googlegroups.com.

Srinu Chp

no leída,
22 mar 2022, 12:33:5722/3/22
a sqlalchemy
Hello Simon,

Thank you very much for detail information.

Regards,
Pydi

Srinu Chp

no leída,
23 mar 2022, 0:36:4023/3/22
a sqlalchemy
Hello Simon,

I tried your suggestion as POC:

def setup_event_handlers(engine):
@event.listens_for(engine, 'do_connect')
def receive_do_connect(dialect, conn_rec, cargs, cparams):
print("inside do_connect")
print('password %s' % cparams['password'])
try:
print("inside try")
return cx_Oracle.connect(*cargs, **cparams)
except Exception as e:
print("inside catch")
cparams['password'] = "NewPassword"
return cx_Oracle.connect(*cargs, **cparams)

Every time except block is triggered even I set correct password in Except block. As per document once cparams password set it should pass new password for new request. Can you please suggest if I miss anything here?

Regards,
Pydi

Simon King

no leída,
23 mar 2022, 11:29:5723/3/22
a sqlal...@googlegroups.com
I don't have Oracle, but here's a complete runnable example using MySQL (hopefully the formatting will survive this time):

import sqlalchemy as sa
import MySQLdb

# start with the wrong password to force a connection error
engine = sa.create_engine("mysql://user:wrong@db/db")

@sa.event.listens_for(engine, "do_connect")

def receive_do_connect(dialect, conn_rec, cargs, cparams):
    print(f"---- do_connect ----")
    print(f"cargs: {cargs}")
    print(f"cparams: {cparams}")
    try:
        return MySQLdb.connect(**cparams)
    except Exception as e:
        print(f"EXCEPTION: {e}")
        # Store the correct password in cparams for future connection attempts
        cparams["passwd"] = "correct"
        return MySQLdb.connect(**cparams)

conn1 = engine.connect()
print(f"conn1: {conn1}")

conn2 = engine.connect()
print(f"conn2: {conn2}")


And here's the output:

---- do_connect ----
cargs: []
cparams: {'host': 'db', 'db': 'db', 'user': 'user', 'passwd': 'wrong', 'client_flag': 2}
EXCEPTION: (1045, "Access denied for user 'user'@'172.18.0.9' (using password: YES)")
conn1: <sqlalchemy.engine.base.Connection object at 0x7fbd481b93d0>
---- do_connect ----
cargs: []
cparams: {'host': 'db', 'db': 'db', 'user': 'user', 'passwd': 'correct', 'client_flag': 2}
conn2: <sqlalchemy.engine.base.Connection object at 0x7fbd49499f10>


As you can hopefully see, the first connection attempt triggered an exception. The correct password was then stored in cparams, and the next time we called engine.connect(), the correct password was already passed in and no exception was raised.

Simon


Responder a todos
Responder al autor
Reenviar
0 mensajes nuevos