implementing a FIFO buffer using memcached

320 views
Skip to first unread message

ivanhoe

unread,
Aug 10, 2008, 2:15:10 PM8/10/08
to memcached
I need to cache a list of notifications in a system using multiple web
servers and databases, and I was thinking of implementing it in
memcache. List will work as FIFO buffer, and will have a single
process reading from the beginning of the buffer, and multiple servers
will be pushing data to the end of it.

What is the best approach to do this using memcached from perl or
php ? Or maybe you can recommend me some solution better suited for
this than memcahed?

Brian Moon

unread,
Aug 10, 2008, 2:24:10 PM8/10/08
to memc...@googlegroups.com

I use a MySQL memory table for this:

CREATE TABLE `job_queue` (
`item_id` int(10) unsigned NOT NULL default '0',
`item_type` varchar(50) NOT NULL default '',
`priority` tinyint(4) NOT NULL default '10',
`insert_time` timestamp NOT NULL default CURRENT_TIMESTAMP,
`process_id` varchar(255) NOT NULL default '',
PRIMARY KEY (`item_id`,`item_type`),
KEY `queue_pickup` (`process_id`,`item_id`,`item_type`),
KEY `queue_select` (`priority`,`insert_time`)
) ENGINE=MEMORY

We use triggers on key tables. When something changes in those tables,
they are insted into the above queue. Process servers then pick up the
items and work on them.

If you need lots of power, this may not be for you. I only do thousands
of items per minute in this, not second. There are a few queue daemons
out there, but I can't find any links to them right now for some reason.
I think someone on this list use/maintain one of them.

--

Brian Moon
Senior Web Engineer
------------------------------
When you care enough to spend the very least.
http://dealnews.com/

Jehiah Czebotar

unread,
Aug 10, 2008, 2:28:22 PM8/10/08
to memc...@googlegroups.com
On Sun, Aug 10, 2008 at 2:15 PM, ivanhoe <ivanh...@gmail.com> wrote:
> What is the best approach to do this using memcached from perl or
> php ?

you can use the increment/decrement functionality to keep track of
positions in your queue

i use something like this in python, but you should be able to adapt
it to your needs (this is just some rough psudocode off the top of my
head). As several people on this list love to point out, remember that
you are not guaranteed that the items will be there when you go to get
them out of the queue, because memcache can loose them for multiple
reasons. It's a cache not a persistent data store, so make sure you
handle that. Also you might want to make your reader lag the front of
the queue by an item or two, or be carefull about the effects of
timing differences between incr() and putting the item in the queue.

queue = "myqueue"
placeholder = "myqueue.%d"
reader = "myqueue.reader"
mc.set(queue,'0')
mc.set(reader,'0') ## i'm initalizing these here, but your could would
need to handle re-setting these when an incr() fails.

## to add an item to the queue
postion = mc.incr(queue)
mc.set(placeholder%position,value)

## then to read out the next item
next = mc.incr(reader)
if next > int(mc.get(queue)):
## you are trying to read an item not there yet
## decrement reader ,or sleep a second if your queue gets items very often
## or check mc.get(reader) before you try to increment it.
else:
value = mc.get(placeholder%next)

Isart Montane

unread,
Aug 10, 2008, 2:30:56 PM8/10/08
to memc...@googlegroups.com
Hi Ivanhoe,

there's already queue storage system for MySQL. Maybe it will work for you

http://q4m.31tools.com/

Isart Montane

dormando

unread,
Aug 10, 2008, 3:08:47 PM8/10/08
to memcached
Consider something like gearman or theschwartz ... or beanstalkd or
starling or whatever. They're a little more specifically designed for
this.

-Dormando

Clint Webb

