Hive connection configurations for EMR transient cluster

713 views
Skip to first unread message

Or Sher

unread,
Feb 14, 2016, 3:22:56 AM2/14/16
to Airflow
Hi guys,

We're looking into Airflow to replace our current workflow python scripts.

In our use cases we're initializing an EMR cluster with hive, running our processes and shutting down the EMR cluster.
In this approach the hive connection config is changing from run to run and from what I saw the HiveOperator is working only with a predefined connection object.

Is there a way to dynamically define the connection config?
I'd really love using a built in operator than writing my own. :)

Thanks,
Or.

Or Sher

unread,
Mar 1, 2016, 8:42:30 AM3/1/16
to Airflow
Someone?
Any suggestions what can I do to overcome these issue?

Or Sher

unread,
Mar 6, 2016, 6:49:41 AM3/6/16
to Airflow
Sorry to jump this thread up again, but I'd really love to hear some thoughts before I'm inventing the wheel.

Any suggestions on how to tackle this use case?

Maxime Beauchemin

unread,
Mar 7, 2016, 10:44:11 PM3/7/16
to Airflow
The Hive operator is just a wrapper around the Hive CLI (or beeline). You can define connections using environment variables:

You can also import the connection model and use the ORM to create new ones or do whatever:
`from airflow.models import Connection`

Or Sher

unread,
Mar 10, 2016, 10:40:22 AM3/10/16
to Airflow
Great, Thanks.

Used the orm to dynamically create and destroy connections.

In case any one need something like that in the future:

def create_hive_connection(**context):
    session = settings.Session()
    hivenodeip = context['task_instance'].xcom_pull(task_ids='wait_for_status_wait')
    from airflow import models
    conn = models.Connection(
            conn_id='nightly_process_hive', conn_type='hive',
            host=hivenodeip, port=10000, login='hadoop', password='',
            schema='default',
            extra='{ "use_beeline": true , "auth" : ""}')
    
    session.add(conn)
    session.commit()
    
def delete_hive_connection(**context):
    session = settings.Session()
    from airflow import models
    C = models.Connection
    conn = session.query(C).filter(C.conn_id == 'nightly_process_hive').first()
    session.delete(conn)
    session.commit()

create_hive_connection_op = PythonOperator(
                                           task_id='create_hive_connection',
                                           python_callable=create_hive_connection,
                                           provide_context=True,
                                           dag=dag
                                           )

delete_hive_connection_op = PythonOperator(
                                           task_id='delete_hive_connection',
                                           python_callable=delete_hive_connection,
                                           provide_context=True,
                                           dag=dag
                                           )

hive_create_input_tables = HiveOperator(task_id='hive_create_input_tables',
                          hive_cli_conn_id="nightly_process_hive",
                          hql="hql/dp_create_input_tables.hql",
                          #hql="{{ ds }}",
                          dag=dag)
Reply all
Reply to author
Forward
0 new messages