[pycopia] r754 committed - Unicode strings cause problems here....

2 views
Skip to first unread message

pyc...@googlecode.com

unread,
Dec 20, 2014, 6:00:30 AM12/20/14
to pyc...@googlegroups.com
Revision: 754
Author: keith.dart
Date: Sat Dec 20 11:00:08 2014 UTC
Log: Unicode strings cause problems here.

ALso pep8-ified.

https://code.google.com/p/pycopia/source/detail?r=754

Modified:
/trunk/net/pycopia/ssl/certs.py

=======================================
--- /trunk/net/pycopia/ssl/certs.py Wed May 15 01:38:51 2013 UTC
+++ /trunk/net/pycopia/ssl/certs.py Sat Dec 20 11:00:08 2014 UTC
@@ -33,7 +33,6 @@
"""
from __future__ import absolute_import
from __future__ import print_function
-from __future__ import unicode_literals
from __future__ import division

import re
@@ -44,10 +43,10 @@


_FILETYPES = {
- b"pem": crypto.FILETYPE_PEM,
- b"crt": crypto.FILETYPE_PEM,
- b"der": crypto.FILETYPE_ASN1,
- b"asn1": crypto.FILETYPE_ASN1,
+ "pem": crypto.FILETYPE_PEM,
+ "crt": crypto.FILETYPE_PEM,
+ "der": crypto.FILETYPE_ASN1,
+ "asn1": crypto.FILETYPE_ASN1,
}

VERSION_1 = 1
@@ -58,15 +57,16 @@
class CertError(Exception):
pass

+
class ExtensionEncodingError(CertError):
pass


class CertificateRequest(object):

- def __init__(self, country=None, state=None, locality=None,
organization=None,
- organization_unit=None, name=None, email=None, digest=b"sha1",
- filename=None):
+ def __init__(self, country=None, state=None, locality=None,
+ organization=None, organization_unit=None,
+ name=None, email=None, digest="sha1", filename=None):
if filename is None:
req = crypto.X509Req()
subject = req.get_subject()
@@ -134,7 +134,8 @@
return self._req.verify(pubkey._key)

def emit(self, fo, filetype="pem"):
- fo.write(crypto.dump_certificate_request(_FILETYPES[filetype],
self._req))
+ fo.write(crypto.dump_certificate_request(_FILETYPES[filetype],
+ self._req))

def get_pem(self):
return crypto.dump_certificate_request(crypto.FILETYPE_PEM,
self._req)
@@ -142,8 +143,8 @@

class PrivateKey(object):
def __init__(self, filename=None, text=None, passphrase=None,
- filetype="pem", bits=2048, _key=None):
- self.__passphrase = passphrase # can also be a callable
+ filetype="pem", bits=2048, _key=None):
+ self.__passphrase = passphrase # can also be a callable
if _key is not None:
key = _key
else:
@@ -162,7 +163,7 @@
self._key = key

bits = property(lambda self: self._key.bits())
- type = property(lambda self: {6:"RSA", 116:"DSA"}[self._key.type()])
+ type = property(lambda self: {6: "RSA", 116: "DSA"}[self._key.type()])

def check(self):
self._key.check()
@@ -174,12 +175,12 @@
self.__passphrase = None

passphrase = property(None, _set_pw, _del_pw,
- doc="The passphrase (write only attribute).")
+ doc="The passphrase (write only attribute).")

def encrypt(self, passphrase):
self._key.check()
text = crypto.dump_privatekey(crypto.FILETYPE_PEM, self._key,
- b'aes-256-cbc', passphrase)
+ 'aes-256-cbc', passphrase)
return text

def emit(self, fo, filetype="pem"):
@@ -282,7 +283,8 @@
pubkey = property(_get_pubkey, _set_pubkey)

expired = property(lambda self: bool(self._cert.has_expired()))
- signature_algorithm = property(lambda self:
self._cert.get_signature_algorithm())
+ signature_algorithm = property(
+ lambda self: self._cert.get_signature_algorithm())
subject_name_hash = property(lambda self:
self._cert.subject_name_hash())

def digest(self, digest_name):
@@ -301,8 +303,9 @@


class DistinguishedName(object):
- def __init__(self, country=None, state=None, locality=None,
organization=None,
- organization_unit=None, name=None, email=None, _dn=None):
+ def __init__(self, country=None, state=None, locality=None,
+ organization=None, organization_unit=None,
+ name=None, email=None, _dn=None):
if _dn is None:
dn = crypto.X509Req().get_subject()
if country:
@@ -321,14 +324,14 @@
dn.emailAddress = email
else:
dn = _dn
- self.__dict__[b"_dn"] = dn
+ self.__dict__["_dn"] = dn

der = property(lambda self: self._dn.der())
hash = property(lambda self: self._dn.hash())
componenents = property(lambda self: self._dn.get_components())

def __getattr__(self, name):
- return getattr(self.__dict__[b"_dn"], name)
+ return getattr(self.__dict__["_dn"], name)

def __setattr__(self, name, value):
return setattr(self._dn, name, value)
@@ -354,24 +357,25 @@
more informative string representation.
"""
def __init__(self, typename=None, critical=None, value=None,
- subject=None, issuer=None, _ext=None):
+ subject=None, issuer=None, _ext=None):
if _ext is not None:
ext = _ext
- # The following is necessary due to the nature of the underlying C
implementation.
+ # The following is necessary due to the nature of the
+ # underlying C implementation.
elif subject is None and issuer is None:
ext = crypto.X509Extension(typename, critical, value)
elif subject is not None and issuer is None:
subject = subject._cert
ext = crypto.X509Extension(typename, critical, value,
- subject=subject)
+ subject=subject)
elif subject is None and issuer is not None:
issuer = issuer._cert
ext = crypto.X509Extension(typename, critical, value,
- issuer=issuer)
+ issuer=issuer)
elif subject is not None and issuer is not None:
issuer = issuer._cert
ext = crypto.X509Extension(typename, critical, value,
- subject=subject, issuer=issuer)
+ subject=subject, issuer=issuer)
self._ext = ext

