I have been working with the Pypes software for about 2 months now.
My group and I are building a proof-of-concept demo for a customer,
trying to show them how they can do visual workflow programming with
some of their common data processing tasks.
1. I have implemented several Pypes components. Some of the
components are just wrappers to web services. So the tasklet makes an
http request to some URL and gets back a response. This response is
operated on and then past to the next component via the Packet.
2. Our scenario is not data-driven but database-driven. So we do not
directly send the data to the Pypes server, the Pypes server has to
request it. We have a Component for this with tunable parameters to
modify the SQL query that grabs the data. The drawback to this is
that we don't have an adapter that starts workflow. I created a
special Start component that expects no data, but will send an empty
packet to the next Component which is the database query.
The Start component still requires some signal to get it started, so I
added a special button on the browser interface called "Start". This
sends a simple GET request to docs.py. This is instead of the normal
POST request with attached file data. I modified the docs.py
controller slightly to accept this extra case and instantiate an empty
starting packet for Start.
3. I have added a snippet of Javascript code that polls a new Pylons
controller called poll.py. The objective of this is to give some
real-time feedback on what is happening on the Pypes server. The
javascript is polling the server once every second and is returning
JSON data. The trouble here is that I don't know what kind of
feedback I can get from Stackless and Pypes that will tell me when the
processes are running and what components it is running in.
I am able to import stackless from the Pylons controller and query
some of the data there. I can even tell which tasklet is active at
the current time, but that doesnt' seem to have any relationship to
where all the workload is taking place. It seems to me that the only
way to get workflow statistics is to instrument every single Component
with some kind of shared datastructure where they can manually post
performance data such as new jobs and job completions.
Can you think of any better way and what would be the best way to go
about making a shared data structure?
By the way, are the Pylons controller responses from the HTTP requests
and the running tasklets both running on the same python run-time
environment?
--
Jacob Everist
cell: 310-425-9732
email: jacob....@gmail.com
I was able to setup a monitoring system using RabbitMQ as you
suggested. This was a good idea since we are already using RabbitMQ
for another part of this project so it makes sense.
I did the setup part of rabbitmq in app_globals.py in the constructor
of Globals.
def __init__(self):
"""One instance of Globals is created during application
initialization and is available during requests via the
'app_globals' variable
"""
self.dfg = DataFlowGraph()
self.conn = amqp.Connection(host="localhost:5672",
userid="guest", password="guest", virtual_host="/", insist=False)
self.chan = self.conn.channel()
self.chan.queue_declare(queue="po_box", durable=True,
exclusive=False, auto_delete=False)
self.chan.exchange_declare(exchange="sorting_room",
type="direct", durable=True, auto_delete=False,)
self.chan.queue_bind(queue="po_box", exchange="sorting_room",
routing_key="pypes_status")
self.lastMsg = ""
This makes sure that both the exchange and the queue are setup and
that they are bound to each other.
I also initialized the connection to the exchange in the constructor
of the superclass of Component for each of the Pypes tasklets. Each
tasklet has a channel to the "sorting_room" exchange.
self.conn = amqp.Connection(host="localhost:5672",
userid="guest", password="guest", virtual_host="/", insist=False)
self.chan = self.conn.channel()
In place of the logging, I sent messages directly to the sorting_room
exchange for each tasklet.
msg = amqp.Message(''Component Succeeded: %s' % self.__class__.__name__)
msg.properties["delivery_mode"] = 2
self.chan.basic_publish(msg,exchange="sorting_room",routing_key="pypes_status")
I added a controller in Pylons called poll.py. If you call
index(self), it will establish a connection to the "po_box" queue and
read out all the messages. It will then send back the very last
message message to the calling browser. The last message is saved in
app_globals, so if the queue is empty, we send that back instead.
@jsonify
def index(self, format='html'):
"""GET /poll: All items in the collection"""
bodyText = ""
try:
while True:
msg = app_globals.chan.basic_get("po_box")
print msg.body
bodyText = msg.body
app_globals.lastMsg = bodyText
app_globals.chan.basic_ack(msg.delivery_tag)
except:
pass
print "poll:", app_globals.lastMsg
result = {"currStat" : { "status" : app_globals.lastMsg , "id" : 1}}
return result
The python interface code was taken from this blog post tutorial about
using py-amqplib and RabbitMQ.
http://blogs.digitar.com/jjww/2009/01/rabbits-and-warrens/
--
Eric,
I was able to setup a monitoring system using RabbitMQ as you
suggested. This was a good idea since we are already using RabbitMQ
for another part of this project so it makes sense.