implementing support for multiple schedulers

1,183 views
Skip to first unread message

Justin Francis

unread,
Oct 11, 2015, 10:17:24 AM10/11/15
to APScheduler
Hi, 

I'm new to APScheduler. For fault-tolerance reasons (though not scalability reasons) I am considering trying to add support for multiple scheduler instances running off the same job store to APScheduler. I am not necessarily planning on doing this in a general way such that it can be contributed back to the project (for example, I only need it to work for my chosen job store).

From looking at the code, it appears that all I would really need to do would be to override the implementation of the BaseScheduler._jobstores_lock (though not the other locks) to something that is able to work across multiple processes or machines. 

Is my understanding correct? Or is it more complicated?

Thanks,

Justin

Alex Grönholm

unread,
Oct 11, 2015, 10:21:31 AM10/11/15
to apsch...@googlegroups.com
If you replace the lock with a distributed lock, I guess that would do it.
Looking forward to hearing back from this.
--
You received this message because you are subscribed to the Google Groups "APScheduler" group.
To unsubscribe from this group and stop receiving emails from it, send an email to apscheduler...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Justin Francis

unread,
Oct 11, 2015, 10:30:01 AM10/11/15
to apsch...@googlegroups.com
Thanks for the quick reply. I'll give it a shot and let you know the results. I was going to try this with MongoDB job store first.



--
You received this message because you are subscribed to a topic in the Google Groups "APScheduler" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/apscheduler/Gjc_JQMPePc/unsubscribe.
To unsubscribe from this group and all its topics, send an email to apscheduler...@googlegroups.com.
Message has been deleted

Justin Francis

unread,
Oct 12, 2015, 12:22:58 AM10/12/15
to APScheduler
= Results of Overriding _jobstore_lock = 

So this does work. I implemented a lock on top of MongoDB and replaced the base Scheduler _jobstore_lock and two processes co-operated to run a periodic task at regular intervals. However, this solution has a number of drawbacks:

* The _jobstore_lock is used for modifications both to jobs inside the jobstore, and also the Scheduler's jobstores list itself. The first requires the centralized lock, while the other requires only a local lock
* The _jobstore_lock is used for all jobstores, so modifying any jobstore will lock any other modifications to other jobstores
* The _jobstore_lock becomes "special", different from the other locks, and would need another method _create_jobstore_lock() to differentiate it from _create_lock() so that it can be overriden separately

On the other hand, it is kinda nice that you can drop in any lock that works (backed by Redis, Mongo, or even a network-based solution) across machines and it just works regardless of what jobstore is being used. 

= A Proper Solution =

A more elegant solution I think would be to tie the implementation of the lock to each individual jobstore. Some jobstores (SQL, Mongo, Redis) will readily support a centralized lock, while others (Memory) do not. And indeed, if the jobstore cannot be shared across machines (like Memory), there is no value to a lock that can be shared across machines. For me this really points to the fact that the lock should be part of the jobstore, and is used to control concurrent modifications to that jobstore. 

So if I were submitting this as a patch, I would propose something like (eg for _process_jobs):

        with self._jobstores_lock:
            with jobstore.lock():
                jobstore.get_due_jobs(now):
                ...
                jobstore.update_job(job)
 
Reading the code, the methods which would need to lock a jobstore before reading/modifying jobs are "modify_job", "resume_job", "_process_jobs". In order to ensure no new exceptions are thrown (for example, if you try to update a job that was deleted in _process_jobs), then "remove_job", "remove_all_jobs" and "_real_add_job" should also acquire the jobstore lock. 

= Do you even want this? =

Before I go ahead and try to implement this patch, though, I should ask if this is even a desired feature. I personally believe it would be a great addition, but there is obviously going to be a maintenance cost to this feature. We would have to implement it for all the jobstore implementations that it made sense for. In addition, any future modifications to Scheduler would need to consider whether or not to acquire the jobstore lock. It took me a few hours just to go over the code and come up with the list of methods above that should do it, and even now I'm not totally sure they all need the lock, or that I did not miss some race conditions somewhere. 