unread,
Aug 11, 2008, 1:55:47 AM8/11/08
to memc...@googlegroups.com
I use memcached for writing my sitelog.  My sitelog contains upper-level log information, such as "User #23452 saved new settings...", and "New user created.  Username=... Fullname=... Email=...".   I use this log not really to catch programming errors, but to be able to have a chronological log of the higher-level operations.  Its very useful to have when a user complains and says "I didn't change my settings" I can look at the log easily and say, yes you did.  Etc.

I use memcached for this, because I dont want to tie up my database with it.  And also, it ends up in a file which is easy to search, archive and manipulate.   I wanted something that can be written to safely from multiple processes and from multiple servers (thats why I dont log directly to a file).

So therefore, this is my simple implementation of using memcached for a FIFO queue.

Most of my site is in perl.

So here is the function used to WRITE to the sitelog.

Note that $cfgLogDomain and $cfgLogServer are defined elsewhere.  I use the same log-cache for multiple websites and services, so the domain just identifies them.  Also note that the function also embeds the time and ip address in the message.  I cant really remember why I save the time as a number into cache, and then convert that into a readible date-time when writing to the file. 

sub SiteLog {
  my ($text) = @_;

  my ($tt)  = time;
  &ConnectLogCache;
  if ($logcache) {
    # Increment the current counter.
    my ($key) = 0 + $logcache->incr("$cfgLogDomain:CurrentKey");
    if ($key <= 0) {
      $key = 1;
      $logcache->set("$cfgLogDomain:CurrentKey", '1');
    }

    # store the log based on the counter, giving it a 3 week expiry so that
    # it will clear if we dont remove the entries within that time.
    my ($ip) = $ENV{'REMOTE_ADDR'};
    if (length $IP > 0) { $ip = $IP; }
    my $complete = "$tt,$ip, $text";
    $logcache->set("$cfgLogDomain:$key", $complete, 86400*21);
  }
}

# Connect to the log cache.
sub ConnectLogCache {
        unless (defined $logcache) {
                $logcache = new Cache::Memcached {
                                'servers' => $cfgLogServer,
                                'debug' => 0,
                                'compress_threshold' => 1024
                };
                if ($logcache) { $logcache->enable_compress(1); }
        }
}

To use it, very simply, will call SiteLog("Text to log...");


Now, the following is the perl script which is run as a daemon when the server starts up.   It READS the entries from teh cache and actually puts them in the sitelog file.


[cjw@srv3 utils]$ cat sitelog.pl
#!/usr/bin/perl

# Sitelog script that will pull sitelog entries out of the cache set aside for site logging.  Because we use a domain (or name-space) we can use the same cache as anything else (that doesnt conflict)

use Cache::Memcached;
use Getopt::Long;


my $domain   = "Log";
my $server   = "localhost";
my $port     = 11217;
my $filename = "sitelog.txt";
my $sleep    = 5;

$result = GetOptions ("domain=s"   => \$domain,
                      "server=s"   => \$server,
                      "port=i"     => \$port,
                      "filename=s" => \$filename,
                      "sleep=i"    => \$sleep);

my $limit    = 5;

my $signals = 0;
sub catch_signal {
        my $signame = shift;
    $signals++;
}

$SIG{INT} = \&catch_signal;
$SIG{KILL} = \&catch_signal;
$SIG{HUP} = \&catch_signal;
$SIG{TERM} = \&catch_signal;

# Create the cache connection.
unless (defined $logcache) {
        $logcache = new Cache::Memcached {
                'servers' => [ "$server:$port" ],
                'debug' => 0,
                'compress_threshold' => 1024
        };
        if ($logcache) { $logcache->enable_compress(1); }
}