critical = property(lambda self: self._ext.get_critical())
@@ -382,9 +386,9 @@
def __str__(self):
ext = self._ext
return "{} = {}{}".format(
- ext.get_short_name(),
- "critical," if self._ext.get_critical() else "",
- str(ext).strip())
+ ext.get_short_name(),
+ "critical," if self._ext.get_critical() else "",
+ str(ext).strip())

def _der_(self):
return self._ext.get_data()
@@ -400,7 +404,8 @@
value = "CA:FALSE"
if pathlen is not None:
value += ", pathlen:{:d}".format(pathlen)
- super(BasicConstraints, self).__init__("basicConstraints",
critical, value)
+ super(BasicConstraints, self).__init__("basicConstraints",
critical,
+ value)


class KeyUsage(Extension):
@@ -442,24 +447,26 @@
"""
def __init__(self, values, critical=False):
assert len(values) > 0, "Need at least one value."
- super(ExtendedKeyUsage, self).__init__("extendedKeyUsage",
critical, ", ".join(values))
+ super(ExtendedKeyUsage, self).__init__("extendedKeyUsage",
critical,
+ ", ".join(values))


class AuthorityKeyIdentifier(Extension):
def __init__(self, issuer, critical=False):
super(AuthorityKeyIdentifier, self).__init__(
- "authorityKeyIdentifier", critical, "keyid,issuer",
issuer=issuer)
+ "authorityKeyIdentifier", critical, "keyid,issuer",
+ issuer=issuer)


class SubjectKeyIdentifier(Extension):
def __init__(self, subject, critical=False):
super(SubjectKeyIdentifier, self).__init__(
- "subjectKeyIdentifier", critical, "hash", subject=subject)
+ "subjectKeyIdentifier", critical, "hash", subject=subject)


class SubjectAltName(Extension):
- """The subject alternative name extension allows various literal
values to be
- included in the certificate.
+ """The subject alternative name extension allows various literal
values to
+ be included in the certificate.

These include email (an email address) URI a uniform resource
indicator,
DNS (a DNS domain name), RID (a registered ID: OBJECT IDENTIFIER), IP
(an
@@ -479,7 +486,8 @@
for value in values:
pre = guess_altname_prefix(value)
new.append("{}:{}".format(pre, value))
- super(SubjectAltName, self).__init__("subjectAltName",
critical, ",".join(new))
+ super(SubjectAltName, self).__init__("subjectAltName", critical,
+ ",".join(new))


class IssuerAltName(Extension):
@@ -489,17 +497,18 @@
for value in values:
if isinstance(value, Certificate):
super(IssuerAltName, self).__init__(
- "issuerAltName", critical, "issuer:copy",
issuer=value)
+ "issuerAltName", critical, "issuer:copy", issuer=value)
break
elif isinstance(value, crypto.X509):
super(IssuerAltName, self).__init__(
- "issuerAltName", critical, "issuer:copy",
- issuer=Certificate(_cert=value))
+ "issuerAltName", critical, "issuer:copy",
+ issuer=Certificate(_cert=value))
break
pre = guess_altname_prefix(value)
new.append("{}:{}".format(pre, value))
else:
- super(IssuerAltName, self).__init__("issuerAltName",
critical, ",".join(new))
+ super(IssuerAltName, self).__init__("issuerAltName", critical,
+ ",".join(new))


class AuthorityInfoAccess(Extension):
@@ -510,7 +519,8 @@
--------
AuthorityInfoAccess(ocsp="http://ocsp.my.host/")
AuthorityInfoAccess(caissuers="http://my.ca/ca.html")
- AuthorityInfoAccess(ocsp="http://ocsp.my.host/",
caissuers="http://my.ca/ca.html")
+ AuthorityInfoAccess(ocsp="http://ocsp.my.host/",
+ caissuers="http://my.ca/ca.html")
"""
def __init__(self, ocsp=None, caissuers=None, critical=False):
new = []
@@ -523,7 +533,7 @@
if not new:
raise ValueError("You must supply at least ocsp or caissuers")
super(AuthorityInfoAccess, self).__init__(
- "authorityInfoAccess", critical, ",".join(new))
+ "authorityInfoAccess", critical, ",".join(new))


