implementing session detach and reconnect

143 views
Skip to first unread message

Peter

unread,
Jun 6, 2012, 8:24:40 PM6/6/12
to pyRserve
Dear All,
Thanks for a great module. I made minor extensions to support session
detach and reconnect functionality that exists in Rserve.
Below are the patches files for the changes I did. It works just
fine, but I admit this is my first time working with python, so please
go over them before incorporating.
Cheers,
-peter.

--- pyRserve-0.5.2/pyRserve/rconn.py 2011-11-22 10:14:34.000000000
-0800
+++ rconn.py 2012-06-06 14:56:45.240177768 -0700
@@ -2,21 +2,34 @@
###
import rtypes
from rexceptions import RConnectionRefused, REvalError,
PyRserveClosed
-from rserializer import rEval, rAssign
-from rparser import rparse
+from rserializer import rEval, rAssign, rDetach, rWriteSessionKey
+from rparser import rparse, key_parse

RSERVEPORT = 6311
DEBUG = False

+class SessionInfo(list):
+ def __init__(self,host,port,key):
+ list.__init__(self,[host,port,key])
+ self.host=host
+ self.port=port
+ self.key=key
+

-def connect(host='', port=RSERVEPORT, atomicArray=False):
+def connect(host='', port=RSERVEPORT,
atomicArray=False,sessionInfo=None):
"""Open a connection to a Rserve instance"""
# if host in (None, ''):
# # On Win32 it seems that passing an empty string as
'localhost' does not work
# # So just to be sure provide the full local hostname if None
or '' were passed.
# host = socket.gethostname()
assert port is not None, 'port number must be given'
- return RConnector(host, port, atomicArray)
+ if sessionInfo:
+ port=sessionInfo.port
+ host=sessionInfo.host
+ sessionKey=sessionInfo.key
+ else:
+ sessionKey=None;
+ return RConnector(host, port, atomicArray,sessionKey)


def rconnect(host='', port=RSERVEPORT):
@@ -36,11 +49,11 @@

class RConnector(object):
'@brief Provides a network connector to an Rserve process'
- def __init__(self, host, port, atomicArray):
+ def __init__(self, host, port, atomicArray,sessionKey=None):
self.host = host
self.port = port
self.atomicArray = atomicArray
- self.connect()
+ self.connect(sessionKey)
self.r = RNameSpace(self)
self.ref = RNameSpaceReference(self)

@@ -52,19 +65,27 @@
def isClosed(self):
return self.__closed

