Python Message Queue

217 views
Skip to first unread message

Pystar

unread,
Nov 18, 2010, 9:09:42 PM11/18/10
to web2py-users
I would like to know if any one here has used any message queue? and
which one comes recommended?

Phyo Arkar

unread,
Nov 18, 2010, 10:16:48 PM11/18/10
to web...@googlegroups.com
i use Multiprocessing's Queue across processess which works to parse
huge list of files and communicate back with database. they work
great.

mdipierro

unread,
Nov 18, 2010, 10:26:31 PM11/18/10
to web2py-users
Do you have an example...?



On Nov 18, 9:16 pm, Phyo Arkar <phyo.arkarl...@gmail.com> wrote:
> i use Multiprocessing's Queue across processess which works to parse
> huge list of files and communicate back with database. they work
> great.
>

Niphlod

unread,
Nov 19, 2010, 11:29:16 AM11/19/10
to web2py-users
the only thing on multiprocessing's queue that I don't like (it's not
its fault, but psycopg's one) is that I have to create multiple
connections (one for every process) to the database.

Multiprocessing's queue, threading one, deque are where you end up
with your hands dirty ....
if you need persistence and "security" usually you need to :
- take the message and store somewhere (table called "queued" ? )
- give it an uuid
- prepare a field in "result" store (usually a table, uuid and blob
columns)
- someone reading the "queued" shot an update to that result store
when it has the result
- retrieve results, send it away and/or delete it from pool comparing
with the "queued" tables as soon as possible

If you are not a "persistent" maniac, you can always store the message
in a deque, pop() it and you're done!

I found myself in one or two cases facing some issues and for the next
time I'm going to have a look to pyres seems nice, simple, and stable,
and ultimately nicer to "hack in" the code, said a friend of mine) and
if I don't make it I'm going to learn celery one time for all (seems
the best implementation out there) .
> > > which one comes recommended?- Nascondi testo citato
>
> - Mostra testo citato -

Michele Comitini

unread,
Nov 20, 2010, 3:11:54 PM11/20/10
to web...@googlegroups.com
2010/11/19 Niphlod <nip...@gmail.com>:

> the only thing on multiprocessing's queue that I don't like (it's not
> its fault, but psycopg's one) is that I have to create multiple
> connections (one for every process) to the database.
Not a psycopg fault, it is the way PostgreSQL work, and it is very well thought.

http://www.postgresql.org/docs/9.0/static/connect-estab.html reads:

"PostgreSQL is implemented using a simple "process per user"
client/server model."

Phyo Arkar

unread,
Nov 20, 2010, 4:16:04 PM11/20/10
to web...@googlegroups.com
Yes massimo

Here is my current implementation , please note i use it outside of
web2py and call it via subprocess . And it is quite crude and dirty ,
need to modify for using outside of my script.

What it does : it accepts a list of files (file_list) and the function
object (toprocess) , and according to sweetspot i found in the
maching (best number of process to run) i modify number of splits to
distribute workload across.

I have not tried inside web2py as one wierdness of multiprocessing requires
if __name__="__main__"


def task_splitter( files_list ,toprocess ):
from multiprocessing import Process, Queue
if len(files_list)>=16*16:
splits = 16
elif len(files_list)>=64:
splits =8
elif len(files_list)>=16:
splits =4
elif len(files_list)<16:
splits=1
n = len( files_list ) / splits
processes = []
ql = []

for i in range( 0, splits ):
que = Queue()
if i == splits - 1:
# len( files_list )
processes.append( [Process( target = toprocess, args = (
files_list, que) ), que] )
else:
# len( files_list )
processes.append( [Process( target = toprocess, args = (
files_list[0:n], que ) ), que] )
files_list = files_list[n:]
processes[i][0].start()

for p in processes:
ql.append( p[1].get() )
p[0].join()

flat_q = []
for q in ql:
flat_q += q


# "End , Returning"

return flat_q

Phyo Arkar

unread,
Nov 20, 2010, 4:19:17 PM11/20/10
to web...@googlegroups.com
Wow

celery is freaking awesome!

http://pypi.python.org/pypi/celery/2.1.3#example

I think we need it in web2py!. all other web frameworks have it now!.

Michele Comitini

unread,
Nov 20, 2010, 4:47:52 PM11/20/10
to web...@googlegroups.com
+1

2010/11/20 Phyo Arkar <phyo.ar...@gmail.com>:

Phyo Arkar

unread,
Nov 20, 2010, 4:53:15 PM11/20/10
to web...@googlegroups.com
One thing i am not clear about celery

It needs a MQ Backend to installed and configured right? (RabbitMQ,Redis) etc ?
They are whole new thing for me and they are Java/C  , so much dependencies.

Please  Celerify lol :D

Michele Comitini

unread,
Nov 20, 2010, 5:10:12 PM11/20/10
to web...@googlegroups.com
RabbitMQ seems Erlang a good sign, but add too many dependecies.
Redis is C

If it would be possible to replace sqlalchemy with DAL easily then we
could integrate it,
who is going to investigate?

2010/11/20 Phyo Arkar <phyo.ar...@gmail.com>:

Phyo Arkar

unread,
Nov 20, 2010, 5:16:04 PM11/20/10
to web...@googlegroups.com
Here is what i found from : http://celeryq.org/docs/configuration.html


By default it dont need any MQ Brokers , works via database backend which is supported by SQLAlchemy. ALso can use memcache .


We can easily make it work as it from memcached i think , and DAL wont be hard!

mdipierro

unread,
Nov 20, 2010, 6:36:27 PM11/20/10
to web2py-users
If we here to integrate a queue functionality in web2py what features
would you consider most valuable?

I can three different applications:
1) a message queue (one app sends a message, another one receives it)
2) a task queue (an app can submit a task to be executed later): task
is executed by a remote process (or cloud)
3) a task queue: task is executed by the app itself (in a separate
thread) but triggered by a queue callback.