Bottom line: it will work, and is easy to patch into the lib for anyone that wants or needs this. Just have to implement a lock on top of your favorite backend. If the maintainers want me to, I am happy to formalize the solution and submit a pull request for review along the lines I discussed above.

Thanks, 

Justin

On Sunday, October 11, 2015 at 10:30:01 AM UTC-4, Justin Francis wrote:
Thanks for the quick reply. I'll give it a shot and let you know the results. I was going to try this with MongoDB job store first.


On Sun, Oct 11, 2015 at 10:21 AM, Alex Grönholm wrote:
If you replace the lock with a distributed lock, I guess that would do it.
Looking forward to hearing back from this.

11.10.2015, 17:17, Justin Francis kirjoitti:
Hi, 

I'm new to APScheduler. For fault-tolerance reasons (though not scalability reasons) I am considering trying to add support for multiple scheduler instances running off the same job store to APScheduler. I am not necessarily planning on doing this in a general way such that it can be contributed back to the project (for example, I only need it to work for my chosen job store).

From looking at the code, it appears that all I would really need to do would be to override the implementation of the BaseScheduler._jobstores_lock (though not the other locks) to something that is able to work across multiple processes or machines. 

Is my understanding correct? Or is it more complicated?

Thanks,

Justin
--
You received this message because you are subscribed to the Google Groups "APScheduler" group.
To unsubscribe from this group and stop receiving emails from it, send an email to apscheduler+unsubscribe@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to a topic in the Google Groups "APScheduler" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/apscheduler/Gjc_JQMPePc/unsubscribe.
To unsubscribe from this group and all its topics, send an email to apscheduler+unsubscribe@googlegroups.com.

Alex Grönholm

unread,
Oct 12, 2015, 5:22:11 AM10/12/15
to apsch...@googlegroups.com
Some schedulers might have their own lock implementations (e.g. GeventScheduler). How will this mesh with the idea of store specific locks?
To unsubscribe from this group and stop receiving emails from it, send an email to apscheduler...@googlegroups.com.

Justin Francis

unread,
Oct 12, 2015, 9:41:26 AM10/12/15
to apsch...@googlegroups.com
TLDR: I guess my answer would be that most clients of Gevent would monkey-patch blocking system calls, including RLocks, and time.sleep (which I used in my MongoDB lock to wait on the centralized lock), so the effect would be nil. However, for Tornado schedulers, this could be a problem because they may block for long periods of time on another process, but I think this is a different issue, discussed below.

Long answer: 

It's actually even worse than just Gevent because even for Schedulers without specific lock implementations, the possibility of blocking with another process will cause problems with the current code. For example, if the Tornado Scheduler used a jobstore with a blocking lock, another process with the lock will cause it to lock up waiting on the other process. 

I would argue, however, that the Base Scheduler has a built-in assumption that it is blocking due to jobstores using blocking network calls. If a jobstore is backed by Mongo, SQL, Redis, etc, and that server/network/etc is slow, then this will cause the Base Scheduler to block a Tornado server or any other non-threaded scheduler on, say, an update_job() call. In that sense, adding a blocking lock to the jobstore only makes this problem more obvious. This implies that the Tornado and other event-loop schedulers should be rewritten to use a real thread to avoid these pitfalls.

Which leaves us just with greenlet-based schedulers, which usually monkey-patches network and other blocking calls that would otherwise block things like Tornado. In that case, though, RLock, sockets, and time.sleep (what I use in my MongoDB lock to wait until the lock is freed) have all been patched, and become non-blocking.



To unsubscribe from this group and stop receiving emails from it, send an email to apscheduler...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to a topic in the Google Groups "APScheduler" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/apscheduler/Gjc_JQMPePc/unsubscribe.
To unsubscribe from this group and all its topics, send an email to apscheduler...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "APScheduler" group.
To unsubscribe from this group and stop receiving emails from it, send an email to apscheduler...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to a topic in the Google Groups "APScheduler" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/apscheduler/Gjc_JQMPePc/unsubscribe.
To unsubscribe from this group and all its topics, send an email to apscheduler...@googlegroups.com.