class CrlDistributionPoints(Extension):
@@ -535,7 +545,8 @@
for value in values:
pre = guess_altname_prefix(value)
new.append("{}:{}".format(pre, value))
- super(CrlDistributionPoints,
self).__init__("crlDistributionPoints", critical, ",".join(new))
+ super(CrlDistributionPoints,
self).__init__("crlDistributionPoints",
+
critical, ",".join(new))


class PolicyConstraints(Extension):
@@ -543,16 +554,21 @@
requireExplicitPolicy or inhibitPolicyMapping

"""
- def __init__(self, requireexplicitpolicy=None,
inhibitpolicymapping=None, critical=False):
+ def __init__(self, requireexplicitpolicy=None,
inhibitpolicymapping=None,
+ critical=False):
new = []
if requireexplicitpolicy is not None:
-
new.append("requireExplicitPolicy:{:d}".format(requireexplicitpolicy))
+ new.append("requireExplicitPolicy:{:d}".format(
+ requireexplicitpolicy))
if inhibitpolicymapping is not None:
-
new.append("inhibitPolicyMapping:{:d}".format(inhibitpolicymapping))
+ new.append("inhibitPolicyMapping:{:d}".format(
+ inhibitpolicymapping))
if not new:
raise ValueError(
- "You must supply at least one of inhibitpolicymapping
requireexplicitpolicy")
- super(PolicyConstraints, self).__init__("policyConstraints",
critical, ",".join(new))
+ "You must supply at least one of "
+ "inhibitpolicymapping or requireexplicitpolicy")
+ super(PolicyConstraints, self).__init__("policyConstraints",
critical,
+ ",".join(new))


class InhibitAnyPolicy(Extension):
@@ -561,21 +577,23 @@
Value is a non-negative integer.
"""
def __init__(self, value, critical=False):
- super(InhibitAnyPolicy, self).__init__("inhibitAnyPolicy",
critical, str(value))
+ super(InhibitAnyPolicy, self).__init__("inhibitAnyPolicy",
critical,
+ str(value))


class NameConstraints(Extension):
"""Named constraints.
"""
def __init__(self, permitted=None, excluded=None, critical=False):
- new=[]
+ new = []
if permitted is not None:
pre = guess_altname_prefix(permitted)
new.append("permitted;{}:{}".format(pre, permitted))
if excluded is not None:
pre = guess_altname_prefix(excluded)
new.append("excluded;{}:{}".format(pre, excluded))
- super(NameConstraints, self).__init__("nameConstraints",
critical, ",".join(new))
+ super(NameConstraints, self).__init__("nameConstraints", critical,
+ ",".join(new))


