FIFO order between jobs of different functions

215 views
Skip to first unread message

Yasser González Fernández

unread,
Jul 26, 2009, 1:30:48 AM7/26/09
to gea...@googlegroups.com
Hello everyone,

As Eric Day said in a previous thread titled "order of grabbed jobs": when a
worker asks for a job, the job server (of the C implementation) walks through
each function that the worker registered until it finds one with a job. This
implementation breaks the FIFO behavior between the queues of different
functions, specially (only?) when a worker declares it can do more than one
function because it will not execute jobs for the second function until the
queue for the first function is empty.

In the "order of grabbed jobs" thread, various solutions were discussed:
round-robin through the abilities of the worker, select the job from the
largest queue. I tried to implement a solution proposed by Ernesto Reina based
on adding timestamps to each job and then, when a worker asks for a job, the
job server will assign the older job with the highest priority it can do.

I first tried adding seconds-based timestamps (using time()) to
gearman_server_job_st but this was not enough to disambiguate between jobs
added using the Concurrent Client Interface (gearman_client_add_task,
gearman_client_run_tasks) and then changed to useconds-based timestamps (using
gettimeofday()) and it seems to be working as expected.

I attach a patch to the 0.8 release. (I think) It does not work if any kind of
persistence is used in the queues of the job server. I hope it is
useful. Comments are welcomed.

Huge thanks,

--
Yasser González Fernández

gearmand-0.8-fifo-queues.patch

Eric Day

unread,
Jul 28, 2009, 2:04:00 PM7/28/09
to gea...@googlegroups.com
Hi Yasser,

Thanks for the patch! I'll take a closer look at this and try to
integrate it within the server as an optional behavior. The current
behavior is nice because it is simple, fast, and adequate for many
workloads. Having this time based option will be useful as well, but I
want to make sure it is not required (especially since gettimeofday()
can translate to a system call on some architectures). I'll try to
get this into the 0.10 release.

Thanks!
-Eric

