adding cloud expandability

Skip to first unread message

Jacob Everist

Sep 1, 2010, 6:31:36 PM9/1/10
Our next step with our Pypes-based prototype is to add some real-time
cloud expandability. This will include the following.

1. a Pypes machine image to create Pypes instances on-the-fly
2. a composite Pypes component which is actually another data flow
graph on a slaved Pypes server

So for instance, if you add a composite component on your data flow
graph, this will instaniate another Pypes instance in the cloud,
configure it with the custom composite data flow graph, and establish
the comm. channel so this new slave Pypes server and can communicate
back its result to the master Pypes server.

Jacob Everist

cell:  310-425-9732

Eric Gaumer

Sep 2, 2010, 9:23:42 AM9/2/10
When you talk about expandability, what do you mean exactly? What aspect are you trying to scale?

Pypes was designed to be homogenous whereby each node has the same role. There is no concept of a master/slave relationship between nodes. Input data streams are essentially partitioned across the cluster by load balancing techniques.

It sounds like you are trying to run portions of a graph on another instance. This should be possible and you may even be able to write a publisher component that simply sends output back to some instance of pypes. So the slave would accept some input, process it, and publish results to the master who then consumes the data and potentially does further processing. All of this should be relatively straightforward given everything speaks HTTP.

In order to distribute graph "configurations" we kept the format simple. Its nothing more than JSON and that JSON describes (to the back-end) what components are used, what parameters each takes, and what the layout coordinates are in the UI. The back-end can build a fully functioning instance from this configuration.

So you can basically export  the config (as JSON) and send to another instance. At the moment you'd place the config into a directory and then restart pypes. We've been thinking about ways to auto-load (at runtime) a new config. This should be doable.

Lastly, pypes components are an abstraction of a Stackless tasklet. Stackless tasklets can be pickled, sent to another machine, and then resume execution.

So not only can you send a live instance to a node, you can send it in mid stroke.


Jacob Everist

Sep 14, 2010, 12:17:43 PM9/14/10

Thanks for the pointers. They have been very helpful.

This is what I am doing.

I am creating a virtual machine image that is running a Pypes server.
This is image can be loaded as an instance on the Eucalyptus cloud.
This is analogous to the Amazon EC2 cloud since they both use the same

I have a "main" pypes server that the user interfaces with via the
browser. They construct their components and their data flows in the
WireIt interface.

Some of the components are actually "composite components". They
contain extra heave duty processing. When one of these composite
components are added to the graph, they instantiate a new Pypes server
on the cloud that are intended to execute the subgraph workflow
represented by the composite component.

The easiest way to do this seems to be to put the cloud instantiation
and destruction code into the constructor and destructor respectively
of the class that runs the tasklet representing the component.

For instance, if my composite component is called Compos:

class Compos:
def __init__(self):
# execute Eucalyptus command-line to instantiate a new pypes server

# send JSON sub-graph description to new pypes server

# normal tasklet setup

def __del__(self):
# execute Eucalyptus command-line to destroy the pypes server

So for each Compos component, a new pypes server is instantiated on
the cloud. Communication will be established with these pypes servers
through the controlling tasklet.

One concern is that the cloud instantiation will take some time, so
the startup commands are blocking. Do the tasklets relinquish control
by themselves while blocking or do I need to fork another process to
do the setup on the command-line?

Do you see anything wrong with this approach or a more elegant way to
approach it?



Eric Gaumer

Sep 15, 2010, 10:06:19 AM9/15/10
On Tue, Sep 14, 2010 at 12:17 PM, Jacob Everist <> wrote:

One concern is that the cloud instantiation will take some time, so
the startup commands are blocking.  Do the tasklets relinquish control
by themselves while blocking or do I need to fork another process to
do the setup on the command-line?

Multitasking in pypes is cooperative but not preemptive. It is the the tasklets responsibility to release control of the CPU using yield_ctrl().

The scheduler will not preempt. This decision is largely based on the fact that there's a specific chain of dependency (i.e., B depends on A therefore preempting A to run B provides little use). This also simplifies the scheduler code.

It also provides the flexibility to control the flow of execution inside the components rather than having the scheduler enforce its own semantics. You can call yield_ctrl() at any point and know that the next time you're called, execution will begin where it left off.

