[ibpy] r345 committed - Adds pre- and post- send messages. Factors dispatcher out of Sender c...

8 views
Skip to first unread message

ib...@googlecode.com

unread,
Dec 15, 2009, 3:13:59 AM12/15/09
to ibpy-...@googlegroups.com
Revision: 345
Author: troy.melhase
Date: Tue Dec 15 00:13:41 2009
Log: Adds pre- and post- send messages. Factors dispatcher out of Sender
class to support.
http://code.google.com/p/ibpy/source/detail?r=345

Added:
/trunk/ib/opt/dispatcher.py
Modified:
/trunk/demo/filters
/trunk/ib/lib/__init__.py
/trunk/ib/opt/connection.py
/trunk/ib/opt/message.py
/trunk/ib/opt/receiver.py
/trunk/ib/opt/sender.py

=======================================
--- /dev/null
+++ /trunk/ib/opt/dispatcher.py Tue Dec 15 00:13:41 2009
@@ -0,0 +1,134 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+##
+# Defines Dispatcher class to send messages to registered listeners.
+#
+##
+from Queue import Queue, Empty
+
+from ib.lib.logger import logger
+from ib.opt.message import registry
+
+
+class Dispatcher(object):
+ """
+
+ """
+ def __init__(self, listeners=None, types=None):
+ """ Initializer.
+
+ @param listeners=None mapping of existing listeners
+ @param types=None method name to message type lookup
+ """
+ self.listeners = listeners if listeners else {}
+ self.types = types if types else registry
+ self.logger = logger()
+
+ def __call__(self, name, mapping):
+ """ Send message to each listener.
+
+ @param name method name
+ @param mapping values for message instance
+ @return None
+ """
+ try:
+ messagetype = self.types[name]
+ listeners = self.listeners[self.key(messagetype)]
+ except (KeyError, ):
+ pass
+ else:
+ message = messagetype(**mapping)
+ for listener in listeners:
+ try:
+ listener(message)
+ except (Exception, ):
+ self.unregister(listener, messagetype)
+ errmsg = ("Exception in message dispatch. "
+ "Handler '%s' unregistered for '%s'")
+ self.logger.exception(errmsg, self.key(listener), name)
+
+ def iterator(self, *types):
+ """ Create and return a function for iterating over messages.
+
+ @param *types zero or more message types to associate with listener
+ @return function that yields messages
+ """
+ queue = Queue()
+ closed = []
+ def messageGenerator(block=True, timeout=0.1):
+ while True:
+ try:
+ yield queue.get(block=block, timeout=timeout)
+ except (Empty, ):
+ if closed:
+ break
+ self.register(closed.append, 'ConnectionClosed')
+ if types:
+ self.register(queue.put, *types)
+ else:
+ self.registerAll(queue.put)
+ return messageGenerator
+
+ def register(self, listener, *types):
+ """ Associate listener with message types created by this
Dispatcher.
+
+ @param listener callable to receive messages
+ @param *types zero or more message types to associate with listener
+ @return True if associated with one or more handler; otherwise
False
+ """
+ count = 0
+ for messagetype in types:
+ key = self.key(messagetype)
+ listeners = self.listeners.setdefault(key, [])
+ if listener not in listeners:
+ listeners.append(listener)
+ count += 1
+ return count > 0
+
+ def registerAll(self, listener):
+ """ Associate listener with all messages created by this
Dispatcher.
+
+ @param listener callable to receive messages
+ @return True if associated with one or more handler; otherwise
False
+ """
+ return self.register(listener, *self.types.values())
+
+ def unregister(self, listener, *types):
+ """ Disassociate listener with message types created by this
Dispatcher.
+
+ @param listener callable to no longer receive messages
+ @param *types zero or more message types to disassociate with
listener
+ @return True if disassociated with one or more handler; otherwise
False
+ """
+ count = 0
+ for messagetype in types:
+ try:
+ listeners = self.listeners[self.key(messagetype)]
+ except (KeyError, ):
+ pass
+ else:
+ if listener in listeners:
+ listeners.remove(listener)
+ count += 1
+ return count > 0
+
+ def unregisterAll(self, listener):
+ """ Disassociate listener with all messages created by this
Dispatcher.
+
+ @param listener callable to no longer receive messages
+ @return True if disassociated with one or more handler; otherwise
False
+ """
+ return self.unregister(listener, *self.types.values())
+
+ @staticmethod
+ def key(obj):
+ """ Generates lookup key for given object.
+
+ @param obj any object
+ @return obj name or string representation
+ """
+ try:
+ return obj.__name__
+ except (AttributeError, ):
+ return str(obj)
=======================================
--- /trunk/demo/filters Sat Dec 5 14:18:18 2009
+++ /trunk/demo/filters Tue Dec 15 00:13:41 2009
@@ -31,10 +31,20 @@
cash_handler = messagetools.messageFilter(my_account_handler, lambda
m:m.key.lower().count('cash'))