Avi Weit

unread,
Feb 16, 2016, 4:47:47 AM2/16/16
to APScheduler
,Hi

 ?I am interested in your solution. Can you please share your centralized lock code

.The one that syncs between the schedulers and datastore

Thanks alot
Avi - 


בתאריך יום שני, 12 באוקטובר 2015 בשעה 16:41:26 UTC+3, מאת Justin Francis:

Alex Grönholm

unread,
Feb 16, 2016, 5:13:01 AM2/16/16
to apsch...@googlegroups.com
I am planning on building a task scheduler/queue for my new Asphalt framework. One of the unique characteristics of Asphalt is that it works well with both asynchronous and blocking code. An upcoming component for this framework, named "asphalt-tasks", will be a full fledged distributed task queue with scheduler capabilities, much like Celery. It will combine the best features of both APScheduler and Celery and will likely pave the way for the next major version of APScheduler.

In case you're interested, you can watch this space and get notified when code is pushed to it.

Justin Francis

unread,
Feb 16, 2016, 3:24:51 PM2/16/16
to APScheduler
Hi Avi,

It's pretty straight-forward. I have created a MongoLock object: 

class MongoLock(object):
    def __init__(self, name='default_lock', client=None, database='apscheduler', collection='locks',
            reinit=False, sleep_interval=0.001, **connect_args):
        self.client = client or MongoClient(**connect_args)
        self.collection = self.client[database][collection]
        self.name = name
        self.sleep_interval = sleep_interval
        import traceback
        traceback.print_stack()
        if reinit:
            self.release()

    def acquire(self, blocking=True, timeout=None):
        acquired = False
        t0 = time.time()

        while not acquired:
            acquired = self._acquire()

            if acquired or not blocking:
                return acquired
            elif timeout and time.time() - t0 > timeout:
                raise ValueError('Timedout waiting for lock')
            else:
                time.sleep(self.sleep_interval)

    def _acquire(self):
        try:
            self.collection.insert({'_id': self.name})
            return True
        except DuplicateKeyError:
            return False

    def release(self):
        self.collection.remove({'_id': self.name})

    def __enter__(self):
        return self.acquire()

    def __exit__(self, type, value, tb):
        return self.release()

Then to use it, create your scheduler and everything, but before starting your scheduler, overwrite the _jobstores_lock attribute with a MongoLock instance. Here is an example test script I wrote:

from apscheduler.schedulers.background import BlockingScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
import time
import logging


jobstores = {
    'default': MongoDBJobStore(host='mongodb://localhost'),
}

executors = {
    'default': ThreadPoolExecutor(20),
}

job_defaults = {
    'coalesce': True,
    'max_instances': 1
}

scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults)
scheduler._jobstores_lock = MongoLock(host='mongodb://localhost')

def myfunc():
    print 'starting job'
    time.sleep(1)
    print 'done job'


scheduler.add_job(myfunc, 'interval', seconds=2, id='myjob', replace_existing=True)

scheduler.start()

If you run two of these at the same time, you will see that only one of them executes each job iteration.

Hope that helps. 

Justin

Alex Grönholm

unread,
Feb 16, 2016, 3:28:38 PM2/16/16
to apsch...@googlegroups.com
There is a library named "redlock" that accomplishes the same with Redis.

Justin Francis

unread,
Feb 17, 2016, 2:58:04 PM2/17/16
to APScheduler
Avi, there are some unsolved problems with my lock implementation I quoted. Namely, the lock is not re-entrant, which will definitely cause problems for apscheduler. 

I found an implementation of a mongo DB lock that seems covers all the bases: https://pypi.python.org/pypi/mongolock/1.3.4

Or here is a more complete MongoLock implementation:

