DynamicTaskMapping over Deferrable Operator not working as expected

282 views
Skip to first unread message

Sam H.

unread,
Nov 1, 2023, 2:29:54 PM11/1/23
to cloud-composer-discuss
We're running composer-2.4.1-airflow-2.5.3 .

I want to run three deferrable BigQuery jobs through this new 'dynamic task mapping' feature.

I expected these dynamic tasks to run and complete independently from one-another, but instead only one gets resumed after the deferral, and the rest seemingly needs to wait their turn (5 minutes).

I also see the following errors under the scheduler logs:
```
{
}
```


Sample DAG:
```
import logging
from datetime import datetime

from airflow import DAG
from airflow.decorators import task
from airflow.operators.empty import EmptyOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

GCLOUD_CONN_ID = <some gcloud conn id>

default_args = {

}

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

dag = DAG(
    "test",
    description="Test the dynamic task mapping",
    default_args=default_args,
    schedule_interval=None,
    start_date=datetime(2023, 10, 31),
    catchup=False
)


def generate_bq_configuration(proc: str):
    q = {
        "query": {
            "query": f"CALL `project.dataset.{proc}`();",
            "useLegacySql": False,
            "priority": "INTERACTIVE",
        }
    }
    return q


@task
def stub_report_names():
    return [
        generate_bq_configuration('print_hello_world_1'),
        generate_bq_configuration('print_hello_world_2'),
        generate_bq_configuration('print_hello_world_3'),
    ]


bq_operator = BigQueryInsertJobOperator.partial(
                task_id="bq_operator",
                gcp_conn_id=GCLOUD_CONN_ID,
                deferrable=True,
              ).expand(configuration=stub_report_names())

Sam H.

unread,
Nov 2, 2023, 9:17:00 AM11/2/23
to cloud-composer-discuss
In a separate test scenario, I'm seeing the following error from the celery worker which might hint at why the task failed. What's going on?

```
{
  "textPayload": "[c0d1cf10-61ab-4514-b15d-9a55bc417a5a] Failed to execute task [Errno 17] File exists: '/home/airflow/gcs/logs/plugin_sam_test/bq_client_test/2023-11-02T02:36:41.438084+00:00/1.log'.",
  "insertId": "1ma42i0feyvbw7",
  "resource": {
    "type": "cloud_composer_environment",
    "labels": {
      "location": "us-central1",
      "project_id": "gradient-dev",
      "environment_name": "airflow-dev-2022-01-04"
    }
  },
  "timestamp": "2023-11-02T02:36:48.032637290Z",
  "severity": "ERROR",
  "labels": {
    "worker_id": "airflow-worker-cv6ln",
    "process": "celery_executor.py:133"
  },
  "logName": "projects/gradient-dev/logs/airflow-worker",
  "receiveTimestamp": "2023-11-02T02:36:53.667616268Z"
}
```
Reply all
Reply to author
Forward
0 new messages