Hi Folks
I'm new to Airflow, rolling out a proof of concept at work. I have the examples running as well as a basic DAG I created. I'm running into issues with a SqlSensor that queries a Vertica database. I have verified that I can query the database from the EC2 instance with vertica_python.
Here are some of the attempts:
1) In Admin -> Connections, modified the connection for 'vertica_default'. Saving this does not seem to save the password, or the UI doesn't indicate whether or not a password was saved for the connection. Running `airflow test dag_name task_name date` I get this error:
Traceback (most recent call last):
File "/home/airflow/venv/bin/airflow", line 15, in <module>
args.func(args)
File "/home/airflow/venv/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 316, in test
ti.run(force=True, ignore_dependencies=True, test_mode=True)
File "/home/airflow/venv/local/lib/python2.7/site-packages/airflow/models.py", line 1067, in run
result = task_copy.execute(context=context)
File "/home/airflow/venv/lib/python2.7/site-packages/airflow/operators/sensors.py", line 52, in execute
while not self.poke(context):
File "/home/airflow/venv/lib/python2.7/site-packages/airflow/operators/sensors.py", line 82, in poke
records = hook.get_records(self.sql)
AttributeError: 'NoneType' object has no attribute 'get_records'
And this is the code for the SqlSensor portion of the DAG.
t_validate = SqlSensor(
conn_id='vertica_default',
sql=get_sql_for_task_id('t_validate'),
task_id='t_validate',
dag=dag
)
2) In Admin -> Connections, create a new connection called 'vertica_custom'. Again, I'm not certain if the password took. Modifying the SqlSensor code to use conn_id='vertica_custom' I get the same error as above.
3) At the command line, export the variable AIRFLOW_CONN_VERTICA_CUSTOM with the value of the URI for the database. I tested that the URI is parsed correctly by the method Connection.parse_from_uri. Same error as above when testing that dag and task.
Some questions:
1) What is the simplest way to test a connection with airflow?
2) How do I verify that the local db is storing my Vertica credentials (including password)?
3) I have installed cryptography and airflow[crypto] but still none of the connections appear as encrypted in admin/connection/.
4) From another post, I downgraded Flask-Admin from 1.4.0 to 1.2.0. While some buttons appeared differently in admin/connection/ it didn't fix anything that I could tell.
My requirements.txt:
airflow==1.7.0
airflow[celery]==1.7.0
airflow[crypto]==1.7.0
airflow[jdbc]==1.7.0
airflow[s3]==1.7.0
airflow[vertica]==1.7.0
celery==3.1.23
cryptography==1.3.1
Flask-Admin==1.2.0
flower==0.8.4
MySQL-python==1.2.5
vertica-python==0.5.6
I'm currently running airflow-webserver and airflow-scheduler on the EC2 instance.
Any pointers to documentation or other help would be most appreciated.
thanks,
Dennis