Creating a worker to pull messages off redis

125 views
Skip to first unread message

Jason Fill

unread,
Jun 13, 2014, 1:17:34 PM6/13/14
to nod...@googlegroups.com

Hello,

I am writing a few workers that will need to poll a redis instance looking for some data.  At small scale this is not a major deal, but I need to ensure that I write this worker to handle maximum throughput.

The basic idea is to check redis to see if there is a key that matches, if so, send to a process function.

The basic code I have is below ( stripped down ).  As you can see from the code, for this one “Node Process” I am starting 10 worker loops.  In running this test, it seems like it is processing the messages in batches of 10, so like all 10 complete, then 10 more start.  Perhaps  this is b/c they are all going so fast it just naturally happens that way, but what I would like to see is if the first one is completed, it fires another worker loop instead of waiting fro all 10 to complete.

Any thoughts on design, best practices, and/or best way to handle what I am looking to do?


/*

    when the process starts, kick off 10 calls so we have a concurrecy of 10...

    Basically thought is this is like 10 users hitting an API all at once, we are running

    one node instance but handling x number of requests at any given second.

*/

for(var w=0; w<10; w++ ){

    start_worker();

}


// Start a worker up...

start_worker = function(){


    var local = {};


    async.series([

        function(callback){

          // Do something

          callback(null);

        }

        ,function(callback){

            // Look up some keys, which returns an array...

            local.results = ['key1','key2','key3'];

            callback(null);

        }


        ,function(callback){

            async.each(local.results, function( delay_time, callback) {

              

                processMessages(delay_time, function(err){

                    callback(err);

                });


            }, function(err){

                callback(err);

            });

        }

    ],

    // optional callback

    function(err, results){

        start_worker();

    });

}


// handle the processing of the actual message when one is located....

processMessages = function(delay_time, callback){

    var local = {};

    local.doProcess = true;

    async.doWhilst(

        function (callback) {

            redis_client.rpoplpush("delayed:" + delay_time, "delayed:processing", function (err, message_data){

                

                if(message_data){ 

                    var data = JSON.parse(message_data);

                    // this is where we could move the messages to processing, then clean the message off the delayed:processing queue.


                    console.log(message_data);


                    // assume everything was done...now delete the message from processing....

                    redis_client.lrem("delayed:processing", 0, message_data, function(err, result){

                       console.log('deleted: ' + message_data);

                       callback(null);

                    });

                    

                }else{

                    local.doProcess = false;

                    callback(null);

                }

            });

        },

        function () { return local.doProcess; },

        function (err) {

            if(err){

                console.log(err)

            }

            callback(err);

        }

    );

}


Thanks in advance!

Peter Rust

unread,
Jun 14, 2014, 11:03:49 AM6/14/14
to nod...@googlegroups.com
Jason,

I couldn't spot anything in your code that would lead to the behavior you describe, but one option would be to use async.queue() with concurrency=10, or to use async.cargo() with payload=10...

-- peter rust

Behrad Zari

unread,
Jun 15, 2014, 2:34:55 AM6/15/14
to nod...@googlegroups.com
You can also look at https://github.com/LearnBoost/kue which does the same. I've not changed it to `rpoplpush` yet, but it's been a piece of working handy module for years.
Reply all
Reply to author
Forward
0 new messages