Great, Thanks.
Used the orm to dynamically create and destroy connections.
def create_hive_connection(**context):
session = settings.Session()
hivenodeip = context['task_instance'].xcom_pull(task_ids='wait_for_status_wait')
from airflow import models
conn = models.Connection(
conn_id='nightly_process_hive', conn_type='hive',
host=hivenodeip, port=10000, login='hadoop', password='',
schema='default',
extra='{ "use_beeline": true , "auth" : ""}')
session.add(conn)
session.commit()
def delete_hive_connection(**context):
session = settings.Session()
from airflow import models
C = models.Connection
conn = session.query(C).filter(C.conn_id == 'nightly_process_hive').first()
session.delete(conn)
session.commit()
create_hive_connection_op = PythonOperator(
task_id='create_hive_connection',
python_callable=create_hive_connection,
provide_context=True,
dag=dag
)
delete_hive_connection_op = PythonOperator(
task_id='delete_hive_connection',
python_callable=delete_hive_connection,
provide_context=True,
dag=dag
)
hive_create_input_tables = HiveOperator(task_id='hive_create_input_tables',
hive_cli_conn_id="nightly_process_hive",
hql="hql/dp_create_input_tables.hql",
#hql="{{ ds }}",
dag=dag)