I have some dags set to run on the 1st of every month at 4pm. Ideally, it should run all data for November on the 1st of December however I noticed it only run for the previous period. That is, my current or latest run is on the 1st November (data for October). I am expecting the latest run to have been on the 1st of December, which should run data for November. I've noticed same for weekly DAGs.
Can anyone please help with this? Maybe I'm getting the logic wrong.
from datetime import timedelta, datetime
import json
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_check_operator import BigQueryCheckOperator
from airflow.contrib.operators.bigquery_table_delete_operator import BigQueryTableDeleteOperator
from airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators import email_operator
from airflow.models import Variable
default_args = {
'owner': 'airflow',
'depends_on_past': False,
#'start_date': seven_days_ago,
'start_date': datetime(2018, 9, 1, 16, 0, 0),
'email_on_failure': True,
'email_on_retry': True,
'retries': 5,
'retry_delay': timedelta(minutes=5),
}
# Set Schedule: Run pipeline first of every month.
schedule_interval = "0 16 1 * *"
# Define DAG: Set ID and assign default args and schedule interval
dag = DAG('test_scripts3', default_args=default_args, schedule_interval=schedule_interval)