On Sunday 05 October 2008 00:47:26 Gloria W wrote:
> You'll notice that my questions point back to developer ease of
> programming. I am really hoping that Kamaelia aloows me to inherently
> stack protocols on top of each other.
OK, I've created an example, which I've copied and pasted below. It
starts from basic principles and ends up with a server slinging JSON
objects around over secure network sockets using DES encryption.
(Clients connect, decrypt the data they get and get sent JSON objects
which are delimited as discrete messages, decode that and dump out
the representation of the object)
The idea I'm showing here is how you could evolve just such a protocol.
This has taken probably a couple of hours, including writing this mail
and the documentation in the example below.
It might be interesting to extend this to creating logical channels
to pub/sub to. This is conceptually relatively simple with Kamaelia
due to the existance of the Kamaelia.Util.Backplane component, but
I thought I'd stop for now at nesting.
I suspect that's probably another hour or two's work, but would then
be a nice pretty scaleable pub/sub JSON object router. It'd probably
be another days work or so to make it possible to build networks of
such routers.
Different stages of the examples below are commented out using "if 0:".
Each such chunk represents a different stage in the evolution of the
system towards the above setup.
Hopefully this example is useful. I may come back to the pub/sub using
backplanes idea tomorrow or over the next couple of days.
Best Regards,
Michael.
--
Everything below this line is copy & pasteable into an editor and runnable.
#!/usr/bin/python
"""
OK, we're starting from first principles, so let's import the core
things we'll be starting off with - we want something to send
messages, once per second and display it.
So we pull in our imports, build a pipeline and start echoing
the message we recieve
"""
from Kamaelia.Util.Console import ConsoleEchoer
from Kamaelia.Chassis.Pipeline import Pipeline
from Kamaelia.Apps.Grey.PeriodicWakeup import PeriodicWakeup
if 0:
Pipeline(
PeriodicWakeup(message="NEXT", interval=1),
ConsoleEchoer()
).run()
"""
We now want to turn this into a (changing) sequence of JSON messages.
For expediency, we'll create 40 messages by creating a list of 4
messages, and multiplying by 10.
We use the "Chooser" component to iterate through this list. The "Chooser"
component advances through the list when it gets a next message, so
let's prove that this works.
"""
from Kamaelia.Util.Chooser import Chooser
messages = [ {"hello": "world" },
{"hello": [1,2,3] },
{"world": [1,2,3] },
{"world": {"game":"over"} },
]*10
if 0:
Pipeline(
PeriodicWakeup(message="NEXT", interval=1),
Chooser(messages),
ConsoleEchoer()
).run()
"""
OK, before we bring the network into play, let's start marshalling
these JSON messages as strings, ready for slinging over the network.
This naturally forms a very simple, very trivial protocol.
"""
import Axon
import cjson
class MarshallJSON(Axon.Component.component):
def main(self):
while not self.dataReady("control"):
for j in self.Inbox("inbox"):
j_encoded = cjson.encode(j)
self.send(j_encoded, "outbox")
if not self.anyReady():
self.pause()
yield 1
"""
We can test this, by inserting this just after the messages have been created.
Note that we've also switched the ConsoleEchoer to use repr() of the messages
it recieves now to demonstrate that MarshalJSON is spitting out strings.
"""
if 0:
Pipeline(
PeriodicWakeup(message="NEXT", interval=1),
Chooser(messages),
MarshallJSON(),
ConsoleEchoer(use_repr=True)
).run()
"""
Next step is demarshalling. This mirrors the Marshaller.
"""
class DeMarshallJSON(Axon.Component.component):
def main(self):
while not self.dataReady("control"):
for j in self.Inbox("inbox"):
j_decoded = cjson.decode(j)
self.send(j_decoded, "outbox")
if not self.anyReady():
self.pause()
yield 1
"""
Add this into the pipeline. You'll see here from the ConsoleEchoer that
it is indeed being demarshalled.
"""
if 0:
Pipeline(
PeriodicWakeup(message="NEXT", interval=1),
Chooser(messages),
MarshallJSON(),
DeMarshallJSON(),
ConsoleEchoer(use_repr=True)
).run()
"""
We can now bring the network into the system and simply pipe this
over a network connection.
SingleServer works in a very similar fashion to "netcat -l", and
TCPClient is obviously intended to work in a similarly.
This enables us to insert SingleServer and TCPClient inbetween the
Marshall/DeMarshall step.
"""
from Kamaelia.Internet.SingleServer import SingleServer
from Kamaelia.Internet.TCPClient import TCPClient
if 0:
Pipeline(
PeriodicWakeup(message="NEXT", interval=1),
Chooser(messages),
MarshallJSON(),
SingleServer(port=2345),
TCPClient("127.0.0.1", port=2345),
DeMarshallJSON(),
ConsoleEchoer(use_repr=True)
).run()
"""
Since that works, the next logical step is to break this into two pipelines.
The first is the network server, the second is the network client.
"""
if 0:
Pipeline(
PeriodicWakeup(message="NEXT", interval=1),
Chooser(messages),
MarshallJSON(),
SingleServer(port=2345),
).activate()
Pipeline(
TCPClient("127.0.0.1", port=2345),
DeMarshallJSON(),
ConsoleEchoer(use_repr=True)
).run()
"""
Since we now have a basic server & a basic protocol for sending JSON
objects, we can seperate out the protocol from the server code, and
plug that into a scalable server core, rather than the netcat style
server.
"""
from Kamaelia.Chassis.ConnectedServer import ServerCore
if 0:
"""
First of all the protocol. We throw away the arguments we're sent when
our protocol factory gets called. We could of course do something more
interesting here.
"""
def protocol(*args,**argd):
return Pipeline(
PeriodicWakeup(message="NEXT", interval=1),
Chooser(messages),
MarshallJSON(),
)
"""
Pull the protocol into the server
"""
ServerCore(protocol=protocol, port=2345).activate()
""" client is unchanged """
Pipeline(
TCPClient("127.0.0.1", port=2345),
DeMarshallJSON(),
ConsoleEchoer(use_repr=True)
).run()
"""
One next possible/logical step is to make the client a prefab as well, so we do that.
"""
if 0:
def protocol(*args,**argd):
return Pipeline(
PeriodicWakeup(message="NEXT", interval=1),
Chooser(messages),
MarshallJSON(),
)
def json_client_prefab(ip, port):
return Pipeline(
TCPClient(ip, port=port),
DeMarshallJSON(),
ConsoleEchoer(use_repr=True)
)
ServerCore(protocol=protocol, port=2345).activate()
json_client_prefab("127.0.0.1", 2345).run()
"""
Now, this is nice and will work, but exploits an aspect of TCP which is
not always true. Specifically often TCP stacks preserve to a limited extent
this messaging structure, but in practice, messages may come in a variety
of different sizes.
As a result, since we're sending messages rather than byte data we really
need to use a higher layer protocol - however simple - to delimit messages.
For this we can use the DataChunker protocol in Kamaelia.Protocol.Framing.
For this to work, the DataChunking must occur on the way out to the network
connection and as the first thing we do when we recieve data from the
connection.
"""
from Kamaelia.Protocol.Framing import DataChunker, DataDeChunker
if 0:
def protocol(*args,**argd):
return Pipeline(
PeriodicWakeup(message="NEXT", interval=1),
Chooser(messages),
MarshallJSON(),
DataChunker(), # Last thing before sent to network
)
def json_client_prefab(ip, port):
return Pipeline(
TCPClient(ip, port=port),
DataDeChunker(), # First thing after get from network
DeMarshallJSON(),
ConsoleEchoer(use_repr=True)
)
ServerCore(protocol=protocol, port=2345).activate()
json_client_prefab("127.0.0.1", 2345).run()
"""
OK, so that's pretty nice - we're sending and recieving JSON objects. What
about encryption?
Well for this, let's pull in PyCrypto, and create a simple object for managing
the state that you need to manage when using the DES Cipher provided.
"""
# Pull in DES
from Crypto.Cipher import DES
""" Basic null encoder, but captures and stores the key """
class Encoder(object):
"""Null encoder/base encoder - returns the same string
for encode/decode"""
def __init__(self, key, **argd):
super(Encoder, self).__init__(**argd)
self.__dict__.update(argd)
self.key = key
def encode(self, some_string):
return some_string
def decode(self, some_string):
return some_string
""" Simple DES based encryption utility - handles encoding &
decoding using a given key """
class DES_CRYPT(Encoder):
def __init__(self, key, **argd):
super(DES_CRYPT, self).__init__(key, **argd)
self.key = self.pad_eight(key)[:8]
self.obj = obj=DES.new(self.key, DES.MODE_ECB)
def encode(self, some_string):
padded = self.pad_eight(some_string)
encrypted = self.obj.encrypt(padded)
return encrypted
def decode(self, some_string):
padded = self.obj.decrypt(some_string)
decoded = self.unpad(padded)
return decoded
def pad_eight(self, some_string):
X = len(some_string)
if X % 8 != 0:
pad_needed = 8-X % 8
else:
pad_needed = 8
pad_needed = 8-(X % 8)
PAD = pad_needed * chr(pad_needed)
return some_string+PAD
def unpad(self, some_string):
x = ord(some_string[-1])
return some_string[:-x]
"""
Now that we have an encryption utility, we now need a component for
encrypting on the way out, and another for decrypting on the way in
First the encrypter. you will note that it looks very similar to the
JSON Marshaller.
"""
class Encrypter(Axon.Component.component):
key = "ABCD"
def main(self):
crypter = DES_CRYPT(self.key)
while not self.dataReady("control"):
for j in self.Inbox("inbox"):
j_encoded = crypter.encode(j)
self.send(j_encoded, "outbox")
if not self.anyReady():
self.pause()
yield 1
"""
Next the decrypter. you will note that it looks very similar to the
JSON DeMarshaller.
"""
class Decrypter(Axon.Component.component):
key = "ABCD"
def main(self):
crypter = DES_CRYPT(self.key)
while not self.dataReady("control"):
for j in self.Inbox("inbox"):
j_decoded = crypter.decode(j)
self.send(j_decoded, "outbox")
if not self.anyReady():
self.pause()
yield 1
"""
We then want to sling this out over the network. Now, since the
Encrypter/Decrypter themselves need to deal with messages of the
right size, we're for expediency encrypting our messages, but not
the stream as a whole.
More complex encrypt/decrypt components would ensure that the messages
are the right length before sending them for encode/decode.
"""
if 1:
def protocol(*args,**argd):
return Pipeline(
PeriodicWakeup(message="NEXT", interval=1),
Chooser(messages),
MarshallJSON(),
Encrypter(), # Encrypt on the way out
DataChunker(),
)
def json_client_prefab(ip, port):
return Pipeline(
TCPClient(ip, port=port),
DataDeChunker(),
Decrypter(), # Decrypt on the way in
DeMarshallJSON(),
ConsoleEchoer(use_repr=True)
)
ServerCore(protocol=protocol, port=2345).activate()
json_client_prefab("127.0.0.1", 2345).run()