Issue with template not being replaced

877 views
Skip to first unread message

Sergei Iakhnin

unread,
Nov 16, 2015, 11:25:55 AM11/16/15
to Airflow
I have the following workflow code that is trying to pull the return value of a task into a downstream task.


Troublesome excerpt below is part of a PythonOperator:

```

sample_location = lookup_sample_location("{{ task_instance.xcom_pull(task_ids='get_sample_assignment_task')[0] }}")

    result_filename = "/tmp/{{ task_instance.xcom_pull(task_ids='get_sample_assignment_task')[1] }}_regenotype_" + contig_name + ".vcf"

    freebayes_command = "freebayes -r " + contig_name +\

                        " -f " + reference_location +\

                        " -@ " + variants_location +\

                        " -l " + sample_location +\

                        " > " + result_filename

    os.system(freebayes_command)



```

When I try to import this workflow I get the following error:

```

Traceback (most recent call last):

  File "/usr/lib/python2.7/site-packages/airflow/models.py", line 193, in process_file

    m = imp.load_source(mod_name, filepath)

  File "/opt/airflow/dags/freebayes-regenotype-workflow.py", line 121, in <module>

    python_callable = run_freebayes(contig_name),

  File "/opt/airflow/dags/freebayes-regenotype-workflow.py", line 74, in run_freebayes

    sample_location = lookup_sample_location("{{ task_instance.xcom_pull(task_ids='get_sample_assignment_task')[0] }}")

  File "/opt/airflow/dags/freebayes-regenotype-workflow.py", line 30, in lookup_sample_location

    my_sample_location = session.query(SampleLocation).filter(SampleLocation.donor_index==donor_index).first()

  File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/query.py", line 2469, in first

    ret = list(self[0:1])

  File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/query.py", line 2292, in __getitem__

    return list(res)

  File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/query.py", line 2571, in __iter__

    return self._execute_and_instances(context)

  File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/query.py", line 2586, in _execute_and_instances

    result = conn.execute(querycontext.statement, self._params)

  File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 914, in execute

    return meth(self, multiparams, params)

  File "/usr/lib64/python2.7/site-packages/sqlalchemy/sql/elements.py", line 323, in _execute_on_connection

    return connection._execute_clauseelement(self, multiparams, params)

  File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 1010, in _execute_clauseelement

    compiled_sql, distilled_params

  File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 1146, in _execute_context

    context)

  File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 1341, in _handle_dbapi_exception

    exc_info

  File "/usr/lib64/python2.7/site-packages/sqlalchemy/util/compat.py", line 199, in raise_from_cause

    reraise(type(exception), exception, tb=exc_tb)

  File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 1139, in _execute_context

    context)

  File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/default.py", line 450, in do_execute

    cursor.execute(statement, parameters)

DataError: (psycopg2.DataError) invalid input syntax for integer: "{{ task_instance.xcom_pull(task_ids='get_sample_assignment_task')[0] }}"

LINE 3: WHERE sample_locations.donor_index = '{{ task_instance.xcom_...

                                             ^

 [SQL: 'SELECT sample_locations.sample_location_id AS sample_locations_sample_location_id, sample_locations.donor_index AS sample_locations_donor_index, sample_locations.normal_sample_location AS sample_locations_normal_sample_location, sample_locations.tumor_sample_location AS sample_locations_tumor_sample_location, sample_locations.last_updated AS sample_locations_last_updated \nFROM sample_locations \nWHERE sample_locations.donor_index = %(donor_index_1)s \n LIMIT %(param_1)s'] [parameters: {'donor_index_1': "{{ task_instance.xcom_pull(task_ids='get_sample_assignment_task')[0] }}", 'param_1': 1}]


```

Basically it looks like the template code is not being replaced at runtime.

If I eschew the template code for a direct lookup of the task instance I also get an error:

