monitoring the pypes components

12 views
Skip to first unread message

Jacob Everist

unread,
Aug 16, 2010, 7:37:04 PM8/16/10
to py...@googlegroups.com
Hello,

I have been working with the Pypes software for about 2 months now.
My group and I are building a proof-of-concept demo for a customer,
trying to show them how they can do visual workflow programming with
some of their common data processing tasks.

1. I have implemented several Pypes components. Some of the
components are just wrappers to web services. So the tasklet makes an
http request to some URL and gets back a response. This response is
operated on and then past to the next component via the Packet.

2. Our scenario is not data-driven but database-driven. So we do not
directly send the data to the Pypes server, the Pypes server has to
request it. We have a Component for this with tunable parameters to
modify the SQL query that grabs the data. The drawback to this is
that we don't have an adapter that starts workflow. I created a
special Start component that expects no data, but will send an empty
packet to the next Component which is the database query.

The Start component still requires some signal to get it started, so I
added a special button on the browser interface called "Start". This
sends a simple GET request to docs.py. This is instead of the normal
POST request with attached file data. I modified the docs.py
controller slightly to accept this extra case and instantiate an empty
starting packet for Start.

3. I have added a snippet of Javascript code that polls a new Pylons
controller called poll.py. The objective of this is to give some
real-time feedback on what is happening on the Pypes server. The
javascript is polling the server once every second and is returning
JSON data. The trouble here is that I don't know what kind of
feedback I can get from Stackless and Pypes that will tell me when the
processes are running and what components it is running in.

I am able to import stackless from the Pylons controller and query
some of the data there. I can even tell which tasklet is active at
the current time, but that doesnt' seem to have any relationship to
where all the workload is taking place. It seems to me that the only
way to get workflow statistics is to instrument every single Component
with some kind of shared datastructure where they can manually post
performance data such as new jobs and job completions.

Can you think of any better way and what would be the best way to go
about making a shared data structure?

By the way, are the Pylons controller responses from the HTTP requests
and the running tasklets both running on the same python run-time
environment?


--
Jacob Everist

cell:  310-425-9732
email:  jacob....@gmail.com

Eric Gaumer

unread,
Aug 16, 2010, 9:36:01 PM8/16/10
to py...@googlegroups.com
Jacob,

I'm glad you're finding some usefulness in pypes.

1. One of our original requirements when designing pypes was performance. It's why we chose Stackless. With that said, Stackless style concurrency works best with non-blocking I/O (i.e., CPU bound components). It's perfectly fine to create blocking calls inside your components as long as you understand that they reduce overall throughput. We're often tasked with integrating tens of millions of documents so we designed the system for highly concurrent non-blocking use cases.

With that said, if you're building workflows that don't require high throughput then blocking calls inside your components can work fine. 

2. The decision to use a data driven or *push" model was another of our original design choices. Our goal was decouple the data "extraction" from the "transformation". We deal with all sorts of source systems (FileNet, Sharepoint, Oracle, Documentum, etc) and each has its own set of API and libraries spanning various languages. The code to extract data is often complex and that complexity is hard to generalize from one set of business requirements to another.

In situations where we're using SQL or we need a polling style data fetch, we typically build a *connector* similar to what you've done but that lives outside of pypes. This allows us to use any language we want that best suits our needs and it also means we're not obligated to conform to any specific interface (e.g., pypes component). That data can look however it wants and we'll write a pypes adapter to model it properly. All we need to do is send valid HTTP requests.

I can see where this might not work in you situation because you want everything within the control of the pypes UI. We've actually done similar prototyping with regards to sending null messages to an adapter that then fetches data. It was mainly to mashup RSS feeds but now we're more focused on Google's PubSubHubbub which provides a push mechanism.

I think your approach is reasonable.

3. This touches on another inherent design choice; no shared state. Pypes uses pure message passing semantics via Stackless. The idea, like systems such as Erlang, is to avoid shared state in favor of message passing. Pypes implements a subset of Flow-Based programming.

Components communicate over Stackless channels by passing messages (Packets) to one another. On top of that, pypes uses the multi-processor support in Python 2.6 to run instances of the graph on each CPU (this is configurable).

The actual components run independent of HTTP (Pylons) layer and are replicated across multiple CPU's. Incoming HTTP requests are load balanced across instances of the graph running on each CPU. For instance, if you have a quad Xeon machine then 4 requests are processed in parallel.

With that said, there is no "global" process that has visibility into every component. This is what allows components to be easily connected in any order to any other component. They are completely independent of one another and operate by sending and receiving messages (Packets).

Even the scheduler itself doesn't know which component is running. When a component completes, it doesn't return to the scheduler which in turn runs the next component. What actually happens is that when a component completes, it passes its data to the next component, which in turn passes to the next, and so on.

In order to gain feedback, you need to follow the same message passing paradigm. One thought that comes to mind (essentially what you've suggested above) is using something like RabbitMQ (or your message broker of choice) and have each component send messages at various stages as it runs. This would provide a sort of "shared data structure".

Pypes does provide a logging framework where components can write to as they perform their operations. You could monitor that log and simply have your components log their actions at certain stages. This log could then be used to provide monitoring.

If you need something more robust then I would suggest a message broker. RabbitMQ is a good choice and Python offers a few AMQP clients that would allow you to easily communicate. In your case, the logging infrastructure might work well enough.

Try logging your performance statistics and then polling that file for updates. 

Regards,
-Eric

Jacob Everist

unread,
Aug 27, 2010, 3:06:55 AM8/27/10
to py...@googlegroups.com
Eric,

I was able to setup a monitoring system using RabbitMQ as you
suggested. This was a good idea since we are already using RabbitMQ
for another part of this project so it makes sense.

I did the setup part of rabbitmq in app_globals.py in the constructor
of Globals.

def __init__(self):
"""One instance of Globals is created during application
initialization and is available during requests via the
'app_globals' variable

"""
self.dfg = DataFlowGraph()

self.conn = amqp.Connection(host="localhost:5672",
userid="guest", password="guest", virtual_host="/", insist=False)
self.chan = self.conn.channel()

self.chan.queue_declare(queue="po_box", durable=True,
exclusive=False, auto_delete=False)
self.chan.exchange_declare(exchange="sorting_room",
type="direct", durable=True, auto_delete=False,)
self.chan.queue_bind(queue="po_box", exchange="sorting_room",
routing_key="pypes_status")
self.lastMsg = ""

This makes sure that both the exchange and the queue are setup and
that they are bound to each other.

I also initialized the connection to the exchange in the constructor
of the superclass of Component for each of the Pypes tasklets. Each
tasklet has a channel to the "sorting_room" exchange.

self.conn = amqp.Connection(host="localhost:5672",
userid="guest", password="guest", virtual_host="/", insist=False)
self.chan = self.conn.channel()

In place of the logging, I sent messages directly to the sorting_room
exchange for each tasklet.

msg = amqp.Message(''Component Succeeded: %s' % self.__class__.__name__)
msg.properties["delivery_mode"] = 2
self.chan.basic_publish(msg,exchange="sorting_room",routing_key="pypes_status")


I added a controller in Pylons called poll.py. If you call
index(self), it will establish a connection to the "po_box" queue and
read out all the messages. It will then send back the very last
message message to the calling browser. The last message is saved in
app_globals, so if the queue is empty, we send that back instead.

@jsonify
def index(self, format='html'):
"""GET /poll: All items in the collection"""

bodyText = ""
try:
while True:
msg = app_globals.chan.basic_get("po_box")
print msg.body
bodyText = msg.body
app_globals.lastMsg = bodyText
app_globals.chan.basic_ack(msg.delivery_tag)
except:
pass

print "poll:", app_globals.lastMsg
result = {"currStat" : { "status" : app_globals.lastMsg , "id" : 1}}
return result


The python interface code was taken from this blog post tutorial about
using py-amqplib and RabbitMQ.

http://blogs.digitar.com/jjww/2009/01/rabbits-and-warrens/

--

Eric Gaumer

unread,
Aug 27, 2010, 11:55:34 AM8/27/10
to py...@googlegroups.com
On Fri, Aug 27, 2010 at 3:06 AM, Jacob Everist <jacob....@gmail.com> wrote:
Eric,

I was able to setup a monitoring system using RabbitMQ as you
suggested.   This was a good idea since we are already using RabbitMQ
for another part of this project so it makes sense.


This is very cool Jacob. We've been playing with RabbitMQ and RabbitHub (adds PubSubHubbub capabilities to Rabbit) in front of pypes to build out a soft real-time publishing architecture that does semantic tagging for one of our clients.

Since pypes is inherently a message passing system, brokers like Rabbit fit really well. I'd be very interested to hear how your prototype turns out and any feedback to help make pypes better.

I could definitely see an API added to pypes to allow easy integration with AMQP brokers. Something that was optional and wired directly in as a logging facility (i.e., you could log to stdout, filesystem, or AMQP).

Regards,
-Eric


Reply all
Reply to author
Forward
0 new messages