dynamically altering worker tasks in python-gearman

76 views
Skip to first unread message

Jonathan

unread,
Mar 18, 2011, 6:01:45 PM3/18/11
to gearman
I'm running into a problem attempting to alter the task list available
to a worker while a task is in progress. A quick example is shown
below - this will probably explain it better than I can. Also, in
case people have other ideas how to do this, I'm attempting to obtain
a certain degree of control over individual workers so that I can turn
off certain functionality while data sets are reloaded, and then turn
them back on. In my case, I would unregister my main processing task
on 4 workers (several other workers are concurrently available),
update the datasets, and then turn off the reload task. When that
batch is done I would move on to the next back of 4 workers. When all
workers are updated, I can then turn back on the reload task for all
workers. In this way I can allow continuous use of the service
without any downtime.

WORKER
-----------------------------------------------------------------------------
import gearman
def reversify(gmWorker, gmJob):
return "".join(gmJob.data[::-1])

def strcount(gmWorker, gmJob):
gmWorker.unregister_task('reversify') ## this is the problem
line
return str(len(gmJob.data))

worker = gearman.GearmanWorker(['localhost:4730'])
worker.register_task('reversify', reversify)
worker.register_task('strcount', strcount)
while True:
worker.work()


CLIENT
-----------------------------------------------------------------------------
import gearman
client = gearman.GearmanClient(['localhost:4730'])
a = client.submit_job('reversify', 'spam and eggs')
print a.result
>>> sgge dna maps

a = client.submit_job('strcount', 'spam and eggs')
...

At this point, when I try and run the second call to strcount - which
calls gmWorker.unregister_task('reversify') and is able (successfully)
to remove the 'reversify' task from the worker - I get a hang. The
strcount() function returns and at this point it crashes somewhere,
but I don't know where. No errors are output anywhere although an
ERROR is recorded in the gearmand log. Relevant sections from there
are included below. Any ideas?

GEARMAND LOG
-----------------------------------------------------------------------------
INFO Entering main event loop
INFO Accepted connection from 127.0.0.1:50438
CRAZY [ 0] 127.0.0.1:50438 Watching POLLIN
INFO [ 0] 127.0.0.1:50438 Connected
CRAZY [ 0] 127.0.0.1:50438 Ready POLLIN
DEBUG [ 0] 127.0.0.1:50438 Received RESET_ABILITIES
DEBUG [ 0] 127.0.0.1:50438 Received CAN_DO
DEBUG [ 0] 127.0.0.1:50438 Received CAN_DO
DEBUG [ 0] 127.0.0.1:50438 Received PRE_SLEEP
CRAZY [ 0] 127.0.0.1:50438 Watching POLLIN
INFO Accepted connection from 127.0.0.1:60175
CRAZY [ 0] 127.0.0.1:60175 Watching POLLIN
INFO [ 0] 127.0.0.1:60175 Connected
CRAZY [ 0] 127.0.0.1:60175 Ready POLLIN
DEBUG [ 0] 127.0.0.1:60175 Received SUBMIT_JOB
CRAZY [ 0] 127.0.0.1:60175 Watching POLLIN
DEBUG [ 0] 127.0.0.1:60175 Sent JOB_CREATED
DEBUG [ 0] 127.0.0.1:50438 Sent NOOP
CRAZY [ 0] 127.0.0.1:50438 Ready POLLIN
DEBUG [ 0] 127.0.0.1:50438 Received GRAB_JOB_UNIQ
CRAZY [ 0] 127.0.0.1:50438 Watching POLLIN
DEBUG [ 0] 127.0.0.1:50438 Sent JOB_ASSIGN_UNIQ
CRAZY [ 0] 127.0.0.1:50438 Ready POLLIN
DEBUG [ 0] 127.0.0.1:50438 Received WORK_COMPLETE
CRAZY [ 0] 127.0.0.1:50438 Watching POLLIN
DEBUG [ 0] 127.0.0.1:60175 Sent WORK_COMPLETE
CRAZY [ 0] 127.0.0.1:50438 Ready POLLIN
DEBUG [ 0] 127.0.0.1:50438 Received PRE_SLEEP
CRAZY [ 0] 127.0.0.1:50438 Watching POLLIN
CRAZY [ 0] 127.0.0.1:60175 Ready POLLIN
DEBUG [ 0] 127.0.0.1:60175 Received SUBMIT_JOB
CRAZY [ 0] 127.0.0.1:60175 Watching POLLIN
DEBUG [ 0] 127.0.0.1:60175 Sent JOB_CREATED
DEBUG [ 0] 127.0.0.1:50438 Sent NOOP
CRAZY [ 0] 127.0.0.1:50438 Ready POLLIN
DEBUG [ 0] 127.0.0.1:50438 Received GRAB_JOB_UNIQ
CRAZY [ 0] 127.0.0.1:50438 Watching POLLIN
DEBUG [ 0] 127.0.0.1:50438 Sent JOB_ASSIGN_UNIQ
CRAZY [ 0] 127.0.0.1:50438 Ready POLLIN
DEBUG [ 0] 127.0.0.1:50438 Received RESET_ABILITIES
DEBUG [ 0] 127.0.0.1:50438 Received CAN_DO
DEBUG [ 0] 127.0.0.1:50438 Received WORK_COMPLETE
CRAZY [ 0] 127.0.0.1:50438 Watching POLLIN
DEBUG [ 0] 127.0.0.1:50438 Sent ERROR
CRAZY [ 0] 127.0.0.1:50438 Ready POLLIN


Thanks for any insight!

Jonathan

unread,
Mar 22, 2011, 3:06:07 PM3/22/11
to gearman
Just as a follow up to this question and in case any one else stumbles
into this problem, you can overwrite the GearmanWorker.on_job_complete
method which will allow you alter the worker abilities. Create a
class variable that can be set as a flag during a task and then check
the flag before returning from the on_job_complete method.

ie.

class myWorker(gearman.GearmanWorker):
def on_job_complete(self, current_job, job_result):
self.send_job_complete(current_job, job_result)
if myWorker.myflag == 1:
self.register_task('reversify', reversify)
if myWorker.myflag == -1:
self.unregister_task('reversify')
myWorker.myflag = 0
return True
Reply all
Reply to author
Forward
0 new messages