```

def run_freebayes(contig_name, **kwargs):

    ti = kwargs['ti']

    sample_assignment = ti.xcom_pull(task_ids='get_sample_assignment_task')

    donor_index = sample_assignment[0]

    sample_id = sample_assignment[1]

    sample_location = lookup_sample_location(donor_index)

    result_filename = "/tmp/" + sample_id + "_regenotype_" + contig_name + ".vcf"

    freebayes_command = "freebayes -r " + contig_name +\

                        " -f " + reference_location +\

                        " -@ " + variants_location +\

                        " -l " + sample_location +\

                        " > " + result_filename

    os.system(freebayes_command)

```


results in


```

ERROR:root:Failed to import: /opt/airflow/dags/freebayes-regenotype-workflow.py

ERROR:root:'ti'

Traceback (most recent call last):

  File "/usr/lib/python2.7/site-packages/airflow/models.py", line 193, in process_file

    m = imp.load_source(mod_name, filepath)

  File "/opt/airflow/dags/freebayes-regenotype-workflow.py", line 125, in <module>

    python_callable = run_freebayes(contig_name),

  File "/opt/airflow/dags/freebayes-regenotype-workflow.py", line 74, in run_freebayes

    ti = kwargs['ti']

KeyError: 'ti'

```


Thanks for any insights.

Steven Yvinec-Kruyk

unread,
Nov 16, 2015, 11:38:33 AM11/16/15
to Airflow
At first glance it looks like you are using the PythonOperator incorrectly. You are calling the python script in your operator declaration ... just pass in the callable and if needed the arguments. Here is how the operator is defined in the sample code that ships with Airflow.

    task = PythonOperator(
        task_id='sleep_for_'+str(i),
        python_callable=my_sleeping_function,
        op_kwargs={'random_base': float(i)/10},
        dag=dag)

Hope this helps.

Sergei Iakhnin

unread,
Nov 16, 2015, 4:22:44 PM11/16/15
to Airflow
Hi Steven, thanks for pointing out this embarrasing error. I was indeed calling my callables instead of passing them. However, fixing this, still does not fix the issue.

Latest version now correctly passes the callables as far as I can tell


but there is still an error:

2015-11-16 21:12:06,858] {models.py:1017} ERROR - 'ti'

Traceback (most recent call last):

  File "/usr/lib/python2.7/site-packages/airflow/models.py", line 977, in run

    result = task_copy.execute(context=context)

  File "/usr/lib/python2.7/site-packages/airflow/operators/python_operator.py", line 65, in execute

    return_value = self.python_callable(*self.op_args, **self.op_kwargs)

  File "/opt/airflow/dags/freebayes-regenotype-workflow.py", line 75, in run_freebayes

    ti = kwargs["ti"]

KeyError: 'ti'

[2015-11-16 21:12:06,865] {models.py:1053} ERROR - 'ti'

Steven Yvinec-Kruyk

unread,
Nov 16, 2015, 9:48:04 PM11/16/15
to Airflow
Sergei,

Looks like progress ... from the error message it looks like the argument "ti" is not being passed to the run_freebayes function... so the key ti does not exist. Looks like you are only passing in contig_name from the code below.

genotyping_task = PythonOperator(
task_id = "regenotype_" + contig_name,
python_callable = run_freebayes,
op_kwargs={"contig_name": contig_name},
dag = dag)

Maxime Beauchemin

unread,
Nov 17, 2015, 12:14:31 AM11/17/15
to Airflow
Try setting `provide_context=True` on the PythonOperator call.
http://pythonhosted.org/airflow/code.html#airflow.operators.PythonOperator

Sergei Iakhnin

unread,
Nov 17, 2015, 5:55:43 AM11/17/15
to Airflow
Thanks for your help Maxime and Steven. It was Maxime's suggestion that finally fixed me.

Maxime, I was assuming that tasks were context aware by default. May be worth it to say that this is off by default in the docs. Also, perhaps include some mention that this needs to be set in the docs on XComs. This wasn't intuitive, at least for me.

Thanks again,

Sergei.
Reply all
Reply to author
Forward
0 new messages