unless ($logcache) {
        print "We dont seem to have a connection to the cache.\n";
        exit 1;
}
else {

        # First we open the logfile, so that we can write an initial message.  This also tests that we are able to.  Once we have written an intro text, we will close it.  The actual log writing will happen in the main while loop.
        unless (open(SITELOG, ">>$filename")) {
                print "Couldn't open logfile for append: $filename\n";
                exit 5;
        }
        else {
                my ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst)=localtime(time);
                printf SITELOG "%4d-%02d-%02d %02d:%02d:%02d,, SiteLog starting: $domain\n", $year+1900,$mon+1,$mday,$hour,$min,$sec;
                close SITELOG;

                while ( $signals == 0 ) {

                        # Get current key
                        my ($key) = $logcache->get("$domain:CurrentKey");
                        unless ($key) {
                                if (open(SITELOG, ">>$filename")) {
                                        my ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst)=localtime(time);
                                        printf SITELOG "%4d-%02d-%02d %02d:%02d:%02d,, Sitelog does not have CurrentKey set: $domain\n", $year+1900, $mon+1, $mday, $hour, $min, $sec;
                                        close SITELOG;
                                }
                                sleep 30;
                        }
                        else {

                                # get process key
                                my ($process) = $logcache->get("$domain:ProcessKey");
                                unless ($process) {

                                        # we dont have a process key, but if we got this far,
                                        # then we do have a current key.  Which means there
                                        # must be something in there to process.  For now, we
                                        # will set ProcessKey to 1, and then process it.
                                        $process = 0;
                                        $logcache->set("$domain:ProcessKey", "0");
                                }

                                if ($process > $key) {
                                                if (open(SITELOG, ">>$filename")) {
                                                                print SITELOG "Process value was invalid.  Resyncing.\n";
                                                                close SITELOG;
                                                }

                                                $logcache->set("$domain:ProcessKey", "0");
                                                $process = 0;
                                }

                                if ($process < $key) {
                                        # open log file.  We wont worry about renaming the file if it gets too
                                        # big, an external process will archive the file every night, renaming
                                        # it.  This process will end up starting a new file if it no longer exists.
                                        if (open(SITELOG, ">>$filename")) {


                                                # while process key is less than current key,
                                                while ($process < $key) {

                                                        # increment process key.
                                                        $process = $logcache->incr("$domain:ProcessKey");

                                                        my ($done) = 0;
                                                        my ($count) = 0;
                                                        while ($done == 0 && $count < $limit) {

                                                                # get text from key
                                                                my ($text) = $logcache->get("$domain:$process");
                                                                unless ($text) {
                                                                        # the text doesnt exist for this key yet.
                                                                        # It is possible for this to happen because
                                                                        # the source increments the counter, and
                                                                        # then adds the text.  We could be checking
                                                                        # for the text before it has been entered.  We
                                                                        # will sleep for 1 second, and then loop again.

                                                                        # We will write a single '-' on a line to the file so that if we get a bunch of them,
                                                                        # we know something went wrong.
                                                                        print SITELOG "-\n";
                                                                        sleep 1;
                                                                }
                                                                else {
                                                                        # delete key.  we could let it expire, but I feel better cleaning it up.
                                                                        $logcache->delete("$domain:$process");

                                                                        # write to log file.  
                                                                        # This bit gets the time value from the entry, and then puts the rest back together again. 
                                                                        # This can be done better probably.
                                                                        my ($tt,@rest) = split(/,/, $text);
                                                                        my ($line) = join ',', @rest;
                                                                        ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst)=localtime($tt);

                                                                        printf SITELOG "%4d-%02d-%02d %02d:%02d:%02d,$line\n", $year+1900, $mon+1, $mday, $hour, $min, $sec;

                                                                        # print "Processed: $process - $line\n";
                                                                        $done = 1;
                                                                }
                                                                $count ++;
                                                        }
                                                }

                                                # close log file.
                                                close SITELOG;
                                        }
                                        else {

                                                # if we couldn't write to the file, should we really exit, or should we wait and try again?

                                                print "Unable to append to log file: $filename\n";
                                                exit 2;
                                        }
                                }
                        }

                        if ($signals == 0) {
                                sleep $sleep;
                        }
                }

                if (open(SITELOG, ">>$filename")) {
                        my ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst)=localtime(time);
                        printf SITELOG "%4d-%02d-%02d %02d:%02d:%02d,, SiteLog stopping: $domain\n", $year+1900,$mon+1,$mday,$hour,$min,$sec;
                        close SITELOG;
                }
        }
}