There is some overlap but they are subject to different optimizations.
2) could be a compatibility layer on top of google's task queue.

blackthorne

unread,
Nov 20, 2010, 7:22:00 PM11/20/10
to web2py-users
I've used extensively ActiveMQ, RabbitMQ and memcache + starling for
professional reasons, what is it that you really want to know? what is
the target?

Phyo Arkar

unread,
Nov 20, 2010, 8:19:11 PM11/20/10
to web...@googlegroups.com
For my problem i have to use multiprocessing + Queue is:

I am developing a file indexer for a Digital Forensic Firm which extract huge archive of files , pst (ms Outlook archives), Read them . parse them , extract Mime and Indexable text out of many different type of documents , as much as possible. And put them into the database for later indexing by search engine.

Target is to process over 10GB of files at once.

When we tested 800MB archives, takes over 20 mins (without multiprocessing) .

machine is dual Core2 8GB DDR

Problems are ,
1. it stop web2py's from responding both normal requests and ajax requests  , that makes it impossible for showing progress to user.
2. it also stops Rocket's Keealive responds for 20 mins , making browser think its dead

my solution is to separate all processing out into a separate python process, spawn it via subprocess.Popen , without waiting it to end.

With that i can use multiprocessing module to spread loads across 4 processess , without bombarding web2py.
With that web2py can function welll.The process is running at background. and with Ajax i can make a progress bar by checking the progress report of the separated process.

Problem there are :

1. As it is outside of web2py theres no way of knowing if there error occoured, unless i monitoring the output of web2py process.
2. The way to communicate back to web2py is via files. I wrote process progress or error to 4 different files , which ajax requests monitor them for progress.
3. Between 4 processes it is easy to communicate parsed results via multiprocessing Queue , but as it is outside of web2py scope , cannot communicate using Queue
4. Also DAL have to be use as outside of web2py to put back results into database , and its ugly.


If tasks and ques of celery are integrated , there will be many benefits. Can easily load balance across multiple machines too. and communicate to and fro easily.

blackthorne

unread,
Nov 20, 2010, 8:22:53 PM11/20/10
to web2py-users
Hey Massimo, i think I can help on this and btw I think it is the way
to go if we want to bring scalability and fault-tolerance to another
level.

I would suggest the very same architecture used in the Merb framework.
They did an awesome work with nanites (a fabric of ruby daemons),
bringing real cloud computing power out-of-the-box. I've a work around
this on my Mst. and currently work on it professionally. If you want
the results on benchmarks for different things on this, please feel
free to contact me.

here:
http://www.slideshare.net/ezmobius/merb-nanite-presentation


Thank you,
Best regards

Phyo Arkar

unread,
Nov 20, 2010, 8:29:59 PM11/20/10
to web...@googlegroups.com
2) could be a compatibility layer on top of google's task queue.

google's task queue? on GAE?

for many orginizations and companys , GAE or any other public clouds are not an option. They want their sensitive data to be private, and paranoid to go it online. The firm i am working on is building their own private cloud and for their clients. My extractor will also need to run over it too , later.

So many private or goverment organisation , they will want non public cloud related technologies.

Phyo Arkar

unread,
Nov 20, 2010, 8:35:07 PM11/20/10
to web...@googlegroups.com
Thats Very cool!
+1

mdipierro

unread,
Nov 20, 2010, 9:32:22 PM11/20/10
to web2py-users
http://code.google.com/appengine/docs/python/taskqueue/

