[pyftpdlib] r1242 committed - pep8 + pyflakes

1 view
Skip to first unread message

pyft...@googlecode.com

unread,
Dec 30, 2013, 6:33:25 AM12/30/13
to pyftpdli...@googlegroups.com
Revision: 1242
Author: g.rodola
Date: Mon Dec 30 11:33:10 2013 UTC
Log: pep8 + pyflakes
http://code.google.com/p/pyftpdlib/source/detail?r=1242

Modified:
/trunk/pyftpdlib/contrib/handlers.py
/trunk/pyftpdlib/filesystems.py
/trunk/pyftpdlib/handlers.py
/trunk/pyftpdlib/ioloop.py
/trunk/pyftpdlib/log.py
/trunk/setup.py
/trunk/test/bench.py
/trunk/test/test_contrib.py
/trunk/test/test_ftpd.py

=======================================
--- /trunk/pyftpdlib/contrib/handlers.py Tue Feb 19 10:13:09 2013 UTC
+++ /trunk/pyftpdlib/contrib/handlers.py Mon Dec 30 11:33:10 2013 UTC
@@ -35,6 +35,7 @@
"use pyftpdlib.handlers instead")

try:
- from pyftpdlib.handlers import SSLConnection, TLS_FTPHandler,
TLS_DTPHandler
+ from pyftpdlib.handlers import (SSLConnection, TLS_FTPHandler,
+ TLS_DTPHandler)
except ImportError:
pass
=======================================
--- /trunk/pyftpdlib/filesystems.py Sun Dec 8 21:40:25 2013 UTC
+++ /trunk/pyftpdlib/filesystems.py Mon Dec 30 11:33:10 2013 UTC
@@ -299,13 +299,13 @@
#assert isinstance(path, unicode), path
return os.stat(path)

- def lstat(self, path):
- """Like stat but does not follow symbolic links."""
- # on python 2 we might also get bytes from os.lisdir()
- #assert isinstance(path, unicode), path
- return os.lstat(path)
-
- if not hasattr(os, 'lstat'):
+ if hasattr(os, 'lstat'):
+ def lstat(self, path):
+ """Like stat but does not follow symbolic links."""
+ # on python 2 we might also get bytes from os.lisdir()
+ #assert isinstance(path, unicode), path
+ return os.lstat(path)
+ else:
lstat = stat

if hasattr(os, 'readlink'):
@@ -359,28 +359,33 @@
assert isinstance(path, unicode), path
return os.path.lexists(path)

- def get_user_by_uid(self, uid):
- """Return the username associated with user id.
- If this can't be determined return raw uid instead.
- On Windows just return "owner".
- """
- try:
- return pwd.getpwuid(uid).pw_name
- except KeyError:
- return uid
-
- def get_group_by_gid(self, gid):
- """Return the groupname associated with group id.
- If this can't be determined return raw gid instead.
- On Windows just return "group".
- """
- try:
- return grp.getgrgid(gid).gr_name
- except KeyError:
- return gid
+ if pwd is not None:
+ def get_user_by_uid(self, uid):
+ """Return the username associated with user id.
+ If this can't be determined return raw uid instead.
+ On Windows just return "owner".
+ """
+ try:
+ return pwd.getpwuid(uid).pw_name
+ except KeyError:
+ return uid
+ else:
+ def get_user_by_uid(self):
+ return "owner"

- if pwd is None: get_user_by_uid = lambda x, y: "owner"
- if grp is None: get_group_by_gid = lambda x, y: "group"
+ if grp is not None:
+ def get_group_by_gid(self, gid):
+ """Return the groupname associated with group id.
+ If this can't be determined return raw gid instead.
+ On Windows just return "group".
+ """
+ try:
+ return grp.getgrgid(gid).gr_name
+ except KeyError:
+ return gid
+ else:
+ def get_group_by_gid(self):
+ return "group"

# --- Listing utilities

