code cleanup, mypy

master
TuxCoder 2022-06-18 13:05:18 +02:00
parent 45c61a06ef
commit 927562fecb
7 changed files with 41 additions and 36 deletions

View File

@ -4,12 +4,13 @@ from .app import create_app
from werkzeug.middleware.proxy_fix import ProxyFix
from flask_migrate import upgrade
from pathlib import Path
from flask import Flask
import logging
import os
def entry_point():
def entry_point() -> None:
logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO"))
parser = argparse.ArgumentParser(description='lenticular_cloud cli')
@ -54,12 +55,12 @@ def entry_point():
args.func(args)
def cli_user(args):
def cli_user(args) -> None:
for user in User.query.all():
print(f'{user.id} - Enabled: {user.enabled} - Name:`{user.username}`')
pass
def cli_signup(args):
def cli_signup(args) -> None:
if args.signup_id is not None:
user = User.query.get(args.signup_id)
@ -76,12 +77,14 @@ def cli_signup(args):
print(f'<Signup id={user.id}, username={user.username}>')
def cli_run(app, args):
def cli_run(app: Flask, args) -> None:
print("running in debug mode")
logging.basicConfig(level=logging.DEBUG)
app.wsgi_app = ProxyFix(app.wsgi_app, x_proto=1)
app.run(debug=True, host='127.0.0.1', port=5000)
#app.wsgi_app = ProxyFix(app.wsgi_app, x_proto=1)
app.run(debug=False, host='127.0.0.1', port=5000)
def cli_db_upgrade(args):
def cli_db_upgrade(args) -> None:
app = create_app()
migration_dir = Path(app.root_path) / 'migrations'
upgrade( str(migration_dir) )

View File

@ -16,7 +16,7 @@ import uuid
import pyotp
from typing import Optional, Callable
from cryptography.x509 import Certificate as CertificateObj
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.ext.declarative import DeclarativeMeta
logger = logging.getLogger(__name__)
@ -26,6 +26,9 @@ logger = logging.getLogger(__name__)
db = SQLAlchemy() # type: SQLAlchemy
migrate = Migrate()
BaseModel: DeclarativeMeta = db.Model
class SecurityUser(UserMixin):
def __init__(self, username):
@ -37,7 +40,7 @@ class SecurityUser(UserMixin):
class Service(object):
def __init__(self, name):
def __init__(self, name: str):
self._name = name
self._client_cert = False
self._pki_config = {
@ -46,7 +49,7 @@ class Service(object):
}
@staticmethod
def from_config(name, config):
def from_config(name, config) -> Service:
"""
"""
service = Service(name)
@ -58,15 +61,15 @@ class Service(object):
return service
@property
def name(self):
def name(self) -> str:
return self._name
@property
def client_cert(self):
def client_cert(self) -> bool:
return self._client_cert
@property
def pki_config(self):
def pki_config(self) -> dict[str,str]:
if not self._client_cert:
raise Exception('invalid call')
return self._pki_config
@ -74,7 +77,7 @@ class Service(object):
class Certificate(object):
def __init__(self, cn, ca_name: str, cert_data: CertificateObj, revoked=False):
def __init__(self, cn: str, ca_name: str, cert_data: CertificateObj, revoked=False):
self._cn = cn
self._ca_name = ca_name
self._cert_data = cert_data
@ -83,11 +86,11 @@ class Certificate(object):
self._cert_data.not_valid_before.replace(tzinfo=tz.tzutc())
@property
def cn(self):
def cn(self) -> str:
return self._cn
@property
def ca_name(self):
def ca_name(self) -> str:
return self._ca_name
@property
@ -128,7 +131,7 @@ def generate_uuid():
return str(uuid.uuid4())
class User(db.Model):
class User(BaseModel):
id = db.Column(
db.String(length=36), primary_key=True, default=generate_uuid)
username = db.Column(
@ -149,13 +152,13 @@ class User(db.Model):
webauthn_credentials = db.relationship('WebauthnCredential', back_populates='user', cascade='delete,delete-orphan', passive_deletes=True)
def __init__(self, **kwargs):
super(db.Model).__init__(**kwargs)
super().__init__(**kwargs)
@property
def is_authenticated(self):
def is_authenticated(self) -> bool:
return True # TODO
def get(self, key):
def get(self, key) -> None:
print(f'getitem: {key}') # TODO
@property
@ -174,30 +177,31 @@ class User(db.Model):
password_hashed = crypt.crypt(password_new)
return True
class AppToken(db.Model):
class AppToken(BaseModel):
id = db.Column(db.Integer, primary_key=True)
service_name = db.Column(db.String, nullable=False)
token = db.Column(db.String, nullable=False)
name = db.Column(db.String, nullable=False)
class Totp(db.Model):
class Totp(BaseModel):
id = db.Column(db.Integer, primary_key=True)
secret = db.Column(db.String, nullable=False)
name = db.Column(db.String, nullable=False)
created_at = db.Column(db.DateTime, default=datetime.now, nullable=False)
last_used = db.Column(db.DateTime, nullable=True)
user_id = db.Column(
db.String(length=36),
db.ForeignKey(User.id), nullable=False)
user = db.relationship(User)
def verify(self, token: str):
def verify(self, token: str) -> bool:
totp = pyotp.TOTP(self.secret)
return totp.verify(token)
class WebauthnCredential(db.Model): # pylint: disable=too-few-public-methods
class WebauthnCredential(BaseModel): # pylint: disable=too-few-public-methods
"""Webauthn credential model"""
id = db.Column(db.Integer, primary_key=True)
@ -210,7 +214,7 @@ class WebauthnCredential(db.Model): # pylint: disable=too-few-public-methods
user = db.relationship('User', back_populates='webauthn_credentials')
class Group(db.Model):
class Group(BaseModel):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(), nullable=False, unique=True)