On Nov 20, 7:29 pm, Phyo Arkar <phyo.arkarl...@gmail.com> wrote:
> 2) could be a compatibility layer on top of google's task queue.
>
> google's task queue? on GAE?
>
> for many orginizations and companys , GAE or any other public clouds are not
> an option. They want their sensitive data to be private, and paranoid to go
> it online. The firm i am working on is building their own private cloud and
> for their clients. My extractor will also need to run over it too , later.
>
> So many private or goverment organisation , they will want non public cloud
> related technologies.
>

Phyo Arkar

unread,
Nov 20, 2010, 9:52:06 PM11/20/10
to web...@googlegroups.com
I was never into GAE , but if GAE SDK works locally without any need
from Google GAE ?

Can we build cloud using it?

mdipierro

unread,
Nov 20, 2010, 9:56:35 PM11/20/10
to web2py-users
No.

On Nov 20, 8:52 pm, Phyo Arkar <phyo.arkarl...@gmail.com> wrote:
> I was never into GAE , but if GAE SDK works locally without any need
> from Google GAE ?
>
> Can we build cloud using it?
>

Phyo Arkar

unread,
Nov 20, 2010, 10:05:13 PM11/20/10
to web...@googlegroups.com
so i am confused :D

Google taskqueues wont work without running on Google GAE Cloud right?

On 11/21/10, mdipierro <mdip...@cs.depaul.edu> wrote:

pbreit

unread,
Apr 5, 2011, 11:15:19 PM4/5/11
to web...@googlegroups.com
Was anyone able to make any progress with Celery? I am very interested as I am not having much luck with cron and background processes.

Ross Peoples

unread,
Apr 6, 2011, 8:55:33 AM4/6/11
to web...@googlegroups.com
I was able to get background processes to work pretty well. I have an import script that takes a minute or two to run and is started by an ajax call from a button in one of my views, and the progress is reported back to the page as the script runs. The way I did this was to use multiprocessing. Here is an example class:

from miltiprocessing import Process, Queue

class ImportScript(Process):
    queue = Queue()
    progress = (0, 0, 'Idle')

    def __init__(self, environment, db):
        Process.__init__(self)
        self.db = db

    def run(self):
        self._do_import(arg1, arg2)

    def _do_import(self, arg1, arg2):
        # long running task here
        self._update_progress((1,100, 'Importing'))

    def _updatre_progress(self, progress):
        ImportScript.queue.put(progress)
        print '%s: %s of %s' % (progress[2], progress[0], progress[1])

    @staticmethod
    def get_progress():
        queue = ImportScript.queue
        progress = ImportScript.progress
        while not queue.empty():
            progress = queue.get(False)

        ImportScript.progress = progress
        return progress

I called this file scripts.py, and added to my modules folder. Then in db.py, I initialize it by calling:

scripts_module = local_import('scripts', reload=False)  # reload=False is important

In my controller, when I want to start this process, I call:

import_script = scripts_module.ImportScript(globals(), db)
import_script.start()

The script starts and the controller continues execution while the script runs in the background. Then if I want to check progress on the script, I make an ajax call to another action in the controller that returns progress:

progress = scripts_module.ImportScript.get_progress()
return '%s: %s of %s' % (progress[2], progress[0], progress[1])

This would return something similar to:
Importing records: 5 of 100

pbreit

unread,
Apr 6, 2011, 12:53:09 PM4/6/11
to web...@googlegroups.com
Interesting, thanks. I am looking to have a background task run forever to process a mail queue. Do you think that would work or do I need something more heavy duty? I was thinking of Celery or just going back to */1 cron jobs for now.

Ross Peoples

unread,
Apr 6, 2011, 1:12:14 PM4/6/11
to web...@googlegroups.com
It should work. And if it's a never-ending process, then instead of calling it in the controller, you'd have to call it right after local_import in your db.py. You would also need to turn the ImportScript into a singleton and make sure it only gets initialized once because you don't want to spin off a new process for every request.

I've never tried it like that, but assuming you have a never-ending while loop or equivalent to process the queue, it should run continually. I would probably recommend using time.sleep() in there just to make sure the process doesn't take up too much processor time.

mart

unread,
Apr 7, 2011, 1:15:40 AM4/7/11
to web2py-users
I don't know if this helps any (or if even applicable to what you
need), but this work for me s it serves a multitude of purposes,
though mainly my task request queuing system. I trimmed it down to the
bare bone so it would make more sense.

* in a nutshel, it polls an arbitrary directory structure
(recursively) and simply keeps tracks of files and directories, coming
in and going out. It uses modified time stamp and compares that to
others. So, it has built-in prioritization (in a first-in, first-out
system). That's it.

