sample_location = lookup_sample_location("{{ task_instance.xcom_pull(task_ids='get_sample_assignment_task')[0] }}")
result_filename = "/tmp/{{ task_instance.xcom_pull(task_ids='get_sample_assignment_task')[1] }}_regenotype_" + contig_name + ".vcf"
freebayes_command = "freebayes -r " + contig_name +\
" -f " + reference_location +\
" -@ " + variants_location +\
" -l " + sample_location +\
" > " + result_filename
os.system(freebayes_command)
Traceback (most recent call last):
File "/usr/lib/python2.7/site-packages/airflow/models.py", line 193, in process_file
m = imp.load_source(mod_name, filepath)
File "/opt/airflow/dags/freebayes-regenotype-workflow.py", line 121, in <module>
python_callable = run_freebayes(contig_name),
File "/opt/airflow/dags/freebayes-regenotype-workflow.py", line 74, in run_freebayes
sample_location = lookup_sample_location("{{ task_instance.xcom_pull(task_ids='get_sample_assignment_task')[0] }}")
File "/opt/airflow/dags/freebayes-regenotype-workflow.py", line 30, in lookup_sample_location
my_sample_location = session.query(SampleLocation).filter(SampleLocation.donor_index==donor_index).first()
File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/query.py", line 2469, in first
ret = list(self[0:1])
File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/query.py", line 2292, in __getitem__
return list(res)
File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/query.py", line 2571, in __iter__
return self._execute_and_instances(context)
File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/query.py", line 2586, in _execute_and_instances
result = conn.execute(querycontext.statement, self._params)
File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 914, in execute
return meth(self, multiparams, params)
File "/usr/lib64/python2.7/site-packages/sqlalchemy/sql/elements.py", line 323, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 1010, in _execute_clauseelement
compiled_sql, distilled_params
File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 1146, in _execute_context
context)
File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 1341, in _handle_dbapi_exception
exc_info
File "/usr/lib64/python2.7/site-packages/sqlalchemy/util/compat.py", line 199, in raise_from_cause
reraise(type(exception), exception, tb=exc_tb)
File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 1139, in _execute_context
context)
File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/default.py", line 450, in do_execute
cursor.execute(statement, parameters)
DataError: (psycopg2.DataError) invalid input syntax for integer: "{{ task_instance.xcom_pull(task_ids='get_sample_assignment_task')[0] }}"
LINE 3: WHERE sample_locations.donor_index = '{{ task_instance.xcom_...
^
[SQL: 'SELECT sample_locations.sample_location_id AS sample_locations_sample_location_id, sample_locations.donor_index AS sample_locations_donor_index, sample_locations.normal_sample_location AS sample_locations_normal_sample_location, sample_locations.tumor_sample_location AS sample_locations_tumor_sample_location, sample_locations.last_updated AS sample_locations_last_updated \nFROM sample_locations \nWHERE sample_locations.donor_index = %(donor_index_1)s \n LIMIT %(param_1)s'] [parameters: {'donor_index_1': "{{ task_instance.xcom_pull(task_ids='get_sample_assignment_task')[0] }}", 'param_1': 1}]
def run_freebayes(contig_name, **kwargs):
ti = kwargs['ti']
sample_assignment = ti.xcom_pull(task_ids='get_sample_assignment_task')
donor_index = sample_assignment[0]
sample_id = sample_assignment[1]
sample_location = lookup_sample_location(donor_index)
result_filename = "/tmp/" + sample_id + "_regenotype_" + contig_name + ".vcf"
freebayes_command = "freebayes -r " + contig_name +\
" -f " + reference_location +\
" -@ " + variants_location +\
" -l " + sample_location +\
" > " + result_filename
os.system(freebayes_command)
```
results in
```
ERROR:root:Failed to import: /opt/airflow/dags/freebayes-regenotype-workflow.py
ERROR:root:'ti'
Traceback (most recent call last):
File "/usr/lib/python2.7/site-packages/airflow/models.py", line 193, in process_file
m = imp.load_source(mod_name, filepath)
File "/opt/airflow/dags/freebayes-regenotype-workflow.py", line 125, in <module>
python_callable = run_freebayes(contig_name),
File "/opt/airflow/dags/freebayes-regenotype-workflow.py", line 74, in run_freebayes
ti = kwargs['ti']
KeyError: 'ti'
```
Thanks for any insights.
task = PythonOperator( task_id='sleep_for_'+str(i), python_callable=my_sleeping_function, op_kwargs={'random_base': float(i)/10}, dag=dag)
2015-11-16 21:12:06,858] {models.py:1017} ERROR - 'ti'
Traceback (most recent call last):
File "/usr/lib/python2.7/site-packages/airflow/models.py", line 977, in run
result = task_copy.execute(context=context)
File "/usr/lib/python2.7/site-packages/airflow/operators/python_operator.py", line 65, in execute
return_value = self.python_callable(*self.op_args, **self.op_kwargs)
File "/opt/airflow/dags/freebayes-regenotype-workflow.py", line 75, in run_freebayes
ti = kwargs["ti"]
KeyError: 'ti'
[2015-11-16 21:12:06,865] {models.py:1053} ERROR - 'ti'
| genotyping_task = PythonOperator( | |
| task_id = "regenotype_" + contig_name, | |
| python_callable = run_freebayes, | |
| op_kwargs={"contig_name": contig_name}, | |
| dag = dag) |