227 lines
9.2 KiB
Python
227 lines
9.2 KiB
Python
from flask import current_app
|
|
from cryptography import x509
|
|
from cryptography.hazmat.backends import default_backend
|
|
from cryptography.hazmat.primitives import hashes
|
|
from cryptography.hazmat.primitives import serialization
|
|
from cryptography.hazmat.primitives.asymmetric import rsa
|
|
from cryptography.x509.oid import NameOID, ExtendedKeyUsageOID
|
|
from cryptography.x509 import ObjectIdentifier
|
|
from pathlib import Path
|
|
import os
|
|
import string
|
|
import re
|
|
import datetime
|
|
import logging
|
|
|
|
from .model import Service, User, Certificate
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# partial source from https://github.com/Snawoot/quickcerts
|
|
# MIT (C) Snawoot
|
|
|
|
DAY = datetime.timedelta(1, 0, 0)
|
|
CA_FILENAME = 'ca'
|
|
KEY_EXT = 'key'
|
|
CERT_EXT = 'pem'
|
|
E = 65537
|
|
|
|
|
|
safe_symbols = set(string.ascii_letters + string.digits + '-.')
|
|
|
|
|
|
def safe_filename(name):
|
|
return "".join(c if c in safe_symbols else '_' for c in name)
|
|
|
|
|
|
class Pki(object):
|
|
|
|
def __init__(self, pki_path: str, domain: str):
|
|
'''
|
|
pki_path: str base path from the pkis
|
|
'''
|
|
self._pki_path = Path(pki_path)
|
|
self._domain = domain
|
|
|
|
|
|
def _init_ca(self, service: Service):
|
|
'''
|
|
'''
|
|
ca_name = service.name
|
|
|
|
ca_private_key = self._ensure_private_key(ca_name)
|
|
ca_cert = self._ensure_ca_cert(ca_name, ca_private_key)
|
|
|
|
pki_path = self._pki_path / ca_name
|
|
if not pki_path.exists():
|
|
pki_path.mkdir()
|
|
|
|
return (ca_private_key, ca_cert)
|
|
|
|
def get_client_certs(self, user: User, service: Service):
|
|
pki_path = self._pki_path / service.name
|
|
certs = []
|
|
for cert_path in pki_path.glob(f'{user.username}*.crt.pem'):
|
|
print(cert_path)
|
|
with cert_path.open('rb') as cert_fd:
|
|
cert_data = x509.load_pem_x509_certificate(
|
|
cert_fd.read(),
|
|
backend=default_backend())
|
|
cert = Certificate(user.username, service.name, cert_data)
|
|
certs.append(cert)
|
|
return certs
|
|
|
|
def signing_publickey(self, user: User, service: Service, publickey: str, valid_time=DAY*365):
|
|
_public_key = serialization.load_pem_public_key(
|
|
publickey.encode(), backend=default_backend())
|
|
|
|
ca_private_key, ca_cert = self._init_ca(service)
|
|
ca_name = service.name
|
|
username = str(user.username)
|
|
config = service.pki_config #TODO use this config
|
|
domain = self._domain
|
|
not_valid_before = datetime.datetime.now()
|
|
|
|
ca_public_key = ca_private_key.public_key()
|
|
end_entity_cert_builder = x509.CertificateBuilder().\
|
|
subject_name(x509.Name([
|
|
x509.NameAttribute(NameOID.COMMON_NAME, config['cn'].format(username=username, domain=domain)),
|
|
x509.NameAttribute(NameOID.EMAIL_ADDRESS, config['email'].format(username=username, domain=domain)),
|
|
])).\
|
|
issuer_name(ca_cert.subject).\
|
|
not_valid_before(not_valid_before).\
|
|
not_valid_after(not_valid_before + valid_time).\
|
|
serial_number(x509.random_serial_number()).\
|
|
public_key(_public_key).\
|
|
add_extension(
|
|
x509.SubjectAlternativeName([
|
|
x509.DNSName(f'{username}'),
|
|
]),
|
|
critical=False).\
|
|
add_extension(
|
|
x509.BasicConstraints(ca=False, path_length=None),
|
|
critical=True).\
|
|
add_extension(
|
|
x509.KeyUsage(digital_signature=True,
|
|
content_commitment=True, # False
|
|
key_encipherment=True,
|
|
data_encipherment=False,
|
|
key_agreement=False,
|
|
key_cert_sign=False,
|
|
crl_sign=False,
|
|
encipher_only=False,
|
|
decipher_only=False),
|
|
critical=True).\
|
|
add_extension(
|
|
x509.ExtendedKeyUsage([
|
|
ExtendedKeyUsageOID.CLIENT_AUTH,
|
|
ExtendedKeyUsageOID.SERVER_AUTH,
|
|
]), critical=False).\
|
|
add_extension(
|
|
x509.AuthorityKeyIdentifier.from_issuer_public_key(ca_public_key),
|
|
critical=False).\
|
|
add_extension(
|
|
x509.SubjectKeyIdentifier.from_public_key(_public_key),
|
|
critical=False).\
|
|
add_extension(
|
|
x509.AuthorityInformationAccess([
|
|
x509.AccessDescription(
|
|
access_method=x509.AuthorityInformationAccessOID.CA_ISSUERS,
|
|
access_location=x509.UniformResourceIdentifier(f'https://www.{self._domain}')),
|
|
x509.AccessDescription(
|
|
access_method=x509.AuthorityInformationAccessOID.OCSP,
|
|
access_location=x509.UniformResourceIdentifier(f'http://ocsp.{self._domain}/{ca_name}/'))
|
|
]),
|
|
critical=False)
|
|
|
|
end_entity_cert = end_entity_cert_builder.\
|
|
sign(
|
|
private_key=ca_private_key,
|
|
algorithm=hashes.SHA256(),
|
|
backend=default_backend()
|
|
)
|
|
|
|
fingerprint =end_entity_cert.fingerprint(hashes.SHA256()).hex()
|
|
end_entity_cert_filename = self._pki_path / ca_name / \
|
|
f'{safe_filename(username)}-{fingerprint}.crt.pem'
|
|
# save cert
|
|
with end_entity_cert_filename.open("wb") as end_entity_cert_file:
|
|
end_entity_cert_file.write(
|
|
end_entity_cert.public_bytes(encoding=serialization.Encoding.PEM))
|
|
|
|
return Certificate(user.username, service.name, end_entity_cert)
|
|
|
|
def get_ca_cert_pem(self, service: Service):
|
|
ca_private_key, ca_cert = self._init_ca(service)
|
|
return ca_cert.public_bytes(encoding=serialization.Encoding.PEM).decode()
|
|
|
|
def _ensure_private_key(self, name, key_size=4096):
|
|
key_filename = self._pki_path / f'{safe_filename(name)}.key.pem'
|
|
if key_filename.exists():
|
|
with open(key_filename, "rb") as key_file:
|
|
private_key = serialization.load_pem_private_key(key_file.read(),
|
|
password=None, backend=default_backend())
|
|
else:
|
|
logger.info(f'Generate new Private key for {name}')
|
|
private_key = rsa.generate_private_key(public_exponent=E,
|
|
key_size=key_size, backend=default_backend())
|
|
with key_filename.open('wb') as key_file:
|
|
key_file.write(private_key.private_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
|
encryption_algorithm=serialization.NoEncryption()))
|
|
return private_key
|
|
|
|
def _ensure_ca_cert(self, ca_name, ca_private_key):
|
|
ca_cert_filename =self._pki_path / f'{ca_name}.crt.pem'
|
|
ca_public_key = ca_private_key.public_key()
|
|
if ca_cert_filename.exists():
|
|
with ca_cert_filename.open("rb") as ca_cert_file:
|
|
ca_cert = x509.load_pem_x509_certificate(
|
|
ca_cert_file.read(),
|
|
backend=default_backend())
|
|
else:
|
|
logger.info(f'Generate new Certificate key for {ca_name}')
|
|
iname = x509.Name([
|
|
x509.NameAttribute(NameOID.COMMON_NAME, f'Lenticular Cloud CA - {ca_name}'),
|
|
x509.NameAttribute(NameOID.ORGANIZATION_NAME,
|
|
'Lenticluar Cloud'),
|
|
x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME,
|
|
ca_name),
|
|
])
|
|
ca_cert = x509.CertificateBuilder().\
|
|
subject_name(iname).\
|
|
issuer_name(iname).\
|
|
not_valid_before(datetime.datetime.today() - DAY).\
|
|
not_valid_after(datetime.datetime.today() + 3650 * DAY).\
|
|
serial_number(x509.random_serial_number()).\
|
|
public_key(ca_public_key).\
|
|
add_extension(
|
|
x509.BasicConstraints(ca=True, path_length=None),
|
|
critical=True).\
|
|
add_extension(
|
|
x509.KeyUsage(digital_signature=False,
|
|
content_commitment=False,
|
|
key_encipherment=False,
|
|
data_encipherment=False,
|
|
key_agreement=False,
|
|
key_cert_sign=True,
|
|
crl_sign=True,
|
|
encipher_only=False,
|
|
decipher_only=False),
|
|
critical=True).\
|
|
add_extension(
|
|
x509.SubjectKeyIdentifier.from_public_key(ca_public_key),
|
|
critical=False).\
|
|
sign(
|
|
private_key=ca_private_key,
|
|
algorithm=hashes.SHA256(),
|
|
backend=default_backend()
|
|
)
|
|
with open(ca_cert_filename, "wb") as ca_cert_file:
|
|
ca_cert_file.write(
|
|
ca_cert.public_bytes(encoding=serialization.Encoding.PEM))
|
|
assert isinstance(ca_cert, x509.Certificate)
|
|
return ca_cert
|