exit;



I use a startup script to start and stop it when the server starts and stops.

[cjw@srv3 utils]$ cat /etc/rc.d/rhokz-sitelog
#!/bin/bash

. /etc/rc.conf
. /etc/rc.d/functions

PIDFILE=/var/run/rhokz-sitelog
APP_PATH=/data/www/domains/rhokz.com/utils/
NAME=rhokz-sitelog
LOG_DOMAIN=rhokz
LOG_PATH=/data/www/domains/rhokz.com/logs/logs/rhokz-sitelog.txt

start_instance() {
  if [ -e "$PIDFILE" ] ; then
    stat_busy "$NAME may already be running."
  else
    stat_busy "Starting $NAME"
    cd $APP_PATH
    ./sitelog.pl --domain=$LOG_DOMAIN --filename=$LOG_PATH &
    PID=$!
    echo $PID>$PIDFILE
  fi
}

stop_instance() {
  if [ -e "$PIDFILE" ] ; then
    PID=`cat $PIDFILE 2>/dev/null`

    stat_busy "Stopping $NAME"
    [ ! -z "$PID" ]  && kill $PID &> /dev/null
      rm $PIDFILE
  else
    stat_busy "$NAME is not running"
  fi
}

case "$1" in
  start)
    start_instance
    add_daemon $NAME
    stat_done
    ;;


  stop)
    stop_instance
    rm_daemon $NAME
    stat_done
    ;;

  restart)
    $0 stop
    sleep 1
    $0 start
    ;;

  status)
    if [ -e "$PIDFILE" ] ; then
      stat_busy "$NAME is running."

        ## Todo: We should actually check to make sure an appserver is running on that PID.

    else
      stat_busy "$NAME is not running."
    fi
    stat_done
    ;;

  *)
    echo "usage: $0 {start|stop|restart}"
esac


Thats it.   Its not very pretty, but it works well enough that I've not touched it for a long time.

I also have a cron job somewhere that archives the sitelog periodically.  My busy sites get archived daily, but the obscure ones are once a month I think.   The sitelog.pl script opens and closes the file on each cycle so that moving and archiving the log is easily done.
--
"Be excellent to each other"

dormando

unread,
Aug 11, 2008, 2:02:56 AM8/11/08
to memc...@googlegroups.com
I've talked to more than a few people who do something like this, and it
works well enough for them for sitelog-specific stuff. Either as a rolling
log or just a way to pull out extra session info about a specific user.

Anyone interested in writing up a wiki page on it? Please be specific
about the pros/cons and maybe a different approach as well ;) Don't ask
the list if it's okay, just go make the page and write it, then let us
know!

-Dormando

张立冰

unread,
Aug 11, 2008, 4:35:20 AM8/11/08
to memc...@googlegroups.com
Hi,all.

I have implemented an distributed memory queue system based on Memcached. I hope you can get some ideas from this project.
And the Google Code:http://code.google.com/p/memqd/
This project stil not publieshed, but I post the source code to the svn repository (http://code.google.com/p/memqd/source/checkout).
--
The time you enjoy weasting is not weasted time!

张立冰

unread,
Aug 11, 2008, 4:39:54 AM8/11/08
to memc...@googlegroups.com
You can check it out and have fun with it.
Here is an ppt about this project information and bechmarks.
http://code.google.com/p/memqd/source/browse/trunk/doc/MemQD.ppt

ivanhoe

unread,
Aug 11, 2008, 9:28:29 AM8/11/08
to memcached
Thank you all on such fast and helpful response...
I think I'll try my luck with one of the specialized message queue
daemons first, and then we'll see..
Reply all
Reply to author
Forward
0 new messages