=======================================
--- /trunk/pyftpdlib/handlers.py Sun Dec 8 21:40:25 2013 UTC
+++ /trunk/pyftpdlib/handlers.py Mon Dec 30 11:33:10 2013 UTC
@@ -2979,10 +2979,10 @@
perm=None, auth=False, arg=True,
help='Syntax: AUTH <SP> TLS|SSL (set up secure control
channel).'),
'PBSZ': dict(
- perm=None, auth=False, arg=True,
+ perm=None, auth=False, arg=True,
help='Syntax: PBSZ <SP> 0 (negotiate TLS buffer).'),
'PROT': dict(
- perm=None, auth=False, arg=True,
+ perm=None, auth=False, arg=True,
help='Syntax: PROT <SP> [C|P] (set up un/secure data
channel).'),
})

=======================================
--- /trunk/pyftpdlib/ioloop.py Sun Dec 8 21:40:25 2013 UTC
+++ /trunk/pyftpdlib/ioloop.py Mon Dec 30 11:33:10 2013 UTC
@@ -211,7 +211,7 @@
else:
sig = repr(self._target)
sig += ' args=%s, kwargs=%s, cancelled=%s, secs=%s' % (
- self._args or '[]', self._kwargs or '{}', self.cancelled,
+ self._args or '[]', self._kwargs or '{}', self.cancelled,
self._delay)
return '<%s>' % sig

@@ -331,7 +331,6 @@
# localize variable access to minimize overhead
poll = self.poll
socket_map = self.socket_map
- tasks = self.sched._tasks
sched_poll = self.sched.poll

if timeout is not None:
=======================================
--- /trunk/pyftpdlib/log.py Sun Dec 8 21:40:25 2013 UTC
+++ /trunk/pyftpdlib/log.py Mon Dec 30 11:33:10 2013 UTC
@@ -98,7 +98,7 @@
# blues
logging.DEBUG: unicode(curses.tparm(fg_color, 4), "ascii"),
# green
- logging.INFO: unicode(curses.tparm(fg_color, 2), "ascii"),
+ logging.INFO: unicode(curses.tparm(fg_color, 2), "ascii"),
# yellow
logging.WARNING: unicode(curses.tparm(fg_color,
3), "ascii"),
# red
=======================================
--- /trunk/setup.py Sun Dec 8 21:40:25 2013 UTC
+++ /trunk/setup.py Mon Dec 30 11:33:10 2013 UTC
@@ -107,9 +107,9 @@
'Programming Language :: Python :: 3.0',
'Programming Language :: Python :: 3.1',
'Programming Language :: Python :: 3.2',
- 'Programming Language :: Python :: 3.3',
- ],
- )
+ 'Programming Language :: Python :: 3.3'
+ ],
+)


# suggest to install pysendfile
=======================================
--- /trunk/test/bench.py Sun Dec 8 21:40:25 2013 UTC
+++ /trunk/test/bench.py Mon Dec 30 11:33:10 2013 UTC
@@ -136,23 +136,24 @@
return s


-# http://goo.gl/6V8Rm
-def hilite(string, ok=True, bold=False):
- """Return an highlighted version of 'string'."""
- attr = []
- if ok is None: # no color
- pass
- elif ok: # green
- attr.append('32')
- else: # red
- attr.append('31')
- if bold:
- attr.append('1')
- return '\x1b[%sm%s\x1b[0m' % (';'.join(attr), string)
-
if not sys.stdout.isatty() or os.name != 'posix':
def hilite(s, *args, **kwargs):
return s
+else:
+ # http://goo.gl/6V8Rm
+ def hilite(string, ok=True, bold=False):
+ """Return an highlighted version of 'string'."""
+ attr = []
+ if ok is None: # no color
+ pass
+ elif ok: # green
+ attr.append('32')
+ else: # red
+ attr.append('31')
+ if bold:
+ attr.append('1')
+ return '\x1b[%sm%s\x1b[0m' % (';'.join(attr), string)
+

server_memory = []