View File

@ -1,10 +1,10 @@
{% extends 'frontend/base.html.j2' %}
{% block title %}{{ gettext('client certs') }}{% endblock %}
{% block title %}{{ gettext('app token') }}{% endblock %}
{% block content %}
{#
<ul class="nav nav-tabs" id="myTab" role="tablist">
{% for service in services.values() %}
<li class="nav-item">
@ -47,14 +47,11 @@
</div>
{% endfor %}
</div>
#}
{% endblock %}
{% block script_js %}
client_cert.init_list();
{% endblock %}

View File

@ -48,7 +48,7 @@ def init_babel(app: Flask) -> None:
@app.context_processor
def get_locale_jinja() -> dict:
def get_locale_() -> str:
return get_locale()
return get_locale() # type: ignore
return dict(get_locale=get_locale_)
return None

View File

@ -67,6 +67,8 @@ def email_login() -> ResponseReturnValue:
return jsonify({}), 400
req_payload = request.get_json()
logger.error(f'{req_payload}')
if not isinstance(req_payload, dict):
return 'bad request', 400
password = req_payload["password"]
username = req_payload["username"]

View File

@ -139,7 +139,8 @@ def client_cert_new(service_name) -> ResponseReturnValue:
@frontend_views.route('/app_token')
def app_token() -> ResponseReturnValue:
return
delete_form = TOTPDeleteForm()
return render_template('frontend/app_token.html.j2', delete_form=delete_form)
@frontend_views.route('/app_token/<service_name>/new')
def app_token_new(service_name: str) -> ResponseReturnValue:

View File

@ -44,13 +44,11 @@ def authorized() -> ResponseReturnValue:
return 'bad request', 400
session['token'] = token
userinfo = oauth2.custom.get('/userinfo').json()
logger.info(f"userinfo `{userinfo}`")
user = User.query.get(str(userinfo["sub"]))
if user is None:
return "user not found", 404
logger.info(f"login user `{user.username}`")
logger.info(f"user `{user.username}` successfully logged in")
login_user(SecurityUser(user.username))
logger.info(f"session user `{session}`")
next_url = request.args.get('next_url')
if next_url is None: