sqlalchemy with airflow and Oracle

1,618 views
Skip to first unread message

Vincent B.

unread,
Oct 26, 2016, 9:22:23 AM10/26/16
to sqlalchemy
Hi,

I am using sqlalchemy 1.0.5, Airflow 1.7.1.3, Python 2.7 and Oracle 12.

I'm pretty much stuck with the integration of a connexion to Oracle through sqlalchemy in an Airflow Airbnb script.


Here is my log from Airflow/sqlalchemy.

[2016-10-26 14:51:07,574] {base.py:719} INFO - COMMIT
[2016-10-26 14:51:07,631] {log.py:109} INFO - SELECT USER FROM DUAL
[2016-10-26 14:51:07,631] {log.py:109} INFO - {}
[2016-10-26 14:51:07,633] {log.py:109} INFO - SELECT CAST('test plain returns' AS VARCHAR(60 CHAR)) AS anon_1 FROM DUAL
[2016-10-26 14:51:07,633] {log.py:109} INFO - {}
[2016-10-26 14:51:07,634] {log.py:109} INFO - SELECT CAST('test unicode returns' AS NVARCHAR2(60)) AS anon_1 FROM DUAL
[2016-10-26 14:51:07,634] {log.py:109} INFO - {}
[2016-10-26 14:51:07,638] {log.py:109} INFO - Disconnection detected on checkout:
[2016-10-26 14:51:07,638] {log.py:109} INFO - Invalidate connection <cx_Oracle.Connection to [MyDataBaseName]@(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(HOST=[MyOracleServerIP])(PORT=1521)))(CONNECT_DATA=(SERVICE_NAME=I2B2)))> (reason: DisconnectionError:)
[2016-10-26 14:51:07,678] {log.py:109} INFO - Disconnection detected on checkout:
[2016-10-26 14:51:07,678] {log.py:109} INFO - Invalidate connection <cx_Oracle.Connection to [MyDataBaseName]@(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(HOST=[MyOracleServerIP])(PORT=1521)))(CONNECT_DATA=(SERVICE_NAME=I2B2)))> (reason: DisconnectionError:)
[2016-10-26 14:51:07,714] {log.py:109} INFO - Reconnection attempts exhausted on checkout
[2016-10-26 14:51:07,715] {log.py:109} INFO - Invalidate connection <cx_Oracle.Connection to [MyDataBaseName]@(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(HOST=[MyOracleServerIP])(PORT=1521)))(CONNECT_DATA=(SERVICE_NAME=I2B2)))>
[2016-10-26 14:51:07,716] {models.py:1286} ERROR - This connection is closed
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1245, in run
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/dist-packages/airflow/operators/python_operator.py", line 66, in execute
    return_value = self.python_callable(*self.op_args, **self.op_kwargs)
  File "/root/airflow/dags/debug_py2b2_connect.py", line 41, in test_connect
    connection=i2b2data.engine.connect()
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 2085, in connect
    return self._connection_cls(self, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 90, in __init__
    if connection is not None else engine.raw_connection()
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 2171, in raw_connection
    self.pool.unique_connection, _connection)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 2141, in _wrap_pool_connect
    return fn()
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 328, in unique_connection
    return _ConnectionFairy._checkout(self)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 804, in _checkout
    raise exc.InvalidRequestError("This connection is closed")
InvalidRequestError: This connection is closed
[2016-10-26 14:51:07,718] {models.py:1306} INFO - Marking task as FAILED.



Here is my sql alchemy __init__code in my python class
def __init__(self,connection_uri,params):
if connection_uri.startswith("oracle"):
os.environ['NLS_LANG']= 'AMERICAN_AMERICA.AL32UTF8'
self.engine=sqlalchemy.create_engine(connection_uri, **params)
Session = sessionmaker(bind=self.engine, autoflush=True, autocommit=False)
self.session = Session()
self.metadata = MetaData()
self.metadata.bind=self.engine


This error "This connection is closed" is not raised when in airflow "test" mode. I can connect and update my database and my python script works perfectly.
However in airflow "run" mode it fails.

I also have this oracle warning,
<msg time='2016-10-17T11:52:47.755+02:00' org_id='xxxxxx' comp_id='rdbms' type='UNKNOWN' level='16' host_id='xxxxxxx' host_addr='xxxxxx' module='python@xxxxxxx' pid='8978'> <txt>Using deprecated SQLNET.ALLOWED_LOGON_VERSION parameter. </txt> </msg>

But it seems unrelated as i get this warning when i execute my script in "test" (success) and "run" (failure) mode. We are planning to fix this soon.


I have no clue if this error is related to airflow, oracle or sqlalchemy configuration.



Mike Bayer

unread,
Oct 26, 2016, 9:45:59 AM10/26/16
to sqlal...@googlegroups.com

This is connectivity issues, I have no idea what airflow is, however if you're dropping connections this would be something to email the cx_oracle list about .   The error message looks like you're not able to establish a connection in the first place.   You might want to create a plain cx_oracle test script at least to make the options and the error clear.


--
SQLAlchemy -
The Python SQL Toolkit and Object Relational Mapper
 
http://www.sqlalchemy.org/
 
To post example code, please provide an MCVE: Minimal, Complete, and Verifiable Example. See http://stackoverflow.com/help/mcve for a full description.
---
You received this message because you are subscribed to the Google Groups "sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email to sqlalchemy+unsubscribe@googlegroups.com.
To post to this group, send email to sqlal...@googlegroups.com.
Visit this group at https://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.

Vincent B.

unread,
Oct 26, 2016, 10:24:03 AM10/26/16
to sqlalchemy
Thanks for this answer.

As you suggested i tried to connect, with success, to my database through cx_oracle.


ip = '[MyOracleServerIP]'
port = 1521
service_name = '[MyServiceName]'
dsn = cx_Oracle.makedsn(ip, port, service_name=service_name)
db = cx_Oracle.connect('[login]', '[password]', dsn)
print db.version
db.close()

Output : 12.1.0.2.0

Functional both in "test" and "run".
To unsubscribe from this group and stop receiving emails from it, send an email to sqlalchemy+...@googlegroups.com.

mike bayer

unread,
Oct 26, 2016, 12:56:53 PM10/26/16
to sqlal...@googlegroups.com
oh. then they are timing out your connections due to inactivity. Set
the "pool_timeout" parameter to a number of seconds less than this timeout.



On 10/26/2016 10:24 AM, Vincent B. wrote:
> Thanks for this answer.
>
> As you suggested i tried to connect, with success, to my database
> through cx_oracle.
>
>
> |
> ip = '[MyOracleServerIP]'
> port = 1521
> service_name = '[MyServiceName]'
> dsn = cx_Oracle.makedsn(ip, port, service_name=service_name)
> db = cx_Oracle.connect('[login]', '[password]', dsn)
> print db.version
> db.close()
> |
>
> Output : 12.1.0.2.0
>
> Functional both in "test" and "run".
>
>
> Le mercredi 26 octobre 2016 15:45:59 UTC+2, Mike Bayer a écrit :
>
> This is connectivity issues, I have no idea what airflow is, however
> if you're dropping connections this would be something to email the
> cx_oracle list about . The error message looks like you're not
> able to establish a connection in the first place. You might want
> to create a plain cx_oracle test script at least to make the options
> and the error clear.
>
>
> On Oct 26, 2016 9:22 AM, "Vincent B." <mr.benoi...@gmail.com
> This error "This connection is closed" is *not *raised when in
> it, send an email to sqlalchemy+...@googlegroups.com <javascript:>.
> To post to this group, send email to sqlal...@googlegroups.com
> <javascript:>.
> <https://groups.google.com/group/sqlalchemy>.
> For more options, visit https://groups.google.com/d/optout
> <https://groups.google.com/d/optout>.
>
> --
> SQLAlchemy -
> The Python SQL Toolkit and Object Relational Mapper
>
> http://www.sqlalchemy.org/
>
> To post example code, please provide an MCVE: Minimal, Complete, and
> Verifiable Example. See http://stackoverflow.com/help/mcve for a full
> description.
> ---
> You received this message because you are subscribed to the Google
> Groups "sqlalchemy" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to sqlalchemy+...@googlegroups.com
> <mailto:sqlalchemy+...@googlegroups.com>.
> To post to this group, send email to sqlal...@googlegroups.com
> <mailto:sqlal...@googlegroups.com>.

Vincent B.

unread,
Oct 27, 2016, 3:46:49 AM10/27/16
to sqlalchemy
Thanks again.

So i configured several pool_timeout ranging from 0.000001 to 30 and i still get this This connection is closed 

Is there a way to have more details regarding my sqlalchemy connection?
I am already using echo=True,  echo_pool=True and Python's logging

logging.basicConfig()
logging.getLogger('sqlalchemy.engine').setLevel(logging.DEBUG)




I also tried to use NullPool before, without success.

mike bayer

unread,
Oct 27, 2016, 8:42:28 AM10/27/16
to sqlal...@googlegroups.com
So the test against plain cx_Oracle is a datapoint sure, but since you
say that connects, we need to look at what your application is doing.
The first thing I see in the stack trace is that the application is
raising a sqlalchemy.exc.DisconnectionError. This is an exception that
SQLAlchemy itself *does not raise* - only the calling application or
library would do this, to indicate to the connection pool that it needs
to reconnect.

As I'm getting the impression this is within code you aren't familiar
with, I went to figure out what airflow is (hey airbnb using SQLAlchemy!
woop) and I can find at least one probable cause which is this code:

https://github.com/apache/incubator-airflow/blob/ff45d8f2218a8da9328161aa66d004c3db3b367e/airflow/utils/db.py#L70

and that code will not work on Oracle. You can't say "SELECT 1" on
Oracle, you need to say "SELECT 1 FROM DUAL".

They have copied the example verbatim from the 0.9 version of the docs,
which is unfortunate because that example has this bug as well as that
it is just crashing on any "except:", not just the ones that indicate a
disconnect. That was back when I was still writing example code with
the perception that people would obviously "fix up" the code to suit
their specific case (e.g. what kinds of exceptions they'd care about for
specific databases, etc).

Airflow needs to fix their example to match the modern form which is at
http://docs.sqlalchemy.org/en/latest/core/pooling.html#disconnect-handling-pessimistic.
I would submit this to them.




On 10/27/2016 03:46 AM, Vincent B. wrote:
> Thanks again.
>
> So i configured several pool_timeout ranging from 0.000001 to 30 and i
> still get this *This connection is closed *
>
> Is there a way to have more details regarding my sqlalchemy connection?
> I am already using *echo=True*, *echo_pool=True* and Python's logging
> > <mailto:sqlalchemy+...@googlegroups.com <javascript:>>.
> > To post to this group, send email to sqlal...@googlegroups.com
> <javascript:>
> > <mailto:sqlal...@googlegroups.com <javascript:>>.
> > Visit this group at https://groups.google.com/group/sqlalchemy
> <https://groups.google.com/group/sqlalchemy>.
> > For more options, visit https://groups.google.com/d/optout
> <https://groups.google.com/d/optout>.
>
> --
> SQLAlchemy -
> The Python SQL Toolkit and Object Relational Mapper
>
> http://www.sqlalchemy.org/
>
> To post example code, please provide an MCVE: Minimal, Complete, and
> Verifiable Example. See http://stackoverflow.com/help/mcve for a full
> description.
> ---
> You received this message because you are subscribed to the Google
> Groups "sqlalchemy" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to sqlalchemy+...@googlegroups.com
> <mailto:sqlalchemy+...@googlegroups.com>.

Vincent B.

unread,
Oct 27, 2016, 8:53:55 AM10/27/16
to sqlalchemy
Thanks for this very detailled answer!
It will take some time for me to investigate this.

By the way Airflow Airbnb is a great solution but still very new and a bit buggy.
>     > <mailto:sqlalchemy+unsub...@googlegroups.com <javascript:>>.
>     > To post to this group, send email to sqlal...@googlegroups.com
>     <javascript:>
>     > <mailto:sqlal...@googlegroups.com <javascript:>>.
>     > Visit this group at https://groups.google.com/group/sqlalchemy
>     <https://groups.google.com/group/sqlalchemy>.
>     > For more options, visit https://groups.google.com/d/optout
>     <https://groups.google.com/d/optout>.
>
> --
> SQLAlchemy -
> The Python SQL Toolkit and Object Relational Mapper
>
> http://www.sqlalchemy.org/
>
> To post example code, please provide an MCVE: Minimal, Complete, and
> Verifiable Example. See http://stackoverflow.com/help/mcve for a full
> description.
> ---
> You received this message because you are subscribed to the Google
> Groups "sqlalchemy" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to sqlalchemy+...@googlegroups.com

Vincent B.

unread,
Oct 27, 2016, 9:14:29 AM10/27/16
to sqlalchemy
Editing the source code from "SELECT 1" to "SELECT 1 FROM DUAL" actually fixed everything.

It is a less than satisfactory solution so i will contact the dev of Airflow to update the pessimistic_connection_handling code with the newer version as you suggested.

So it was the sqlalchemy integration in airflow that was problematic.

Thanks again. You saved me hours (days) of work.
Reply all
Reply to author
Forward
0 new messages