lenticular_cloud2/lenticular_cloud/model.py

342 lines
8.8 KiB
Python

from flask import current_app
from ldap3_orm import AttrDef, EntryBase as _EntryBase, ObjectDef, EntryType
from ldap3_orm import Reader
from ldap3 import Entry
from ldap3.utils.conv import escape_filter_chars
from flask_login import UserMixin
from ldap3.core.exceptions import LDAPSessionTerminatedByServerError
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from collections.abc import MutableSequence
from datetime import datetime
from dateutil import tz
import pyotp
import json
ldap_conn = None # type: Connection
base_dn = ''
class SecurityUser(UserMixin):
def __init__(self, username):
self._username = username
def get_id(self):
return self._username
class LambdaStr:
def __init__(self, lam):
self.lam = lam
def __str__(self):
return self.lam()
class EntryBase(object):
_type = None # will get replaced by the local type
_query_object = None # will get replaced by the local type
_base_dn = LambdaStr(lambda: base_dn)
def __init__(self, ldap_object=None, **kwargs):
if ldap_object is None:
self._ldap_object = self.get_type()(**kwargs)
else:
self._ldap_object = ldap_object
def __str__(self):
return str(self._ldap_object)
@classmethod
def get_object_def(cls):
return ObjectDef(cls.object_classes, ldap_conn)
@classmethod
def get_base(cls):
return cls.base_dn.format(_base_dn=base_dn)
@classmethod
def get_type(cls):
if cls._type is None:
cls._type = EntryType(cls.dn.replace('{base_dn}',cls.get_base()), cls.object_classes, ldap_conn)
return cls._type
def commit(self):
print(self._ldap_object.entry_attributes_as_dict)
ret = ldap_conn.add(
self.dn, self.object_classes, self._ldap_object.entry_attributes_as_dict)
print(ret)
pass
@classmethod
def query(cls):
if cls._query_object is None:
cls._query_object = cls._query(cls)
return cls._query_object
class _query(object):
def __init__(self, clazz):
self._class = clazz
def _mapping(self, ldap_object):
return ldap_object
def _query(self, ldap_filter: str):
reader = Reader(ldap_conn, self._class.get_object_def(), self._class.get_base(), ldap_filter)
try:
reader.search()
except LDAPSessionTerminatedByServerError:
ldap_conn.bind()
reader.search()
return [self._mapping(entry) for entry in reader]
def all(self):
return self._query(None)
class Service(object):
def __init__(self, name):
self._name = name
self._client_cert = False
self._pki_config = {
'cn': '{username}',
'email': '{username}@{domain}'
}
@staticmethod
def from_config(name, config):
"""
"""
service = Service(name)
if 'client_cert' in config:
service._client_cert = bool(config['client_cert'])
if 'pki_config' in config:
service._pki_config.update(config['pki_config'])
return service
@property
def name(self):
return self._name
@property
def client_cert(self):
return self._client_cert
@property
def pki_config(self):
if not self._client_cert:
raise Exception('invalid call')
return self._pki_config
class Certificate(object):
def __init__(self, cn, ca_name: str, cert_data, revoked=False):
self._cn = cn
self._ca_name = ca_name
self._cert_data = cert_data
self._revoked = revoked
self._cert_data.not_valid_after.replace(tzinfo=tz.tzutc())
self._cert_data.not_valid_before.replace(tzinfo=tz.tzutc())
@property
def cn(self):
return self._cn
@property
def ca_name(self):
return self._ca_name
@property
def not_valid_before(self) -> datetime:
return self._cert_data.not_valid_before.replace(tzinfo=tz.tzutc()).astimezone(tz.tzlocal()).replace(tzinfo=None)
@property
def not_valid_after(self) -> datetime:
return self._cert_data.not_valid_after.replace(tzinfo=tz.tzutc()).astimezone(tz.tzlocal()).replace(tzinfo=None)
@property
def serial_number(self) -> int:
return self._cert_data.serial_number
@property
def serial_number_hex(self) -> str:
return f'{self._cert_data.serial_number:X}'
def fingerprint(self, algorithm=hashes.SHA256()) -> bytes:
return self._cert_data.fingerprint(algorithm)
@property
def is_valid(self) -> bool:
return self.not_valid_after > datetime.now() and not self._revoked
def pem(self) -> str:
return self._cert_data.public_bytes(encoding=serialization.Encoding.PEM).decode()
@property
def raw(self):
return self._cert_data
def __str__(self):
return f'Certificate(cn={self._cn}, ca_name={self._ca_name}, not_valid_before={self.not_valid_before}, not_valid_after={self.not_valid_after})'
class Totp(object):
def __init__(self, name, secret, created_at=datetime.now()):
self._secret = secret
self._name = name
self._created_at = created_at
@property
def name(self):
return self._name
@property
def created_at(self):
return self._created_at
def verify(self, token: str):
totp = pyotp.TOTP(self._secret)
return totp.verify(token)
def to_dict(self):
return {
'secret': self._secret,
'name': self._name,
'created_at': int(self._created_at.timestamp())}
@staticmethod
def from_dict(data):
return Totp(
name=data['name'],
secret=data['secret'],
created_at=datetime.fromtimestamp(data['created_at']))
class TotpList(MutableSequence):
def __init__(self, ldap_attr):
super().__init__()
self._ldap_attr = ldap_attr
def __getitem__(self, ii):
return Totp.from_dict(json.loads(self._ldap_attr[ii]))
def __setitem__(self, ii, val: Totp):
self._ldap_attr[ii] = json.dumps(val.to_dict()).encode()
def __len__(self):
return len(self._ldap_attr)
def __delitem__(self, ii):
del self._ldap_attr[ii]
def delete(self, totp_name):
for i in range(len(self)):
if self[i].name == totp_name:
self._ldap_attr.delete(self._ldap_attr[i])
def insert(self, ii, val):
self.append(val)
def append(self, val):
self._ldap_attr.add(json.dumps(val.to_dict()).encode())
class User(EntryBase):
dn = "uid={uid},{base_dn}"
base_dn = "ou=users,{_base_dn}"
object_classes = ["top", "inetOrgPerson", "LenticularUser"]
def __init__(self, ldap_object=None, **kwargs):
super().__init__(ldap_object, **kwargs)
self._totp_list = TotpList(ldap_object.totpSecret)
@property
def is_authenticated(self):
return True # TODO
def get(self, key):
print(f'getitem: {key}')
def make_writeable(self):
self._ldap_object = self._ldap_object.entry_writable()
self._totp_list = TotpList(self._ldap_object.totpSecret)
@property
def entry_dn(self):
return self._ldap_object.entry_dn
@property
def username(self):
return self._ldap_object.uid
@username.setter
def username(self, value):
self._ldap_object.uid = value
@property
def userPassword(self):
return self._ldap_object.userPassword
@property
def fullname(self):
return self._ldap_object.fullname
@property
def givenname(self):
return self._ldap_object.givenname
@property
def surname(self):
return self._ldap_object.surname
@property
def email(self):
return self._ldap_object.mail
@property
def alternative_email(self):
return self._ldap_object.altMail
@property
def auth_role(self):
return self._ldap_object.authRole
@property
def gpg_public_key(self):
return self._ldap_object.gpgPublicKey
@property
def totps(self):
return self._totp_list
class _query(EntryBase._query):
def _mapping(self, ldap_object):
return User(ldap_object=ldap_object)
def by_username(self, username) -> 'User':
result = self._query('(uid={username:s})'.format(username=escape_filter_chars(username)))
if len(result) > 0:
return result[0]
else:
return None
class Group(EntryBase):
dn = "cn={cn},{base_dn}"
base_dn = "ou=Users,{_base_dn}"
object_classes = ["top"]
fullname = AttrDef("cn")