diff --git a/lenticular_cloud/cli.py b/lenticular_cloud/cli.py index 1e12701..a9713df 100644 --- a/lenticular_cloud/cli.py +++ b/lenticular_cloud/cli.py @@ -4,12 +4,13 @@ from .app import create_app from werkzeug.middleware.proxy_fix import ProxyFix from flask_migrate import upgrade from pathlib import Path +from flask import Flask import logging import os -def entry_point(): +def entry_point() -> None: logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO")) parser = argparse.ArgumentParser(description='lenticular_cloud cli') @@ -54,12 +55,12 @@ def entry_point(): args.func(args) -def cli_user(args): +def cli_user(args) -> None: for user in User.query.all(): print(f'{user.id} - Enabled: {user.enabled} - Name:`{user.username}`') pass -def cli_signup(args): +def cli_signup(args) -> None: if args.signup_id is not None: user = User.query.get(args.signup_id) @@ -76,12 +77,14 @@ def cli_signup(args): print(f'') -def cli_run(app, args): +def cli_run(app: Flask, args) -> None: + print("running in debug mode") logging.basicConfig(level=logging.DEBUG) - app.wsgi_app = ProxyFix(app.wsgi_app, x_proto=1) - app.run(debug=True, host='127.0.0.1', port=5000) + #app.wsgi_app = ProxyFix(app.wsgi_app, x_proto=1) + app.run(debug=False, host='127.0.0.1', port=5000) -def cli_db_upgrade(args): + +def cli_db_upgrade(args) -> None: app = create_app() migration_dir = Path(app.root_path) / 'migrations' upgrade( str(migration_dir) ) diff --git a/lenticular_cloud/model.py b/lenticular_cloud/model.py index 91a1e9e..a51b217 100644 --- a/lenticular_cloud/model.py +++ b/lenticular_cloud/model.py @@ -16,7 +16,7 @@ import uuid import pyotp from typing import Optional, Callable from cryptography.x509 import Certificate as CertificateObj -from sqlalchemy.ext.asyncio import create_async_engine +from sqlalchemy.ext.declarative import DeclarativeMeta logger = logging.getLogger(__name__) @@ -26,6 +26,9 @@ logger = logging.getLogger(__name__) db = SQLAlchemy() # type: SQLAlchemy migrate = Migrate() + +BaseModel: DeclarativeMeta = db.Model + class SecurityUser(UserMixin): def __init__(self, username): @@ -37,7 +40,7 @@ class SecurityUser(UserMixin): class Service(object): - def __init__(self, name): + def __init__(self, name: str): self._name = name self._client_cert = False self._pki_config = { @@ -46,7 +49,7 @@ class Service(object): } @staticmethod - def from_config(name, config): + def from_config(name, config) -> Service: """ """ service = Service(name) @@ -58,15 +61,15 @@ class Service(object): return service @property - def name(self): + def name(self) -> str: return self._name @property - def client_cert(self): + def client_cert(self) -> bool: return self._client_cert @property - def pki_config(self): + def pki_config(self) -> dict[str,str]: if not self._client_cert: raise Exception('invalid call') return self._pki_config @@ -74,7 +77,7 @@ class Service(object): class Certificate(object): - def __init__(self, cn, ca_name: str, cert_data: CertificateObj, revoked=False): + def __init__(self, cn: str, ca_name: str, cert_data: CertificateObj, revoked=False): self._cn = cn self._ca_name = ca_name self._cert_data = cert_data @@ -83,11 +86,11 @@ class Certificate(object): self._cert_data.not_valid_before.replace(tzinfo=tz.tzutc()) @property - def cn(self): + def cn(self) -> str: return self._cn @property - def ca_name(self): + def ca_name(self) -> str: return self._ca_name @property @@ -128,7 +131,7 @@ def generate_uuid(): return str(uuid.uuid4()) -class User(db.Model): +class User(BaseModel): id = db.Column( db.String(length=36), primary_key=True, default=generate_uuid) username = db.Column( @@ -149,13 +152,13 @@ class User(db.Model): webauthn_credentials = db.relationship('WebauthnCredential', back_populates='user', cascade='delete,delete-orphan', passive_deletes=True) def __init__(self, **kwargs): - super(db.Model).__init__(**kwargs) + super().__init__(**kwargs) @property - def is_authenticated(self): + def is_authenticated(self) -> bool: return True # TODO - def get(self, key): + def get(self, key) -> None: print(f'getitem: {key}') # TODO @property @@ -174,30 +177,31 @@ class User(db.Model): password_hashed = crypt.crypt(password_new) return True -class AppToken(db.Model): +class AppToken(BaseModel): id = db.Column(db.Integer, primary_key=True) service_name = db.Column(db.String, nullable=False) token = db.Column(db.String, nullable=False) name = db.Column(db.String, nullable=False) -class Totp(db.Model): +class Totp(BaseModel): id = db.Column(db.Integer, primary_key=True) secret = db.Column(db.String, nullable=False) name = db.Column(db.String, nullable=False) created_at = db.Column(db.DateTime, default=datetime.now, nullable=False) + last_used = db.Column(db.DateTime, nullable=True) user_id = db.Column( db.String(length=36), db.ForeignKey(User.id), nullable=False) user = db.relationship(User) - def verify(self, token: str): + def verify(self, token: str) -> bool: totp = pyotp.TOTP(self.secret) return totp.verify(token) -class WebauthnCredential(db.Model): # pylint: disable=too-few-public-methods +class WebauthnCredential(BaseModel): # pylint: disable=too-few-public-methods """Webauthn credential model""" id = db.Column(db.Integer, primary_key=True) @@ -210,7 +214,7 @@ class WebauthnCredential(db.Model): # pylint: disable=too-few-public-methods user = db.relationship('User', back_populates='webauthn_credentials') -class Group(db.Model): +class Group(BaseModel): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(), nullable=False, unique=True) diff --git a/lenticular_cloud/template/frontend/app_token.html.j2 b/lenticular_cloud/template/frontend/app_token.html.j2 index 3bd09ef..9d8de52 100644 --- a/lenticular_cloud/template/frontend/app_token.html.j2 +++ b/lenticular_cloud/template/frontend/app_token.html.j2 @@ -1,10 +1,10 @@ {% extends 'frontend/base.html.j2' %} -{% block title %}{{ gettext('client certs') }}{% endblock %} +{% block title %}{{ gettext('app token') }}{% endblock %} {% block content %} - +{#