How to pass xcom retrun value to operator as argument

2,780 views
Skip to first unread message

Rajashekar Sunkesula

unread,
Jul 5, 2019, 6:34:49 AM7/5/19
to cloud-composer-discuss

Hi All,


I am trying to use return value of getClusterCreateParameters function used in dataproc cluster it is throwing an error.

def getClusterCreateParameters(**context):
    return CLUSTER_NUM_WORKERS

start_cluster = DataprocClusterCreateOperator(
    task_id='start_cluster',
    project_id=PROJECT_ID,
    cluster_name=CLUSTER_NAME,
    #num_workers="{{ task_instance.xcom_pull('get_batch_list_task', key='return_value')[0]}}",
    num_workers="{{ task_instance.xcom_pull('get_batch_list_task', key='return_value')}}",,   => here i want to use retunrn value of getClusterCreateParameters function.
}

Thanks,
Rajashekar.

Szasza Palmer

unread,
Jul 5, 2019, 6:56:12 AM7/5/19
to cloud-composer-discuss
Hi Rajashekar,

As per https://github.com/apache/airflow/blob/master/airflow/contrib/operators/dataproc_operator.py#L184 the num_workers parameter is not templated, therefore you can't achieve it as-is.

The cleanest way of achieving what you would like to do is to create your own DataprocClusterCreateOperator by extending the existing one, and then you can do the below:

start_cluster = CustomDataprocClusterCreateOperator(

    task_id='start_cluster',
    project_id=PROJECT_ID,
    cluster_name=CLUSTER_NAME,
    provide_context=True, <== This is needed otherwise the DAG context is not passed into the task instance
    num_workers="{{ ti.xcom_pull(task_ids='get_batch_list_task', key='return_value') }}", <== please observe that `ti` is used instead of `task_instance`, and that the 
}

There are other, more hacky ways of getting the intended thing done, for example wrapping the DataprocClusterCreateOperator into a PythonOperator and manually kicking off the cluster creation, but I'd stick with the above, and if it works, it can be contributed back into the Airflow code itself.

Cheers,
Szasza

Szasza Palmer

unread,
Jul 5, 2019, 10:17:48 AM7/5/19
to cloud-composer-discuss
As a first attempt, I would go with something like the attached file which can be deployed either as a local Python library ( https://cloud.google.com/composer/docs/how-to/using/installing-python-dependencies#install-local ) or as an Airflow plugin ( https://cloud.google.com/composer/docs/concepts/pluginshttps://cloud.google.com/composer/docs/concepts/plugins ).

I have not tested the attached file due to environment limitations, please try it on a development environment first to verify if it works.

Cheers,
Szasza 
CustomDataprocClusterCreateOperator.py

Rajashekar Sunkesula

unread,
Jul 6, 2019, 8:26:54 AM7/6/19
to Szasza Palmer, cloud-composer-discuss
Hi Szasza,

After i tried whatever suggested by you but it still giving an error.

invalid literal for int() with base 10: "{{ti.xcom_pull('get_cluster_create_parameters', key='cluster_worker_size')[0]}}"

File "/usr/local/lib/airflow/airflow/utils/decorators.py", line 98,
in wrapper result = func(*args, **kwargs) File "/home/airflow/gcs/plugins/operators.py", line 162,
in __init__ num_workers = int(num_workers) ValueError: invalid literal for int() with base 10: "{{ti.xcom_pull('get_cluster_create_parameters', key='cluster_worker_size')[0]}}"




--
You received this message because you are subscribed to the Google Groups "cloud-composer-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cloud-composer-di...@googlegroups.com.
To post to this group, send email to cloud-compo...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/cloud-composer-discuss/83ded9b6-ed93-4761-92c1-749a005c825b%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Please do not include any Protected Health Information (PHI) when responding to this email!  

FIGmd does not transmit via email any healthcare information protected by federal and/or state laws unless authorized by the subject patient or under circumstances where patient authorization is not required. If FIGmd receives PHI directly from a subject patient, we consider this to be an authorized distribution of PHI by the subject patient. 
 
This email and any files transmitted with it are confidential and intended solely for the use of the individual or entity to whom they are addressed. If you are not the intended recipient, you are hereby notified that any dissemination, distribution or copying of this message (or any attachment) is prohibited. If you have received this email in error, please notify the original sender and delete this message (along with any attachments) from your computer and if possible your server.
operators.py

Szasza Palmer

unread,
Jul 6, 2019, 8:40:10 AM7/6/19
to cloud-composer-discuss
Hi Rajashekar,

The error indicates that the templating doesn't work for the field. At a first glance in your code I can see that you didn't use the class I sent as an example but tried to re-implement (copy) the DataprocClusterOperator from Airflow.

Unfortunately the copying is incorrectly done, nearly all the variables are missing from the docblock which yields a non-working templating in this case.

After the above is fixed, please keep in mind that the xcom value is usually a string, one shouldn't use [0] index accessor on it.

Cheers,
Szasza
Reply all
Reply to author
Forward
0 new messages