@@ -177,7 +178,7 @@
symbols = ('B', 'K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
prefix = {}
for i, s in enumerate(symbols[1:]):
- prefix[s] = 1 << (i+1)*10
+ prefix[s] = 1 << (i + 1) * 10
for symbol in reversed(symbols[1:]):
if n >= prefix[symbol]:
value = float(n) / prefix[symbol]
@@ -200,7 +201,7 @@
num = float(num)
prefix = {symbols[0]: 1}
for i, s in enumerate(symbols[1:]):
- prefix[s] = 1 << (i+1)*10
+ prefix[s] = 1 << (i + 1) * 10
return int(num * prefix[letter])


=======================================
--- /trunk/test/test_contrib.py Sun Dec 8 21:40:25 2013 UTC
+++ /trunk/test/test_contrib.py Mon Dec 30 11:33:10 2013 UTC
@@ -116,38 +116,81 @@
class TLSTestMixin:
pass

-class TestFtpAuthenticationTLSMixin(TLSTestMixin, TestFtpAuthentication):
pass
-class TestTFtpDummyCmdsTLSMixin(TLSTestMixin, TestFtpDummyCmds): pass
-class TestFtpCmdsSemanticTLSMixin(TLSTestMixin, TestFtpCmdsSemantic): pass
-class TestFtpFsOperationsTLSMixin(TLSTestMixin, TestFtpFsOperations): pass
-class TestFtpStoreDataTLSMixin(TLSTestMixin, TestFtpStoreData): pass
-class TestFtpRetrieveDataTLSMixin(TLSTestMixin, TestFtpRetrieveData): pass
-class TestFtpListingCmdsTLSMixin(TLSTestMixin, TestFtpListingCmds): pass
+class TestFtpAuthenticationTLSMixin(TLSTestMixin, TestFtpAuthentication):
+ pass
+
+class TestTFtpDummyCmdsTLSMixin(TLSTestMixin, TestFtpDummyCmds):
+ pass
+
+class TestFtpCmdsSemanticTLSMixin(TLSTestMixin, TestFtpCmdsSemantic):
+ pass
+
+class TestFtpFsOperationsTLSMixin(TLSTestMixin, TestFtpFsOperations):
+ pass
+
+class TestFtpStoreDataTLSMixin(TLSTestMixin, TestFtpStoreData):
+ pass
+
+class TestFtpRetrieveDataTLSMixin(TLSTestMixin, TestFtpRetrieveData):
+ pass
+
+class TestFtpListingCmdsTLSMixin(TLSTestMixin, TestFtpListingCmds):
+ pass

class TestFtpAbortTLSMixin(TLSTestMixin, TestFtpAbort):
- def test_oob_abor(self): pass
+ def test_oob_abor(self):
+ pass

class TestTimeoutsTLSMixin(TLSTestMixin, TestTimeouts):
- def test_data_timeout_not_reached(self): pass
+ def test_data_timeout_not_reached(self):
+ pass

-class TestConfigurableOptionsTLSMixin(TLSTestMixin,
TestConfigurableOptions): pass
+class TestConfigurableOptionsTLSMixin(TLSTestMixin,
TestConfigurableOptions):
+ pass
+
class TestCallbacksTLSMixin(TLSTestMixin, TestCallbacks):
- def test_on_file_received(self): pass
- def test_on_file_sent(self): pass
- def test_on_incomplete_file_received(self): pass
- def test_on_incomplete_file_sent(self): pass
- def test_on_connect(self): pass
- def test_on_disconnect(self): pass
- def test_on_login(self): pass
- def test_on_login_failed(self): pass
- def test_on_logout_quit(self): pass
- def test_on_logout_rein(self): pass
- def test_on_logout_user_issued_twice(self): pass
+ def test_on_file_received(self):
+ pass

-class TestIPv4EnvironmentTLSMixin(TLSTestMixin, TestIPv4Environment): pass
-class TestIPv6EnvironmentTLSMixin(TLSTestMixin, TestIPv6Environment): pass
-class TestCornerCasesTLSMixin(TLSTestMixin, TestCornerCases): pass
+ def test_on_file_sent(self):
+ pass

+ def test_on_incomplete_file_received(self):
+ pass
+
+ def test_on_incomplete_file_sent(self):
+ pass
+
+ def test_on_connect(self):
+ pass
+
+ def test_on_disconnect(self):
+ pass
+
+ def test_on_login(self):
+ pass
+
+ def test_on_login_failed(self):
+ pass
+
+ def test_on_logout_quit(self):
+ pass
+
+ def test_on_logout_rein(self):
+ pass
+
+ def test_on_logout_user_issued_twice(self):
+ pass
+
+class TestIPv4EnvironmentTLSMixin(TLSTestMixin, TestIPv4Environment):
+ pass
+
+class TestIPv6EnvironmentTLSMixin(TLSTestMixin, TestIPv6Environment):
+ pass
+
+class TestCornerCasesTLSMixin(TLSTestMixin, TestCornerCases):
+ pass
+

# =====================================================================
# --- threaded FTP server mixin tests
@@ -159,22 +202,50 @@
class ThreadFTPTestMixin:
server_class = TFTPd

-class TestFtpAuthenticationThreadMixin(ThreadFTPTestMixin,
TestFtpAuthentication): pass
-class TestTFtpDummyCmdsThreadMixin(ThreadFTPTestMixin, TestFtpDummyCmds):
pass
-class TestFtpCmdsSemanticThreadMixin(ThreadFTPTestMixin,
TestFtpCmdsSemantic): pass
-class TestFtpFsOperationsThreadMixin(ThreadFTPTestMixin,
TestFtpFsOperations): pass
-class TestFtpStoreDataThreadMixin(ThreadFTPTestMixin, TestFtpStoreData):
pass
-class TestFtpRetrieveDataThreadMixin(ThreadFTPTestMixin,
TestFtpRetrieveData): pass
-class TestFtpListingCmdsThreadMixin(ThreadFTPTestMixin,
TestFtpListingCmds): pass
-class TestFtpAbortThreadMixin(ThreadFTPTestMixin, TestFtpAbort): pass
+class TestFtpAuthenticationThreadMixin(ThreadFTPTestMixin,
+ TestFtpAuthentication):
+ pass
+
+class TestTFtpDummyCmdsThreadMixin(ThreadFTPTestMixin, TestFtpDummyCmds):
+ pass
+
+class TestFtpCmdsSemanticThreadMixin(ThreadFTPTestMixin,
TestFtpCmdsSemantic):
+ pass
+
+class TestFtpFsOperationsThreadMixin(ThreadFTPTestMixin,
TestFtpFsOperations):
+ pass
+
+class TestFtpStoreDataThreadMixin(ThreadFTPTestMixin, TestFtpStoreData):
+ pass
+
+class TestFtpRetrieveDataThreadMixin(ThreadFTPTestMixin,
TestFtpRetrieveData):
+ pass
+
+class TestFtpListingCmdsThreadMixin(ThreadFTPTestMixin,
TestFtpListingCmds):
+ pass
+
+class TestFtpAbortThreadMixin(ThreadFTPTestMixin, TestFtpAbort):
+ pass
+
#class TestTimeoutsThreadMixin(ThreadFTPTestMixin, TestTimeouts):
# def test_data_timeout_not_reached(self): pass
-#class TestConfigurableOptionsThreadMixin(ThreadFTPTestMixin,
TestConfigurableOptions): pass
-class TestCallbacksThreadMixin(ThreadFTPTestMixin, TestCallbacks): pass
-class TestIPv4EnvironmentThreadMixin(ThreadFTPTestMixin,
TestIPv4Environment): pass
-class TestIPv6EnvironmentThreadMixin(ThreadFTPTestMixin,
TestIPv6Environment): pass
-class TestCornerCasesThreadMixin(ThreadFTPTestMixin, TestCornerCases): pass
-class TestFTPServerThreadMixin(ThreadFTPTestMixin, TestFTPServer): pass
+#class TestConfOptsThreadMixin(ThreadFTPTestMixin,
TestConfigurableOptions):
+# pass
+
+class TestCallbacksThreadMixin(ThreadFTPTestMixin, TestCallbacks):
+ pass
+
+class TestIPv4EnvironmentThreadMixin(ThreadFTPTestMixin,
TestIPv4Environment):
+ pass
+
+class TestIPv6EnvironmentThreadMixin(ThreadFTPTestMixin,
TestIPv6Environment):
+ pass
+
+class TestCornerCasesThreadMixin(ThreadFTPTestMixin, TestCornerCases):
+ pass
+
+class TestFTPServerThreadMixin(ThreadFTPTestMixin, TestFTPServer):
+ pass


# =====================================================================
@@ -191,26 +262,49 @@
class MProcFTPTestMixin:
pass

-class TestFtpAuthenticationMProcMixin(MProcFTPTestMixin,
TestFtpAuthentication): pass
-class TestTFtpDummyCmdsMProcMixin(MProcFTPTestMixin, TestFtpDummyCmds):
pass
-class TestFtpCmdsSemanticMProcMixin(MProcFTPTestMixin,
TestFtpCmdsSemantic): pass
+class TestFtpAuthenticationMProcMixin(MProcFTPTestMixin,
+ TestFtpAuthentication):
+ pass
+
+class TestTFtpDummyCmdsMProcMixin(MProcFTPTestMixin, TestFtpDummyCmds):
+ pass
+
+class TestFtpCmdsSemanticMProcMixin(MProcFTPTestMixin,
TestFtpCmdsSemantic):
+ pass

class TestFtpFsOperationsMProcMixin(MProcFTPTestMixin,
TestFtpFsOperations):
def test_unforeseen_mdtm_event(self):
pass

-class TestFtpStoreDataMProcMixin(MProcFTPTestMixin, TestFtpStoreData): pass
-class TestFtpRetrieveDataMProcMixin(MProcFTPTestMixin,
TestFtpRetrieveData): pass
-class TestFtpListingCmdsMProcMixin(MProcFTPTestMixin, TestFtpListingCmds):
pass
-class TestFtpAbortMProcMixin(MProcFTPTestMixin, TestFtpAbort): pass
+class TestFtpStoreDataMProcMixin(MProcFTPTestMixin, TestFtpStoreData):
+ pass
+
+class TestFtpRetrieveDataMProcMixin(MProcFTPTestMixin,
TestFtpRetrieveData):
+ pass
+
+class TestFtpListingCmdsMProcMixin(MProcFTPTestMixin, TestFtpListingCmds):
+ pass
+
+class TestFtpAbortMProcMixin(MProcFTPTestMixin, TestFtpAbort):
+ pass
+
#class TestTimeoutsMProcMixin(MProcFTPTestMixin, TestTimeouts):
# def test_data_timeout_not_reached(self): pass
-#class TestConfigurableOptionsMProcMixin(MProcFTPTestMixin,
TestConfigurableOptions): pass
+#class TestConfiOptsMProcMixin(MProcFTPTestMixin, TestConfigurableOptions):
+# pass
#class TestCallbacksMProcMixin(MProcFTPTestMixin, TestCallbacks): pass
-class TestIPv4EnvironmentMProcMixin(MProcFTPTestMixin,
TestIPv4Environment): pass
-class TestIPv6EnvironmentMProcMixin(MProcFTPTestMixin,
TestIPv6Environment): pass
-class TestCornerCasesMProcMixin(MProcFTPTestMixin, TestCornerCases): pass
-class TestFTPServerMProcMixin(MProcFTPTestMixin, TestFTPServer): pass
+
+class TestIPv4EnvironmentMProcMixin(MProcFTPTestMixin,
TestIPv4Environment):
+ pass
+
+class TestIPv6EnvironmentMProcMixin(MProcFTPTestMixin,
TestIPv6Environment):
+ pass
+
+class TestCornerCasesMProcMixin(MProcFTPTestMixin, TestCornerCases):
+ pass
+
+class TestFTPServerMProcMixin(MProcFTPTestMixin, TestFTPServer):
+ pass