class OCSPNoCheck(Extension):
@@ -591,7 +609,7 @@
elif "://" in string:
return "URI"
elif "::" in string:
- return "IP" # ipv6
+ return "IP" # ipv6
elif re.search(br"(?:\D+\.)+\D{1,4}", string, re.I):
return "DNS"
elif re.search(br"(?:[0-9]{1,3}\.)+\d{1,3}", string):
@@ -605,16 +623,21 @@
pkey.generate_key(crypto.TYPE_RSA, bits)
return PrivateKey(_key=pkey)

+
def create_dsa_keypair(bits=2048):
pkey = crypto.PKey()
pkey.generate_key(crypto.TYPE_DSA, bits)
return PrivateKey(_key=pkey)

+
def now_utc():
return datetime.now(pytz.utc)

+
def get_asn1time(when):
- """Return an ASN1 normalized time from a datetime object or ISO 8601
string."""
+ """Return an ASN1 normalized time from a datetime object or ISO 8601
+ string.
+ """
if when is None:
when = now_utc()
if isinstance(when, str):
@@ -623,11 +646,13 @@
assert type(when) is datetime
return when.strftime("%Y%m%d%H%M%S%z")

+
def get_datetime(when):
"""Return datetime object (UTC) from ASN1 time string."""
dt = datetime.strptime(when, "%Y%m%d%H%M%SZ")
return dt.replace(tzinfo=pytz.utc)

+
def get_type_and_text(filename):
ftype = _FILETYPES[filename[-3:]]
text = open(filename).read()
@@ -638,7 +663,8 @@
try:
return obj._der_()
except AttributeError:
- raise ExtensionEncodingError("Not an extension object:
{!r}".format(obj))
+ raise ExtensionEncodingError(
+ "Not an extension object: {!r}".format(obj))


if __name__ == "__main__":
@@ -646,17 +672,17 @@
from pycopia import autodebug

req = CertificateRequest(country="US",
- state="California",
- locality="Mountain View",
- organization="Acme Labs Inc.",
- name="www.acmeacme.com")
+ state="California",
+ locality="Mountain View",
+ organization="Acme Labs Inc.",
+ name="www.acmeacme.com")
req.emit(sys.stdout)

pw = PrivateKey()
- ektext = pw.encrypt(b"secret")
+ ektext = pw.encrypt("secret")
print(repr(ektext))
- npw = PrivateKey(text=ektext, passphrase=b"secret")
- #npw.emit(sys.stdout)
+ npw = PrivateKey(text=ektext, passphrase="secret")
+ # npw.emit(sys.stdout)

cert = Certificate()
dt = now_utc()
@@ -668,64 +694,67 @@
print(gext)
print('----------')

- #cacert = Certificate(filename="/etc/pycopia/ssl/CA/cacert.pem")
+ # cacert = Certificate(filename="/etc/pycopia/ssl/CA/cacert.pem")

cert = Certificate(filename="/var/tmp/github.pem")
print(cert.subject)
print(cert.issuer)
print(cert.notafter)

