debugging database connections

1,832 views
Skip to first unread message

Dennis O'Brien

unread,
Apr 13, 2016, 7:07:20 PM4/13/16
to Airflow
Hi Folks

I'm new to Airflow, rolling out a proof of concept at work.  I have the examples running as well as a basic DAG I created.  I'm running into issues with a SqlSensor that queries a Vertica database.  I have verified that I can query the database from the EC2 instance with vertica_python.

Here are some of the attempts:
1) In Admin -> Connections, modified the connection for 'vertica_default'.  Saving this does not seem to save the password, or the UI doesn't indicate whether or not a password was saved for the connection.  Running `airflow test dag_name task_name date` I get this error:
Traceback (most recent call last):
 
File "/home/airflow/venv/bin/airflow", line 15, in <module>
    args
.func(args)
 
File "/home/airflow/venv/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 316, in test
    ti
.run(force=True, ignore_dependencies=True, test_mode=True)
 
File "/home/airflow/venv/local/lib/python2.7/site-packages/airflow/models.py", line 1067, in run
    result
= task_copy.execute(context=context)
 
File "/home/airflow/venv/lib/python2.7/site-packages/airflow/operators/sensors.py", line 52, in execute
   
while not self.poke(context):
 
File "/home/airflow/venv/lib/python2.7/site-packages/airflow/operators/sensors.py", line 82, in poke
    records
= hook.get_records(self.sql)
AttributeError: 'NoneType' object has no attribute 'get_records'


And this is the code for the SqlSensor portion of the DAG.

t_validate = SqlSensor(
conn_id='vertica_default',
sql=get_sql_for_task_id('t_validate'),
task_id='t_validate',
dag=dag
)

2) In Admin -> Connections, create a new connection called 'vertica_custom'.  Again, I'm not certain if the password took.  Modifying the SqlSensor code to use conn_id='vertica_custom' I get the same error as above.

3) At the command line, export the variable AIRFLOW_CONN_VERTICA_CUSTOM with the value of the URI for the database.  I tested that the URI is parsed correctly by the method Connection.parse_from_uri.  Same error as above when testing that dag and task.

Some questions:
1) What is the simplest way to test a connection with airflow?
2) How do I verify that the local db is storing my Vertica credentials (including password)?
3) I have installed cryptography and airflow[crypto] but still none of the connections appear as encrypted in admin/connection/.
4) From another post, I downgraded Flask-Admin from 1.4.0 to 1.2.0.  While some buttons appeared differently in admin/connection/ it didn't fix anything that I could tell.

My requirements.txt:
airflow==1.7.0
airflow[celery]==1.7.0
airflow[crypto]==1.7.0
airflow[jdbc]==1.7.0
airflow[s3]==1.7.0
airflow[vertica]==1.7.0
celery==3.1.23
cryptography==1.3.1
Flask-Admin==1.2.0
flower==0.8.4
MySQL-python==1.2.5
vertica-python==0.5.6

I'm currently running airflow-webserver and airflow-scheduler on the EC2 instance.

Any pointers to documentation or other help would be most appreciated.

thanks,
Dennis

Dennis O'Brien

unread,
Apr 13, 2016, 7:25:19 PM4/13/16
to Airflow
I was able to verify that the local db has the correct credentials saved.

>>> import sqlite3
>>> conn = sqlite3.connect('airflow/airflow.db')
>>> c = conn.cursor()
>>> for line in c.execute('select * from connection').fetchall():
...     print(line)
...

...
(14, u'vertica_default', u'vertica', u'<DB_HOSTNAME>', u'db', u'<DB_USERNAME>', u'<DB_PASSWORD>', 5433, None, 0, 0)
...

So this is probably in my setup or usage.

Dennis O'Brien

unread,
Apr 13, 2016, 8:16:43 PM4/13/16
to Airflow
Regarding encryption of the db credentials, I realized I needed to provide a fernet_key in airflow.cfg.  Once I did that, ran 'airflow resetdb', and added the connection definition, the UI showed 'Is Encrypted' active for this connection.

About debugging, I found some clues in this issue:  https://github.com/airbnb/airflow/issues/1040
If I instantiate a VerticaHook directly I get an object I can query.  But if I use BaseHook.get_connection('vertica_default').get_hook() (as SqlSensor.poke does) I get None.

>>> from airflow.contrib.hooks import VerticaHook
>>> h = VerticaHook('vertica_default')
>>> h.get_records("""select 1""")
[2016-04-14 00:12:30,453] {base_hook.py:53} INFO - Using connection to: localhost
[2016-04-14 00:12:30,504] {base_hook.py:53} INFO - Using connection to: localhost
[[1]]
>>>
>>> from airflow.hooks import BaseHook
>>> h = BaseHook.get_connection('vertica_default').get_hook()
[2016-04-14 00:13:10,581] {base_hook.py:53} INFO - Using connection to: localhost
>>> print(h)

None



Is there something I need to do to "activate" the Vertica hook?

thanks,
Dennis


Dennis O'Brien

unread,
Apr 13, 2016, 9:02:38 PM4/13/16
to Airflow
It looks like this was a bug in 1.7.0 but has already been addressed.

1.7.0:

Fixed in 1.7.1rc1:

I'll try pip installing that tag and see how it goes.
Reply all
Reply to author
Forward
0 new messages