We're running
composer-2.4.1-airflow-2.5.3 .I want to run three deferrable BigQuery jobs through this new 'dynamic task mapping' feature.
I expected these dynamic tasks to run and complete independently from one-another, but instead only one gets resumed after the deferral, and the rest seemingly needs to wait their turn (5 minutes).
I also see the following errors under the scheduler logs:
```
{
insertId: "1mty8gdfbgzw9p"
labels: {2}
logName: "projects/gradient-dev/logs/airflow-scheduler"
receiveTimestamp: "2023-11-01T17:39:02.703732840Z"
resource: {2}
severity: "ERROR"
textPayload: "Executor reports task instance <TaskInstance: plugin_sam_test.bq_operator_deferrable_nolimits manual__2023-11-01T17:38:41.695403Z map_index=2 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?"
timestamp: "2023-11-01T17:38:57.846554593Z"
}
```
Sample DAG:
```
import logging
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.operators.empty import EmptyOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
GCLOUD_CONN_ID = <some gcloud conn id>
default_args = {
}
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
dag = DAG(
"test",
description="Test the dynamic task mapping",
default_args=default_args,
schedule_interval=None,
start_date=datetime(2023, 10, 31),
catchup=False
)
def generate_bq_configuration(proc: str):
q = {
"query": {
"query": f"CALL `project.dataset.{proc}`();",
"useLegacySql": False,
"priority": "INTERACTIVE",
}
}
return q
@task
def stub_report_names():
return [
generate_bq_configuration('print_hello_world_1'),
generate_bq_configuration('print_hello_world_2'),
generate_bq_configuration('print_hello_world_3'),
]
bq_operator = BigQueryInsertJobOperator.partial(
task_id="bq_operator",
gcp_conn_id=GCLOUD_CONN_ID,
deferrable=True,
).expand(configuration=stub_report_names())