Revision: 1238
Author: g.rodola
Date: Tue Nov 26 15:27:59 2013 UTC
Log: pep8ify
http://code.google.com/p/pyftpdlib/source/detail?r=1238
Modified:
/trunk/pyftpdlib/authorizers.py
/trunk/pyftpdlib/handlers.py
/trunk/test/test_contrib.py
/trunk/test/test_ftpd.py
=======================================
--- /trunk/pyftpdlib/authorizers.py Tue Feb 19 10:13:09 2013 UTC
+++ /trunk/pyftpdlib/authorizers.py Tue Nov 26 15:27:59 2013 UTC
@@ -45,10 +45,10 @@
"""
+import errno
import os
+import sys
import warnings
-import errno
-import sys
from pyftpdlib._compat import PY3, unicode, getcwdu
@@ -56,7 +56,7 @@
__all__ = ['DummyAuthorizer',
#'BaseUnixAuthorizer', 'UnixAuthorizer',
#'BaseWindowsAuthorizer', 'WindowsAuthorizer',
- ]
+ ]
# ===================================================================
@@ -66,6 +66,7 @@
class AuthorizerError(Exception):
"""Base class for authorizer exceptions."""
+
class AuthenticationFailed(Exception):
"""Exception raised when authentication fails for any reason."""
@@ -97,7 +98,7 @@
self.user_table = {}
def add_user(self, username, password, homedir, perm='elr',
- msg_login="Login successful.", msg_quit="Goodbye."):
+ msg_login="Login successful.", msg_quit="Goodbye."):
"""Add a user to the virtual users table.
AuthorizerError exceptions raised on error conditions such as
@@ -234,8 +235,8 @@
if self._issubpath(path, dir):
if recursive:
return perm in operm
- if (path == dir) or (os.path.dirname(path) == dir \
- and not os.path.isdir(path)):
+ if (path == dir or os.path.dirname(path) == dir
+ and not os.path.isdir(path)):
return perm in operm
return perm in self.user_table[username]['perm']
@@ -257,7 +258,9 @@
for p in perm:
if p not in self.read_perms + self.write_perms:
raise ValueError('no such permission %r' % p)
- if (username == 'anonymous') and (p in self.write_perms) and
not warned:
+ if (username == 'anonymous'
+ and p in self.write_perms
+ and not warned):
warnings.warn("write permissions assigned to anonymous
user.",
RuntimeWarning)
warned = 1
@@ -274,6 +277,7 @@
methods as first argument with the actual user used to handle
anonymous sessions.
"""
+
def wrapper(self, username, *args, **kwargs):
if username == 'anonymous':
username = self.anonymous_user or username
@@ -322,9 +326,10 @@
"""Overrides the options specified in the class constructor
for a specific user.
"""
- if not password and not homedir and not perm and not msg_login \
- and not msg_quit:
- raise AuthorizerError("at least one keyword argument must be
specified")
+ if (not password and not homedir and not perm and not msg_login
+ and not msg_quit):
+ raise AuthorizerError(
+ "at least one keyword argument must be specified")
if self.allowed_users and username not in self.allowed_users:
raise AuthorizerError('%s is not an allowed user' % username)
if self.rejected_users and username in self.rejected_users:
@@ -339,11 +344,12 @@
if username in self._dummy_authorizer.user_table:
# re-set parameters
del self._dummy_authorizer.user_table[username]
- self._dummy_authorizer.add_user(username, password or "",
- homedir or getcwdu(),
- perm or "",
- msg_login or "",
- msg_quit or "")
+ self._dummy_authorizer.add_user(username,
+ password or "",
+ homedir or getcwdu(),
+ perm or "",
+ msg_login or "",
+ msg_quit or "")
if homedir is None:
self._dummy_authorizer.user_table[username]['home'] = ""
@@ -385,7 +391,9 @@
# Note: requires python >= 2.5
try:
- import pwd, spwd, crypt
+ import crypt
+ import pwd
+ import spwd
except ImportError:
pass
else:
@@ -489,7 +497,6 @@
def has_perm(self, username, perm, path=None):
return perm in self.get_perms(username)
-
class UnixAuthorizer(_Base, BaseUnixAuthorizer):
"""A wrapper on top of BaseUnixAuthorizer providing options
to specify what users should be allowed to login, per-user
@@ -514,12 +521,12 @@
# --- public API
def __init__(self, global_perm="elradfmw",
- allowed_users=None,
- rejected_users=None,
- require_valid_shell=True,
- anonymous_user=None,
- msg_login="Login successful.",
- msg_quit="Goodbye."):
+ allowed_users=None,
+ rejected_users=None,
+ require_valid_shell=True,
+ anonymous_user=None,
+ msg_login="Login successful.",
+ msg_quit="Goodbye."):
"""Parameters:
- (string) global_perm:
@@ -665,7 +672,11 @@
pass
# Note: requires pywin32 extension
try:
- import win32security, win32net, pywintypes, win32con, win32api
+ import pywintypes
+ import win32api
+ import win32con
+ import win32net
+ import win32security
except ImportError:
pass
else:
@@ -723,7 +734,7 @@
"""
try:
sid = win32security.ConvertSidToStringSid(
- win32security.LookupAccountName(None, username)[0])
+ win32security.LookupAccountName(None, username)[0])
except pywintypes.error:
err = sys.exc_info()[1]
raise AuthorizerError(err)
@@ -759,7 +770,6 @@
def has_perm(self, username, perm, path=None):
return perm in self.get_perms(username)
-
class WindowsAuthorizer(_Base, BaseWindowsAuthorizer):
"""A wrapper on top of BaseWindowsAuthorizer providing options
to specify what users should be allowed to login, per-user
@@ -780,13 +790,14 @@
# --- public API
- def __init__(self, global_perm="elradfmw",
- allowed_users=None,
- rejected_users=None,
- anonymous_user=None,
- anonymous_password=None,
- msg_login="Login successful.",
- msg_quit="Goodbye."):
+ def __init__(self,
+ global_perm="elradfmw",
+ allowed_users=None,
+ rejected_users=None,
+ anonymous_user=None,
+ anonymous_password=None,
+ msg_login="Login successful.",
+ msg_quit="Goodbye."):
"""Parameters:
- (string) global_perm:
=======================================
--- /trunk/pyftpdlib/handlers.py Tue Nov 26 14:11:59 2013 UTC
+++ /trunk/pyftpdlib/handlers.py Tue Nov 26 15:27:59 2013 UTC
@@ -30,16 +30,16 @@
# ======================================================================
import asynchat
-import time
-import sys
-import os
import errno
-import socket
-import traceback
import glob
+import logging
+import os
import random
+import socket
+import sys
+import time
+import traceback
import warnings
-import logging
try:
import pwd
import grp
@@ -47,12 +47,12 @@
pwd = grp = None
from pyftpdlib import __ver__
-from pyftpdlib.log import logger
+from pyftpdlib.authorizers import (DummyAuthorizer, AuthenticationFailed,
+ AuthorizerError)
+from pyftpdlib._compat import PY3, b, u, getcwdu, unicode, xrange, next
from pyftpdlib.filesystems import FilesystemError, AbstractedFS
-from pyftpdlib._compat import PY3, b, u, getcwdu, unicode, xrange, next
from pyftpdlib.ioloop import AsyncChat, Connector, Acceptor, timer,
_DISCONNECTED
-from pyftpdlib.authorizers import (DummyAuthorizer, AuthenticationFailed,
- AuthorizerError)
+from pyftpdlib.log import logger
def _import_sendfile():
=======================================
--- /trunk/test/test_contrib.py Fri Apr 19 14:35:41 2013 UTC
+++ /trunk/test/test_contrib.py Tue Nov 26 15:27:59 2013 UTC
@@ -242,7 +242,7 @@
return
raise self.failureException("%s != %s" % (str(why), msg))
else:
- if hasattr(excClass,'__name__'):
+ if hasattr(excClass, '__name__'):
excName = excClass.__name__
else:
excName = str(excClass)
@@ -423,8 +423,10 @@
return
raise self.failureException("%s != %s" % (str(why), msg))
else:
- if hasattr(excClass,'__name__'): excName = excClass.__name__
- else: excName = str(excClass)
+ if hasattr(excClass, '__name__'):
+ excName = excClass.__name__
+ else:
+ excName = str(excClass)
raise self.failureException("%s not raised" % excName)
# --- /utils
@@ -458,10 +460,12 @@
auth = self.authorizer_class()
current_user = self.get_current_user()
nonexistent_user = self.get_nonexistent_user()
- self.assertRaises(AuthenticationFailed,
- auth.validate_authentication, current_user, 'wrongpasswd',
None)
- self.assertRaises(AuthenticationFailed,
- auth.validate_authentication, nonexistent_user, 'bar',
None)
+ self.assertRaises(
+ AuthenticationFailed,
+ auth.validate_authentication, current_user, 'wrongpasswd',
None)
+ self.assertRaises(
+ AuthenticationFailed,
+ auth.validate_authentication, nonexistent_user, 'bar', None)
def test_impersonate_user(self):
auth = self.authorizer_class()
@@ -472,10 +476,12 @@
self.assertRaises(AuthorizerError,
auth.impersonate_user,
nonexistent_user, 'pwd')
else:
- self.assertRaises(Win32ExtError,
- auth.impersonate_user, nonexistent_user, 'pwd')
- self.assertRaises(Win32ExtError,
- auth.impersonate_user,
self.get_current_user(), '')
+ self.assertRaises(
+ Win32ExtError,
+ auth.impersonate_user, nonexistent_user, 'pwd')
+ self.assertRaises(
+ Win32ExtError,
+ auth.impersonate_user, self.get_current_user(), '')
finally:
auth.terminate_impersonation('')
@@ -501,20 +507,25 @@
def test_error_options(self):
wrong_user = self.get_nonexistent_user()
- self.assertRaisesWithMsg(AuthorizerError,
- "rejected_users and allowed_users options are mutually
exclusive",
- self.authorizer_class, allowed_users=['foo'],
rejected_users=['bar'])
- self.assertRaisesWithMsg(AuthorizerError,
- 'invalid username "anonymous"',
- self.authorizer_class,
allowed_users=['anonymous'])
- self.assertRaisesWithMsg(AuthorizerError,
- 'invalid username "anonymous"',
- self.authorizer_class,
rejected_users=['anonymous'])
- self.assertRaisesWithMsg(AuthorizerError,
- 'unknown user %s' % wrong_user,
- self.authorizer_class,
allowed_users=[wrong_user])
+ self.assertRaisesWithMsg(
+ AuthorizerError,
+ "rejected_users and allowed_users options are mutually
exclusive",
+ self.authorizer_class, allowed_users=['foo'],
rejected_users=['bar'])
+ self.assertRaisesWithMsg(
+ AuthorizerError,
+ 'invalid username "anonymous"',
+ self.authorizer_class, allowed_users=['anonymous'])
+ self.assertRaisesWithMsg(
+ AuthorizerError,
+ 'invalid username "anonymous"',
+ self.authorizer_class, rejected_users=['anonymous'])
+ self.assertRaisesWithMsg(
+ AuthorizerError,
+ 'unknown user %s' % wrong_user,
+ self.authorizer_class, allowed_users=[wrong_user])
self.assertRaisesWithMsg(AuthorizerError, 'unknown user %s' %
wrong_user,
- self.authorizer_class,
rejected_users=[wrong_user])
+ self.authorizer_class,
+ rejected_users=[wrong_user])
def test_override_user_password(self):
auth = self.authorizer_class()
@@ -575,9 +586,10 @@
another_user = x
break
nonexistent_user = self.get_nonexistent_user()
- self.assertRaisesWithMsg(AuthorizerError,
- "at least one keyword argument must be
specified",
- auth.override_user, this_user)
+ self.assertRaisesWithMsg(
+ AuthorizerError,
+ "at least one keyword argument must be specified",
+ auth.override_user, this_user)
self.assertRaisesWithMsg(AuthorizerError,
'no such user %s' % nonexistent_user,
auth.override_user, nonexistent_user,
perm='r')
@@ -650,8 +662,9 @@
require_valid_shell=False)
self.assertRaises(AuthenticationFailed,
auth.validate_authentication, 'foo', 'passwd',
None)
- self.assertRaises(AuthenticationFailed,
- auth.validate_authentication, current_user, 'passwd',
None)
+ self.assertRaises(
+ AuthenticationFailed,
+ auth.validate_authentication, current_user, 'passwd', None)
auth.validate_authentication('anonymous', 'passwd', None)
def test_require_valid_shell(self):
@@ -667,9 +680,10 @@
self.fail("no user found")
user = get_fake_shell_user()
- self.assertRaisesWithMsg(AuthorizerError,
- "user %s has not a valid shell" % user,
- authorizers.UnixAuthorizer,
allowed_users=[user])
+ self.assertRaisesWithMsg(
+ AuthorizerError,
+ "user %s has not a valid shell" % user,
+ authorizers.UnixAuthorizer, allowed_users=[user])
# commented as it first fails for invalid home
#self.assertRaisesWithMsg(ValueError,
# "user %s has not a valid shell" % user,
@@ -705,7 +719,8 @@
def test_wrong_anonymous_credentials(self):
user = self.get_current_user()
self.assertRaises(Win32ExtError, self.authorizer_class,
- anonymous_user=user, anonymous_password='$|
1wrongpasswd')
+ anonymous_user=user,
+ anonymous_password='$|1wrongpasswd')
# =====================================================================
@@ -735,20 +750,21 @@
# FTPS tests
if FTPS_SUPPORT:
- ftps_tests = [TestFTPS,
- TestFtpAuthenticationTLSMixin,
- TestTFtpDummyCmdsTLSMixin,
- TestFtpCmdsSemanticTLSMixin,
- TestFtpFsOperationsTLSMixin,
- TestFtpStoreDataTLSMixin,
- TestFtpRetrieveDataTLSMixin,
- TestFtpListingCmdsTLSMixin,
- TestFtpAbortTLSMixin,
- TestTimeoutsTLSMixin,
- TestConfigurableOptionsTLSMixin,
- TestCallbacksTLSMixin,
- TestCornerCasesTLSMixin,
- ]
+ ftps_tests = [
+ TestFTPS,
+ TestFtpAuthenticationTLSMixin,
+ TestTFtpDummyCmdsTLSMixin,
+ TestFtpCmdsSemanticTLSMixin,
+ TestFtpFsOperationsTLSMixin,
+ TestFtpStoreDataTLSMixin,
+ TestFtpRetrieveDataTLSMixin,
+ TestFtpListingCmdsTLSMixin,
+ TestFtpAbortTLSMixin,
+ TestTimeoutsTLSMixin,
+ TestConfigurableOptionsTLSMixin,
+ TestCallbacksTLSMixin,
+ TestCornerCasesTLSMixin,
+ ]
if SUPPORTS_IPV4:
ftps_tests.append(TestIPv4EnvironmentTLSMixin)
if SUPPORTS_IPV6:
@@ -809,7 +825,7 @@
try:
authorizers.UnixAuthorizer()
except AuthorizerError: # not root
- warn("UnixAuthorizer tests skipped (root privileges are " \
+ warn("UnixAuthorizer tests skipped (root privileges are "
"required)")
else:
tests.append(TestUnixAuthorizer)
@@ -829,7 +845,7 @@
try:
import win32api
except ImportError:
- warn("WindowsAuthorizer tests skipped (pywin32 extension "
\
+ warn("WindowsAuthorizer tests skipped (pywin32 extension "
"is required)")
else:
warn("WindowsAuthorizer tests skipped")
=======================================
--- /trunk/test/test_ftpd.py Sat Aug 24 17:48:45 2013 UTC
+++ /trunk/test/test_ftpd.py Tue Nov 26 15:27:59 2013 UTC
@@ -30,23 +30,23 @@
#
# ======================================================================
-import threading
-import unittest
-import socket
+import atexit
+import errno
+import ftplib
+import logging
import os
+import random
+import re
+import select
import shutil
-import time
-import re
+import socket
+import stat
+import sys
import tempfile
-import ftplib
-import random
+import threading
+import time
+import unittest
import warnings
-import sys
-import errno
-import atexit
-import stat
-import logging
-import select
try:
from StringIO import StringIO as BytesIO
except ImportError:
@@ -60,14 +60,14 @@
except ImportError:
sendfile = None
-import pyftpdlib.__main__
-from pyftpdlib.ioloop import IOLoop
+from pyftpdlib._compat import PY3, u, b, getcwdu, callable
+from pyftpdlib.authorizers import DummyAuthorizer, AuthenticationFailed
+from pyftpdlib.filesystems import AbstractedFS
from pyftpdlib.handlers import (FTPHandler, DTPHandler,
ThrottledDTPHandler,
SUPPORTS_HYBRID_IPV6)
+from pyftpdlib.ioloop import IOLoop
from pyftpdlib.servers import FTPServer
-from pyftpdlib.filesystems import AbstractedFS
-from pyftpdlib.authorizers import DummyAuthorizer, AuthenticationFailed
-from pyftpdlib._compat import PY3, u, b, getcwdu, callable
+import pyftpdlib.__main__
# Attempt to use IP rather than hostname (test suite will run a lot faster)
@@ -102,6 +102,7 @@
SUPPORTS_IPV6 = socket.has_ipv6 and try_address('::1',
family=socket.AF_INET6)
SUPPORTS_SENDFILE = hasattr(os, 'sendfile') or sendfile is not None
+
def safe_remove(*files):
"Convenience function for removing temporary test files"
for file in files:
@@ -112,6 +113,7 @@
if err.errno != errno.ENOENT:
raise
+
def safe_rmdir(dir):
"Convenience function for removing temporary test directories"
try:
@@ -121,6 +123,7 @@
if err.errno != errno.ENOENT:
raise
+
def safe_mkdir(dir):
"Convenience function for creating a directory"
try:
@@ -130,6 +133,7 @@
if err.errno != errno.EEXIST:
raise
+
def touch(name):
"""Create a file and return its name."""
f = open(name, 'w')
@@ -138,6 +142,7 @@
finally:
f.close()
+
def remove_test_files():
"""Remove files and directores created during tests."""
for name in os.listdir(u('.')):
@@ -147,13 +152,16 @@
else:
os.remove(name)
+
def warn(msg):
"""Add warning message to be executed on exit."""
atexit.register(warnings.warn, str(msg) + " - tests have been skipped",
RuntimeWarning)
+
def disable_log_warning(inst):
"""Temporarily set FTP server's logging level to ERROR."""
+
def wrapper(self, *args, **kwargs):
logger = logging.getLogger('pyftpdlib')
level = logger.getEffectiveLevel()
@@ -164,6 +172,7 @@
logger.setLevel(level)
return wrapper
+
def skip_other_tests():
"""Decorator which skips all tests except the decorated one.
http://mail.python.org/pipermail/python-ideas/2010-August/007992.html
@@ -171,6 +180,7 @@
from unittest import TextTestRunner as _TextTestRunner
class CustomTestRunner(_TextTestRunner):
+
def run(self, test):
if test._tests:
for t1 in test._tests:
@@ -192,6 +202,7 @@
return outer
+
def cleanup():
"""Cleanup function executed on interpreter exit."""
remove_test_files()
@@ -313,7 +324,7 @@
expected_regexp = re.compile(expected_regexp)
if not expected_regexp.search(msg):
raise self.failureException('"%s" does not
match "%s"' %
- (expected_regexp.pattern, msg))
+ (expected_regexp.pattern,
msg))
else:
if hasattr(expected_exception, '__name__'):
exc_name = expected_exception.__name__
@@ -395,7 +406,8 @@
ae(fs.ftp2fs(u('a/b/..')), join(root, u('sub/a')))
ae(fs.ftp2fs(u('a/b/../..')), join(root, u('sub')))
ae(fs.ftp2fs(u('a/b/../../..')), root)
- ae(fs.ftp2fs(u('//a')), join(root, u('a'))) # UNC paths must
be collapsed
+ # UNC paths must be collapsed
+ ae(fs.ftp2fs(u('//a')), join(root, u('a')))
if os.sep == '\\':
goforit(u(r'C:\dir'))
@@ -443,7 +455,8 @@
ae(fs.ftp2fs(u('a/b/..')), join(root, u('sub/a')))
ae(fs.ftp2fs(u('a/b/../..')), join(root, u('sub')))
ae(fs.ftp2fs(u('a/b/../../..')), root)
- ae(fs.ftp2fs(u('//a')), join(root, u('a'))) # UNC paths must
be collapsed
+ # UNC paths must be collapsed
+ ae(fs.ftp2fs(u('//a')), join(root, u('a')))
if os.sep == '\\':
goforit(u(r'C:\dir'))
@@ -607,14 +620,14 @@
# expect warning on write permissions assigned to anonymous user
for x in "adfmw":
self.assertRaisesRegex(RuntimeWarning,
- "write permissions assigned to anonymous
user.",
- auth.add_anonymous, HOME, perm=x)
+ "write permissions assigned to
anonymous user.",
+ auth.add_anonymous, HOME, perm=x)
def test_override_perm_interface(self):
auth = DummyAuthorizer()
auth.add_user(USER, PASSWD, HOME, perm='elr')
# raise exc if user does not exists
- self.assertRaises(KeyError, auth.override_perm, USER+'w',
HOME, 'elr')
+ self.assertRaises(KeyError, auth.override_perm, USER + 'w',
HOME, 'elr')
# raise exc if path does not exist or it's not a directory
self.assertRaisesRegex(ValueError,
'no such directory',
@@ -630,12 +643,12 @@
auth.add_anonymous(HOME)
for p in "adfmw":
self.assertRaisesRegex(RuntimeWarning,
- "write permissions assigned to anonymous
user.",
- auth.override_perm, 'anonymous', HOME, p)
+ "write permissions assigned to
anonymous user.",
+ auth.override_perm, 'anonymous', HOME,
p)
# raise on attempt to override home directory permissions
self.assertRaisesRegex(ValueError,
- "can't override home directory
permissions",
- auth.override_perm, USER, HOME, perm='w')
+ "can't override home directory permissions",
+ auth.override_perm, USER, HOME, perm='w')
# raise on attempt to override a path escaping home directory
if os.path.dirname(HOME) != HOME:
self.assertRaisesRegex(ValueError,
@@ -749,7 +762,8 @@
def test_errback(self):
l = []
- self.ioloop.call_later(0.0, lambda: 1//0, _errback=lambda:
l.append(True))
+ self.ioloop.call_later(
+ 0.0, lambda: 1 // 0, _errback=lambda: l.append(True))
self.scheduler()
self.assertEqual(l, [True])
@@ -828,11 +842,14 @@
def test_errback(self):
l = []
- self.ioloop.call_every(0.0, lambda: 1//0, _errback=lambda:
l.append(True))
+ self.ioloop.call_every(
+ 0.0, lambda: 1 // 0, _errback=lambda: l.append(True))
self.scheduler()
self.assertTrue(l)
+
class TestFtpAuthentication(TestCase):
+
"test: USER, PASS, REIN."
server_class = FTPd
client_class = ftplib.FTP
@@ -949,7 +966,7 @@
self.client.login(user=USER, passwd=PASSWD)
self.client.sendcmd('pwd')
self.dummyfile.seek(0)
- self.assertEqual(hash(data), hash (self.dummyfile.read()))
+ self.assertEqual(hash(data), hash(self.dummyfile.read()))
conn.close()
def test_user(self):
@@ -1001,7 +1018,7 @@
self.client.sendcmd('pass ' + PASSWD)
self.client.sendcmd('pwd')
self.dummyfile.seek(0)
- self.assertEqual(hash(data), hash (self.dummyfile.read()))
+ self.assertEqual(hash(data), hash(self.dummyfile.read()))
conn.close()
@@ -1091,25 +1108,31 @@
self.assertTrue('TVFS' in resp)
def test_opts_feat(self):
- self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'opts
mlst bad_fact')
- self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'opts
mlst type ;')
+ self.assertRaises(
+ ftplib.error_perm, self.client.sendcmd, 'opts mlst bad_fact')
+ self.assertRaises(
+ ftplib.error_perm, self.client.sendcmd, 'opts mlst type ;')
self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'opts
not_mlst')
# utility function which used for extracting the MLST "facts"
# string from the FEAT response
+
def mlst():
resp = self.client.sendcmd('feat')
return re.search(r'^\s*MLST\s+(\S+)$', resp,
re.MULTILINE).group(1)
# we rely on "type", "perm", "size", and "modify" facts which
# are those available on all platforms
self.assertTrue('type*;perm*;size*;modify*;' in mlst())
- self.assertEqual(self.client.sendcmd('opts mlst type;'), '200 MLST
OPTS type;')
- self.assertEqual(self.client.sendcmd('opts mLSt TypE;'), '200 MLST
OPTS type;')
+ self.assertEqual(self.client.sendcmd(
+ 'opts mlst type;'), '200 MLST OPTS type;')
+ self.assertEqual(self.client.sendcmd(
+ 'opts mLSt TypE;'), '200 MLST OPTS type;')
self.assertTrue('type*;perm;size;modify;' in mlst())
self.assertEqual(self.client.sendcmd('opts mlst'), '200 MLST
OPTS ')
self.assertTrue(not '*' in mlst())
- self.assertEqual(self.client.sendcmd('opts mlst
fish;cakes;'), '200 MLST OPTS ')
+ self.assertEqual(
+ self.client.sendcmd('opts mlst fish;cakes;'), '200 MLST OPTS ')
self.assertTrue(not '*' in mlst())
self.assertEqual(self.client.sendcmd('opts mlst fish;cakes;type;'),
'200 MLST OPTS type;')
@@ -1119,9 +1142,9 @@
class TestFtpCmdsSemantic(TestCase):
server_class = FTPd
client_class = ftplib.FTP
- arg_cmds =
['allo','appe','dele','eprt','mdtm','mode','mkd','opts','port',
- 'rest','retr','rmd','rnfr','rnto','site','size','stor','stru',
- 'type','user','xmkd','xrmd','site chmod']
+ arg_cmds =
['allo', 'appe', 'dele', 'eprt', 'mdtm', 'mode', 'mkd', 'opts',
+ 'port', 'rest', 'retr', 'rmd', 'rnfr', 'rnto', 'site', 'size',
+ 'stor', 'stru', 'type', 'user', 'xmkd', 'xrmd', 'site
chmod']
def setUp(self):
self.server = self.server_class()
@@ -1146,8 +1169,9 @@
def test_no_arg_cmds(self):
# Test commands accepting no arguments.
expected = "501 Syntax error: command does not accept arguments."
- for cmd in ('abor','cdup','feat','noop','pasv','pwd','quit','rein',
- 'syst','xcup','xpwd'):
+ narg_cmds = ['abor', 'cdup', 'feat', 'noop', 'pasv', 'pwd', 'quit',
+ 'rein', 'syst', 'xcup', 'xpwd']
+ for cmd in narg_cmds:
self.client.putcmd(cmd + ' arg')
resp = self.client.getmultiline()
self.assertEqual(resp, expected)
@@ -1158,8 +1182,9 @@
self.client.sendcmd('rein')
for cmd in self.server.handler.proto_cmds:
cmd = cmd.lower()
- if cmd in
('feat','help','noop','user','pass','stat','syst','quit',
- 'site', 'site help', 'pbsz', 'auth', 'prot', 'ccc'):
+ if cmd in
('feat', 'help', 'noop', 'user', 'pass', 'stat', 'syst',
+ 'quit', 'site', 'site help', 'pbsz', 'auth', 'prot',
+ 'ccc'):
continue
if cmd in self.arg_cmds:
cmd = cmd + ' arg'
@@ -1170,7 +1195,7 @@
def test_no_auth_cmds(self):
# Test those commands that do not require client to be
authenticated.
self.client.sendcmd('rein')
- for cmd in ('feat','help','noop','stat','syst','site help'):
+ for cmd in ('feat', 'help', 'noop', 'stat', 'syst', 'site help'):
self.client.sendcmd(cmd)
# STAT provided with an argument is equal to LIST hence not allowed
# if not authenticated
@@ -1180,6 +1205,7 @@
class TestFtpFsOperations(TestCase):
+
"test: PWD, CWD, CDUP, SIZE, RNFR, RNTO, DELE, MKD, RMD, MDTM, STAT"
server_class = FTPd
client_class = ftplib.FTP
@@ -1270,7 +1296,8 @@
# rnfr/rnto over non-existing paths
bogus = os.path.basename(tempfile.mktemp(dir=HOME))
self.assertRaises(ftplib.error_perm, self.client.rename,
bogus, '/x')
- self.assertRaises(ftplib.error_perm, self.client.rename,
self.tempfile, u('/'))
+ self.assertRaises(
+ ftplib.error_perm, self.client.rename, self.tempfile, u('/'))
# rnto sent without first specifying the source
self.assertRaises(ftplib.error_perm, self.client.sendcmd,
'rnto ' + self.tempfile)
@@ -1309,8 +1336,8 @@
try:
AbstractedFS.getmtime = lambda x, y: -9000000000
self.assertRaisesRegex(ftplib.error_perm,
- "550 Can't determine file's last modification
time",
- self.client.sendcmd, 'mdtm ' + self.tempfile)
+ "550 Can't determine file's last
modification time",
+ self.client.sendcmd, 'mdtm ' +
self.tempfile)
# make sure client hasn't been disconnected
self.client.sendcmd('noop')
finally:
@@ -1400,7 +1427,7 @@
self.client.storbinary('stor ' + TESTFN, self.dummy_sendfile)
self.client.retrbinary('retr ' + TESTFN,
self.dummy_recvfile.write)
self.dummy_recvfile.seek(0)
- self.assertEqual(hash(data), hash (self.dummy_recvfile.read()))
+ self.assertEqual(hash(data), hash(self.dummy_recvfile.read()))
finally:
# We do not use os.remove() because file could still be
# locked by ftpd thread. If DELE through FTP fails try
@@ -1522,7 +1549,7 @@
self.assertEqual('226', self.client.voidresp()[:3])
self.client.retrbinary('retr ' + filename,
self.dummy_recvfile.write)
self.dummy_recvfile.seek(0)
- self.assertEqual(hash(data), hash (self.dummy_recvfile.read()))
+ self.assertEqual(hash(data), hash(self.dummy_recvfile.read()))
finally:
# We do not use os.remove() because file could still be
# locked by ftpd thread. If DELE through FTP fails try
@@ -1572,7 +1599,8 @@
self.client.retrbinary("retr " + TESTFN,
self.dummy_recvfile.write)
self.dummy_recvfile.seek(0)
- self.assertEqual(hash(data1 + data2), hash
(self.dummy_recvfile.read()))
+ self.assertEqual(
+ hash(data1 + data2), hash(self.dummy_recvfile.read()))
finally:
# We do not use os.remove() because file could still be
# locked by ftpd thread. If DELE through FTP fails try
@@ -1588,7 +1616,7 @@
self.client.sendcmd('type i')
self.client.sendcmd('rest 10')
self.assertRaisesRegex(ftplib.error_temp, "Can't APPE while REST",
- self.client.sendcmd, 'appe x')
+ self.client.sendcmd, 'appe x')
def test_rest_on_stor(self):
# Test STOR preceded by REST.
@@ -1686,6 +1714,7 @@
class TestFtpRetrieveData(TestCase):
+
"Test RETR, REST, TYPE"
server_class = FTPd
client_class = ftplib.FTP
@@ -1720,7 +1749,7 @@
# attempt to retrieve a file which doesn't exist
bogus = os.path.basename(tempfile.mktemp(dir=HOME))
self.assertRaises(ftplib.error_perm, self.client.retrbinary,
- "retr " + bogus, lambda x: x)
+ "retr " + bogus, lambda x: x)
def test_retr_ascii(self):
# Test RETR in ASCII mode.
@@ -1779,7 +1808,7 @@
self.client.sendcmd('rest %s' % received_bytes)
self.client.retrbinary("retr " + TESTFN, self.dummyfile.write)
self.dummyfile.seek(0)
- self.assertEqual(hash(data), hash (self.dummyfile.read()))
+ self.assertEqual(hash(data), hash(self.dummyfile.read()))
def test_retr_empty_file(self):
self.client.retrbinary("retr " + TESTFN, self.dummyfile.write)
@@ -1835,7 +1864,7 @@
# non-existent path, 550 response is expected
bogus = os.path.basename(tempfile.mktemp(dir=HOME))
self.assertRaises(ftplib.error_perm, self.client.retrlines,
- '%s ' %cmd + bogus, lambda x: x)
+ '%s ' % cmd + bogus, lambda x: x)
# for an empty directory we excpect that the data channel is
# opened anyway and that no data is received
x = []
@@ -1863,7 +1892,7 @@
self.client.retrlines('list -la', l5.append)
tot = (l1, l2, l3, l4, l5)
for x in range(len(tot) - 1):
- self.assertEqual(tot[x], tot[x+1])
+ self.assertEqual(tot[x], tot[x + 1])
def test_mlst(self):
# utility function for extracting the line of interest
@@ -1878,7 +1907,7 @@
self.assertEqual(mlstline('mlst'), mlstline('mlst /'))
# non-existent path
bogus = os.path.basename(tempfile.mktemp(dir=HOME))
- self.assertRaises(ftplib.error_perm,
self.client.sendcmd, 'mlst '+bogus)
+ self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'mlst '
+ bogus)
# test file/dir notations
self.assertTrue('type=dir' in mlstline('mlst'))
self.assertTrue('type=file' in mlstline('mlst ' + TESTFN))
@@ -1962,6 +1991,7 @@
class TestFtpAbort(TestCase):
+
"test: ABOR"
server_class = FTPd
client_class = ftplib.FTP
@@ -2067,6 +2097,7 @@
# overridden so that the "awake" callback is executed
# immediately; this way we won't introduce any slowdown
# and still test the code of interest
+
def _throttle_bandwidth(self, *args, **kwargs):
ThrottledDTPHandler._throttle_bandwidth(self, *args,
**kwargs)
if self._throttler is not None and not
self._throttler.cancelled:
@@ -2119,6 +2150,7 @@
file.close()
self.assertEqual(hash(data), hash(file_data))
+
class TestTimeouts(TestCase):
"""Test idle-timeout capabilities of control and data channels.
Some tests may fail on slow machines.
@@ -2401,7 +2433,7 @@
# Test FTPHandler.masquerade_address_map attribute
host, port = self.client.makepasv()
self.assertEqual(host, self.server.host)
- self.server.handler.masquerade_address_map = {self.server.host :
+ self.server.handler.masquerade_address_map = {self.server.host:
"128.128.128.128"}
host, port = self.client.makepasv()
self.assertEqual(host, "128.128.128.128")
@@ -2700,6 +2732,7 @@
flag = []
class TestHandler(FTPHandler):
+
def on_connect(self):
flag.append(None)
@@ -2713,6 +2746,7 @@
flag = []
class TestHandler(FTPHandler):
+
def on_disconnect(self):
flag.append(None)
@@ -2742,7 +2776,6 @@
def on_login_failed(self, username, password):
raise Exception
-
self._setUp(TestHandler)
# shut down the server to avoid race conditions
self.tearDown()
@@ -2934,10 +2967,10 @@
self.assertEqual(self.cmdresp('eprt ||'), msg)
# port > 65535
self.assertEqual(self.cmdresp('eprt |%s|%s|65536|' % (self.proto,
- self.HOST)),
msg)
+ self.HOST)),
msg)
# port < 0
self.assertEqual(self.cmdresp('eprt |%s|%s|-1|' % (self.proto,
- self.HOST)), msg)
+ self.HOST)),
msg)
# port < 1024
resp = self.cmdresp('eprt |%s|%s|222|' % (self.proto, self.HOST))
self.assertEqual(resp[:3], '501')
@@ -2946,7 +2979,6 @@
_cmd = 'eprt |3|%s|%s|' % (self.server.host, self.server.port)
self.assertRaises(ftplib.error_perm, self.client.sendcmd, _cmd)
-
if self.proto == '1':
# len(ip.octs) > 4
self.assertEqual(self.cmdresp('eprt |1|1.2.3.4.5|2048|'), msg)
@@ -2961,7 +2993,7 @@
sock.bind((self.client.sock.getsockname()[0], 0))
sock.listen(5)
sock.settimeout(TIMEOUT)
- ip, port = sock.getsockname()[:2]
+ ip, port = sock.getsockname()[:2]
self.client.sendcmd('eprt |%s|%s|%s|' % (self.proto, ip, port))
try:
try:
@@ -2988,7 +3020,7 @@
# test connection
for cmd in ('EPSV', 'EPSV ' + self.proto):
host, port = ftplib.parse229(self.client.sendcmd(cmd),
- self.client.sock.getpeername())
+ self.client.sock.getpeername())
s = socket.socket(
self.client.af, socket.SOCK_STREAM)
s.settimeout(TIMEOUT)
try:
@@ -3032,7 +3064,8 @@
ae(self.cmdresp('port 256,0,0,1,1,1'), msg) # oct > 255
ae(self.cmdresp('port 127,0,0,1,256,1'), msg) # port > 65535
ae(self.cmdresp('port 127,0,0,1,-1,0'), msg) # port < 0
- resp = self.cmdresp('port %s,1,1' % self.HOST.replace('.',',')) #
port < 1024
+ # port < 1024
+ resp = self.cmdresp('port %s,1,1' % self.HOST.replace('.', ','))
self.assertEqual(resp[:3], '501')
self.assertIn('privileged port', resp)
if "1.2.3.4" != self.HOST:
@@ -3135,7 +3168,7 @@
sock.bind((self.client.sock.getsockname()[0], 0))
sock.listen(5)
sock.settimeout(2)
- ip, port = sock.getsockname()[:2]
+ ip, port = sock.getsockname()[:2]
self.client.sendcmd('eprt |1|%s|%s|' % (ip, port))
sock2 = None
try:
@@ -3155,7 +3188,7 @@
self.client.sock.settimeout(TIMEOUT)
self.client.login(USER, PASSWD)
host, port = ftplib.parse229(self.client.sendcmd('EPSV'),
- self.client.sock.getpeername())
+ self.client.sock.getpeername())
self.assertEqual('127.0.0.1', host)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(TIMEOUT)
@@ -3163,6 +3196,7 @@
self.assertTrue(mlstline('mlst /').endswith('/'))
s.close()
+
class TestCornerCases(TestCase):
"""Tests for any kind of strange situation for the server to be in,
mainly referring to bugs signaled on the bug tracker.
@@ -3193,7 +3227,7 @@
sock.bind((self.client.sock.getsockname()[0], 0))
sock.listen(5)
sock.settimeout(TIMEOUT)
- host, port = sock.getsockname()[:2]
+ host, port = sock.getsockname()[:2]
hbytes = host.split('.')
pbytes = [repr(port // 256), repr(port % 256)]
@@ -3406,7 +3440,6 @@
self.assertRaises(ftplib.error_perm, self.client.sendcmd,
'site chmod 777 ' + TESTFN_UNICODE)
-
# --- listing cmds
def _test_listing_cmds(self, cmd):
@@ -3436,19 +3469,18 @@
# utility function for extracting the line of interest
mlstline = lambda cmd: self.client.voidcmd(cmd).split('\n')[1]
if self.utf8fs:
- self.assertTrue('type=dir' in \
+ self.assertTrue('type=dir' in
mlstline('mlst ' + TESTFN_UNICODE))
- self.assertTrue('/' + TESTFN_UNICODE in \
+ self.assertTrue('/' + TESTFN_UNICODE in
mlstline('mlst ' + TESTFN_UNICODE))
- self.assertTrue('type=file' in \
+ self.assertTrue('type=file' in
mlstline('mlst ' + TESTFN_UNICODE_2))
- self.assertTrue('/' + TESTFN_UNICODE_2 in \
+ self.assertTrue('/' + TESTFN_UNICODE_2 in
mlstline('mlst ' + TESTFN_UNICODE_2))
else:
self.assertRaises(ftplib.error_perm,
mlstline, 'mlst ' + TESTFN_UNICODE)
-
# --- file transfer
def test_stor(self):
@@ -3608,31 +3640,32 @@
logging.basicConfig(level=logging.WARNING)
remove_test_files()
+
def test_main(tests=None):
test_suite = unittest.TestSuite()
if tests is None:
tests = [
- TestAbstractedFS,
- TestDummyAuthorizer,
- TestCallLater,
- TestCallEvery,
- TestFtpAuthentication,
- TestFtpDummyCmds,
- TestFtpCmdsSemantic,
- TestFtpFsOperations,
- TestFtpStoreData,
- TestFtpRetrieveData,
- TestFtpListingCmds,
- TestFtpAbort,
- TestThrottleBandwidth,
- TestTimeouts,
- TestConfigurableOptions,
- TestCallbacks,
- TestFTPServer,
- TestCornerCases,
- #TestUnicodePathNames, # TODO: fix errors and re-enable
- TestCommandLineParser,
- ]
+ TestAbstractedFS,
+ TestDummyAuthorizer,
+ TestCallLater,
+ TestCallEvery,
+ TestFtpAuthentication,
+ TestFtpDummyCmds,
+ TestFtpCmdsSemantic,
+ TestFtpFsOperations,
+ TestFtpStoreData,
+ TestFtpRetrieveData,
+ TestFtpListingCmds,
+ TestFtpAbort,
+ TestThrottleBandwidth,
+ TestTimeouts,
+ TestConfigurableOptions,
+ TestCallbacks,
+ TestFTPServer,
+ TestCornerCases,
+ # TestUnicodePathNames, # TODO: fix errors and re-enable
+ TestCommandLineParser,
+ ]
if SUPPORTS_IPV4:
tests.append(TestIPv4Environment)
else: