Cloud Composer doesn't achieve expected concurrency

217 views
Skip to first unread message

Harry Prior

unread,
Nov 2, 2022, 12:59:38 PM11/2/22
to cloud-composer-discuss
I am attempting to use CC as part of our data pipeline. This involves pulling some data from an oracle database to big query; comparing this data with another big query dataset to get a set of 'changes'; and then based on these changes, pulling more data from the oracle database to big query. The aim is to run lots of IO-bound tasks concurrently in a relatively simple DAG. Roughly, the DAG looks like this:

`get_data` --> `compare_changes` --> `pull_changes[]`

In some nodes, such as `get_data` and `compare_changes`, most of the 'work' is done outside cloud composer. For example, sending a query to big query. With the `pull_changes` task, the node is doing some trivial IO - using TaskFlow and dynamic task mapping, the 'changes' output from the previous node is expanded such that a `pull_change` task is generated for each change. In this node, a database is queried and the output is stored (locally) as a CSV file. This CSV file is then uploaded to GCS where it will later be uploaded to big query.

I was hoping to generate potentially thousands of `pull_changes` tasks which would be run as concurrently as the database connections limit would allow for. But so far the DAG is painfully slow.

Even with a powerful environment (64 workers at 8 vCPUs, 16 GB memory, 5 GB storage. 2 schedulars at 4 vCPUs, 4GB memory, 5 GB storage. And a large environment size), the number of concurrent 'running' tasks never reaches that which is available in the pool. Tasks remain in a queueing state for far longer than expected. I have also noticed that many tasks take a while to be marked as successful, despite their 'work' being completed.

The environment parameters (worker_concurrency, parallelism, max_active_tasks_per_dag) have been set correctly so there are no limiting factors in this respect. The environment monitoring dashboard also doesn't indicate any performance issues with the environment.

Is this behaviour typical of airflow? What can I do to try and speed up my DAG and achieve better concurrent behaviour? Or is my use-case inappropriate for Cloud Composer?

Stephan Meyn

unread,
Nov 2, 2022, 5:03:54 PM11/2/22
to Harry Prior, cloud-composer-discuss
In composer the cycle time to schedule and start a task is comparatively slow. You are ok with a few dozen tasks but if you go into the hundreds it will take time for the scheduler (and executor if you are using airflow 2) to run up all the tasks.

If you do want to run a large number of tasks I would recommend invoking cloud batch. 

--
You received this message because you are subscribed to the Google Groups "cloud-composer-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cloud-composer-di...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/cloud-composer-discuss/17ea8d94-ac28-48a0-b9d9-04b90fe68449n%40googlegroups.com.
--

Stephan Meyn

steph...@google.com

Strategic Cloud Engineer

+61 414 599 624


Filip Knapik

unread,
Nov 3, 2022, 6:53:36 AM11/3/22
to Stephan Meyn, Harry Prior, cloud-composer-discuss
Harry,

A lot depends on how many concurrent tasks you want to run in an environment; there should be no issues with running hundreds of concurrent tasks (with proper configuration) but thousands may prove to be challenging - mostly due to how Airflow workers communicate with the database and bottlenecks it could generate there. 
I will follow up with you separately to better understand the configuration of your environment and settings you have applied, and desired scale/performance. 
 
Regards,
Filip

Message has been deleted

Raul-Ronald Galea

unread,
Oct 31, 2023, 2:53:02 PM10/31/23
to cloud-composer-discuss
Hello all,

Having established that Airflow does not really scale beyond a few hundred concurrent tasks - what would instead be the right solution for managing high numbers of tasks? Potentially tens of thousands of concurrent tasks?

My concrete use case can be described as follows:

There is an external service that offers 2 endpoints:
- start_job()
- query_job_status()

The jobs need to be orchestrated in a DAG-like manner, where features such as those provided by Airflow would come really handy (defining dependencies, retry logic, monitoring, etc.).

Airflow actually has the Triggerer service, which supports polling-like operations very efficiently (single asyncio event loop), but even just starting the jobs via Airflow does not scale... The Airflow Triggerer would also be resilient to worker restarts/failures, such that tasks are not wrongly marked as failed in Airflow while the real jobs are still actually running - that's a second crucial requirement, scalability aside. 

Does GCP have a solution that would fit my use case? Or is my particular use case perhaps too uncommon/unusual?

Many thanks

Stephan Meyn

unread,
Nov 5, 2023, 9:37:45 PM11/5/23
to cloud-composer-discuss
At this scale, with thousands of concurrent tasks, you need to consider more frugal options, to avoid an immense resource consumption.
things that come to mind:

  • gcp workflows: 
    • faster
    • still provides a dag
    • has a management surface
    • however  has a limit of 3,000 active wworkflow exetions per project
  • cloud tasks:
    • can invoke workflows
    • can invoke cloud functions/cloud run, gke,  any http target
    • can limit both rate and max concurrent invocations (to avoid overloading workflows for instance)

Reply all
Reply to author
Forward
0 new messages