I am using an apscheduler to schedule my job in a django project. here is code
def start():
"""
The `start` function creates a report using a scheduler to generate and send reports based on
specified sources and schedules.
"""
logger.info(f" Started creating report")
executors = {
'default': ThreadPoolExecutor(1)
}
scheduler = BackgroundScheduler(executors=executors, daemon=True)
sources, idx_list, job_schedules, triggers = generate_report()
for source, idx, job_schedule, trigger in zip(sources, idx_list, job_schedules, triggers):
job_id = f"daily_report_job_{source.name}_{job_schedule.pk}_{idx}"
logger.info(f" Adding job with ID: {job_id}")
try:
scheduler.add_job(
generate_and_send_report_by_source_name,
trigger=trigger,
id=job_id,
args=[source],
replace_existing=True,
)
except Exception as e:
logger.error(f" Failed to add job with ID {job_id}: {e}")
try:
if scheduler.state == 0:
scheduler.start()
except Exception as e:
logger.error(f" Scheduler failed to start: {e}")
def generate_report():
"""
The `generate_report` function retrieves active sources, iterates through their job schedules,
creates triggers based on the schedule, and returns the sources, job schedules, and triggers.
:return: The `generate_report` function returns four values: `sources`, `idx_list`, `job_schedules`,
and `triggers`.
"""
sources = get_active_sources()
job_schedules = []
triggers = []
if not sources:
logger.error(" Failed to generate report. No active sources found.")
return [], [], []
idx_list = []
for idx, source in enumerate(sources, start=1):
source_job_schedules = source.job_schedule.all()
for job_schedule in source_job_schedules:
if job_schedule.day_of_week is not None:
trigger = CronTrigger(
day_of_week=job_schedule.day_of_week,
hour=job_schedule.hour,
minute=job_schedule.minute
)
else:
trigger = CronTrigger(
hour=job_schedule.hour,
minute=job_schedule.minute
)
triggers.append(trigger)
idx_list.append(idx)
job_schedules.append(job_schedule)
return sources, idx_list, job_schedules, triggers