> diff -ur gearmand-0.8-orig/libgearman/server_job.c gearmand-0.8/libgearman/server_job.c
> --- gearmand-0.8-orig/libgearman/server_job.c 2009-06-28 01:22:12.000000000 -0400
> +++ gearmand-0.8/libgearman/server_job.c 2009-07-25 13:14:02.000000000 -0400
> @@ -110,6 +110,8 @@
> return NULL;
> }
>
> + gettimeofday(&server_job->insert_time, NULL);
> +
> server_job->priority= priority;
>
> server_job->function= server_function;
> @@ -218,6 +220,8 @@
> else
> server_job->options= 0;
>
> + server_job->insert_time.tv_sec = 0;
> + server_job->insert_time.tv_usec = 0;
> server_job->priority= 0;
> server_job->job_handle_key= 0;
> server_job->unique_key= 0;
> @@ -298,63 +302,123 @@
> gearman_server_job_st *
> gearman_server_job_peek(gearman_server_con_st *server_con)
> {
> - gearman_server_worker_st *server_worker;
> + gearman_server_worker_st *server_worker, *tmp_worker;
> + gearman_server_job_st *server_job, *tmp_job;
> gearman_job_priority_t priority;
>
> - for (server_worker= server_con->worker_list; server_worker != NULL;
> - server_worker= server_worker->con_next)
> + server_worker= NULL;
> + server_job= NULL;
> +
> + for (tmp_worker= server_con->worker_list; tmp_worker != NULL;
> + tmp_worker= tmp_worker->con_next)
> {
> - if (server_worker->function->job_count != 0)
> + if (tmp_worker->function->job_count == 0)
> + continue;
> +
> + if (server_job == NULL)
> {
> for (priority= GEARMAN_JOB_PRIORITY_HIGH;
> priority != GEARMAN_JOB_PRIORITY_MAX; priority++)
> {
> - if (server_worker->function->job_list[priority] != NULL)
> - {
> - if (server_worker->function->job_list[priority]->options &
> - GEARMAN_SERVER_JOB_IGNORE)
> - {
> - /* This is only happens when a client disconnects from a foreground
> - job. We do this because we don't want to run the job anymore. */
> - server_worker->function->job_list[priority]->options&=
> - ~GEARMAN_SERVER_JOB_IGNORE;
> - gearman_server_job_free(gearman_server_job_take(server_con));
> - return gearman_server_job_peek(server_con);
> - }
> - return server_worker->function->job_list[priority];
> - }
> + if (tmp_worker->function->job_list[priority] != NULL)
> + break;
> + }
> + server_worker= tmp_worker;
> + server_job= tmp_worker->function->job_list[priority];
> + }
> + else
> + {
> + for (priority= GEARMAN_JOB_PRIORITY_HIGH;
> + priority <= server_job->priority; priority++)
> + {
> + if (tmp_worker->function->job_list[priority] != NULL)
> + break;
> + }
> +
> + if (tmp_worker->function->job_list[priority] == NULL)
> + continue;
> +
> + tmp_job= tmp_worker->function->job_list[priority];
> + if (tmp_job->priority < server_job->priority ||
> + tmp_job->insert_time.tv_sec < server_job->insert_time.tv_sec ||
> + (tmp_job->insert_time.tv_sec == server_job->insert_time.tv_sec &&
> + tmp_job->insert_time.tv_usec < server_job->insert_time.tv_usec))
> + {
> + server_worker= tmp_worker;
> + server_job= tmp_job;
> }
> }
> }
>
> - return NULL;
> + if (server_job == NULL)
> + return NULL;
> +
> + if (server_job->options & GEARMAN_SERVER_JOB_IGNORE)
> + {
> + /* This is only happens when a client disconnects from a foreground
> + job. We do this because we don't want to run the job anymore. */
> + server_job->options&= ~GEARMAN_SERVER_JOB_IGNORE;
> + gearman_server_job_free(gearman_server_job_take(server_con));
> + return gearman_server_job_peek(server_con);
> + }
> +
> + return server_job;
> }
>
> gearman_server_job_st *
> gearman_server_job_take(gearman_server_con_st *server_con)
> {
> - gearman_server_worker_st *server_worker;
> - gearman_server_job_st *server_job;
> + gearman_server_worker_st *server_worker, *tmp_worker;
> + gearman_server_job_st *server_job, *tmp_job;
> gearman_job_priority_t priority;
>
> - for (server_worker= server_con->worker_list; server_worker != NULL;
> - server_worker= server_worker->con_next)
> + server_worker= NULL;
> + server_job= NULL;
> +
> + for (tmp_worker= server_con->worker_list; tmp_worker != NULL;
> + tmp_worker= tmp_worker->con_next)
> {
> - if (server_worker->function->job_count != 0)
> - break;
> - }
> + if (tmp_worker->function->job_count == 0)
> + continue;
>
> - if (server_worker == NULL)
> - return NULL;
> + if (server_job == NULL)
> + {
> + for (priority= GEARMAN_JOB_PRIORITY_HIGH;
> + priority != GEARMAN_JOB_PRIORITY_MAX; priority++)
> + {
> + if (tmp_worker->function->job_list[priority] != NULL)
> + break;
> + }
> + server_worker= tmp_worker;
> + server_job= tmp_worker->function->job_list[priority];
> + }
> + else
> + {
> + for (priority= GEARMAN_JOB_PRIORITY_HIGH;
> + priority <= server_job->priority; priority++)
> + {
> + if (tmp_worker->function->job_list[priority] != NULL)
> + break;
> + }
>
> - for (priority= GEARMAN_JOB_PRIORITY_HIGH;
> - priority != GEARMAN_JOB_PRIORITY_MAX; priority++)
> - {
> - if (server_worker->function->job_list[priority] != NULL)
> - break;
> + if (tmp_worker->function->job_list[priority] == NULL)
> + continue;
> +
> + tmp_job= tmp_worker->function->job_list[priority];
> + if (tmp_job->priority < server_job->priority ||
> + tmp_job->insert_time.tv_sec < server_job->insert_time.tv_sec ||
> + (tmp_job->insert_time.tv_sec == server_job->insert_time.tv_sec &&
> + tmp_job->insert_time.tv_usec < server_job->insert_time.tv_usec))
> + {
> + server_worker= tmp_worker;
> + server_job= tmp_job;
> + }
> + }
> }
>
> - server_job= server_worker->function->job_list[priority];
> + if (server_job == NULL)
> + return NULL;
> +
> server_job->function->job_list[priority]= server_job->function_next;
> if (server_job->function->job_end[priority] == server_job)
> server_job->function->job_end[priority]= NULL;
> diff -ur gearmand-0.8-orig/libgearman/structs.h gearmand-0.8/libgearman/structs.h
> --- gearmand-0.8-orig/libgearman/structs.h 2009-06-23 12:51:53.000000000 -0400
> +++ gearmand-0.8/libgearman/structs.h 2009-07-25 13:05:53.000000000 -0400
> @@ -397,6 +397,7 @@
> {
> gearman_server_job_options_t options;
> gearman_job_priority_t priority;
> + struct timeval insert_time;
> uint32_t job_handle_key;
> uint32_t unique_key;
> uint32_t client_count;

dormando

unread,
Aug 3, 2009, 4:23:42 AM8/3/09
to gea...@googlegroups.com
Hey,

The perl server prevents starvation by a round robin relative to the
individual worker connection. If a worker can do 3 job types, the first
time it requets a new job it'll *start* looking at the first queue. The
next time it requests a job it'll *start* looking at the second queue.

This is actually pretty even. Since the job search index is relative to
the worker connection, the job server itself isn't necessarily
prioritizing where the jobs come from. Since they all iterate through
where they first look for jobs, short queues tend to stay depleted and
large queues get chewed on as much as they can.

This algorithm has been pretty fair to us so far - when you start to
prioritize jobs you have to be careful to not starve the smaller queues.
Keeping all queues busy at all times is an easy answer to that.

-Dormando

On Tue, 28 Jul 2009, Eric Day wrote:

>
> Hi Yasser,
>
> Thanks for the patch! I'll take a closer look at this and try to
> integrate it within the server as an optional behavior. The current
> behavior is nice because it is simple, fast, and adequate for many
> workloads. Having this time based option will be useful as well, but I
> want to make sure it is not required (especially since gettimeofday()
> can translate to a system call on some architectures). I'll try to
> get this into the 0.10 release.
>
> Thanks!
> -Eric
>

> On Sun, Jul 26, 2009 at 12:30:48AM -0500, Yasser Gonz?lez Fern?ndez wrote:
> > Hello everyone,
> >
> > As Eric Day said in a previous thread titled "order of grabbed jobs": when a
> > worker asks for a job, the job server (of the C implementation) walks through
> > each function that the worker registered until it finds one with a job. This
> > implementation breaks the FIFO behavior between the queues of different
> > functions, specially (only?) when a worker declares it can do more than one
> > function because it will not execute jobs for the second function until the
> > queue for the first function is empty.
> >
> > In the "order of grabbed jobs" thread, various solutions were discussed:
> > round-robin through the abilities of the worker, select the job from the
> > largest queue. I tried to implement a solution proposed by Ernesto Reina based
> > on adding timestamps to each job and then, when a worker asks for a job, the
> > job server will assign the older job with the highest priority it can do.
> >
> > I first tried adding seconds-based timestamps (using time()) to
> > gearman_server_job_st but this was not enough to disambiguate between jobs
> > added using the Concurrent Client Interface (gearman_client_add_task,
> > gearman_client_run_tasks) and then changed to useconds-based timestamps (using
> > gettimeofday()) and it seems to be working as expected.
> >
> > I attach a patch to the 0.8 release. (I think) It does not work if any kind of
> > persistence is used in the queues of the job server. I hope it is
> > useful. Comments are welcomed.
> >
> > Huge thanks,
> >
> > --

> > Yasser Gonz?lez Fern?ndez

Cory Bennett

unread,
Aug 3, 2009, 4:26:29 AM8/3/09
to gea...@googlegroups.com
Hey all,

I found that I needed the FIFO ordering as well. I tried to apply
Yasser's patch to the 0.9 release but was unable to. The exact
implementation confused me a bit, so I ended up refactoring it, but
it is basically Yasser's patch.

I have uploaded the changes to lp:~coryb/gearmand/time-order

I also added a -T or --time-order option to enable the time ordering
FIFO. Without this enabled the gettimeofday overhead you were
concerned with will not happen. So the default behavior is the same
as it is today (with 2 extra conditional checks), but the the
--time-order will get the FIFO order based on gettimeofday.

These changes will break binary compatibility since we had to
add to the gearman_server_st and gearman_server_job_st. Sort of
hard to see what the "public interface" is since all the .h files
are shipped and included via gearman.h, so no idea how you guys
deal with binary incompatible changes. Perhaps you can think
of a way to make these changes that would be binary compatible?

Let me know if there are any questions/concerns.

Thanks!
-Cory

dormando

unread,
Aug 3, 2009, 4:38:35 AM8/3/09
to gea...@googlegroups.com
Hmm, to clarify a little:

This effectively randomizes which queue jobs come out of at any particular
instant, along with guaranteeing that all queues are crawled quickly. It's
like a crazy crosspattern. (otherwise you'd just iterate on the server
level).

Ernesto Rodriguez Reina

unread,
Aug 3, 2009, 1:11:27 PM8/3/09
to gea...@googlegroups.com
Hi Yasser and Cory,

I have tested a bit the FIFO ordering you both implemented and it
works perfectely well (as I spect). Thank you both, and keep
contributing.


--
Ernesto Rodriguez Reina

Eric Day

unread,
Aug 3, 2009, 1:49:36 PM8/3/09
to gea...@googlegroups.com
Hi Dormando!

This was one of the algorithms we discussed previously, and the one
I am leaning towards the most since it has very little overhead. The
time-based one would also be fair for some loads, and I think is a
good option to have. The current one is pretty dumb and just finds the
first job it can (no guarantee to the order of that). I'll probably
make the round-robin behavior the default.

The plan in the C server is to have flags to trigger the various
queue behaviors to help match the load.

-Eric

Eric Day

unread,
Aug 3, 2009, 2:41:05 PM8/3/09
to gea...@googlegroups.com
Thanks Cory!

I'll try to work this patch into the next release. I also will be
adding the round-robin behavior Dormando talks about.

For binary compatibility, we should not see any issues. Only gearmand
uses the gearmand*_st and gearman_server*_st structs right now. They
are provided as headers because gearmand itself if fully embeddable and
you could build your own, custom gearmand using the gearman_server*
interface. They are just a few abstraction layers to help with
code-reuse and extensibility. For anyone thinking of using these
interfaces, please consider extending the implementation we already
have first. :)

-Eric

Cory Bennett

unread,
Dec 11, 2009, 5:35:23 PM12/11/09
to gea...@googlegroups.com
Hey Eric,

Looking at the .11 release, It seems these time-ordering patches never made
it to trunk? Did the cross-queue-FIFO request get dropped intentionally, or
was it lost? Perhaps I just missed an alternative solution to this?

Just looking forward to get off my patched .9 release so I can use thew new
fixes and features :)