* but with that, we can add tones of stuff to make it work for us.
Here are a few examples:
* it already takes care of keeping track of in-coming and out-
going traffic. So those files can contain stuff (like data, data-
types, pickled objects, script, string that can be executed (one of
the reasons we like python), it could also be...? can you guess? it
can be a bunch of CSV files that some web2py instance (or script using
DAL) dumped there after exporting its DB to CVS... (here, because
maybe the root folder is actually under /applications/appName/static/
Q) we can do something like have a controller check for that. This
controller could potentially say "hey, there's a bunch of CSV files, I
will have myApp import them!".

Because, we poll recursively, we could take advantage of that and give
the directories under the /Q some purpose, like the directory path
could represent something like a network path, or a family tree, or an
organization's "in-house mail system" (well, it could be ;) ).
Actually, a little while ago, I wanted to use this to organize my
build-time generated DB & tables. Would have been dandy, but seems not
possible :(. But another interesting idea with queuing could be this:
the folder structure could represent the decision flow of some sort of
workflow.... something like this:

a new file is dropped in root.submit

then processed by WF. it could now be expected to be dropped at some
point in the "processing" folder where "processor.py polls for files
and evaluates when found (or could go to the rejected folder @
anytime). so now the file is moving down the directory structure:

root.submit.processing

maybe now processor.py determines that it is a valid request, so it
droppes the file in a "pending" folder. so the structure has a new
layer:

root.submit.processing.pending_human_approval


here the Q notices the new file in a folder called
"pending_human_approval", reads it and loads instructions & sees who
the user's human manager is and notifies the manager that he needs to
approve or reject a workflow item.

the manager receives the notice and then logs in to the web2py app,
sees the 'pending_human_approval" message. He opens and clicks on
"approve" which will have the affect of moving the file to the
"approved folder. So now this flow's structure now looks like this:

root.submit.processing.pending_human_approval.approved

so now, our initial submitter is happy because he received a
"decision_taken" email form the automated workflow. So,he now logs in,
goes to his "work_items" and clicks on the
"submit_time_sheet_to_payrol" button because his manager has approved
this employee's request to take this year's Christmas day as
unpaid_time_off! :)

This is a queuing system because maybe there are another 200 employees
who want the christmas day off.

So, the recursive queuing system, that keeps track of files and folder
(& time stamps everything) can be used to handle a workflow's queuing
system (in fact be at the heart of it)...


anyways, just having fun thinking about ;) the script is below, and
does queuing :)

hope it helps, or at least provided minimal amusement ;)

Mart


from __future__ import nested_scopes
import os, time
import threading

def Qstructure (rPath,funcCheckQ):
_all = {}
def checkQ(unused, dirName, fileset):
for fName in fileset:
path = os.path.join(dirName, fName)

try:
t = os.stat(path)
except os.error as oErr:
continue
mtime = newOrModified.get(path)

if mtime is not None:
del newOrModified[path]
if t.st_mtime > mtime:
modifiedFileList.append(path)
else:
modifiedFileList.append(path)

_all[path] = t.st_mtime

patrolStructure = False
while True:
modifiedFileList = []
newOrModified = _all.copy()
_all = {}
for path in rPath:
os.path.walk(path, checkQ, None)
deletedFileList = newOrModified.keys()
if patrolStructure:
patrolStructure = False
elif modifiedFileList or deletedFileList:
patrolStructure = funcCheckQ(\
modifiedFileList\
,deletedFileList)
time.sleep(1.0)

def startCheckQ():
def checkQ(incoming,outgoing):
'''---
display the ins and outs
---'''
for item in incoming:print('in {0}'.format(item))
print('out {0}'.format(outgoing))
'''---
set the root ---'''
wDir = os.chdir('/Users/mart/dev_mart/')
Qstructure('.',checkQ,1.0)

if __name__ == '__main__':
startCheckQ()

Praneeth Bodduluri

unread,
Apr 7, 2011, 8:48:03 AM4/7/11
to web...@googlegroups.com, pbreit
I was planning to, making web2py pip installable was a step towards
this but got side tracked with academic work. Hoping to hack in DAL
support for celery next month.

--
Praneeth
IRC: lifeeth

pbreit

unread,
Apr 7, 2011, 1:02:10 PM4/7/11
to web...@googlegroups.com, pbreit
Go for it!!

pbreit

unread,
Apr 7, 2011, 1:05:41 PM4/7/11
to web...@googlegroups.com
I was looking for a simple but reliable way to run a never ending background task (for example, a mail queue). But also just simple queuing in general like Delayed Job, perhaps https://github.com/collectiveidea/delayed_job
Reply all
Reply to author
Forward
0 new messages