- print(b"\n== Extensions :")
- rawext = crypto.X509Extension("keyUsage", False, "digitalSignature,
nonRepudiation")
+ print("\n== Extensions :")
+ rawext = crypto.X509Extension("keyUsage", False,
+ "digitalSignature, nonRepudiation")
ext = Extension(_ext=rawext)
print(ext)
- print( KeyUsage(["digitalSignature", "keyEncipherment"]) )
- print( BasicConstraints(is_ca=True) )
- print( BasicConstraints(is_ca=True, critical=True) )
- print( BasicConstraints(is_ca=True, critical=True, pathlen=0) )
- print( BasicConstraints(is_ca=False) )
- print( ExtendedKeyUsage( ["serverAuth", "clientAuth"] ) )
- print( SubjectKeyIdentifier(cert) )
- #print( AuthorityKeyIdentifier(cacert) )
- print( SubjectAltName(["m...@other.address", "http://my.url.here/"]) )
- print( SubjectAltName(["192.168.7.1"]) )
- print( SubjectAltName(["13::17"]) )
- print( SubjectAltName(["www.mydomain.com", "*.mydomain.com"]) )
- print( IssuerAltName([cert]) )
- print( IssuerAltName(["issuer.com"]) )
- print( AuthorityInfoAccess(ocsp="http://ocsp.my.host/") )
- print( AuthorityInfoAccess(caissuers="http://my.ca/ca.html") )
- print( AuthorityInfoAccess(ocsp="http://ocsp.my.host/",
caissuers="http://my.ca/ca.html") )
- print( CrlDistributionPoints(["http://myhost.com/myca.crl"]) )
- print( PolicyConstraints(requireexplicitpolicy=3) )
- print( PolicyConstraints(inhibitpolicymapping=3) )
- print( InhibitAnyPolicy(3) )
- print( NameConstraints(permitted="192.168.0.0/255.255.0.0") )
- print( NameConstraints(excluded="172.16.0.0/255.255.0.0") )
- print( OCSPNoCheck() )
- print( OCSPNoCheck(critical=True) )
+ print(KeyUsage(["digitalSignature", "keyEncipherment"]))
+ print(BasicConstraints(is_ca=True))
+ print(BasicConstraints(is_ca=True, critical=True))
+ print(BasicConstraints(is_ca=True, critical=True, pathlen=0))
+ print(BasicConstraints(is_ca=False))
+ print(ExtendedKeyUsage(["serverAuth", "clientAuth"]))
+ print(SubjectKeyIdentifier(cert))
+ # print( AuthorityKeyIdentifier(cacert))
+ print(SubjectAltName(["m...@other.address", "http://my.url.here/"]))
+ print(SubjectAltName(["192.168.7.1"]))
+ print(SubjectAltName(["13::17"]))
+ print(SubjectAltName(["www.mydomain.com", "*.mydomain.com"]))
+ print(IssuerAltName([cert]))
+ print(IssuerAltName(["issuer.com"]))
+ print(AuthorityInfoAccess(ocsp="http://ocsp.my.host/"))
+ print(AuthorityInfoAccess(caissuers="http://my.ca/ca.html"))
+ print(AuthorityInfoAccess(ocsp="http://ocsp.my.host/",
+ caissuers="http://my.ca/ca.html"))
+ print(CrlDistributionPoints(["http://myhost.com/myca.crl"]))
+ print(PolicyConstraints(requireexplicitpolicy=3))
+ print(PolicyConstraints(inhibitpolicymapping=3))
+ print(InhibitAnyPolicy(3))
+ print(NameConstraints(permitted="192.168.0.0/255.255.0.0"))
+ print(NameConstraints(excluded="172.16.0.0/255.255.0.0"))
+ print(OCSPNoCheck())
+ print(OCSPNoCheck(critical=True))

- dn = DistinguishedName(country="US", state="California",
organization="myCA")
+ dn = DistinguishedName(country="US", state="California",
+ organization="myCA")
print(dn)

# example of creating a more complex req with extensions.
req = CertificateRequest(
- country="US",
- state="California",
- locality="Mountain View",
- organization="Acme Labs Inc.",
- organization_unit="Slaves",
- name="www.foo.com",
- )
+ country="US",
+ state="California",
+ locality="Mountain View",
+ organization="Acme Labs Inc.",
+ organization_unit="Slaves",
+ name="www.foo.com",
+ )
req.extensions = [
- BasicConstraints(is_ca=False),
-
KeyUsage(["nonRepudiation", "digitalSignature", "keyEncipherment"]),
- SubjectAltName(["www.foo.com", "www.bar.com"]),
- ]
+ BasicConstraints(is_ca=False),
+ KeyUsage(["nonRepudiation", "digitalSignature",
+ "keyEncipherment"]),
+ SubjectAltName(["www.foo.com", "www.bar.com"]),
+ ]
req.pubkey = npw
req.sign(npw)
with open("/tmp/testreq.pem", "w+") as fo:
req.emit(fo)
with open("/tmp/testreq.key", "w+") as fo:
npw.emit(fo)
-
Reply all
Reply to author
Forward
0 new messages