lenticular_cloud2/lenticular_cloud/pki.py

216 lines
8.5 KiB
Python

from flask import current_app
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509.oid import NameOID, ExtendedKeyUsageOID
from cryptography.x509 import ObjectIdentifier
from pathlib import Path
import os
import string
import re
import datetime
import logging
from .model import Service, User, Certificate
logger = logging.getLogger(__name__)
# partial source from https://github.com/Snawoot/quickcerts
# MIT (C) Snawoot
DAY = datetime.timedelta(1, 0, 0)
CA_FILENAME = 'ca'
KEY_EXT = 'key'
CERT_EXT = 'pem'
E = 65537
safe_symbols = set(string.ascii_letters + string.digits + '-.')
def safe_filename(name):
return "".join(c if c in safe_symbols else '_' for c in name)
class Pki(object):
def __init__(self, pki_path: str, domain: str):
'''
pki_path: str base path from the pkis
'''
self._pki_path = Path(pki_path)
self._domain = domain
def _init_ca(self, service: Service):
'''
'''
ca_name = service.name
ca_private_key = self._ensure_private_key(ca_name)
ca_cert = self._ensure_ca_cert(ca_name, ca_private_key)
pki_path = self._pki_path / ca_name
if not pki_path.exists():
pki_path.mkdir()
return (ca_private_key, ca_cert)
def get_client_certs(self, user: User, service: Service):
pki_path = self._pki_path / service.name
certs = []
for cert_path in pki_path.glob(f'{user.username}*.crt.pem'):
print(cert_path)
with cert_path.open('rb') as cert_fd:
cert_data = x509.load_pem_x509_certificate(
cert_fd.read(),
backend=default_backend())
cert = Certificate(user.username, service.name, cert_data)
certs.append(cert)
return certs
def signing_publickey(self, user: User, service: Service, publickey: str, valid_time=DAY*365):
_public_key = serialization.load_pem_public_key(
publickey.encode(), backend=default_backend())
ca_private_key, ca_cert = self._init_ca(service)
ca_name = service.name
username = str(user.username)
config = service.pki_config #TODO use this config
domain = self._domain
not_valid_before = datetime.datetime.now()
ca_public_key = ca_private_key.public_key()
end_entity_cert_builder = x509.CertificateBuilder().\
subject_name(x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, username),
x509.NameAttribute(NameOID.EMAIL_ADDRESS, f'{username}@jabber.{domain}'),
])).\
issuer_name(ca_cert.subject).\
not_valid_before(not_valid_before).\
not_valid_after(not_valid_before + valid_time).\
serial_number(x509.random_serial_number()).\
public_key(_public_key).\
add_extension(
x509.SubjectAlternativeName([
x509.DNSName(f'{username}'),
]),
critical=False).\
add_extension(
x509.BasicConstraints(ca=False, path_length=None),
critical=True).\
add_extension(
x509.KeyUsage(digital_signature=True,
content_commitment=True, # False
key_encipherment=True,
data_encipherment=False,
key_agreement=False,
key_cert_sign=False,
crl_sign=False,
encipher_only=False,
decipher_only=False),
critical=True).\
add_extension(
x509.ExtendedKeyUsage([
ExtendedKeyUsageOID.CLIENT_AUTH,
ExtendedKeyUsageOID.SERVER_AUTH,
]), critical=False).\
add_extension(
x509.AuthorityKeyIdentifier.from_issuer_public_key(ca_public_key),
critical=False).\
add_extension(
x509.SubjectKeyIdentifier.from_public_key(_public_key),
critical=False)
end_entity_cert = end_entity_cert_builder.\
sign(
private_key=ca_private_key,
algorithm=hashes.SHA256(),
backend=default_backend()
)
fingerprint =end_entity_cert.fingerprint(hashes.SHA256()).hex()
end_entity_cert_filename = self._pki_path / ca_name / \
f'{safe_filename(username)}-{fingerprint}.crt.pem'
# save cert
with end_entity_cert_filename.open("wb") as end_entity_cert_file:
end_entity_cert_file.write(
end_entity_cert.public_bytes(encoding=serialization.Encoding.PEM))
return Certificate(user.username, service.name, end_entity_cert)
def get_ca_cert_pem(self, service: Service):
ca_private_key, ca_cert = self._init_ca(service)
return ca_cert.public_bytes(encoding=serialization.Encoding.PEM).decode()
def _ensure_private_key(self, name, key_size=4096):
key_filename = self._pki_path / f'{safe_filename(name)}.key.pem'
if key_filename.exists():
with open(key_filename, "rb") as key_file:
private_key = serialization.load_pem_private_key(key_file.read(),
password=None, backend=default_backend())
else:
logger.info(f'Generate new Private key for {name}')
private_key = rsa.generate_private_key(public_exponent=E,
key_size=key_size, backend=default_backend())
with key_filename.open('wb') as key_file:
key_file.write(private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()))
return private_key
def _ensure_ca_cert(self, ca_name, ca_private_key):
ca_cert_filename =self._pki_path / f'{ca_name}.crt.pem'
ca_public_key = ca_private_key.public_key()
if ca_cert_filename.exists():
with ca_cert_filename.open("rb") as ca_cert_file:
ca_cert = x509.load_pem_x509_certificate(
ca_cert_file.read(),
backend=default_backend())
else:
logger.info(f'Generate new Certificate key for {ca_name}')
iname = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, f'Lenticular Cloud CA - {ca_name}'),
x509.NameAttribute(NameOID.ORGANIZATION_NAME,
'Lenticluar Cloud'),
x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME,
ca_name),
])
ca_cert = x509.CertificateBuilder().\
subject_name(iname).\
issuer_name(iname).\
not_valid_before(datetime.datetime.today() - DAY).\
not_valid_after(datetime.datetime.today() + 3650 * DAY).\
serial_number(x509.random_serial_number()).\
public_key(ca_public_key).\
add_extension(
x509.BasicConstraints(ca=True, path_length=None),
critical=True).\
add_extension(
x509.KeyUsage(digital_signature=False,
content_commitment=False,
key_encipherment=False,
data_encipherment=False,
key_agreement=False,
key_cert_sign=True,
crl_sign=True,
encipher_only=False,
decipher_only=False),
critical=True).\
add_extension(
x509.SubjectKeyIdentifier.from_public_key(ca_public_key),
critical=False).\
sign(
private_key=ca_private_key,
algorithm=hashes.SHA256(),
backend=default_backend()
)
with open(ca_cert_filename, "wb") as ca_cert_file:
ca_cert_file.write(
ca_cert.public_bytes(encoding=serialization.Encoding.PEM))
assert isinstance(ca_cert, x509.Certificate)
return ca_cert