Saving files in /tmp/ in Cloud Composer

8,425 views
Skip to first unread message

Jocelyn

unread,
Jul 26, 2018, 7:05:22 PM7/26/18
to cloud-composer-discuss
Hi,

I have the following workflow in Composer:

Task1

BigQueryToCloudStorageOperator - to read a BQ table to GCS - say table1.csv

Task 2 
BashOperator to read a python script from GCS (script1.py) and write the result to /tmp/ for use by Task 3 and also save to GCS (result1.csv). 

 script1.py reads in table1.csv from GCS as a download blob object into a pandas dataframe. This works successfully when run on the Command line (Cloud Shell) but fails in Composer. 

Task 3
BashOperator - to upload result of task 2 (also called in a python script script2.py) and upload via an API.

Am I right to say that Composer does not recognise  /tmp/ as a location and is not able to retrieve files saved to a temporary location? I viewed the logs for task 2 and noted the temporary location, however, task 3 could not locate the file (gave a different location). 


Thanks 
Jocelyn

Norbert Kremer

unread,
Jul 27, 2018, 10:40:39 AM7/27/18
to cloud-composer-discuss
Hello,  Your observations are consistent with mine.  By running commands like pwd and df inside BashOperator tasks and inspecting logs, I found that each task is run in its own subdirectory under /tmp and one task cannot see the files created by another task.  Fair enough, that's just how it works.

I don't know how good a practice this is, but you might be able to use space under /home/airflow/gcsfuse   GCSfuse makes the GCS bucket available to bash commands as if it were part of the filesystem and the locations are stable from task to task.  This area /home/airflow/gcsfuse is set up by CC so it's CC's bucket and not my bucket. I don't like mixing user files with system-generated files, and I don't know the security implications.  Maybe others in this forum could comment on that. But I think it will work for your needs.  Or maybe you could mount another bucket of your own making with gcsfuse.

I was motivated to look at this because I wanted to process some files (ungzip in my case) that were potentially larger than the space available on /tmp    /home/airflow/gcsfuse seems to have lots of space. I have not tested the limit, but df reports 1 PB.

Try it and let us know how it works out.

Trevor Edwards

unread,
Jul 27, 2018, 12:28:47 PM7/27/18
to cloud-composer-discuss
https://cloud.google.com/composer/docs/concepts/cloud-storage#data_synchronization documents the data/ folder which is meant for staging files like this.

Also, IIRC, even if the data/ folder supports up to 1 PB of data, you may run into issues if you try to work with individual files whose size is greater than the size of your Composer workers' disk.

Norbert Kremer

unread,
Jul 27, 2018, 2:27:29 PM7/27/18
to cloud-composer-discuss
Trevor, thanks for clearing that up.  I tested the data folder and it works for me as expected.  Regarding size limitation, what you say makes sense, as gcsfuse needs some place to buffer the writes to GCS.  I'll test the size limits later.


Jocelyn

unread,
Aug 1, 2018, 9:20:43 AM8/1/18
to cloud-composer-discuss
Thanks for the suggestions. I'm still having a problem reading the file correctly. 

  In my old airflow configuration I would call the file directly from my home path, save the output to '~/airflow/bin/test.py' then maybe copy this to cloud storage.

t1 = BashOperator(
     task_id='run_script1',
     bash_command='python /home/airflow/scripts//test.py ',
     dag=dag)

t2 = BashOperator(
     task_id='upload_script1_output',
     bash_command='gsutil cp ~/airflow/bin/test.py gs://[bucket]/test.py ',
     dag=dag)


I tried saving all my files into the data folder but when I call the files in my tasks, airflow is not able to recognise the file path. Even when I save it to a folder in the dags folder.

So I have a file 'test.py' saved in gs://[bucket-name]/data/test.py. How do I call/ reference this path in my DAG task in Composer? The documentation specifies a Storage path and a Mapped Directory path but when I run this: 

t3 = BashOperator(
     task_id='run_script1',
     bash_command='python /home/airflow/gcs/dags/DS_quotes/test_convert_columns.py', 
     dag=dag)

I get:

[2018-08-01 13:06:30,038] {base_task_runner.py:98} INFO - Subtask: [2018-08-01 13:06:30,038] {bash_operator.py:101} INFO - python: can't open file '/home/airflow/gcs/dags/DS_quotes/test_convert_columns.py': [Errno 2] No such file or directory
[2018-08-01 13:06:30,634] {base_task_runner.py:98} INFO - Subtask: [2018-08-01 13:06:30,630] {bash_operator.py:105} INFO - Command exited with return code 2