class MongoLock(object):
    def __init__(self, name='default_lock', client=None, database='apscheduler', collection='locks',
            reinit=False, sleep_interval=0.001, expire=None, **connect_args):
        """Simulates a standard RLock object implemented on top of a Mongo DB, so it can be shared by many processes.

        In order to use across processes, the `name` of the lock created must match in all processes. The client code
        can either pass a pymongo MongoClient `client` instance, or the `connect_args` required to instantiate one.
        Locks will be stored in the given `database` and `collection`.

        If `expire` is given, then an instance that acquires a lock will implicitly release the lock after `expire` seconds.
        This is intended to be used to prevent a process that crashed while holding the lock from holding the lock forever.
        The `expire` effectively becomes the maximum time the lock can be held.

        Note that only a single instance of this lock is re-entrant. Two instances of this class will not
        be re-entrant with each other even if they have the same `name`.
        """
        self.client = client or MongoClient(**connect_args)
        self.collection = self.client[database][collection]
        self.name = name
        self.sleep_interval = sleep_interval
        self.memlock = RLock()
        self.instanceid = uuid.uuid1()
        self.expire = expire

        if reinit:
            self.release()

    def acquire(self, blocking=True, timeout=None):
        acquired = False
        t0 = time.time()

        while not acquired:
            mem_acquired = self.memlock.acquire(False)

            if mem_acquired: # don't bother with DB lock if we don't even have our mem lock
                acquired = self._acquire_dblock()

                if not acquired: # either we have both, or we have neither lock
                    self.memlock.release()

            if acquired or not blocking:
                return acquired
            elif timeout and time.time() - t0 > timeout:
                raise ValueError('Timedout waiting for lock')
            else:
                time.sleep(self.sleep_interval)

    def _acquire_dblock(self):
        try:
            self.collection.insert({'_id': self.name, 'owner': self.instanceid, 'acquired': time.time()})
            return True
        except DuplicateKeyError:
            lock = self.collection.find_one({'_id': self.name})

            if self.expire and time.time() - lock['acquired'] > self.expire:
                self.collection.remove({'_id': self.name})

            if lock and lock['owner'] == self.instanceid:
                return True # re-entrant
            else:
                return False

    def release(self):
        self.collection.remove({'_id': self.name})
        self.memlock.release()

    def __enter__(self):
        return self.acquire()

    def __exit__(self, type, value, tb):
        return self.release()

Avi Weit

unread,
Feb 18, 2016, 4:15:08 AM2/18/16
to APScheduler
 
.Thanks alot, I will have a look




בתאריך יום רביעי, 17 בפברואר 2016 בשעה 21:58:04 UTC+2, מאת Justin Francis:

Avi Weit

unread,
Feb 19, 2016, 9:09:23 AM2/19/16
to APScheduler

Hi,

 

I am using three background APSchedulers (each runs on separate node) and mongodb (in dedicated node).
Mongodb centralized lock seems to work fine (am currently using the first snippet). However, I noticed that the schedulers are not in synch with 'real world': at fist,
all the schedulers see "no jobs" so _process_jobs won't get called. _process_jobs will get called only for the one that actually creates the jobs while the rest ones still remain
idle – not polling db store; so if I kill that scheduler process then jobs won't get triggered by one of the others that still run.

Thanks,
- Avi


בתאריך יום חמישי, 18 בפברואר 2016 בשעה 11:15:08 UTC+2, מאת Avi Weit:

Alex Grönholm

unread,
Feb 19, 2016, 9:10:27 AM2/19/16
to apsch...@googlegroups.com
Yeah, there's that problem -- the schedulers need to be notified of new jobs somehow or they will "oversleep".

Justin Francis

unread,
Feb 19, 2016, 9:17:58 AM2/19/16
to apsch...@googlegroups.com
I only really use apscheduler across multiple processes to run repeated tasks, so all the processes schedule the same job, so they all know about the job. 

It sounds like you are using apscheduler like a job queue shared between processes.

@Alex, looking at the code it seems it would be quite some work to get multiple schedulers to poll the jobstore to get new jobs, right? Most of the logic looks to be in _real_add_job, which would have to be run with the appropriate arguments on each scheduler (not to mention co-ordinating the updates to jobs as well).

--
You received this message because you are subscribed to a topic in the Google Groups "APScheduler" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/apscheduler/Gjc_JQMPePc/unsubscribe.
To unsubscribe from this group and all its topics, send an email to apscheduler...@googlegroups.com.

