[spade2] r2231 committed - - Added Personal Event Publication and Subscription Protocol! This ama...

6 views
Skip to first unread message

spa...@googlecode.com

unread,
Jul 16, 2010, 1:42:47 PM7/16/10
to spade-...@googlegroups.com
Revision: 2231
Author: jpalanca
Date: Fri Jul 16 10:41:53 2010
Log: - Added Personal Event Publication and Subscription Protocol! This
amazing new feature (developed by cooldwind and sangarb1) allows agents to
create events and subscribe to friend events, when their contact publish
events. This was done following the Jabber XEP #60. This is an inital
version, in future work more features will be added to the pubsub protocol.
- Pubsub unittest were added. There is still some work to check their
reliability.
- Also some pubsub examples in directory examples/pubsub
- Old spade.sh was deleted.

http://code.google.com/p/spade2/source/detail?r=2231

Added:
/trunk/examples/pubsub
/trunk/examples/pubsub/juliet.py
/trunk/examples/pubsub/romeo.py
/trunk/examples/unittests/pubsubTestCase.py
/trunk/spade/pubsub.py
/trunk/xmppd/modules/pubsub.py
Deleted:
/trunk/spade.sh
Modified:
/trunk/examples/unittests/unittests.py
/trunk/spade/Agent.py
/trunk/spade/socialnetwork.py
/trunk/xmpp/protocol.py
/trunk/xmppd/modules/__init__.py
/trunk/xmppd/modules/db_fake.py

=======================================
--- /dev/null
+++ /trunk/examples/pubsub/juliet.py Fri Jul 16 10:41:53 2010
@@ -0,0 +1,88 @@
+#!/usr/bin/python
+# -*- coding: utf-8 -*-
+
+# TODO: Recognize error of "non-supported error creation"
+
+import sys
+sys.path.append("../..")
+from xmpp.simplexml import Node
+from spade.Agent import Agent
+from spade.Behaviour import OneShotBehaviour
+#from spade.pubsub import PubSub #, PubSubMessageTemplate
+
+def asserteq(one, two):
+ print one == two, ' ', one, ' == ', two
+ assert one == two, 'not equal'
+
+class MyAgent(Agent):
+ class MyBehav(OneShotBehaviour):
+ def onStart(self):
+ self.myAgent.DEBUG("Starting behaviour . . .")
+
+ def _process(self):
+ #pubsub = PubSub(self.myAgent, self)
+
+ try:
+
+ while self.myAgent.subscribeToEvent('ExistsNode') ==
('error', ['item-not-found']):
+ time.sleep(1)
+
+ asserteq( self.myAgent.subscribeToEvent('ExistsNode'),
('error', ['not-authorized', 'presence-subscription-required']))
+ asserteq( self.myAgent.unsubscribeFromEvent('ExistsNode'),
('error', ['unexpected-request', 'not-subscribeToEventd']))
+ asserteq( self.myAgent.deleteEvent('ExistsNode'),
('error', ['forbidden']))
+ asserteq( self.myAgent.createEvent('ExistsNode'),
('error', ['conflict']))
+ asserteq( self.myAgent.publishEvent('ExistsNode',
Node(tag='foo')), ('error', ['forbidden']))
+
+ self.myAgent.setSocialItem('romeo@'+self.myAgent.server)
+
self.myAgent._socialnetwork['romeo@'+self.myAgent.server].subscribe()
+
+ time.sleep(10)
+ print 'Sleeping 10 seconds...'
+
+ asserteq( self.myAgent.subscribeToEvent('ExistsNode'),
('ok', []))
+ #TODO: Check that the last published item is sent after
subscription.
+
+ #TODO: Check that the new item published by Romeo is
received too.
+
+ time.sleep(5)
+ print 'Sleeping 5 seconds...'
+
+ asserteq( self.myAgent.unsubscribeFromEvent('ExistsNode'),
('ok', []))
+ asserteq( self.myAgent.subscribeToEvent('ExistsNode'),
('ok', [])) # OK
+ #TODO: Check that the last published item is sent after
subscription.
+ asserteq( self.myAgent.subscribeToEvent('ExistsNode',
jid='romeo@'+self.myAgent.server), ('error',
['bad-request', 'invalid-jid']))
+
+ #TODO: Check that the notification of node deletion is
received.
+
+ except Exception,e:
+ print e
+
+ def onEnd(self):
+ self.myAgent.DEBUG("Ending behaviour . . .")
+
+ def _setup(self):
+ self.DEBUG("MyAgent starting . . .")
+ b = self.MyBehav()
+ #FIXME: If we don't set it to default,
+ # we will have problems with some replies.
+ #self.addBehaviour(b, PubSubMessageTemplate())
+ self.setDefaultBehaviour(b)
+
+if __name__ == "__main__":
+ if len(sys.argv) < 2:
+ host = "127.0.0.1"
+ else:
+ host = sys.argv[1]
+ a = MyAgent("juliet@"+host, "secret")
+ a.wui.start()
+ a.setDebugToScreen()
+ a.start()
+ import time
+ while True:
+ try:
+ time.sleep(1)
+ except KeyboardInterrupt:
+ break
+ a.stop()
+ sys.exit(0)
+
=======================================
--- /dev/null
+++ /trunk/examples/pubsub/romeo.py Fri Jul 16 10:41:53 2010
@@ -0,0 +1,91 @@
+#!/usr/bin/python
+# -*- coding: utf-8 -*-
+
+# TODO: Recognize error of "non-supported error creation"
+
+import sys
+sys.path.append("../..")
+
+from xmpp.simplexml import Node
+from spade.Agent import Agent
+from spade.Behaviour import OneShotBehaviour
+#from spade.pubsub import PubSub #, PubSubMessageTemplate
+
+def asserteq(one, two):
+ print one == two, ' ', one, ' == ', two
+ assert one == two, 'not equal'
+
+class MyAgent(Agent):
+ class MyBehav(OneShotBehaviour):
+ def onStart(self):
+ self.myAgent.DEBUG("Starting behaviour . . .")
+
+ def _process(self):
+ #self.myAgent.setSocialItem('sandra@'+self.myAgent.server)
+
#self.myAgent._socialnetwork['sandra@'+self.myAgent.server].subscribe()
+ #frm = self.myAgent.getAID().getName()
+ #to = self.myAgent.getSpadePlatformJID()
+ #pubsub = PubSub(self.myAgent, self)
+
+ try:
+ asserteq(self.myAgent.subscribeToEvent('NENode'),
('error', ['item-not-found']))
+
+ asserteq(self.myAgent.unsubscribeFromEvent('NENode'),
('error', ['item-not-found']))
+ asserteq(self.myAgent.deleteEvent('NENode'), ('error',
['item-not-found']))
+
+ res = self.myAgent.publishEvent('NENode', Node(tag='foo'))
+ asserteq(res[0], 'ok')
+ asserteq(len(res[1]), 2)
+ asserteq(res[1][0], 'NENode')
+ asserteq(type(res[1][1]), unicode)
+
+ asserteq(self.myAgent.createEvent('ExistsNode'), ('ok',
['ExistsNode']))
+
+ self.myAgent.setSocialItem('juliet@'+self.myAgent.server)
+
self.myAgent._socialnetwork['juliet@'+self.myAgent.server].subscribe()
+
+ time.sleep(15)
+ print 'Sleeping 15 seconds...'
+
+ self.myAgent.publishEvent('ExistsNode', Node(tag='foo'))
#OK
+
+ asserteq(self.myAgent.unsubscribeFromEvent('ExistsNode',
jid='juliet@'+self.myAgent.server), ('error',
['bad-request', 'invalid-jid']))
+
+ time.sleep(5)
+ print 'Sleeping 5 seconds...'
+
+ asserteq(self.myAgent.deleteEvent('ExistsNode'), ('ok',
[]))
+
+ except Exception,e:
+ print 'Exception'
+ print e
+
+ def onEnd(self):
+ self.myAgent.DEBUG("Ending behaviour . . .")
+
+ def _setup(self):
+ self.DEBUG("MyAgent starting . . .")
+ b = self.MyBehav()
+ #FIXME: If we don't set it to default,
+ # we will have problems with some replies.
+ #self.addBehaviour(b, PubSubMessageTemplate())
+ self.setDefaultBehaviour(b)
+
+if __name__ == "__main__":
+ if len(sys.argv) < 2:
+ host = "127.0.0.1"
+ else:
+ host = sys.argv[1]
+ a = MyAgent("romeo@"+host, "secret")
+ a.wui.start()
+ a.setDebugToScreen()
+ a.start()
+ import time
+ while True:
+ try:
+ time.sleep(1)
+ except KeyboardInterrupt:
+ break
+ a.stop()
+ sys.exit(0)
+
=======================================
--- /dev/null
+++ /trunk/examples/unittests/pubsubTestCase.py Fri Jul 16 10:41:53 2010
@@ -0,0 +1,130 @@
+import os
+import sys
+import time
+import unittest
+
+sys.path.append('../..')
+
+import spade
+from xmpp.simplexml import Node
+
+host = "127.0.0.1"
+
+class PubSubTestCase(unittest.TestCase):
+
+ def setUp(self):
+
+ self.Aaid = spade.AID.aid("a@"+host,["xmpp://a@"+host])
+ self.Baid = spade.AID.aid("b@"+host,["xmpp://b@"+host])
+
+ self.a = spade.Agent.Agent("a@"+host, "secret")
+ self.a.wui.start()
+ self.a.start()
+ self.b = spade.Agent.Agent("b@"+host, "secret")
+ self.b.wui.start()
+ self.b.start()
+
+ self.a.setSocialItem('b@'+host)
+ self.a._socialnetwork['b@'+host].subscribe()
+ self.b.setSocialItem('a@'+host)
+ self.b._socialnetwork['a@'+host].subscribe()
+
+ self.a.deleteEvent("ExistsNode")
+ self.b.deleteEvent("ExistsNode")
+ self.a.deleteEvent("NENode")
+ self.b.deleteEvent("NENode")
+
+ def tearDown(self):
+ self.a.deleteEvent("ExistsNode")
+ self.b.deleteEvent("ExistsNode")
+ self.a.deleteEvent("NENode")
+ self.b.deleteEvent("NENode")
+ self.a.stop()
+ self.b.stop()
+
+ def testSubscribeNotExistEvent(self):
+ result = self.a.subscribeToEvent("NENode")
+ self.assertEqual(result, ('error', ['item-not-found']) )
+
+ def testUnsubscribeNotExistEvent(self):
+ result = self.a.unsubscribeFromEvent("NENode")
+ self.assertEqual(result, ('error', ['item-not-found']) )
+
+ def testDeleteNotExistEvent(self):
+ result = self.a.deleteEvent("NENode")
+ self.assertEqual(result, ('error', ['item-not-found']) )
+
+ def testPublishNotExistEvent(self):
+ result = self.a.publishEvent('NENode', Node(tag='foo'))
+ self.assertEqual(result[0], 'ok')
+ self.assertEqual(len(result[1]), 2)
+ self.assertEqual(result[1][0], 'NENode')
+ self.assertEqual(type(result[1][1]), unicode)
+
+ self.a.deleteEvent("NENode")
+
+ def testCreateEvent(self):
+ result = self.a.createEvent("ExistsNode")
+ self.assertEqual(result, ('ok', ['ExistsNode']))
+
+ self.a.deleteEvent("ExistsNode")
+
+ def testPublishEvent(self):
+
+ result = self.a.createEvent("ExistsNode")
+ self.assertEqual(result, ('ok', ['ExistsNode']))
+
+ result = self.b.subscribeToEvent("ExistsNode")
+ self.assertEqual( result, ('ok', []) )
+
+ self.a.publishEvent('ExistsNode', Node(tag='foo'))
+ #TODO: Check that the last published item is sent after
subscription.
+
+ #TODO: Check that the new item published by Romeo is received too.
+
+ self.b.unsubscribeFromEvent("ExistsNode")
+ self.a.deleteEvent("ExistsNode")
+
+ def testSubscribeNotAllowed(self):
+ result = self.a.createEvent("ExistsNode")
+ self.assertEqual(result, ('ok', ['ExistsNode']))
+
+ result = self.b.subscribeToEvent("ExistsNode", jid="a@"+host)
+ self.assertEqual(result, ('error', ['bad-request', 'invalid-jid']))
+
+ self.a.deleteEvent("ExistsNode")
+
+ def testUnsubscribeNotAllowed(self):
+ result = self.a.createEvent("ExistsNode")
+ self.assertEqual(result, ('ok', ['ExistsNode']))
+
+ result = self.b.subscribeToEvent("ExistsNode")
+ self.assertEqual( result, ('ok', []) )
+
+ result = self.a.unsubscribeFromEvent('ExistsNode', jid='b@'+host)
+ self.assertEqual(result, ('error', ['bad-request', 'invalid-jid'])
)
+
+ self.b.unsubscribeFromEvent("ExistsNode")
+ self.a.deleteEvent("ExistsNode")
+
+ def testResubscribeToEvent(self):
+ result = self.a.createEvent("ExistsNode")
+ self.assertEqual(result, ('ok', ['ExistsNode']))
+
+ result = self.b.subscribeToEvent("ExistsNode")
+ self.assertEqual( result, ('ok', []) )
+
+ result = self.b.unsubscribeFromEvent("ExistsNode")
+ self.assertEqual( result, ('ok', []))
+ result = self.b.subscribeToEvent("ExistsNode")
+ self.assertEqual( result, ('ok', [])) # OK
+ #TODO: Check that the last published item is sent after
subscription.
+
+
+
+
+if __name__ == "__main__":
+ unittest.main()
+
+
+
=======================================
--- /dev/null
+++ /trunk/spade/pubsub.py Fri Jul 16 10:41:53 2010
@@ -0,0 +1,297 @@
+from spade.Behaviour import MessageTemplate, OneShotBehaviour
+
+from xmpp.protocol import *
+from xmpp.simplexml import Node
+import uuid
+
+def gen_id():
+ return str(uuid.uuid4())
+
+#def PubSubMessageTemplate():
+# msgs = []
+# for ns in (NS_PUBSUB, NS_PUBSUB_OWNER):
+# msg = Iq()
+# msg.addChild(name='pubsub', namespace=ns)
+# msgs.append(msg)
+# return reduce(lambda a,b: a | b, map(lambda msg:
MessageTemplate(msg), msgs))
+
+
+#class XMPPIdTemplate(MessageTemplate):
+
+# def __init__(self, id):
+# iq = Iq()
+# iq.setID(id)
+# MessageTemplate.__init__(self, iq)
+
+#TODO: Implementar retrieve nodes y discovery
+
+class PubSub(object):
+
+ def __init__(self, agent): #, msgrecv):
+ self._client = agent.getAID().getName()
+ #self.msgrecv = msgrecv
+ self.myAgent = agent
+ self._server = agent.server
+
+ def _sendAndReceive(self, iq, getContents):
+ id = gen_id()
+ t = MessageTemplate(Iq(attrs={'id':id}))
+ iq.setID(id)
+ b = self._sendAndReceiveBehav(iq,getContents)
+
+ self.myAgent.addBehaviour(b,t)
+ b.join()
+ return b.result
+
+ class _sendAndReceiveBehav(OneShotBehaviour):
+ def __init__(self,iq,getContents):
+ OneShotBehaviour.__init__(self)
+ self.iq = iq
+ self.getContents = getContents
+ self.timeout = 15
+ def _process(self):
+ #print 'Sending ', str(self.iq)
+ self.myAgent.send(self.iq)
+
+ #Wait for the answer
+ msg = self._receive(block=True,timeout=self.timeout)
+ #print 'Received ', str(msg)
+ if msg is None:
+ #Timeout
+ self.result = ('error',['timeout'])
+ return
+ if msg['type'] == 'error':
+ errors = []
+ for error in msg.getTag('error').getChildren():
+ if error.getName() == 'text': continue
+ errors.append(error.getName())
+ self.result = ('error',errors)
+ return
+ if msg['type'] == 'result':
+ self.result = ('ok',self.getContents(msg))
+ return
+
+ self.result = ('error',['unknown'])
+ return
+
+
+
+
+ def publish(self, node, event=None):
+ """
+ Publishes an item to a given node.
+
+ XXX: 'node' here is not an XML node, but the attribute for
<publish>
+
+ @type node: string
+ @param node: The ID of the pubsub node to publish
+ @type event: Event
+ @param event: Content to publish
+ @rtype: (string , list[string])
+ @return: A tuple with the type of answer ('ok','error') and
information
+ about the answer. In case of 'error', a list with the errors.
In case of
+ 'ok' the name of the created node.
+ """
+ iq = Iq(
+ typ='set',
+ queryNS=None,
+ attrs={},
+ frm=self._client
+ )
+
+ pubsub_node = Node(tag='pubsub', attrs={'xmlns':NS_PUBSUB})
+ publish_node = Node(tag='publish', attrs={'node':node})
+ item_node = Node(tag='item')
+ if event is not None:
+ item_node.addChild(node=event)
+ publish_node.addChild(node=item_node)
+ pubsub_node.addChild(node=publish_node)
+ iq.addChild(node=pubsub_node)
+
+ def getContents(msg):
+ node_publish = msg.getTag('pubsub').getTag('publish')
+ #XXX: Server implementation always returns the item id, but
XEP-60 does
+ # vim snot require it
+ return [node_publish['node'],node_publish.getTag('item')['id']]
+
+ return self._sendAndReceive(iq, getContents)
+
+
+
+
+ def subscribe(self, node, server=None, jid=None):
+ """
+ Subscribes to the selected node
+
+ @type node: string
+ @param node: id of the node to delete
+ @type server: string
+ @param server: PubSub server
+ @rtype: (string , list[string])
+ @return: A tuple with the type of answer ('ok','error') and
information
+ about the answer. In case of 'error', a list with the errors.
In case of
+ 'ok', an empty list.
+
+ """
+
+ if server is None:
+ server = self._server
+
+ if jid is None:
+ jid = self._client
+
+ iq = Iq(
+ typ='set',
+ queryNS=None,
+ attrs={},
+ frm=self._client,
+ to=server
+ )
+
+ pubsub_node = Node(tag='pubsub', attrs={'xmlns':NS_PUBSUB})
+ subscribe_node = Node(tag='subscribe',
attrs={'node':node, 'jid':jid})
+ pubsub_node.addChild(node=subscribe_node)
+ iq.addChild(node=pubsub_node)
+
+ return self._sendAndReceive(iq, lambda msg: [])
+
+ def unsubscribe(self, node, server=None, jid=None):
+ """
+ Unsubscribe from the selected node
+
+ @type node: string
+ @param node: id of the node to unsubscribe
+ @type server: string
+ @param server: PubSub server
+ @rtype: (string , list[string])
+ @return: A tuple with the type of answer ('ok','error') and
information
+ about the answer. In case of 'error', a list with the errors.
In case of
+ 'ok' an empty list.
+
+ """
+
+ if server is None:
+ server = self._server
+
+ if jid is None:
+ jid = self._client
+
+ iq = Iq(
+ typ='set',
+ queryNS=None,
+ attrs={},
+ frm=self._client,
+ to=server
+ )
+
+ pubsub_node = Node(tag='pubsub', attrs={'xmlns':NS_PUBSUB_OWNER})
+ unsubscribe_node = Node(tag='unsubscribe',
attrs={'node':node, 'jid':jid})
+ pubsub_node.addChild(node=unsubscribe_node)
+ iq.addChild(node=pubsub_node)
+ return self._sendAndReceive(iq, lambda msg: [])
+
+ def createNode(self, node, server=None, type='leaf', parent=None,
access=None):
+ """
+ Creates a node with the specified parameters.
+
+ @type node: string
+ @param node: The ID of the node to create
+ @type server: string
+ @param server: PubSub server
+ @type type: string
+ @param type: Type of the node: 'leaf' or 'collection'
+ @type parent: string
+ @param parent: id of the parent node. None if parent is root
+ @type access: string
+ @param acccess: Access model of the node
+ @rtype: (string , list[string])
+ @return: A tuple with the type of answer ('ok','error') and
information
+ about the answer. In case of 'error', a list with the errors.
In case of
+ 'ok' the name of the created node.
+ """
+ #TODO: Add suport for node configuration (RECOMMENDED in XEP-60)
+ if server is None:
+ server = self._server
+
+ iq = Iq(
+ typ='set',
+ queryNS=None,
+ attrs={},
+ frm=self._client,
+ to=server
+ )
+
+
+ pubsub_node = Node(tag='pubsub', attrs={'xmlns':NS_PUBSUB})
+ create_node = Node(tag='create', attrs={} if node is None else
{'node':node})
+
+ pubsub_node.addChild(node=create_node)
+ iq.addChild(node=pubsub_node)
+ if parent is not None or type=='collection' or access is not None:
+ field_nodes=[]
+ configure_node = Node(tag='configure')
+ field_nodes.append(DataField('FORM_TYPE',
NS_PUBSUB+'#node_config','hidden'))
+ if parent is not None:
+ field_nodes.append(DataField('pubsub#collection',parent))
+ # <field
var='pubsub#collection'><value>announcements</value></field>
+ if type == 'collection':
+
field_nodes.append(DataField('pubsub#node_type','collection'))
+ if access is not None:
+ field_nodes.append(DataField('pubsub#access_model',access))
+ x_node = DataForm(typ='submit',data=field_nodes)
+ configure_node.addChild(x_node)
+ pubsub_node.addChild(configure_node)
+
+ return self._sendAndReceive(iq, lambda
msg:[msg.getTag('pubsub').getTag('create')['node']])
+
+
+
+ def createInstantNode(self, server=None, type='leaf', parent=None,
access=None):
+ """
+ Creates an instant node without a name. The server will generate
id.
+ """
+
+ if server is None:
+ server = self._server
+
+ return createNode(self, None, server, type, parent, access)
+
+
+ def deleteNode(self, node, server=None):
+ """
+ Deletes the selected node.
+
+ @type node: string
+ @param node: id of the node to delete
+ @type server: string
+ @param server: PubSub server
+ @rtype: (string , list[string])
+ @return: A tuple with the type of answer ('ok','error') and
information
+ about the answer. In case of 'error', a list with the errors.
In case of
+ 'ok' an empty list.
+
+
+ """
+
+ #TODO: A method to redirect the subscriptions to the node to
another one COULD be implemented
+
+ if server is None:
+ server = self._server
+
+ iq = Iq(
+ typ='set',
+ queryNS=None,
+ attrs={},
+ frm=self._client,
+ to=server,
+ )
+
+ pubsub_node = Node(tag='pubsub', attrs={'xmlns':NS_PUBSUB_OWNER})
+ pubsub_node.addChild(name='delete', attrs={'node':node})
+ iq.addChild(node=pubsub_node)
+
+ return self._sendAndReceive(iq, lambda msg: [])
+
+
+
+
=======================================
--- /dev/null
+++ /trunk/xmppd/modules/pubsub.py Fri Jul 16 10:41:53 2010
@@ -0,0 +1,371 @@
+# -*- coding: UTF-8 -*-
+
+from pprint import pprint
+from uuid import uuid4
+from datetime import datetime
+
+from xmpp import *
+from xmpp.protocol import *
+
+class PSNode(object):
+ """
+ Publish-Subscribe node.
+
+ TODO: Items must retain info about its publisher. We have only
implemented
+ owners and subscribes, not publishers. So publisher is always
the onwer.
+ """
+
+ def __init__(self, id, owner, type='leaf', parent=None, children=[],
members={}, access_model='presence'):
+ """
+ Constructs a PSNode.
+
+ @type id: string
+ @param m: node id
+ """
+ self.id = id
+ self.owner = owner
+ self.type = type
+ self.parent = parent
+ self.children = children
+ self.members = members
+ self.access_model = access_model
+ self.item_ids = []
+ self.items_timestamp = {}
+ self.items = {}
+
+ def addItem(self, id, content):
+ try:
+ #TODO: Error if ID aready exists.
+ if id not in self.item_ids:
+ self.item_ids.append(id)
+ self.items[id] = content
+ self.items_timestamp[id] =
datetime.utcnow().isoformat().split('.')[0] + 'Z'
+ #print self.items_timestamp[id], self.items[id], id
+ except Exception,e:
+ print 'Exception in addItem'
+ print e
+
+ def __repr__(self):
+ return 'PSNode(%s, %s)' % (self.id, self.type)
+
+ def __str__(self):
+ return self.__repr__()
+
+class PubSubServer(PlugIn):
+
+ NS = NS_PUBSUB
+
+ def plugin(self,server):
+ self.name = self._owner.servernames[0]
+ self.nodes = {}
+
+ for ns in (NS_PUBSUB, NS_PUBSUB_ERRORS, NS_PUBSUB_EVENTS,
NS_PUBSUB_OWNER):
+ server.Dispatcher.RegisterHandler('iq', self.PubSubIqHandler,
typ='set', ns=ns, xmlns=NS_CLIENT)
+
+ def _getIqError(self, iq, name, specific=None):
+ if specific is None:
+ return Error(iq, NS_STANZAS + ' ' + name)
+ else:
+ error_node = ErrorNode(name)
+ error_node.addChild(name=specific,
attrs={'xmlns':NS_PUBSUB_ERRORS})
+ iq = iq.buildReply('error')
+ iq.addChild(node=error_node)
+ return iq
+
+ def _sendItem(self, node_id, item_id, frm=None):
+ #FIXME: items contents have incorrect xmlns
+ try:
+ node = self.nodes[node_id]
+
+ #print node.members
+
+ for jid in node.members.keys():
+ #print 'Enviando %s a %s' % (item_id,jid)
+ msg = Message(frm=self.name, to=jid)
+ msg.setID(str(uuid4())) #TODO: Do this automatically in
xmpp.protocol.Protocol
+ event_node = Node(tag='event',
attrs={'xmlns':NS_PUBSUB_EVENTS})
+ items_node = Node(tag='items', attrs={'node':node_id})
+ item_node = Node(tag='item',
attrs={'id':item_id, 'timestamp':node.items_timestamp[item_id]})
+ if frm is not None:
+ item_node['publisher'] = frm
+ item_node.addChild(node=node.items[item_id])
+ items_node.addChild(node=item_node)
+ event_node.addChild(node=items_node)
+ msg.addChild(node=event_node)
+ s = self._owner.getsession(jid)
+ s.send(msg)
+ except Exception,e:
+ print 'Exception in sendItem'
+ print e
+
+
+ #TODO: If we had a maximum, we should remove the first item here.
Doing a FIFO.
+
+
+ def PubSubIqHandler(self, session, stanza):
+ """
+ XXX: We do not validate. We just get what we want and that is
enough.
+ """
+
+ try:
+
+ self.DEBUG('PubSub Iq handler called','info')
+
+ #pprint(self._owner.DB.db)
+
+ #print stanza.__str__(fancy=True)
+ #if stanza.getType() == 'set':
+ pubsub_node = stanza.getTag('pubsub')
+
+ # If no pubsub node, this should have arrive here.
+ # TODO: Return an error to the user?
+ if pubsub_node is None:
+ self.DEBUG('Bad message: %s' % stanza, 'error')
+ raise NodeProcessed
+
+ # CREATE NODE
+ if pubsub_node.getTag('create') is not None:
+ create_node = pubsub_node.getTag('create')
+ node_id = create_node.getAttr('node')
+
+ #TODO: If no name, this is an instant node.
+
+ if False: #TODO: Registro
+
session.send(self._getIqError(stanza, 'registration-required'))
+ raise NodeProcessed
+
+ if False: #TODO: cuando tengamos privilegios
+ session.send(self._getIqError(stanza, 'forbidden'))
+ raise NodeProcessed
+
+ if node_id in self.nodes:
+ iq = self._getIqError(stanza, 'conflict')
+ session.send(iq)
+ raise NodeProcessed
+
+ if False: #TODO: Access model
+
session.send(self._getIqError(stanza, 'not-acceptable', 'unsupported-access-model'))
+ raise NodeProcessed
+
+ self.nodes[node_id] = PSNode(id=node_id,
owner=stanza.getFrom())
+
+ # Add node
+ #print self.nodes
+ self.DEBUG('Creating node: %s' % create_node, 'info')
+
+ iq = stanza.buildReply('result')
+ pubsub_node = Node(tag='pubsub', attrs={'xmlns':NS_PUBSUB})
+ pubsub_node.addChild(node=create_node)
+ iq.addChild(node=pubsub_node)
+
+ # SUCCESS
+ session.send(iq)
+
+ # DELETE NODE
+ elif pubsub_node.getTag('delete') is not None:
+
+ node_id = pubsub_node.getTag('delete').getAttr('node')
+
+ if node_id is None: #FIXME
+ self.DEBUG('Node is Non', 'error')
+
session.send(self._getIqError(stanza, 'item-not-found'))
+ raise NodeProcessed
+
+ if node_id not in self.nodes:
+ self.DEBUG('Node does not exists: %s' %
node_id, 'error')
+
session.send(self._getIqError(stanza, 'item-not-found'))
+ raise NodeProcessed
+
+ node = self.nodes[node_id]
+
+ if not node.owner.bareMatch(stanza.getFrom()):
+ session.send(self._getIqError(stanza, 'forbidden'))
+ raise NodeProcessed
+
+ # Delete node
+ # Keep in mind that associated items are (and must be)
deleted.
+ del self.nodes[node_id]
+
+ # SUCCESS
+ session.send(stanza.buildReply('result'))
+
+ # Notify no all subscribers
+ for jid in node.members.keys():
+ msg = Message(frm=self.name, to=jid)
+ msg.setID(str(uuid4())) #TODO: Do this automatically
in xmpp.protocol.Protocol
+ event_node = Node(tag='event',
attrs={'xmlns':NS_PUBSUB + '#event'})
+ event_node.addChild(name='delete',
attrs={'node':node_id})
+ msg.addChild(node=event_node)
+ s = self._owner.getsession(jid)
+ s.send(msg)
+
+
+ # SUBSCRIBE
+ elif pubsub_node.getTag('subscribe') is not None:
+
+ #TODO: We do not do multiple subscribe.
+ # So be careful if we duplicate subscriptions.
+
+ subscribe_node = pubsub_node.getTag('subscribe')
+ node_id = subscribe_node.getAttr('node')
+
+ jid = JID(subscribe_node.getAttr('jid'))
+
+ #print 'nodes: ', self.nodes
+ if node_id is None or jid is None: #TODO: Que enviar
aquí?, además, jid no es None, peta antes
+ self.DEBUG('No node id or jid in subscribe
message.', 'error')
+ raise NodeProcessed
+
+ if node_id not in self.nodes:
+ self.DEBUG('Node does not exists: %s' %
node_id, 'error')
+
session.send(self._getIqError(stanza, 'item-not-found'))
+ raise NodeProcessed
+
+ if not stanza.getFrom().bareMatch(jid): # Trying to
subscribe a JID different from the real one
+
session.send(self._getIqError(stanza, 'bad-request', 'invalid-jid'))
+ raise NodeProcessed
+
+ node = self.nodes[node_id]
+ access_model = node.access_model
+ #print 'access_model: ', 'presence'
+
+ if access_model == 'presence':
+ owner_roster =
self._owner.DB.db[self.name][node.owner.getNode()]['roster']
+ if jid in owner_roster:
+ if owner_roster[jid.getStripped()]['subscription']
not in ('from', 'both'):
+ if owner_roster[jid.getStripped()]['status']
== 'pending_in': #XXX: pending_out too?
+
session.send(self._getIqError(stanza, 'not-authorized', 'pending-subscription'))
+ raise NodeProcessed
+ else:
+
session.send(self._getIqError(stanza, 'not-authorized', 'presence-subscription-required'))
+ raise NodeProcessed
+ else:
+
session.send(self._getIqError(stanza, 'not-authorized', 'presence-subscription-required'))
+ raise NodeProcessed
+
+ if False: #TODO: Roster access_model
+ #TODO: WE DO NOT IMPLEMENT ROSTER ACCESS MODEL EVEN IF
IT IS REQUIRED BY XEP-163
+ pass
+
+ if False: #TODO: Whitelist access_model
+ #TODO: WE DO NOT IMPLEMENT WHITELIST ACCESS MODEL EVEN
IF IT IS REQUIRED BY XEP-163
+ pass
+
+ if False: #TODO: Do not allow subscription from blocked
people, even with 'open' access model
+ #TODO: WE DO NOT IMPLEMENT BLOCKING
+ pass
+
+ if False: #TODO: Establish a max_subscription threshold.
+ #TODO: Let's people of the future care about security.
+ pass
+
+ # SUCESS
+
+
+ # Add new member to node, with its subid (an UUID)
+ node.members[jid] = str(uuid4())
+
+ #TODO: Send last published item (as required by XEP-163 by
default
+ iq = stanza.buildReply('result')
+ pubsub_node = Node(tag='pubsub', attrs={'xmlns':
NS_PUBSUB})
+ pubsub_node.addChild(name='subscription', attrs={
+ 'node':node_id,
+ 'jid': jid,
+ 'subid': node.members[jid],
+ 'subscription': 'subscribed'
+ })
+ iq.addChild(node=pubsub_node)
+ session.send(iq)
+
+
+ # UNSUBSCRIBE
+ elif pubsub_node.getTag('unsubscribe') is not None:
+ unsubscribe_node = pubsub_node.getTag('unsubscribe')
+ node_id = unsubscribe_node.getAttr('node')
+ jid = unsubscribe_node.getAttr('jid')
+
+ if node_id is None or jid is None: #TODO: Que enviar aquí?
+ self.DEBUG('No node id or jid in subscribe
message.', 'error')
+ raise NodeProcessed
+
+ if node_id not in self.nodes:
+ self.DEBUG('Node does not exists: %s' %
node_id, 'error')
+
session.send(self._getIqError(stanza, 'item-not-found'))
+ raise NodeProcessed
+
+
+ if not stanza.getFrom().bareMatch(JID(jid)): # Trying to
subscribe a JID different from the real one
+
session.send(self._getIqError(stanza, 'bad-request', 'invalid-jid'))
+ raise NodeProcessed
+
+ node = self.nodes[node_id]
+
+ if jid not in node.members:
+ #XXX: Owner is not a member, so he would trigger this
error too if he is stupid.
+
session.send(self._getIqError(stanza, 'unexpected-request', 'not-subscribed'))
+ raise NodeProcessed
+
+ del node.members[jid]
+
+ # SUCESS
+ session.send(stanza.buildReply('result'))
+
+
+ # PUBLISH ITEM
+ elif pubsub_node.getTag('publish') is not None:
+ publish_node = pubsub_node.getTag('publish')
+ node_id = publish_node['node']
+
+ if node_id is None: #TODO: Que enviar aquí?
+ self.DEBUG('No node id or jid in subscribe
message.', 'error')
+ raise NodeProcessed
+
+ item_node = publish_node.getTag('item')
+
+ if item_node is None: #TODO: Que enviar aquí?
+ self.DEBUG('No item', 'error')
+ raise NodeProcessed
+
+ if node_id not in self.nodes:
+ # XEP-60 defines auto-create, XEP-163 enforces it.
+ self.nodes[node_id] = PSNode(id=node_id,
owner=stanza.getFrom())
+
+ node = self.nodes[node_id]
+
+ # TODO: Change once we implement additional publishers
(not only owner).
+ if not node.owner.bareMatch(stanza.getFrom()):
+ session.send(self._getIqError(stanza, 'forbidden'))
+ raise NodeProcessed
+
+ item_id = item_node['id']
+ if item_id is None:
+ item_id = str(uuid4())
+
+
+
+ #print item_node.getChildren()
+ self.nodes[node_id].addItem(item_id,
item_node.getChildren()[0])
+
+ # SUCESS
+
+ iq = stanza.buildReply('result')
+ pubsub_node = Node(tag='pubsub', attrs={'xmlns':NS_PUBSUB})
+ publish_node = Node(tag='publish', attrs={'node':node_id})
+ publish_node.addChild(name='item', attrs={'id':item_id})
+ pubsub_node.addChild(node=publish_node)
+ iq.addChild(node=pubsub_node)
+ session.send(iq)
+
+ # Send item to all subscriptors.
+ self._sendItem(node_id, item_id, stanza.getFrom())
+
+
+ # NON-IMPLEMENTED ACTION
+ else:
+ self.DEBUG('Not implemented: %s' % create_node, 'info')
+
+ raise NodeProcessed
+ except NodeProcessed:
+ raise NodeProcessed
+ except Exception,e:
+ print e
=======================================
--- /trunk/spade.sh Wed Jun 21 11:10:23 2006
+++ /dev/null
@@ -1,3 +0,0 @@
-#! /bin/sh
-
-/usr/bin/runspade.py "$@"
=======================================
--- /trunk/examples/unittests/unittests.py Sun Jul 11 16:13:46 2010
+++ /trunk/examples/unittests/unittests.py Fri Jul 16 10:41:53 2010
@@ -7,4 +7,5 @@
from aidTestCase import *
from dadTestCase import *
from eventbehavTestCase import *
+from pubsubTestCase import *
unittest.main()
=======================================
--- /trunk/spade/Agent.py Wed Jul 14 06:10:54 2010
+++ /trunk/spade/Agent.py Fri Jul 16 10:41:53 2010
@@ -27,6 +27,7 @@
import peer2peer as P2P
import socialnetwork
import RPC
+import pubsub

import mutex
import types
@@ -84,6 +85,7 @@
self._aid = AID.aid(name=agentjid, addresses=["xmpp://"+agentjid])
self._jabber = None
self._serverplatform = serverplatform
+ self.server = serverplatform
self._defaultbehaviour = None
self._behaviourList = dict()
self._alive = True
@@ -116,6 +118,8 @@
self._socialnetwork = {}
self._subscribeHandler = lambda frm,typ,stat,show: False
self._unsubscribeHandler = lambda frm,typ,stat,show: False
+
+ self._pubsub = pubsub.PubSub(self)

self._waitingForRoster = False # Indicates that a request for the
roster is in progress

@@ -1510,6 +1514,22 @@
else:
self.runBehaviourOnce(b,t)
return b.result
+
+
+
+ ####################
+ #PubSub services
+ ####################
+ def publishEvent(self, name, event):
+ return self._pubsub.publish(name,event)
+ def subscribeToEvent(self, name, server=None,jid=None):
+ return self._pubsub.subscribe(name,server,jid)
+ def unsubscribeFromEvent(self, name,server=None,jid=None):
+ return self._pubsub.unsubscribe(name,server,jid)
+ def createEvent(self, name, server=None, type='leaf', parent=None,
access=None):
+ return self._pubsub.createNode(name, server=None, type='leaf',
parent=None, access=None)
+ def deleteEvent(self, name, server=None):
+ return self._pubsub.deleteNode(name, server=None)

##################################

=======================================
--- /trunk/spade/socialnetwork.py Tue Jul 6 02:38:25 2010
+++ /trunk/spade/socialnetwork.py Fri Jul 16 10:41:53 2010
@@ -80,3 +80,7 @@

def getPresence(self):
return self._presence
+
+ def subscribe(self):
+ self.myAgent.jabber.Roster.Subscribe(self._jid)
+
=======================================
--- /trunk/xmpp/protocol.py Wed Jul 14 06:10:54 2010
+++ /trunk/xmpp/protocol.py Fri Jul 16 10:41:53 2010
@@ -65,7 +65,10 @@
NS_PRESENCE ='presence' #
Jabberd2
NS_PRIVACY ='jabber:iq:privacy'
NS_PRIVATE ='jabber:iq:private'
-NS_PUBSUB ='http://jabber.org/protocol/pubsub' #
JEP-0060
+NS_PUBSUB ='http://jabber.org/protocol/pubsub' #
XEP-0060
+NS_PUBSUB_ERRORS=NS_PUBSUB+'#errors'
+NS_PUBSUB_EVENTS=NS_PUBSUB+'#events'
+NS_PUBSUB_OWNER =NS_PUBSUB+'#owner'
NS_REGISTER ='jabber:iq:register'
NS_ROSTER ='jabber:iq:roster'
NS_ROSTERX ='http://jabber.org/protocol/rosterx' #
JEP-0144
@@ -286,7 +289,7 @@
frn - from attribure, attrs - other attributes mapping,
payload - same meaning as for simplexml payload definition
timestamp - the time value that needs to be stamped over stanza
xmlns - namespace of top stanza node
- node - parsed or unparsed stana to be taken as prototype.
+ node - parsed or unparsed stanza to be taken as prototype.
"""
if not attrs: attrs={}
if to: attrs['to']=to
=======================================
--- /trunk/xmppd/modules/__init__.py Mon Jun 11 09:03:12 2007
+++ /trunk/xmppd/modules/__init__.py Fri Jul 16 10:41:53 2010
@@ -40,4 +40,6 @@
muc.MUC,
wq.WQ,
#webadmin.WebAdmin
+
+ pubsub.PubSubServer,
]
=======================================
--- /trunk/xmppd/modules/db_fake.py Fri Jun 1 07:51:02 2007
+++ /trunk/xmppd/modules/db_fake.py Fri Jul 16 10:41:53 2010
@@ -383,6 +383,13 @@
self.DEBUG('load_database: Could not load user
database', 'error')
return False

+ def __str__(self):
+ return str(db)
+
+ @property
+ def db(self):
+ return db
+
def print_database(self):
- print db
-
+ print str(self)
+
Reply all
Reply to author
Forward
0 new messages