Thanks,
-Cory

Eric Day

unread,
Dec 13, 2009, 12:37:09 PM12/13/09
to gea...@googlegroups.com
Ahh, no it did not make it in. It was not submitted via the usual merge
request path (launchpad branch) so it slipped though the cracks. Also,
since this should be an optional behavior, I don't want to apply the
patch as is, and instead properly wrap it around a user flag.

I'll try to get it in the next release. :)

-Eric
> >> >> Yasser Gonz�lez Fern�ndez

Cory Bennett

unread,
Dec 15, 2009, 12:06:06 PM12/15/09
to gea...@googlegroups.com
Okay thanks. Sorry, I didnt know I was supposed to make a merge
request via launchpad.

Perhaps I am missing what you mean by "optional behavior", but the
patch I submitted via the lp branch (lp:~coryb/gearmand/time-order)
required one to specify the --time-order option to geamand to enable
the FIFO behavior.

I can recreate that patch against the current trunk if that would help.

Thanks!
-Cory
>> >> >> Yasser González Fernández

Ernesto Rodriguez Reina

unread,
Dec 15, 2009, 12:49:30 PM12/15/09
to gea...@googlegroups.com
Hi, I think it would be great!
--
Ernesto Rodriguez Reina

Eric Day

unread,
Dec 16, 2009, 5:53:48 PM12/16/09
to gea...@googlegroups.com
Ahh, ok. I didn't know you had the branch up there already. The
merge request makes it more obvious. When preparing for a release,
we tend to try and drain the merge request queue. :)

I just pulled your branch and will try to get it in soon.

-Eric
> >> >> >> Yasser Gonz�lez Fern�ndez
Reply all
Reply to author
Forward
0 new messages