Alex Grönholm

unread,
Feb 19, 2016, 9:18:56 AM2/19/16
to apsch...@googlegroups.com
If this was easy, I would've already done it myself years ago :)

Avi Weit

unread,
Feb 21, 2016, 7:31:25 AM2/21/16
to APScheduler

I was thinking to use a load-balancer on top of these scheduler services (that use the centralized mongo store lock) so that LB routes the requests

in a round-robin fashion. What I would like to achieve is that the LB after routing 'job_add' and 'job_update' requests, will broadcast to all services in the

pool, a 'wakeup' request – to synch all schedulers with what being stored in the db.

 

I looked into HAProxy (opensource LB implementation) but could not find a way to have it broadcasting request X after forwarding request Y.

Another pair of eyes may help, or any other suggestions are welcome.


Thanks,

- Avi



בתאריך יום שישי, 19 בפברואר 2016 בשעה 16:18:56 UTC+2, מאת Alex Grönholm:
...

Justin Francis

unread,
Feb 21, 2016, 11:59:06 AM2/21/16
to apsch...@googlegroups.com
It sounds to me like your infrastructure is fairly large, and that you are building a scheduler service or micro-service of some kind. I think this use-case would better be served by a "service" like celery, instead of a "library" like apscheduler. I tend to use apscheduler when I need some basic scheduler to fit into my existing infrastructure.

That being said, to build a highly available scheduler built on apscheduler, I would have your load-balancer send all requests to a single host. Only if that host is not available would I send requests to the second machine. Sort of like a master-slave setup. Then I would detect on the second machine when I have "become the master" and re-create my scheduler and jobstore, which effectively grabs all the stored tasks from the DB.

Alex may have other suggestions. 



--

Alex Grönholm

unread,
Feb 22, 2016, 5:38:12 AM2/22/16
to apsch...@googlegroups.com
Anything involving clustering and HA is a huge can of worms. As far as APScheduler goes, I'm not even going to try the required mental workout until Asphalt-tasks is done. By that time I will have either given up (not likely) or figured out how exactly this is done properly.
You received this message because you are subscribed to the Google Groups "APScheduler" group.
To unsubscribe from this group and stop receiving emails from it, send an email to apscheduler...@googlegroups.com.

Avi Weit

unread,
Feb 24, 2016, 9:26:13 AM2/24/16
to APScheduler

Hi,

 

Can you please let us know when Asphalt Task will be ready? How similar will Asphalt Task APIs be in contrast to APScheduler ones?
- Avi

בתאריך יום שני, 22 בפברואר 2016 בשעה 12:38:12 UTC+2, מאת Alex Grönholm:
...

Alex Grönholm

unread,
Feb 24, 2016, 9:59:57 AM2/24/16
to apsch...@googlegroups.com
There is no fixed timetable for asphalt-tasks. I have to get three other components to a usable state first: asphalt-wamp (just lacking documentation and updated upstream libraries), asphalt-templating (needs plenty of work), asphalt-web (needs even more work). So I'm thinking, Q3/Q4 this year is my best guess for when I seriously get to work on asphalt-tasks. You can star the project space on Github so you'll be notified when something happens there. The only way to make this go faster is to help me out with those other projects.

There will be substantial changes to the API since it will run only on top of an asyncio compatible event loop. Configuration will no longer support .ini style configuration, but instead relies solely on dict based configuration structures. You can get an idea of how it will look by looking at the code of other Asphalt component projects. I will also be making my own process pool implementation instead of using concurrent.futures for that. There will be an option to set a timeout for tasks running in worker processes (the process will simply be terminated if it takes too long). Job callables will be able to reschedule or modify themselves through the context object.

None of this is set in stone yet of course -- these are just some ideas I've come up with.
--

Boaz Ackerman

unread,
Aug 20, 2016, 8:37:14 AM8/20/16
to APScheduler
@Justin Francis

Sorry for a blast from the past.
You mentioned "detect on the second machine when I have "become the master"

Couldn't quite figure out how can I achieve that 
Reply all
Reply to author
Forward
0 new messages