lenticular_cloud2/lenticular_cloud/model.py

263 lines
8.2 KiB
Python

from flask import current_app
from flask_login import UserMixin
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from collections.abc import MutableSequence
from datetime import datetime
from dateutil import tz
import pyotp
import json
import logging
import crypt
import secrets
import string
from sqlalchemy.orm import DeclarativeBase, MappedAsDataclass, Mapped, mapped_column, relationship, declarative_base
from flask_sqlalchemy import SQLAlchemy
from flask_sqlalchemy.model import Model, DefaultMeta
from flask_sqlalchemy.extension import _FSAModel
from flask_migrate import Migrate
from datetime import datetime
import uuid
from typing import Optional, List, Dict, Tuple, Any, Type, TYPE_CHECKING
from cryptography.x509 import Certificate as CertificateObj
from sqlalchemy.ext.declarative import DeclarativeMeta
logger = logging.getLogger(__name__)
db = SQLAlchemy()
migrate = Migrate()
class BaseModelIntern(MappedAsDataclass, DeclarativeBase):
pass
if TYPE_CHECKING:
class BaseModel (_FSAModel,BaseModelIntern):
pass
else:
BaseModel: Type[_FSAModel] = db.Model
class ModelUpdatedMixin:
created_at: Mapped[datetime] = mapped_column(db.DateTime, default=datetime.now())
last_update: Mapped[datetime] = mapped_column(db.DateTime, default=datetime.now(), onupdate=datetime.now)
class SecurityUser(UserMixin):
def __init__(self, username):
self._username = username
def get_id(self):
return self._username
class Service(object):
def __init__(self, name: str):
self._name = name
self._app_token = False
self._client_cert = False
self._pki_config = {
'cn': '{username}',
'email': '{username}@{domain}'
}
@staticmethod
def from_config(name, config) -> 'Service':
"""
"""
service = Service(name)
if 'app_token' in config:
service._app_token = bool(config['app_token'])
if 'client_cert' in config:
service._client_cert = bool(config['client_cert'])
if 'pki_config' in config:
service._pki_config.update(config['pki_config'])
return service
@property
def name(self) -> str:
return self._name
@property
def client_cert(self) -> bool:
return self._client_cert
@property
def app_token(self) -> bool:
return self._app_token
@property
def pki_config(self) -> dict[str,str]:
if not self._client_cert:
raise Exception('invalid call')
return self._pki_config
class Certificate(object):
def __init__(self, cn: str, ca_name: str, cert_data: CertificateObj, revoked=False):
self._cn = cn
self._ca_name = ca_name
self._cert_data = cert_data
self._revoked = revoked
self._cert_data.not_valid_after.replace(tzinfo=tz.tzutc())
self._cert_data.not_valid_before.replace(tzinfo=tz.tzutc())
@property
def cn(self) -> str:
return self._cn
@property
def ca_name(self) -> str:
return self._ca_name
@property
def not_valid_before(self) -> datetime:
return self._cert_data.not_valid_before.replace(tzinfo=tz.tzutc()).astimezone(tz.tzlocal()).replace(tzinfo=None)
@property
def not_valid_after(self) -> datetime:
return self._cert_data.not_valid_after.replace(tzinfo=tz.tzutc()).astimezone(tz.tzlocal()).replace(tzinfo=None)
@property
def serial_number(self) -> int:
return self._cert_data.serial_number
@property
def serial_number_hex(self) -> str:
return f'{self._cert_data.serial_number:X}'
def fingerprint(self, algorithm=hashes.SHA256()) -> bytes:
return self._cert_data.fingerprint(algorithm)
@property
def is_valid(self) -> bool:
return self.not_valid_after > datetime.now() and not self._revoked
def pem(self) -> str:
return self._cert_data.public_bytes(encoding=serialization.Encoding.PEM).decode()
@property
def raw(self):
return self._cert_data
def __str__(self):
return f'Certificate(cn={self._cn}, ca_name={self._ca_name}, not_valid_before={self.not_valid_before}, not_valid_after={self.not_valid_after})'
def generate_uuid():
return str(uuid.uuid4())
class User(BaseModel):
id = db.Column(
db.String(length=36), primary_key=True, default=generate_uuid)
username = db.Column(
db.String, unique=True, nullable=False)
password_hashed = db.Column(
db.String, nullable=False)
alternative_email = db.Column(
db.String, nullable=True)
created_at = db.Column(db.DateTime, nullable=False,
default=datetime.now)
modified_at = db.Column(db.DateTime, nullable=False,
default=datetime.now, onupdate=datetime.now)
last_login = db.Column(db.DateTime, nullable=True)
enabled = db.Column(db.Boolean, nullable=False, default=False)
app_tokens = db.relationship('AppToken', back_populates='user')
totps = db.relationship('Totp', back_populates='user')
webauthn_credentials = db.relationship('WebauthnCredential', back_populates='user', cascade='delete,delete-orphan', passive_deletes=True)
def __init__(self, **kwargs):
super().__init__(**kwargs)
@property
def is_authenticated(self) -> bool:
return True # TODO
def get(self, key) -> None:
print(f'getitem: {key}') # TODO
@property
def groups(self) -> list['Group']:
if self.username == 'tuxcoder':
return [Group(name='admin')]
else:
return []
@property
def email(self) -> str:
domain = current_app.config['DOMAIN']
return f'{self.username}@{domain}'
def change_password(self, password_new: str) -> None:
self.password_hashed = crypt.crypt(password_new)
def get_tokens_by_service(self, service: Service) -> list['AppToken']:
return [ token for token in self.app_tokens if token.service_name == service.name ]
def get_token(self, service: Service, name: str) -> Optional['AppToken']:
for token in self.app_tokens:
if token.service_name == service.name and token.name == name:
return token # type: ignore
return None
class AppToken(BaseModel):
id = db.Column(db.Integer, primary_key=True)
service_name = db.Column(db.String, nullable=False)
user_id = db.Column(
db.String(length=36),
db.ForeignKey(User.id), nullable=False)
user = db.relationship(User)
token = db.Column(db.String, nullable=False)
name = db.Column(db.String, nullable=False)
last_used = db.Column(db.DateTime, nullable=True)
@staticmethod
def new(service: Service):
app_token = AppToken()
app_token.service_name = service.name
alphabet = string.ascii_letters + string.digits
app_token.token = ''.join(secrets.choice(alphabet) for i in range(12))
return app_token
class Totp(BaseModel):
id = db.Column(db.Integer, primary_key=True)
secret = db.Column(db.String, nullable=False)
name = db.Column(db.String, nullable=False)
created_at = db.Column(db.DateTime, default=datetime.now, nullable=False)
last_used = db.Column(db.DateTime, nullable=True)
user_id = db.Column(
db.String(length=36),
db.ForeignKey(User.id), nullable=False)
user = db.relationship(User)
def verify(self, token: str) -> bool:
totp = pyotp.TOTP(self.secret)
return totp.verify(token)
class WebauthnCredential(BaseModel): # pylint: disable=too-few-public-methods
"""Webauthn credential model"""
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.String(length=36), db.ForeignKey('user.id', ondelete='CASCADE'), nullable=False)
user_handle = db.Column(db.String(64), nullable=False)
credential_data = db.Column(db.LargeBinary, nullable=False)
name = db.Column(db.String(250))
registered = db.Column(db.DateTime, default=datetime.utcnow)
user = db.relationship('User', back_populates='webauthn_credentials')
class Group(BaseModel):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(), nullable=False, unique=True)