Jocelyn

unread,
Aug 1, 2018, 5:56:26 PM8/1/18
to cloud-composer-discuss
I have just read found this documentation [https://cloud.google.com/sdk/gcloud/reference/beta/composer/environments/storage/data/import ] that shows how to import files that run in tasks and outputs of tasks to '/data folder' in GCS and call it in the task using /home/airflow/gcs/data/file.py. I tried setting up this way and I'm getting this error:
[2018-08-01 21:37:32,409] {models.py:1427} INFO - Executing <Task(BigQueryOperator): bq_to_bq> on 2018-07-30 14:30:00
[2018-08-01 21:37:32,409] {base_task_runner.py:115} INFO - Running: ['bash', '-c', u'airflow run dsquotes_upload13 bq_to_bq 2018-07-30T14:30:00 --job_id 1925 --raw -sd DAGS_FOLDER/ds_quotes_upload3.py']
[2018-08-01 21:37:35,187] {base_task_runner.py:98} INFO - Subtask: [2018-08-01 21:37:35,185] {__init__.py:45} INFO - Using executor CeleryExecutor
[2018-08-01 21:37:35,322] {base_task_runner.py:98} INFO - Subtask: [2018-08-01 21:37:35,320] {models.py:189} INFO - Filling up the DagBag from /home/airflow/gcs/dags/ds_quotes_upload3.py
[2018-08-01 21:37:36,054] {base_task_runner.py:98} INFO - Subtask: [2018-08-01 21:37:36,050] {models.py:2505} ERROR - /home/airflow/gcs/data/DS_quotes/sql_script/gclid_export.sql
[2018-08-01 21:37:36,061] {base_task_runner.py:98} INFO - Subtask: Traceback (most recent call last):
[2018-08-01 21:37:36,064] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 2503, in resolve_template_files
[2018-08-01 21:37:36,065] {base_task_runner.py:98} INFO - Subtask:     setattr(self, attr, env.loader.get_source(env, content)[0])
[2018-08-01 21:37:36,067] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/jinja2/loaders.py", line 187, in get_source
[2018-08-01 21:37:36,067] {base_task_runner.py:98} INFO - Subtask:     raise TemplateNotFound(template)
[2018-08-01 21:37:36,070] {base_task_runner.py:98} INFO - Subtask: TemplateNotFound: /home/airflow/gcs/data/DS_quotes/sql_script/gclid_export.sql
[2018-08-01 21:37:36,377] {base_task_runner.py:98} INFO - Subtask: Traceback (most recent call last):
[2018-08-01 21:37:36,379] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/bin/airflow", line 27, in <module>
[2018-08-01 21:37:36,385] {base_task_runner.py:98} INFO - Subtask:     args.func(args)
[2018-08-01 21:37:36,386] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
[2018-08-01 21:37:36,388] {base_task_runner.py:98} INFO - Subtask:     pool=args.pool,
[2018-08-01 21:37:36,388] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
[2018-08-01 21:37:36,391] {base_task_runner.py:98} INFO - Subtask:     result = func(*args, **kwargs)
[2018-08-01 21:37:36,392] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1477, in _run_raw_task
[2018-08-01 21:37:36,392] {base_task_runner.py:98} INFO - Subtask:     self.render_templates()
[2018-08-01 21:37:36,393] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1760, in render_templates
[2018-08-01 21:37:36,393] {base_task_runner.py:98} INFO - Subtask:     rendered_content = rt(attr, content, jinja_context)
[2018-08-01 21:37:36,396] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 2481, in render_template
[2018-08-01 21:37:36,397] {base_task_runner.py:98} INFO - Subtask:     return jinja_env.get_template(content).render(**context)
[2018-08-01 21:37:36,397] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/jinja2/environment.py", line 812, in get_template
[2018-08-01 21:37:36,398] {base_task_runner.py:98} INFO - Subtask:     return self._load_template(name, self.make_globals(globals))
[2018-08-01 21:37:36,401] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/jinja2/environment.py", line 774, in _load_template
[2018-08-01 21:37:36,402] {base_task_runner.py:98} INFO - Subtask:     cache_key = self.loader.get_source(self, name)[1]
[2018-08-01 21:37:36,405] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/jinja2/loaders.py", line 187, in get_source
[2018-08-01 21:37:36,406] {base_task_runner.py:98} INFO - Subtask:     raise TemplateNotFound(template)
[2018-08-01 21:37:36,406] {base_task_runner.py:98} INFO - Subtask: jinja2.exceptions.TemplateNotFound: /home/airflow/gcs/data/DS_quotes/sql_script/gclid_export.sql
 
Do I need to add additional parameters to my call to DAG that is dag = DAG(dag_id, schedule_interval .....)


Thank you.

On Friday, July 27, 2018 at 12:05:22 AM UTC+1, Jocelyn wrote:

Feng Lu

unread,
Aug 2, 2018, 3:55:26 AM8/2/18
to AGBEKO Jocelyn, cloud-composer-discuss
As Composer with celery setup distributes task executions to a set of workers, task2 output saved in local file system on one worker may not be visible to another worker which runs task3.  The two workers may not be running in the same k8s pod/node, GCS is the right place to go. 

Re: the errors you have seen, if you ssh into the worker pod, are you able to "ls -l /home/airflow/gcs/data/DS_quotes/sql_script/gclid_export.sql " successfully? 

--
You received this message because you are subscribed to the Google Groups "cloud-composer-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cloud-composer-di...@googlegroups.com.
To post to this group, send email to cloud-compo...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/cloud-composer-discuss/0b2c724b-f774-41d3-bb0b-7f783ab2d7e1%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Jocelyn

unread,
Aug 2, 2018, 4:51:54 AM8/2/18
to cloud-composer-discuss
Hi Feng,

When I ssh into the worker pod it returns:- " ls: cannot access '/home/airflow/gcs/data/DS_quotes/sql_script/gclid_export.sql': No such file or directory "
To unsubscribe from this group and stop receiving emails from it, send an email to cloud-composer-discuss+unsub...@googlegroups.com.

Jocelyn

unread,
Aug 2, 2018, 7:14:40 AM8/2/18
to cloud-composer-discuss
I have also tried running the DAG again, keeping my task files in the /data folder  and the dag file in /dags folder. My first two tasks run successfully (because these are simple big query operator tasks). My task which should pick the output from the BQ job, read it (in a pandas df) and write the result to a BQ table fails. Looking at the logs, airflow is unable to pick the file from its location in /home/airflow/gcs/data/file.py.

This is my task:

t3 = BashOperator(
     task_id='ds_run_conversion_script',
     bash_command='python /home/airflow/gcs/dags/DS/test_convert_columns.py',
     dag=dag)

Error log:
[2018-08-02 10:49:48,021] {models.py:1427} INFO - Executing <Task(BashOperator): ds_run_conversion_script> on 2018-07-30 14:30:00
[2018-08-02 10:49:48,022] {base_task_runner.py:115} INFO - Running: ['bash', '-c', u'airflow run dsquotes_upload21 ds_run_conversion_script 2018-07-30T14:30:00 --job_id 1990 --raw -sd DAGS_FOLDER/DS_quotes_upload.py']
[2018-08-02 10:49:50,576] {base_task_runner.py:98} INFO - Subtask: [2018-08-02 10:49:50,575] {__init__.py:45} INFO - Using executor CeleryExecutor
[2018-08-02 10:49:50,704] {base_task_runner.py:98} INFO - Subtask: [2018-08-02 10:49:50,702] {models.py:189} INFO - Filling up the DagBag from /home/airflow/gcs/dags/DS_quotes_upload.py
[2018-08-02 10:49:51,742] {base_task_runner.py:98} INFO - Subtask: [2018-08-02 10:49:51,742] {bash_operator.py:70} INFO - Tmp dir root location: 
[2018-08-02 10:49:51,743] {base_task_runner.py:98} INFO - Subtask:  /tmp
[2018-08-02 10:49:51,749] {base_task_runner.py:98} INFO - Subtask: [2018-08-02 10:49:51,746] {bash_operator.py:80} INFO - Temporary script location: /tmp/airflowtmpOMTtMk//tmp/airflowtmpOMTtMk/ds_run_conversion_script_Y4o0J
[2018-08-02 10:49:51,750] {base_task_runner.py:98} INFO - Subtask: [2018-08-02 10:49:51,748] {bash_operator.py:88} INFO - Running command: python /home/airflow/gcs/data/DS/test_convert_columns.py
[2018-08-02 10:49:51,766] {base_task_runner.py:98} INFO - Subtask: [2018-08-02 10:49:51,764] {bash_operator.py:97} INFO - Output:
[2018-08-02 10:49:53,831] {base_task_runner.py:98} INFO - Subtask: [2018-08-02 10:49:53,830] {bash_operator.py:101} INFO - python: can't open file '/home/airflow/gcs/data/DS/test_convert_columns.py': [Errno 2] No such file or directory
[2018-08-02 10:49:54,555] {base_task_runner.py:98} INFO - Subtask: [2018-08-02 10:49:54,553] {bash_operator.py:105} INFO - Command exited with return code 2
[2018-08-02 10:49:54,658] {base_task_runner.py:98} INFO - Subtask: Traceback (most recent call last):
[2018-08-02 10:49:54,660] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/bin/airflow", line 27, in <module>
[2018-08-02 10:49:54,661] {base_task_runner.py:98} INFO - Subtask:     args.func(args)
[2018-08-02 10:49:54,664] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
[2018-08-02 10:49:54,665] {base_task_runner.py:98} INFO - Subtask:     pool=args.pool,
[2018-08-02 10:49:54,666] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
[2018-08-02 10:49:54,667] {base_task_runner.py:98} INFO - Subtask:     result = func(*args, **kwargs)
[2018-08-02 10:49:54,669] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
[2018-08-02 10:49:54,670] {base_task_runner.py:98} INFO - Subtask:     result = task_copy.execute(context=context)
[2018-08-02 10:49:54,670] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/airflow/operators/bash_operator.py", line 109, in execute
[2018-08-02 10:49:54,671] {base_task_runner.py:98} INFO - Subtask:     raise AirflowException("Bash command failed")
[2018-08-02 10:49:54,671] {base_task_runner.py:98} INFO - Subtask: airflow.exceptions.AirflowException: Bash command failed

Am I missing anything here please? I have a PoC going live tomorrow and my jobs keep failing.

Thanks

Maria Janczak

unread,
Aug 2, 2018, 1:20:48 PM8/2/18
to jocelyn...@swiftcover.com, cloud-compo...@googlegroups.com
Hi Jocelyn,

One thing I notice is that your Operator references '/home/airflow/gcs/dags/DS/test_convert_columns.py', while the logs reference '/home/airflow/gcs/data/DS/test_convert_columns.py' is not found. (So the logs are likely from a different attempt.)

Secondly, the bash command is executed from a temp directory and you would need to specify full paths to your files. This is what 
Temporary script location: /tmp/airflowtmpOMTtMk//tmp/airflowtmpOMTtMk/ds_run_conversion_script_Y4o0J
in your logs means. Another simpler thing you could try is to run the script directly in the operator, using the Python operator and instructions at link.

Lastly, keep in mind that data written to the data/ folder by your previous tasks will only be eventually consistent across worker pods. This does not guarantee that it will be ready before your next task starts.

Hi Feng,

To unsubscribe from this group and stop receiving emails from it, send an email to cloud-composer-di...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "cloud-composer-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cloud-composer-di...@googlegroups.com.

To post to this group, send email to cloud-compo...@googlegroups.com.

f.p...@travelaudience.com

unread,
Sep 9, 2018, 10:29:24 AM9/9/18
to cloud-composer-discuss
Can you explain how to easily save a result of an operator to GCS bucket reserved for composer's instance

Imagine this easy dag in dags folder:


How can I get custom or /data folder in gcs?
Hi Feng,

To unsubscribe from this group and stop receiving emails from it, send an email to cloud-composer-discuss+unsub...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "cloud-composer-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cloud-composer-discuss+unsub...@googlegroups.com.

Feng Lu

unread,
Sep 9, 2018, 5:35:08 PM9/9/18
to f.p...@travelaudience.com, cloud-composer-discuss
Please take a look at our doc on GCS directory mapping, for example, if you save operator output data to /home/airflow/gcs/data, it will be auto synced to the gs://{composer-bucket}/data. 

Hi Feng,

To unsubscribe from this group and stop receiving emails from it, send an email to cloud-composer-di...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "cloud-composer-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cloud-composer-di...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "cloud-composer-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cloud-composer-di...@googlegroups.com.

To post to this group, send email to cloud-compo...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages