Hello,
I'm working on a project that uses APScheduler, and I have a unit test that fails with the following stack:
Exception in thread APScheduler:
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 551, in __bootstrap_inner
self.run()
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 504, in run
self.__target(*self.__args, **self.__kwargs)
File "/blah/tests/lib/python2.7/site-packages/apscheduler/scheduler.py", line 579, in _main_loop
next_wakeup_time = self._process_jobs(now)
File "/blah/tests/lib/python2.7/site-packages/apscheduler/scheduler.py", line 545, in _process_jobs
self._threadpool.submit(self._run_job, job, run_times)
File "/blah/tests/lib/python2.7/site-packages/apscheduler/threadpool.py", line 102, in submit
raise RuntimeError('Cannot schedule new tasks after shutdown')
RuntimeError: Cannot schedule new tasks after shutdown
Stripped of the extraneous stuff (well, extraneous in theory), the unit test looks like this:
def null_function():
return
scheduler = some_constructor() # Basically makes a thin apscheduler.scheduler.Scheduler wrapper class and adds a MongoDB jobstore. Also sets misfire_grace_time to 5 minutes.
scheduled_date = datetime.datetime.fromtimestamp(time.time() + 3)
scheduler.modified_add_date_job(null_function, scheduled_date) # Adds the job to the scheduler, using the MongoDB jobstore.
time.sleep(1)
scheduler.shutdown()
time.sleep(6)
scheduler.start() # This causes the aforementioned exception.
time.sleep(1)
assert scheduler.running # This assertion fails too. On inspection: scheduler._thread.isAlive() is False (but scheduler._thread exists and scheduler._stopped is False).
The point of the test is to make sure that if the scheduler has to be shut down, it can recover: When it restarts, it can get the jobs from the Mongo store, and run all the jobs that it missed while it was down.
Anyone have suggestions on this? Any help would be much appreciated. Thanks!
-- Slater