Peter
unread,Jun 6, 2012, 8:24:40 PM6/6/12Sign in to reply to author
Sign in to forward
You do not have permission to delete messages in this group
Sign in to report message
Either email addresses are anonymous for this group or you need the view member email addresses permission to view the original message
to pyRserve
Dear All,
Thanks for a great module. I made minor extensions to support session
detach and reconnect functionality that exists in Rserve.
Below are the patches files for the changes I did. It works just
fine, but I admit this is my first time working with python, so please
go over them before incorporating.
Cheers,
-peter.
--- pyRserve-0.5.2/pyRserve/rconn.py 2011-11-22 10:14:34.000000000
-0800
+++ rconn.py 2012-06-06 14:56:45.240177768 -0700
@@ -2,21 +2,34 @@
###
import rtypes
from rexceptions import RConnectionRefused, REvalError,
PyRserveClosed
-from rserializer import rEval, rAssign
-from rparser import rparse
+from rserializer import rEval, rAssign, rDetach, rWriteSessionKey
+from rparser import rparse, key_parse
RSERVEPORT = 6311
DEBUG = False
+class SessionInfo(list):
+ def __init__(self,host,port,key):
+ list.__init__(self,[host,port,key])
+ self.host=host
+ self.port=port
+ self.key=key
+
-def connect(host='', port=RSERVEPORT, atomicArray=False):
+def connect(host='', port=RSERVEPORT,
atomicArray=False,sessionInfo=None):
"""Open a connection to a Rserve instance"""
# if host in (None, ''):
# # On Win32 it seems that passing an empty string as
'localhost' does not work
# # So just to be sure provide the full local hostname if None
or '' were passed.
# host = socket.gethostname()
assert port is not None, 'port number must be given'
- return RConnector(host, port, atomicArray)
+ if sessionInfo:
+ port=sessionInfo.port
+ host=sessionInfo.host
+ sessionKey=sessionInfo.key
+ else:
+ sessionKey=None;
+ return RConnector(host, port, atomicArray,sessionKey)
def rconnect(host='', port=RSERVEPORT):
@@ -36,11 +49,11 @@
class RConnector(object):
'@brief Provides a network connector to an Rserve process'
- def __init__(self, host, port, atomicArray):
+ def __init__(self, host, port, atomicArray,sessionKey=None):
self.host = host
self.port = port
self.atomicArray = atomicArray
- self.connect()
+ self.connect(sessionKey)
self.r = RNameSpace(self)
self.ref = RNameSpaceReference(self)
@@ -52,19 +65,27 @@
def isClosed(self):
return self.__closed
- def connect(self):
+ def connect(self,sessionKey=None):
self.sock = socket.socket()
try:
self.sock.connect((self.host, self.port))
except socket.error:
raise RConnectionRefused('Connection denied, server not
reachable or not accepting connections')
- time.sleep(0.2)
- hdr = self.sock.recv(1024)
+
+ # write out sesion key if one was provided, don't wait back
for a response
+ if sessionKey:
+ rWriteSessionKey(sessionKey,fp=self.sock)
+ rparse(self.sock, atomicArray=self.atomicArray)
+ else:
+ time.sleep(0.2)
+ hdr = self.sock.recv(1024)
+ if DEBUG:
+ print 'received hdr %s from rserve' % hdr
+ assert hdr.startswith('Rsrv01') # make sure we are
really connected with rserv
+ # TODO: possibly also do version checking here to make
sure we understand the protocol...
+
self.__closed = False
- if DEBUG:
- print 'received hdr %s from rserve' % hdr
- assert hdr.startswith('Rsrv01') # make sure we are really
connected with rserv
- # TODO: possibly also do version checking here to make sure
we understand the protocol...
+
@checkIfClosed
def close(self):
@@ -106,9 +127,12 @@
'@brief Receive the result from a previous call to rserve.'
raw = self.sock.recv(rtypes.SOCKET_BLOCK_SIZE)
d = [raw]
+ print "len(raw)= %d" % len(raw)
while len(raw) == rtypes.SOCKET_BLOCK_SIZE:
raw = self.sock.recv(rtypes.SOCKET_BLOCK_SIZE)
+ print "len(raw)= %d" % len(raw)
d.append(raw)
+ print "len(d)= %d" % len(d)
return ''.join(d)
# @checkIfClosed
@@ -125,6 +149,15 @@
rparse(self.sock, atomicArray=self.atomicArray)
@checkIfClosed
+ def detach(self):
+ '@brief Detach current session, returning back a serializable
object'
+ rDetach(self.sock)
+ # Rserv sends an emtpy confirmation message, or error message
in case of an error.
+ # rparse() will raise an Exception in the latter case.
+ info=key_parse(self.sock, atomicArray=self.atomicArray)
+ return SessionInfo(self.host,info[0],info[1])
+
+ @checkIfClosed
def getRexp(self, name):
'@brief Retrieve a Rexp stored in a variable called "name"'
return self.eval(name)
--- pyRserve-0.5.2/pyRserve/rserializer.py 2011-09-20
03:09:10.000000000 -0700
+++ rserializer.py 2012-06-06 14:50:44.868171004 -0700
@@ -39,7 +39,7 @@
serializeMap = {}
fmap = FunctionMapper(serializeMap)
#
- def __init__(self, commandType, fp=None):
+ def __init__(self, commandType=None, fp=None):
if isinstance(fp, socket._socketobject):
self._orig_fp = fp.makefile()
self._fp = cStringIO.StringIO()
@@ -49,7 +49,8 @@
else:
self._fp = self._orig_fp = fp
self._dataSize = 0
- self._writeHeader(commandType)
+ if commandType :
+ self._writeHeader(commandType)
def _getRetVal(self):
if self._orig_fp is self._fp:
@@ -108,6 +109,10 @@
length = self._serializeExpr(o)
self._fp.seek(startPos)
self._writeDataHeader(dtTypeCode, length)
+ elif dtTypeCode == rtypes.DT_BYTESTREAM:
+ length = len(o);
+ self._writeDataHeader(dtTypeCode, length)
+ self._fp.write(o)
else:
raise NotImplementedError('no support for DT-type %x' %
dtTypeCode)
self._dataSize += length + 4
@@ -281,7 +286,16 @@
s.serialize(varname, dtTypeCode=rtypes.DT_STRING)
s.serialize(o, dtTypeCode=rtypes.DT_SEXP)
return s.finalize()
-
+
+ @classmethod
+ def rWriteSessionKey(cls, key, fp=None):
+ """write out binary session key"""
+ s = cls(fp=fp)
+ print 'writing out session key'
+ s._fp.write(key);
+ s._fp.flush();
+ s._getRetVal()
+ print 'done'
@classmethod
def rSerializeResponse(cls, Rexp, fp=None):
@@ -290,8 +304,16 @@
s.serialize(Rexp, dtTypeCode=rtypes.DT_SEXP)
return s.finalize()
+ @classmethod
+ def rDetach(cls, fp=None):
+ """Send detach signal"""
+ s = cls(rtypes.CMD_detachSession, fp=fp)
+ return s.finalize()
+
# Some shortcuts:
rEval = RSerializer.rEval
rAssign = RSerializer.rAssign
rSerializeResponse = RSerializer.rSerializeResponse
+rDetach = RSerializer.rDetach
+rWriteSessionKey = RSerializer.rWriteSessionKey
--- pyRserve-0.5.2/pyRserve/rparser.py 2011-09-20 23:09:27.000000000
-0700
+++ rparser.py 2012-06-06 17:19:05.380337224 -0700
@@ -306,6 +306,36 @@
else:
raise NotImplementedError()
+ def key_parse(self):
+ '''
+ @brief parse key returned after session detach
+ '''
+ self.indentLevel = 1
+ self.lexer.readHeader()
+ if self.lexer.messageSize == 3*4+32:
+ try:
+
port=self.lexer.nextExprData(self.lexer.nextExprHdr())
+ keyh=self.lexer.read(4); # trust that the header will
say DT_BYTESTREAM
+ key=self.lexer.read(32);
+ if DEBUG:
+ print "detached session key %s" % key
+ return [port, key];
+ except:
+ # If any error is raised during lexing and parsing,
make sure that the entire data
+ # is read from the input source if it is a socket,
otherwise following attempts to
+ # parse again from a socket will return polluted
data:
+ self.lexer.clearSocketData()
+ raise
+
+ elif not self.lexer.responseOK:
+ try:
+ rserve_err_msg = ERRORS[self.lexer.errCode]
+ except KeyError:
+ raise REvalError("R evaluation error (code=%d)" %
self.lexer.errCode)
+ else:
+ raise RResponseError('Response error %s (error code=
%d)' %
+ (rserve_err_msg,
self.lexer.errCode))
+
def _parseExpr(self):
self.indentLevel += 1
lexeme = self.lexer.nextExprHdr()
@@ -420,6 +450,11 @@
rparser = RParser(src, atomicArray)
return rparser.parse()
+
+def key_parse(src, atomicArray=False):
+ rparser = RParser(src, atomicArray)
+ return rparser.key_parse()
+
########################################################################################
class Closure(object):