With that said, there are some caveats once you understand how scheduling works.

The scheduler itself is a tasklet. It takes a user's graph and performs a topological sort which provides a linear ordering based on dependencies. It loads the tasklets in the order they will run and then places itself as a blocking tasklet at the front.

When data is sent from the HTTP layer it's sent through a Stackless channel (asynchronously) which the scheduling tasklet is listening on. Once the channel sees data it causes the scheduling tasklet to run which then causes all other tasklets to run in their topological ordering.

As each tasklet (component) calls yield_ctrl() it allows the next to run. Once the last tasklet calls yield_ctrl(), the next tasklet to run is the scheduling tasklet which once again blocks waiting for input.

So ideally (or under normal circumstances), a component has some sense of completion on iteration. Consider a situation where data arrived on a components input port but the component only processed a portion of that data. The remaining portion would still be buffered on the input port which could then only be processed by sending more data in.

As a way around this (which you've already discovered), you could send "triggers" into the system which contain null or junk data. The simple act of sending anything on the scheduling tasklets channel is enough to execute the graph.

That means that rather than sending data into the system, you could write a component that fetches some data. You can then cause this component to run by sending triggers to the system.

In the case of something more programmatic, you should be able to "control" things just by talking over the scheduling tasklets channel. In other words, you could essentially write your own scheduler that simply controls when triggers/data get sent into pypes. Then your components could yield intermittently and your new controlling tasklet could determine when to block (maybe once every components input ports are empty).
In terms of code executed at instantiation time, this actually happens in the asynchronously in the HTTP layer.  When a component is dragged from the left side list, within the Javascript layer we invoke a POST request to the HTTP server (the pylons "filters" controller).

This controller provides a RESTful interface for manipulating components (a.k.a., filters).

# instantiates a new filter instance (POST body contains a class name)
POST /filters 

# updates an instance with new configuration information (PUT body contains new JSON config)
PUT /filters/id

# deletes an instance represented by id
DELETE /filters/id 

# will return a list of all the filters for the given type (where type is Adapter, Publisher, etc.)
GET /filters?node=type 

The actual async JS call to create filter instances is here:

So in terms of this code blocking (your construction/destruction semantics), it actually doesn't block and this code executes outside the context of tasklet (which is bound to the run() method).

Keep in mind that if it takes really long to complete and you try wiring this component before it does, the back-end may not have registered the component yet. In this case it would look like you're trying to wire a component that doesn't yet exist.

One option may be to leverage the fact that pypes provides these RESTful semantics. When a composite component is created, you could essentially invoke the REST actions to construct the graph remotely. Since this could all be done in Javascript, you could ensure nothing blocks. Then your controlling tasklet could communicate with the other node over HTTP the same way data is sent in to any instance of pypes (i.e., POST request to /docs controller). That call itself is also asynchronous. The subgraph or composite instance could then write to a local sink that your controlling tasklet occasionally checks for completed jobs.

Then just write a publisher on the composite instance that actually serializes the pypes Packet to disk where it can be pulled via a GET request (a serialized Packet would ideally be JSON but you could just pickle it)

This might help keep the communication channels simple using HTTP. Keep in mind that the pypes UI is nothing more than a bunch of Javascript code that invokes methods on the server for creating and manipulating graphs. All of that functionality is exposed by design and it's available over HTTP because we envisioned the sort of semantics you're attempting here and we wanted to have the flexibility to do some of these things one day.

So anything you can do in the UI is exposed over HTTP meaning that you should have the flexibility to completely control the backend using any language or code that supports the HTTP protocol. Pypes was designed for the web and should therefore be able to operate within a cloud environment but you're group is the first to actually attempt it.

As a last note, keep in mind that pypes supports content negotiation of compressed (gzip) data. If you data is relatively large, have your controlling tasklet compress it before sending it to the remote instance. Just set the proper header and the other server will be sure to decompress the message before sending it through the graph.

We use this technique along with batching to send lots of data over a relatively slow protocol. If we're feeding millions of documents, we'll typically batch then in groups of one thousand and then compress the payload before we submit it. This reduces the number of HTTP calls. We also talked about supporting Google Protocol Buffers in addition to HTTP.

Hope this helps.


Reply all
Reply to author
0 new messages