+# try out the new before and after send messages
+def pre_req_account_updates(msg):
+ print 'pre account updates: ', msg
+
+def post_req_account_updates(msg):
+ print 'post account updates: ', msg
+
+
if __name__ == '__main__':
con = ibConnection()
con.register(cash_handler, 'UpdateAccountValue')
con.register(tick_handler, message.TickSize, message.TickPrice)
+ con.register(pre_req_account_updates, 'ReqAccountUpdatesBefore')
+ con.register(post_req_account_updates, 'ReqAccountUpdatesAfter')
con.connect()

def inner():
=======================================
--- /trunk/ib/lib/__init__.py Sun Aug 2 03:17:47 2009
+++ /trunk/ib/lib/__init__.py Tue Dec 15 00:13:41 2009
@@ -11,6 +11,7 @@
##

import copy
+import functools
import socket
import struct
import sys
@@ -37,6 +38,7 @@
@return decorator that provides automatic locking
"""
def wrapper(func):
+ @functools.wraps(func)
def inner(*args, **kwds):
lock.acquire()
try:
=======================================
--- /trunk/ib/opt/connection.py Wed May 14 10:31:36 2008
+++ /trunk/ib/opt/connection.py Tue Dec 15 00:13:41 2009
@@ -18,6 +18,7 @@
#
##
from ib.lib.logger import logger
+from ib.opt.dispatcher import Dispatcher
from ib.opt.receiver import Receiver
from ib.opt.sender import Sender

@@ -26,7 +27,7 @@
""" Encapsulates a connection to TWS.

"""
- def __init__(self, host, port, clientId, receiver, sender):
+ def __init__(self, host, port, clientId, receiver, sender, dispatcher):
""" Constructor.

@param host name of host for connection; default is localhost
@@ -38,19 +39,23 @@
self.clientId = clientId
self.receiver = receiver
self.sender = sender
+ self.dispatcher = dispatcher

def __getattr__(self, name):
""" x.__getattr__('name') <==> x.name

- @return named attribute from instance receiver or sender
+ @return attribute of instance dispatcher, receiver, or sender
"""
- try:
- return getattr(self.receiver, name)
- except (AttributeError, ):
- try:
- return getattr(self.sender, name)
- except (AttributeError, ):
- pass
+ try:
+ return getattr(self.dispatcher, name)
+ except (AttributeError, ):
+ try:
+ return getattr(self.receiver, name)
+ except (AttributeError, ):
+ try:
+ return getattr(self.sender, name)
+ except (AttributeError, ):
+ pass
raise AttributeError(name)

def connect(self):
@@ -76,9 +81,9 @@
"""
if enable:
self.logger = logger()
- self.receiver.registerAll(self.logMessage)
+ self.registerAll(self.logMessage)
else:
- self.receiver.unregisterAll(self.logMessage)
+ self.unregisterAll(self.logMessage)
return enable

def logMessage(self, message):
@@ -92,7 +97,7 @@

@classmethod
def create(cls, host='localhost', port=7496, clientId=0, receiver=None,
- sender=None):
+ sender=None, dispatcher=None):
""" Creates and returns Connection class (or subclass) instance.

@param host name of host for connection; default is localhost
@@ -100,6 +105,7 @@
@param clientId client identifier to send when connected
@return Connection (or subclass) instance
"""
- receiver = Receiver() if receiver is None else receiver
- sender = Sender() if sender is None else sender
- return cls(host, port, clientId, receiver, sender)
+ dispatcher = Dispatcher() if dispatcher is None else dispatcher
+ receiver = Receiver(dispatcher) if receiver is None else receiver
+ sender = Sender(dispatcher) if sender is None else sender
+ return cls(host, port, clientId, receiver, sender, dispatcher)
=======================================
--- /trunk/ib/opt/message.py Thu Dec 3 16:20:25 2009
+++ /trunk/ib/opt/message.py Tue Dec 15 00:13:41 2009
@@ -9,11 +9,46 @@
# that the Receiver class then uses to determine message types.
##

-from functools import partial
-from inspect import getargspec
-from types import MethodType
-
+from ast import NodeVisitor, parse
+from inspect import getsourcefile
+from re import match
+
+from ib.ext.AnyWrapper import AnyWrapper
from ib.ext.EWrapper import EWrapper
+from ib.ext.EClientSocket import EClientSocket
+
+
+class SignatureAccumulator(NodeVisitor):
+ def __init__(self):
+ NodeVisitor.__init__(self)
+ self.signatures = []
+
+ def visit_FunctionDef(self, node):
+ args = [arg.id for arg in node.args.args]
+ self.signatures.append((node.name, args[1:]))
+
+ def getSignatures(self):
+ for filename in self.filenames:
+ self.visit(parse(open(filename).read()))
+ return self.filterSignatures()
+
+
+class EClientSocketAccumulator(SignatureAccumulator):
+ filenames = (getsourcefile(EClientSocket), )
+
+ def filterSignatures(self):
+ for name, args in self.signatures:
+ if match('(?i)req|cancel|place', name):
+ yield (name, args)
+
+
+class EWrapperAccumulator(SignatureAccumulator):
+ filenames = (getsourcefile(AnyWrapper), getsourcefile(EWrapper), )
+
+ def filterSignatures(self):
+ for name, args in self.signatures:
+ if match('(?!((?i)error.*))', name):
+ yield (name, args)


##
@@ -36,6 +71,7 @@
@param namespace dictionary with namespace of new type
"""
setattr(cls, 'typeName', name)
+ return
try:
registry[namespace['__assoc__']] = cls
except (KeyError, ):
@@ -43,7 +79,7 @@


class Message(object):
- """ Base class of all Message types.
+ """ Base class for Message types.

"""
__metaclass__ = MessageType
@@ -104,55 +140,39 @@
__slots__ = ['id', 'errorCode', 'errorMsg']


-def isWrapperMethod(name, value):
- """ Predicate for wrapper methods.
-
- @param name name of class attribute as string
- @param value value of class attribute; any object
- @return True if wrapper method
- """
- return (not name.startswith('_') and
- not name.startswith('error') and
- isinstance(value, MethodType))
-
-
-def selectWrapperMethods(cls):
- """ Wrapper methods of a class.
-
- @param cls class object to inspect
- @return list of two-tuples, each (name, argnames)
- """
- clsitems = [(name, getattr(cls, name)) for name in dir(cls)]
- clsitems = [(name, method) for name, method in clsitems
- if isWrapperMethod(name, method)]
- def argns(meth):
- args, varargs, varkw, defaults = getargspec(meth)
- return args[1:] # without leading 'self'
- return [(name, argns(method)) for name, method in clsitems]
-
-
-def buildMessageTypes(wrapper, mapping, *bases):
+def buildMessageTypes(seq, mapping, suffixes=('', ), bases=(Message, )):
""" Construct message types and add to given mapping.

- @param wrapper class object to inspect for methods
+ @param seq pairs of method (name, arguments)
@param mapping dictionary for adding new message types
@param bases sequence of base classes for message types
@return None
"""
- for name, args in selectWrapperMethods(wrapper):
- typename = name[0].upper() + name[1:]
- typens = {'__slots__':args, '__assoc__':name}
- mapping[typename] = type(typename, bases, typens)
+ for name, args in seq:
+ for suffix in suffixes:
+ typename = name[0].upper() + name[1:] + suffix
+ methname = name + suffix
+ typens = {'__slots__':args, '__assoc__':name}
+ mapping[typename] = msgtype = type(typename, bases, typens)
+ registry[methname] = msgtype
+
+
+##
+# Sequences for accessing the same values we use to create
+# Message types.
+wrapperMethods = list(EWrapperAccumulator().getSignatures())
+clientSocketMethods = list(EClientSocketAccumulator().getSignatures())


# create message types in the module namespace from the EWrapper
# abstract class
-buildMessageTypes(EWrapper, globals(), Message)
-
-##
-# A (partial) method so other modules can use the same mappings we
-# have.
-wrapperMethods = partial(selectWrapperMethods, EWrapper)
+buildMessageTypes(wrapperMethods, globals())
+
+## create message types in the module namespace from the EClientSocket
+# concrete class
+buildMessageTypes(clientSocketMethods, globals(),
+ suffixes=('Before', 'After'))
+


def messageTypeNames():
@@ -161,3 +181,8 @@
@return set of all message type names as strings
"""
return set([t.typeName for t in registry.values()])
+
+
+del(AnyWrapper)
+del(EWrapper)
+del(EClientSocket)
=======================================
--- /trunk/ib/opt/receiver.py Thu Dec 3 16:27:51 2009
+++ /trunk/ib/opt/receiver.py Tue Dec 15 00:13:41 2009
@@ -11,11 +11,8 @@
# ib.opt.message module more information.
#
##
-from Queue import Queue, Empty
-
from ib.lib.overloading import overloaded
-from ib.lib.logger import logger
-from ib.opt.message import registry, wrapperMethods
+from ib.opt.message import wrapperMethods


def messageMethod(name, argnames):
@@ -27,7 +24,7 @@
"""
def inner(self, *args):
params = dict(zip(argnames, args))
- self.dispatch(name, params)
+ self.dispatcher(name, params)
inner.__name__ = name
return inner

@@ -46,7 +43,7 @@
@param namespace dictionary with namespace for new type
@return generated type
"""
- for methodname, methodargs in wrapperMethods():
+ for methodname, methodargs in wrapperMethods:
namespace[methodname] = messageMethod(methodname, methodargs)
return type(name, bases, namespace)

@@ -59,123 +56,12 @@
"""
__metaclass__ = ReceiverType

- def __init__(self, listeners=None, types=None):
+ def __init__(self, dispatcher):
""" Initializer.

- @param listeners=None mapping of existing listeners
- @param types=None method name to message type lookup
+ @param dispatcher message dispatcher instance
"""
- self.listeners = listeners if listeners else {}
- self.types = types if types else registry
- self.logger = logger()
-
- def dispatch(self, name, mapping):
- """ Send message to each listener.
-
- @param name method name
- @param mapping values for message instance
- @return None
- """
- try:
- messagetype = self.types[name]
- listeners = self.listeners[self.key(messagetype)]
- except (KeyError, ):
- pass
- else:
- message = messagetype(**mapping)
- for listener in listeners:
- try:
- listener(message)
- except (Exception, ):
- self.unregister(listener, messagetype)
- errmsg = ("Exception in message dispatch. "
- "Handler '%s' unregistered for '%s'")
- self.logger.exception(errmsg, self.key(listener), name)
-
- def iterator(self, *types):
- """ Create and return a function for iterating over messages.
-
- @param *types zero or more message types to associate with listener
- @return function that yields messages
- """
- queue = Queue()
- closed = []
- def messageGenerator(block=True, timeout=0.1):
- while True:
- try:
- yield queue.get(block=block, timeout=timeout)
- except (Empty, ):
- if closed:
- break
- self.register(closed.append, 'ConnectionClosed')
- if types:
- self.register(queue.put, *types)
- else:
- self.registerAll(queue.put)
- return messageGenerator
-
- def register(self, listener, *types):
- """ Associate listener with message types created by this Receiver.
-
- @param listener callable to receive messages
- @param *types zero or more message types to associate with listener
- @return True if associated with one or more handler; otherwise
False
- """
- count = 0
- for messagetype in types:
- key = self.key(messagetype)
- listeners = self.listeners.setdefault(key, [])
- if listener not in listeners:
- listeners.append(listener)
- count += 1
- return count > 0
-
- def registerAll(self, listener):
- """ Associate listener with all messages created by this Receiver.
-
- @param listener callable to receive messages
- @return True if associated with one or more handler; otherwise
False
- """
- return self.register(listener, *self.types.values())
-
- def unregister(self, listener, *types):
- """ Disassociate listener with message types created by this
Receiver.
-
- @param listener callable to no longer receive messages
- @param *types zero or more message types to disassociate with
listener
- @return True if disassociated with one or more handler; otherwise
False
- """
- count = 0
- for messagetype in types:
- try:
- listeners = self.listeners[self.key(messagetype)]
- except (KeyError, ):
- pass
- else:
- if listener in listeners:
- listeners.remove(listener)
- count += 1
- return count > 0
-
- def unregisterAll(self, listener):
- """ Disassociate listener with all messages created by this
Receiver.
-
- @param listener callable to no longer receive messages
- @return True if disassociated with one or more handler; otherwise
False
- """
- return self.unregister(listener, *self.types.values())
-
- @staticmethod
- def key(obj):
- """ Generates lookup key for given object.
-
- @param obj any object
- @return obj name or string representation
- """
- try:
- return obj.__name__
- except (AttributeError, ):
- return str(obj)
+ self.dispatcher = dispatcher

@overloaded
def error(self, e):
@@ -188,7 +74,7 @@
@param e some error value
@return None
"""
- self.dispatch('error', dict(errorMsg=e))
+ self.dispatcher('error', dict(errorMsg=e))

@error.register(object, str)
def error_0(self, strval):
@@ -197,7 +83,7 @@
@param strval some error value as string
@return None
"""
- self.dispatch('error', dict(errorMsg=strval))
+ self.dispatcher('error', dict(errorMsg=strval))

@error.register(object, int, int, str)
def error_1(self, id, errorCode, errorMsg):
@@ -209,4 +95,4 @@
@return None
"""
params = dict(id=id, errorCode=errorCode, errorMsg=errorMsg)
- self.dispatch('error', params)
+ self.dispatcher('error', params)
=======================================
--- /trunk/ib/opt/sender.py Sun Aug 2 03:17:47 2009
+++ /trunk/ib/opt/sender.py Tue Dec 15 00:13:41 2009
@@ -9,6 +9,7 @@
#
##
from ib.ext.EClientSocket import EClientSocket
+from ib.opt.message import registry, clientSocketMethods


class Sender(object):
@@ -18,6 +19,14 @@
"""
client = None

+ def __init__(self, dispatcher):
+ """ Initializer.
+
+ @param dispatcher message dispatcher instance
+ """
+ self.dispatcher = dispatcher
+ self.clientMethodNames = [m[0] for m in clientSocketMethods]
+
def connect(self, host, port, clientId, handler,
clientType=EClientSocket):
""" Creates a TWS client socket and connects it.

@@ -51,4 +60,16 @@

@return named attribute from EClientSocket object
"""
- return getattr(self.client, name)
+ try:
+ value = getattr(self.client, name)
+ except (AttributeError, ):
+ raise
+ if name in self.clientMethodNames:
+ before, after = registry[name+'Before'], registry[name+'After']
+ def wrapperMethod(*args):
+ self.dispatcher(name+'Before', dict(zip(before.__slots__, args)))
+ result = value(*args)
+ self.dispatcher(name+'After', dict(zip(after.__slots__, args)))
+ return result
+ return wrapperMethod
+ return value
Reply all
Reply to author
Forward
0 new messages