# =====================================================================
@@ -475,8 +569,9 @@
try:
if self.authorizer_class.__name__ == 'UnixAuthorizer':
auth.impersonate_user(self.get_current_user(), '')
- self.assertRaises(AuthorizerError,
- auth.impersonate_user,
nonexistent_user, 'pwd')
+ self.assertRaises(
+ AuthorizerError,
+ auth.impersonate_user, nonexistent_user, 'pwd')
else:
self.assertRaises(
Win32ExtError,
@@ -512,7 +607,8 @@
self.assertRaisesWithMsg(
AuthorizerError,
"rejected_users and allowed_users options are mutually
exclusive",
- self.authorizer_class, allowed_users=['foo'],
rejected_users=['bar'])
+ self.authorizer_class, allowed_users=['foo'],
+ rejected_users=['bar'])
self.assertRaisesWithMsg(
AuthorizerError,
'invalid username "anonymous"',
@@ -525,7 +621,8 @@
AuthorizerError,
'unknown user %s' % wrong_user,
self.authorizer_class, allowed_users=[wrong_user])
- self.assertRaisesWithMsg(AuthorizerError, 'unknown user %s' %
wrong_user,
+ self.assertRaisesWithMsg(AuthorizerError,
+ 'unknown user %s' % wrong_user,
self.authorizer_class,
rejected_users=[wrong_user])

=======================================
--- /trunk/test/test_ftpd.py Sun Dec 8 21:40:25 2013 UTC
+++ /trunk/test/test_ftpd.py Mon Dec 30 11:33:10 2013 UTC
@@ -409,55 +409,6 @@
# UNC paths must be collapsed
ae(fs.ftp2fs(u('//a')), join(root, u('a')))

- if os.sep == '\\':
- goforit(u(r'C:\dir'))
- goforit(u('C:\\'))
- # on DOS-derived filesystems (e.g. Windows) this is the same
- # as specifying the current drive directory (e.g. 'C:\\')
- goforit(u('\\'))
- elif os.sep == '/':
- goforit(u('/home/user'))
- goforit(u('/'))
- else:
- # os.sep == ':'? Don't know... let's try it anyway
- goforit(getcwdu())
-
- def test_ftp2fs(self):
- # Tests for ftp2fs method.
- ae = self.assertEqual
- fs = AbstractedFS(u('/'), None)
- join = lambda x, y: os.path.join(x, y.replace('/', os.sep))
-
- def goforit(root):
- fs._root = root
- fs._cwd = u('/')
- ae(fs.ftp2fs(u('')), root)
- ae(fs.ftp2fs(u('/')), root)
- ae(fs.ftp2fs(u('.')), root)
- ae(fs.ftp2fs(u('..')), root)
- ae(fs.ftp2fs(u('a')), join(root, u('a')))
- ae(fs.ftp2fs(u('/a')), join(root, u('a')))
- ae(fs.ftp2fs(u('/a/')), join(root, u('a')))
- ae(fs.ftp2fs(u('a/..')), root)
- ae(fs.ftp2fs(u('a/b')), join(root, r'a/b'))
- ae(fs.ftp2fs(u('/a/b')), join(root, r'a/b'))
- ae(fs.ftp2fs(u('/a/b/..')), join(root, u('a')))
- ae(fs.ftp2fs(u('/a/b/../..')), root)
- fs._cwd = u('/sub')
- ae(fs.ftp2fs(u('')), join(root, u('sub')))
- ae(fs.ftp2fs(u('/')), root)
- ae(fs.ftp2fs(u('.')), join(root, u('sub')))
- ae(fs.ftp2fs(u('..')), root)
- ae(fs.ftp2fs(u('a')), join(root, u('sub/a')))
- ae(fs.ftp2fs(u('a/')), join(root, u('sub/a')))
- ae(fs.ftp2fs(u('a/..')), join(root, u('sub')))
- ae(fs.ftp2fs(u('a/b')), join(root, 'sub/a/b'))
- ae(fs.ftp2fs(u('a/b/..')), join(root, u('sub/a')))
- ae(fs.ftp2fs(u('a/b/../..')), join(root, u('sub')))
- ae(fs.ftp2fs(u('a/b/../../..')), root)
- # UNC paths must be collapsed
- ae(fs.ftp2fs(u('//a')), join(root, u('a')))
-
if os.sep == '\\':
goforit(u(r'C:\dir'))
goforit(u('C:\\'))
@@ -2829,24 +2780,6 @@
self.tearDown()
self.assertEqual(pair, [('foo', 'bar')])

- def test_on_login_failed(self):
- pair = []
-
- class TestHandler(FTPHandler):
- _auth_failed_timeout = 0
-
- def on_login(self, username):
- raise Exception
-
- def on_login_failed(self, username, password):
- pair.append((username, password))
-
- self._setUp(TestHandler, login=False)
- self.assertRaises(ftplib.error_perm,
self.client.login, 'foo', 'bar')
- # shut down the server to avoid race conditions
- self.tearDown()
- self.assertEqual(pair, [('foo', 'bar')])
-
def test_on_logout_quit(self):
user = []

@@ -3007,7 +2940,7 @@
self.assertIn('privileged port', resp)
# proto > 2
_cmd = 'eprt |3|%s|%s|' % (self.server.host, self.server.port)
- self.assertRaises(ftplib.error_perm, self.client.sendcmd, _cmd)
+ self.assertRaises(ftplib.error_perm, self.client.sendcmd, _cmd)

if self.proto == '1':
# len(ip.octs) > 4
Reply all
Reply to author
Forward
0 new messages