- def connect(self):
+ def connect(self,sessionKey=None):
self.sock = socket.socket()
try:
self.sock.connect((self.host, self.port))
except socket.error:
raise RConnectionRefused('Connection denied, server not
reachable or not accepting connections')
- time.sleep(0.2)
- hdr = self.sock.recv(1024)
+
+ # write out sesion key if one was provided, don't wait back
for a response
+ if sessionKey:
+ rWriteSessionKey(sessionKey,fp=self.sock)
+ rparse(self.sock, atomicArray=self.atomicArray)
+ else:
+ time.sleep(0.2)
+ hdr = self.sock.recv(1024)
+ if DEBUG:
+ print 'received hdr %s from rserve' % hdr
+ assert hdr.startswith('Rsrv01') # make sure we are
really connected with rserv
+ # TODO: possibly also do version checking here to make
sure we understand the protocol...
+
self.__closed = False
- if DEBUG:
- print 'received hdr %s from rserve' % hdr
- assert hdr.startswith('Rsrv01') # make sure we are really
connected with rserv
- # TODO: possibly also do version checking here to make sure
we understand the protocol...
+

@checkIfClosed
def close(self):
@@ -106,9 +127,12 @@
'@brief Receive the result from a previous call to rserve.'
raw = self.sock.recv(rtypes.SOCKET_BLOCK_SIZE)
d = [raw]
+ print "len(raw)= %d" % len(raw)
while len(raw) == rtypes.SOCKET_BLOCK_SIZE:
raw = self.sock.recv(rtypes.SOCKET_BLOCK_SIZE)
+ print "len(raw)= %d" % len(raw)
d.append(raw)
+ print "len(d)= %d" % len(d)
return ''.join(d)

# @checkIfClosed
@@ -125,6 +149,15 @@
rparse(self.sock, atomicArray=self.atomicArray)

@checkIfClosed
+ def detach(self):
+ '@brief Detach current session, returning back a serializable
object'
+ rDetach(self.sock)
+ # Rserv sends an emtpy confirmation message, or error message
in case of an error.
+ # rparse() will raise an Exception in the latter case.
+ info=key_parse(self.sock, atomicArray=self.atomicArray)
+ return SessionInfo(self.host,info[0],info[1])
+
+ @checkIfClosed
def getRexp(self, name):
'@brief Retrieve a Rexp stored in a variable called "name"'
return self.eval(name)



--- pyRserve-0.5.2/pyRserve/rserializer.py 2011-09-20
03:09:10.000000000 -0700
+++ rserializer.py 2012-06-06 14:50:44.868171004 -0700
@@ -39,7 +39,7 @@
serializeMap = {}
fmap = FunctionMapper(serializeMap)
#
- def __init__(self, commandType, fp=None):
+ def __init__(self, commandType=None, fp=None):
if isinstance(fp, socket._socketobject):
self._orig_fp = fp.makefile()
self._fp = cStringIO.StringIO()
@@ -49,7 +49,8 @@
else:
self._fp = self._orig_fp = fp
self._dataSize = 0
- self._writeHeader(commandType)
+ if commandType :
+ self._writeHeader(commandType)

def _getRetVal(self):
if self._orig_fp is self._fp:
@@ -108,6 +109,10 @@
length = self._serializeExpr(o)
self._fp.seek(startPos)
self._writeDataHeader(dtTypeCode, length)
+ elif dtTypeCode == rtypes.DT_BYTESTREAM:
+ length = len(o);
+ self._writeDataHeader(dtTypeCode, length)
+ self._fp.write(o)
else:
raise NotImplementedError('no support for DT-type %x' %
dtTypeCode)
self._dataSize += length + 4
@@ -281,7 +286,16 @@
s.serialize(varname, dtTypeCode=rtypes.DT_STRING)
s.serialize(o, dtTypeCode=rtypes.DT_SEXP)
return s.finalize()
-
+
+ @classmethod
+ def rWriteSessionKey(cls, key, fp=None):
+ """write out binary session key"""
+ s = cls(fp=fp)
+ print 'writing out session key'
+ s._fp.write(key);
+ s._fp.flush();
+ s._getRetVal()
+ print 'done'

@classmethod
def rSerializeResponse(cls, Rexp, fp=None):
@@ -290,8 +304,16 @@
s.serialize(Rexp, dtTypeCode=rtypes.DT_SEXP)
return s.finalize()

+ @classmethod
+ def rDetach(cls, fp=None):
+ """Send detach signal"""
+ s = cls(rtypes.CMD_detachSession, fp=fp)
+ return s.finalize()
+

# Some shortcuts:
rEval = RSerializer.rEval
rAssign = RSerializer.rAssign
rSerializeResponse = RSerializer.rSerializeResponse
+rDetach = RSerializer.rDetach
+rWriteSessionKey = RSerializer.rWriteSessionKey



--- pyRserve-0.5.2/pyRserve/rparser.py 2011-09-20 23:09:27.000000000
-0700
+++ rparser.py 2012-06-06 17:19:05.380337224 -0700
@@ -306,6 +306,36 @@
else:
raise NotImplementedError()

+ def key_parse(self):
+ '''
+ @brief parse key returned after session detach
+ '''
+ self.indentLevel = 1
+ self.lexer.readHeader()
+ if self.lexer.messageSize == 3*4+32:
+ try:
+
port=self.lexer.nextExprData(self.lexer.nextExprHdr())
+ keyh=self.lexer.read(4); # trust that the header will
say DT_BYTESTREAM
+ key=self.lexer.read(32);
+ if DEBUG:
+ print "detached session key %s" % key
+ return [port, key];
+ except:
+ # If any error is raised during lexing and parsing,
make sure that the entire data
+ # is read from the input source if it is a socket,
otherwise following attempts to
+ # parse again from a socket will return polluted
data:
+ self.lexer.clearSocketData()
+ raise
+
+ elif not self.lexer.responseOK:
+ try:
+ rserve_err_msg = ERRORS[self.lexer.errCode]
+ except KeyError:
+ raise REvalError("R evaluation error (code=%d)" %
self.lexer.errCode)
+ else:
+ raise RResponseError('Response error %s (error code=
%d)' %
+ (rserve_err_msg,
self.lexer.errCode))
+
def _parseExpr(self):
self.indentLevel += 1
lexeme = self.lexer.nextExprHdr()
@@ -420,6 +450,11 @@
rparser = RParser(src, atomicArray)
return rparser.parse()

+
+def key_parse(src, atomicArray=False):
+ rparser = RParser(src, atomicArray)
+ return rparser.key_parse()
+

########################################################################################

class Closure(object):

Ralph Heinkel

unread,
Jun 8, 2012, 5:34:44 AM6/8/12
to pyrs...@googlegroups.com
Hello Peter,

thanks for this patch. I'll look into it asap.
Currently I'm working on unicode /py3 support for pyRserve, so your patch will probably end up there. Just give me a few more days for finishing it.

Ciao ciao

Ralph
Reply all
Reply to author
Forward
0 new messages