from concurrent.futures import ThreadPoolExecutor
from functools import wraps
from werkzeug.security import check_password_hash, generate_password_hash
__all__ = (
'SHA1Password',
'SHA256Password',
'SHA512Password',
'MD5Password',
'PlainPassword',
'PasswordThreadPoolExecutor'
)
MAXIMUM_PASSWORD_LENGTH = 64
MINIMUM_PASSWORD_LENGTH = 8
def validate_password(length):
def decorator(func):
@wraps(func)
def wrapper(self, password, *args, **kwargs):
password_len = len(password)
if password_len < length or password_len > MAXIMUM_PASSWORD_LENGTH:
raise ValueError(
'The "password" must be greater than or equal to %s '
'or less than or equal to %s'
% (length, MAXIMUM_PASSWORD_LENGTH)
)
return func(self, password, *args, **kwargs)
return wrapper
return decorator
class _PasswordHasher(object):
_algorithm = None
@validate_password(MINIMUM_PASSWORD_LENGTH)
def make(self, password, salt_length=8):
return generate_password_hash(password, self._algorithm, salt_length)
@validate_password(MINIMUM_PASSWORD_LENGTH)
def verify(self, password, password_hash):
return check_password_hash(password_hash, password)
class SHA1Password(_PasswordHasher):
_algorithm = 'pbkdf2:sha1'
class SHA256Password(_PasswordHasher):
_algorithm = 'pbkdf2:sha256'
class SHA512Password(_PasswordHasher):
_algorithm = 'pbkdf2:sha512'
class MD5Password(_PasswordHasher):
_algorithm = 'pbkdf2:md5'
class PlainPassword(_PasswordHasher):
_algorithm = 'plain'
class PasswordThreadPoolExecutor(ThreadPoolExecutor):
def _get_client(self, algorithm='sha256'):
client = {
'sha1': SHA1Password,
'sha256': SHA256Password,
'sha512': SHA512Password,
'md5': MD5Password,
'plain': PlainPassword
}[algorithm.lower()]
return client()
def make(self, password, salt_length=8, algorithm='sha256'):
client = self._get_client(algorithm)
return self.submit(client.make, password, salt_length)
def verify(self, password, password_hash, algorithm='sha256'):
client = self._get_client(algorithm)
return self.submit(client.verify, password, password_hash)