--
You received this message because you are subscribed to the Google Groups "APScheduler" group.
To unsubscribe from this group and stop receiving emails from it, send an email to apscheduler...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to a topic in the Google Groups "APScheduler" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/apscheduler/Gjc_JQMPePc/unsubscribe.
To unsubscribe from this group and all its topics, send an email to apscheduler...@googlegroups.com.
Thanks for the quick reply. I'll give it a shot and let you know the results. I was going to try this with MongoDB job store first.
On Sun, Oct 11, 2015 at 10:21 AM, Alex Grönholm wrote:
If you replace the lock with a distributed lock, I guess that would do it.
Looking forward to hearing back from this.
11.10.2015, 17:17, Justin Francis kirjoitti:
Hi,--
I'm new to APScheduler. For fault-tolerance reasons (though not scalability reasons) I am considering trying to add support for multiple scheduler instances running off the same job store to APScheduler. I am not necessarily planning on doing this in a general way such that it can be contributed back to the project (for example, I only need it to work for my chosen job store).
From looking at the code, it appears that all I would really need to do would be to override the implementation of the BaseScheduler._jobstores_lock (though not the other locks) to something that is able to work across multiple processes or machines.
Is my understanding correct? Or is it more complicated?
Thanks,
Justin
You received this message because you are subscribed to the Google Groups "APScheduler" group.
To unsubscribe from this group and stop receiving emails from it, send an email to apscheduler+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to a topic in the Google Groups "APScheduler" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/apscheduler/Gjc_JQMPePc/unsubscribe.
To unsubscribe from this group and all its topics, send an email to apscheduler+unsubscribe@googlegroups.com.
To unsubscribe from this group and stop receiving emails from it, send an email to apscheduler...@googlegroups.com.
To unsubscribe from this group and stop receiving emails from it, send an email to apscheduler...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to a topic in the Google Groups "APScheduler" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/apscheduler/Gjc_JQMPePc/unsubscribe.
To unsubscribe from this group and all its topics, send an email to apscheduler...@googlegroups.com.
--
You received this message because you are subscribed to the Google Groups "APScheduler" group.
To unsubscribe from this group and stop receiving emails from it, send an email to apscheduler...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to a topic in the Google Groups "APScheduler" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/apscheduler/Gjc_JQMPePc/unsubscribe.
To unsubscribe from this group and all its topics, send an email to apscheduler...@googlegroups.com.
?I am interested in your solution. Can you please share your centralized lock code
class MongoLock(object): def __init__(self, name='default_lock', client=None, database='apscheduler', collection='locks', reinit=False, sleep_interval=0.001, **connect_args): self.client = client or MongoClient(**connect_args) self.collection = self.client[database][collection] self.name = name self.sleep_interval = sleep_interval import traceback traceback.print_stack() if reinit: self.release()
def acquire(self, blocking=True, timeout=None): acquired = False t0 = time.time()
while not acquired: acquired = self._acquire()
if acquired or not blocking: return acquired elif timeout and time.time() - t0 > timeout: raise ValueError('Timedout waiting for lock') else: time.sleep(self.sleep_interval)
def _acquire(self): try: self.collection.insert({'_id': self.name}) return True except DuplicateKeyError: return False
def release(self): self.collection.remove({'_id': self.name})
def __enter__(self): return self.acquire()
def __exit__(self, type, value, tb): return self.release()from apscheduler.schedulers.background import BlockingSchedulerfrom apscheduler.jobstores.mongodb import MongoDBJobStorefrom apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutorimport timeimport logging
jobstores = { 'default': MongoDBJobStore(host='mongodb://localhost'),}
executors = { 'default': ThreadPoolExecutor(20),}
job_defaults = { 'coalesce': True, 'max_instances': 1}
scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults)scheduler._jobstores_lock = MongoLock(host='mongodb://localhost')
def myfunc(): print 'starting job' time.sleep(1) print 'done job'
scheduler.add_job(myfunc, 'interval', seconds=2, id='myjob', replace_existing=True)
scheduler.start()
class MongoLock(object): def __init__(self, name='default_lock', client=None, database='apscheduler', collection='locks', reinit=False, sleep_interval=0.001, expire=None, **connect_args): """Simulates a standard RLock object implemented on top of a Mongo DB, so it can be shared by many processes.
In order to use across processes, the `name` of the lock created must match in all processes. The client code can either pass a pymongo MongoClient `client` instance, or the `connect_args` required to instantiate one. Locks will be stored in the given `database` and `collection`.
If `expire` is given, then an instance that acquires a lock will implicitly release the lock after `expire` seconds. This is intended to be used to prevent a process that crashed while holding the lock from holding the lock forever. The `expire` effectively becomes the maximum time the lock can be held.
Note that only a single instance of this lock is re-entrant. Two instances of this class will not be re-entrant with each other even if they have the same `name`. """ self.client = client or MongoClient(**connect_args) self.collection = self.client[database][collection] self.name = name self.sleep_interval = sleep_interval self.memlock = RLock() self.instanceid = uuid.uuid1() self.expire = expire
if reinit: self.release()
def acquire(self, blocking=True, timeout=None): acquired = False t0 = time.time()
while not acquired: mem_acquired = self.memlock.acquire(False)
if mem_acquired: # don't bother with DB lock if we don't even have our mem lock acquired = self._acquire_dblock()
if not acquired: # either we have both, or we have neither lock self.memlock.release()
if acquired or not blocking: return acquired elif timeout and time.time() - t0 > timeout: raise ValueError('Timedout waiting for lock') else: time.sleep(self.sleep_interval)
def _acquire_dblock(self): try: self.collection.insert({'_id': self.name, 'owner': self.instanceid, 'acquired': time.time()}) return True except DuplicateKeyError: lock = self.collection.find_one({'_id': self.name})
if self.expire and time.time() - lock['acquired'] > self.expire: self.collection.remove({'_id': self.name})
if lock and lock['owner'] == self.instanceid: return True # re-entrant else: self.memlock.release()
def __enter__(self): return self.acquire()
def __exit__(self, type, value, tb): return self.release()
.Thanks alot, I will have a look
Hi,
--
You received this message because you are subscribed to a topic in the Google Groups "APScheduler" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/apscheduler/Gjc_JQMPePc/unsubscribe.
To unsubscribe from this group and all its topics, send an email to apscheduler...@googlegroups.com.
I was thinking to use a load-balancer on top of these scheduler services (that use the centralized mongo store lock) so that LB routes the requests
in a round-robin fashion. What I would like to achieve is that the LB after routing 'job_add' and 'job_update' requests, will broadcast to all services in the
pool, a 'wakeup' request – to synch all schedulers with what being stored in the db.
I looked into HAProxy (opensource LB implementation) but could not find a way to have it broadcasting request X after forwarding request Y.
Another pair of eyes may help, or any other suggestions are welcome.
Thanks,
- Avi
...
--
You received this message because you are subscribed to the Google Groups "APScheduler" group.
To unsubscribe from this group and stop receiving emails from it, send an email to apscheduler...@googlegroups.com.
Hi,
...
--