As Eric Day said in a previous thread titled "order of grabbed jobs": when a
worker asks for a job, the job server (of the C implementation) walks through
each function that the worker registered until it finds one with a job. This
implementation breaks the FIFO behavior between the queues of different
functions, specially (only?) when a worker declares it can do more than one
function because it will not execute jobs for the second function until the
queue for the first function is empty.
In the "order of grabbed jobs" thread, various solutions were discussed:
round-robin through the abilities of the worker, select the job from the
largest queue. I tried to implement a solution proposed by Ernesto Reina based
on adding timestamps to each job and then, when a worker asks for a job, the
job server will assign the older job with the highest priority it can do.
I first tried adding seconds-based timestamps (using time()) to
gearman_server_job_st but this was not enough to disambiguate between jobs
added using the Concurrent Client Interface (gearman_client_add_task,
gearman_client_run_tasks) and then changed to useconds-based timestamps (using
gettimeofday()) and it seems to be working as expected.
I attach a patch to the 0.8 release. (I think) It does not work if any kind of
persistence is used in the queues of the job server. I hope it is
useful. Comments are welcomed.
Huge thanks,
--
Yasser González Fernández
Thanks for the patch! I'll take a closer look at this and try to
integrate it within the server as an optional behavior. The current
behavior is nice because it is simple, fast, and adequate for many
workloads. Having this time based option will be useful as well, but I
want to make sure it is not required (especially since gettimeofday()
can translate to a system call on some architectures). I'll try to
get this into the 0.10 release.
Thanks!
-Eric
> diff -ur gearmand-0.8-orig/libgearman/server_job.c gearmand-0.8/libgearman/server_job.c
> --- gearmand-0.8-orig/libgearman/server_job.c 2009-06-28 01:22:12.000000000 -0400
> +++ gearmand-0.8/libgearman/server_job.c 2009-07-25 13:14:02.000000000 -0400
> @@ -110,6 +110,8 @@
> return NULL;
> }
>
> + gettimeofday(&server_job->insert_time, NULL);
> +
> server_job->priority= priority;
>
> server_job->function= server_function;
> @@ -218,6 +220,8 @@
> else
> server_job->options= 0;
>
> + server_job->insert_time.tv_sec = 0;
> + server_job->insert_time.tv_usec = 0;
> server_job->priority= 0;
> server_job->job_handle_key= 0;
> server_job->unique_key= 0;
> @@ -298,63 +302,123 @@
> gearman_server_job_st *
> gearman_server_job_peek(gearman_server_con_st *server_con)
> {
> - gearman_server_worker_st *server_worker;
> + gearman_server_worker_st *server_worker, *tmp_worker;
> + gearman_server_job_st *server_job, *tmp_job;
> gearman_job_priority_t priority;
>
> - for (server_worker= server_con->worker_list; server_worker != NULL;
> - server_worker= server_worker->con_next)
> + server_worker= NULL;
> + server_job= NULL;
> +
> + for (tmp_worker= server_con->worker_list; tmp_worker != NULL;
> + tmp_worker= tmp_worker->con_next)
> {
> - if (server_worker->function->job_count != 0)
> + if (tmp_worker->function->job_count == 0)
> + continue;
> +
> + if (server_job == NULL)
> {
> for (priority= GEARMAN_JOB_PRIORITY_HIGH;
> priority != GEARMAN_JOB_PRIORITY_MAX; priority++)
> {
> - if (server_worker->function->job_list[priority] != NULL)
> - {
> - if (server_worker->function->job_list[priority]->options &
> - GEARMAN_SERVER_JOB_IGNORE)
> - {
> - /* This is only happens when a client disconnects from a foreground
> - job. We do this because we don't want to run the job anymore. */
> - server_worker->function->job_list[priority]->options&=
> - ~GEARMAN_SERVER_JOB_IGNORE;
> - gearman_server_job_free(gearman_server_job_take(server_con));
> - return gearman_server_job_peek(server_con);
> - }
> - return server_worker->function->job_list[priority];
> - }
> + if (tmp_worker->function->job_list[priority] != NULL)
> + break;
> + }
> + server_worker= tmp_worker;
> + server_job= tmp_worker->function->job_list[priority];
> + }
> + else
> + {
> + for (priority= GEARMAN_JOB_PRIORITY_HIGH;
> + priority <= server_job->priority; priority++)
> + {
> + if (tmp_worker->function->job_list[priority] != NULL)
> + break;
> + }
> +
> + if (tmp_worker->function->job_list[priority] == NULL)
> + continue;
> +
> + tmp_job= tmp_worker->function->job_list[priority];
> + if (tmp_job->priority < server_job->priority ||
> + tmp_job->insert_time.tv_sec < server_job->insert_time.tv_sec ||
> + (tmp_job->insert_time.tv_sec == server_job->insert_time.tv_sec &&
> + tmp_job->insert_time.tv_usec < server_job->insert_time.tv_usec))
> + {
> + server_worker= tmp_worker;
> + server_job= tmp_job;
> }
> }
> }
>
> - return NULL;
> + if (server_job == NULL)
> + return NULL;
> +
> + if (server_job->options & GEARMAN_SERVER_JOB_IGNORE)
> + {
> + /* This is only happens when a client disconnects from a foreground
> + job. We do this because we don't want to run the job anymore. */
> + server_job->options&= ~GEARMAN_SERVER_JOB_IGNORE;
> + gearman_server_job_free(gearman_server_job_take(server_con));
> + return gearman_server_job_peek(server_con);
> + }
> +
> + return server_job;
> }
>
> gearman_server_job_st *
> gearman_server_job_take(gearman_server_con_st *server_con)
> {
> - gearman_server_worker_st *server_worker;
> - gearman_server_job_st *server_job;
> + gearman_server_worker_st *server_worker, *tmp_worker;
> + gearman_server_job_st *server_job, *tmp_job;
> gearman_job_priority_t priority;
>
> - for (server_worker= server_con->worker_list; server_worker != NULL;
> - server_worker= server_worker->con_next)
> + server_worker= NULL;
> + server_job= NULL;
> +
> + for (tmp_worker= server_con->worker_list; tmp_worker != NULL;
> + tmp_worker= tmp_worker->con_next)
> {
> - if (server_worker->function->job_count != 0)
> - break;
> - }
> + if (tmp_worker->function->job_count == 0)
> + continue;
>
> - if (server_worker == NULL)
> - return NULL;
> + if (server_job == NULL)
> + {
> + for (priority= GEARMAN_JOB_PRIORITY_HIGH;
> + priority != GEARMAN_JOB_PRIORITY_MAX; priority++)
> + {
> + if (tmp_worker->function->job_list[priority] != NULL)
> + break;
> + }
> + server_worker= tmp_worker;
> + server_job= tmp_worker->function->job_list[priority];
> + }
> + else
> + {
> + for (priority= GEARMAN_JOB_PRIORITY_HIGH;
> + priority <= server_job->priority; priority++)
> + {
> + if (tmp_worker->function->job_list[priority] != NULL)
> + break;
> + }
>
> - for (priority= GEARMAN_JOB_PRIORITY_HIGH;
> - priority != GEARMAN_JOB_PRIORITY_MAX; priority++)
> - {
> - if (server_worker->function->job_list[priority] != NULL)
> - break;
> + if (tmp_worker->function->job_list[priority] == NULL)
> + continue;
> +
> + tmp_job= tmp_worker->function->job_list[priority];
> + if (tmp_job->priority < server_job->priority ||
> + tmp_job->insert_time.tv_sec < server_job->insert_time.tv_sec ||
> + (tmp_job->insert_time.tv_sec == server_job->insert_time.tv_sec &&
> + tmp_job->insert_time.tv_usec < server_job->insert_time.tv_usec))
> + {
> + server_worker= tmp_worker;
> + server_job= tmp_job;
> + }
> + }
> }
>
> - server_job= server_worker->function->job_list[priority];
> + if (server_job == NULL)
> + return NULL;
> +
> server_job->function->job_list[priority]= server_job->function_next;
> if (server_job->function->job_end[priority] == server_job)
> server_job->function->job_end[priority]= NULL;
> diff -ur gearmand-0.8-orig/libgearman/structs.h gearmand-0.8/libgearman/structs.h
> --- gearmand-0.8-orig/libgearman/structs.h 2009-06-23 12:51:53.000000000 -0400
> +++ gearmand-0.8/libgearman/structs.h 2009-07-25 13:05:53.000000000 -0400
> @@ -397,6 +397,7 @@
> {
> gearman_server_job_options_t options;
> gearman_job_priority_t priority;
> + struct timeval insert_time;
> uint32_t job_handle_key;
> uint32_t unique_key;
> uint32_t client_count;
The perl server prevents starvation by a round robin relative to the
individual worker connection. If a worker can do 3 job types, the first
time it requets a new job it'll *start* looking at the first queue. The
next time it requests a job it'll *start* looking at the second queue.
This is actually pretty even. Since the job search index is relative to
the worker connection, the job server itself isn't necessarily
prioritizing where the jobs come from. Since they all iterate through
where they first look for jobs, short queues tend to stay depleted and
large queues get chewed on as much as they can.
This algorithm has been pretty fair to us so far - when you start to
prioritize jobs you have to be careful to not starve the smaller queues.
Keeping all queues busy at all times is an easy answer to that.
-Dormando
On Tue, 28 Jul 2009, Eric Day wrote:
>
> Hi Yasser,
>
> Thanks for the patch! I'll take a closer look at this and try to
> integrate it within the server as an optional behavior. The current
> behavior is nice because it is simple, fast, and adequate for many
> workloads. Having this time based option will be useful as well, but I
> want to make sure it is not required (especially since gettimeofday()
> can translate to a system call on some architectures). I'll try to
> get this into the 0.10 release.
>
> Thanks!
> -Eric
>
> On Sun, Jul 26, 2009 at 12:30:48AM -0500, Yasser Gonz?lez Fern?ndez wrote:
> > Hello everyone,
> >
> > As Eric Day said in a previous thread titled "order of grabbed jobs": when a
> > worker asks for a job, the job server (of the C implementation) walks through
> > each function that the worker registered until it finds one with a job. This
> > implementation breaks the FIFO behavior between the queues of different
> > functions, specially (only?) when a worker declares it can do more than one
> > function because it will not execute jobs for the second function until the
> > queue for the first function is empty.
> >
> > In the "order of grabbed jobs" thread, various solutions were discussed:
> > round-robin through the abilities of the worker, select the job from the
> > largest queue. I tried to implement a solution proposed by Ernesto Reina based
> > on adding timestamps to each job and then, when a worker asks for a job, the
> > job server will assign the older job with the highest priority it can do.
> >
> > I first tried adding seconds-based timestamps (using time()) to
> > gearman_server_job_st but this was not enough to disambiguate between jobs
> > added using the Concurrent Client Interface (gearman_client_add_task,
> > gearman_client_run_tasks) and then changed to useconds-based timestamps (using
> > gettimeofday()) and it seems to be working as expected.
> >
> > I attach a patch to the 0.8 release. (I think) It does not work if any kind of
> > persistence is used in the queues of the job server. I hope it is
> > useful. Comments are welcomed.
> >
> > Huge thanks,
> >
> > --
> > Yasser Gonz?lez Fern?ndez
I found that I needed the FIFO ordering as well. I tried to apply
Yasser's patch to the 0.9 release but was unable to. The exact
implementation confused me a bit, so I ended up refactoring it, but
it is basically Yasser's patch.
I have uploaded the changes to lp:~coryb/gearmand/time-order
I also added a -T or --time-order option to enable the time ordering
FIFO. Without this enabled the gettimeofday overhead you were
concerned with will not happen. So the default behavior is the same
as it is today (with 2 extra conditional checks), but the the
--time-order will get the FIFO order based on gettimeofday.
These changes will break binary compatibility since we had to
add to the gearman_server_st and gearman_server_job_st. Sort of
hard to see what the "public interface" is since all the .h files
are shipped and included via gearman.h, so no idea how you guys
deal with binary incompatible changes. Perhaps you can think
of a way to make these changes that would be binary compatible?
Let me know if there are any questions/concerns.
Thanks!
-Cory
This effectively randomizes which queue jobs come out of at any particular
instant, along with guaranteeing that all queues are crawled quickly. It's
like a crazy crosspattern. (otherwise you'd just iterate on the server
level).
I have tested a bit the FIFO ordering you both implemented and it
works perfectely well (as I spect). Thank you both, and keep
contributing.
--
Ernesto Rodriguez Reina
This was one of the algorithms we discussed previously, and the one
I am leaning towards the most since it has very little overhead. The
time-based one would also be fair for some loads, and I think is a
good option to have. The current one is pretty dumb and just finds the
first job it can (no guarantee to the order of that). I'll probably
make the round-robin behavior the default.
The plan in the C server is to have flags to trigger the various
queue behaviors to help match the load.
-Eric
I'll try to work this patch into the next release. I also will be
adding the round-robin behavior Dormando talks about.
For binary compatibility, we should not see any issues. Only gearmand
uses the gearmand*_st and gearman_server*_st structs right now. They
are provided as headers because gearmand itself if fully embeddable and
you could build your own, custom gearmand using the gearman_server*
interface. They are just a few abstraction layers to help with
code-reuse and extensibility. For anyone thinking of using these
interfaces, please consider extending the implementation we already
have first. :)
-Eric