add nix flake
make a restart
This commit is contained in:
tuxcoder 2023-10-09 21:58:44 +02:00
parent 536668d8b9
commit eee18c1785
24 changed files with 509 additions and 231 deletions

View file

@ -46,7 +46,7 @@ async def index() -> ResponseReturnValue:
@admin_views.route('/user', methods=['GET'])
async def users():
async def users() -> ResponseReturnValue:
users = User.query.all() # type: Iterable[User]
return render_template('admin/users.html.j2', users=users)

View file

@ -21,6 +21,7 @@ from ory_hydra_client.api.o_auth_2 import get_o_auth_2_consent_request, accept_o
from ory_hydra_client import models as ory_hydra_m
from ory_hydra_client.models import TheRequestPayloadUsedToAcceptALoginOrConsentRequest, TheRequestPayloadUsedToAcceptAConsentRequest, GenericError
from typing import Optional
from uuid import uuid4
from ..model import db, User, SecurityUser
from ..form.auth import ConsentForm, LoginForm, RegistrationForm
@ -136,7 +137,7 @@ async def login_auth() -> ResponseReturnValue:
if 'username' not in session:
return redirect(url_for('auth.login'))
auth_forms = {}
user = User.query.filter_by(username=session['username']).first() # Optional[User]
user = User.query.filter_by(username=session['username']).first_or_404()
for auth_provider in AUTH_PROVIDER_LIST:
form = auth_provider.get_form()
if auth_provider.get_name() not in session['auth_providers'] and\
@ -154,7 +155,7 @@ async def login_auth() -> ResponseReturnValue:
# db.session.add(db_user)
# db.session.commit()
subject = user.id
subject = str(user.id)
user.last_login = datetime.now()
db.session.commit()
resp = await accept_o_auth_2_login_request.asyncio(_client=hydra_service.hydra_client,
@ -170,8 +171,9 @@ async def login_auth() -> ResponseReturnValue:
@auth_views.route('/webauthn/pkcro', methods=['POST'])
def webauthn_pkcro_route():
def webauthn_pkcro_route() -> ResponseReturnValue:
"""login webauthn pkcro route"""
return '', 404
user = User.query.filter(User.id == session.get('webauthn_login_user_id')).one() #type: User
form = ButtonForm()
@ -213,6 +215,7 @@ def sign_up_submit():
form = RegistrationForm()
if form.validate_on_submit():
user = User()
user.id = uuid4()
user.username = form.data['username']
user.password_hashed = crypt.crypt(form.data['password'])
user.alternative_email = form.data['alternative_email']

View file

@ -21,7 +21,7 @@ from urllib.parse import urlencode, parse_qs
from random import SystemRandom
import string
from collections.abc import Iterable
from typing import Optional, Mapping, Iterator, List
from typing import Optional, Mapping, Iterator, List, Any
from ..model import db, User, SecurityUser, Totp, AppToken, WebauthnCredential
from ..form.frontend import ClientCertForm, TOTPForm, \
@ -38,11 +38,16 @@ from ..lenticular_services import lenticular_services
frontend_views = Blueprint('frontend', __name__, url_prefix='')
logger = logging.getLogger(__name__)
def get_current_user() -> User:
user_any: Any = current_user
user: User = user_any
return user
def before_request() -> Optional[ResponseReturnValue]:
try:
resp = oauth2.custom.get('/userinfo')
if not current_user.is_authenticated or resp.status_code != 200:
if not get_current_user().is_authenticated or resp.status_code != 200:
logger.info('user not logged in redirect')
return redirect_login()
except MissingTokenError:
@ -79,7 +84,7 @@ def client_cert() -> ResponseReturnValue:
client_certs = {}
for service in lenticular_services.values():
client_certs[str(service.name)] = \
pki.get_client_certs(current_user, service)
pki.get_client_certs(get_current_user(), service)
return render_template(
'frontend/client_cert.html.j2',
@ -91,7 +96,7 @@ def client_cert() -> ResponseReturnValue:
def get_client_cert(service_name, serial_number) -> ResponseReturnValue:
service = lenticular_services[service_name]
cert = pki.get_client_cert(
current_user, service, serial_number)
get_current_user(), service, serial_number)
return jsonify({
'data': {
'pem': cert.pem()}
@ -103,7 +108,7 @@ def get_client_cert(service_name, serial_number) -> ResponseReturnValue:
def revoke_client_cert(service_name, serial_number) -> ResponseReturnValue:
service = lenticular_services[service_name]
cert = pki.get_client_cert(
current_user, service, serial_number)
get_current_user(), service, serial_number)
pki.revoke_certificate(cert)
return jsonify({})
@ -119,7 +124,7 @@ def client_cert_new(service_name) -> ResponseReturnValue:
if form.validate_on_submit():
valid_time = int(form.data['valid_time']) * timedelta(1, 0, 0)
cert = pki.signing_publickey(
current_user,
get_current_user(),
service,
form.data['publickey'],
valid_time=valid_time)
@ -156,13 +161,15 @@ def app_token_new(service_name: str) -> ResponseReturnValue:
form = AppTokenForm()
if form.validate_on_submit():
app_token = AppToken.new(service)
user_any = get_current_user() # type: Any
user = user_any # type: User
app_token = AppToken.new(user, service, "")
form.populate_obj(app_token)
# check for duplicate names
for user_app_token in current_user.app_tokens:
for user_app_token in user.app_tokens:
if user_app_token.name == app_token.name:
return 'name already exist', 400
current_user.app_tokens.append(app_token)
user.app_tokens.append(app_token)
db.session.commit()
return render_template('frontend/app_token_new_show.html.j2', service=service, app_token=app_token)
@ -180,7 +187,7 @@ def app_token_delete(service_name: str, app_token_name: str) -> ResponseReturnVa
service = lenticular_services[service_name]
if form.validate_on_submit():
app_token = current_user.get_token(service, app_token_name)
app_token = get_current_user().get_token(service, app_token_name)
if app_token is None:
return 'not found', 404
db.session.delete(app_token)
@ -199,9 +206,9 @@ def totp_new() -> ResponseReturnValue:
form = TOTPForm()
if form.validate_on_submit():
totp = Totp(name=form.data['name'], secret=form.data['secret'])
totp = Totp(name=form.data['name'], secret=form.data['secret'], user=get_current_user())
if totp.verify(form.data['token']):
current_user.totps.append(totp)
get_current_user().totps.append(totp)
db.session.commit()
return jsonify({
'status': 'ok'})
@ -269,7 +276,7 @@ def random_string(length=32) -> str:
def webauthn_pkcco_route() -> ResponseReturnValue:
"""get publicKeyCredentialCreationOptions"""
user = User.query.get(current_user.id) #type: Optional[User]
user = User.query.get(get_current_user().id) #type: Optional[User]
if user is None:
return 'internal error', 500
user_handle = random_string()
@ -287,7 +294,7 @@ def webauthn_pkcco_route() -> ResponseReturnValue:
def webauthn_register_route() -> ResponseReturnValue:
"""register credential for current user"""
user = current_user # type: User
user = get_current_user() # type: User
form = WebauthnRegisterForm()
if form.validate_on_submit():
try:
@ -300,7 +307,7 @@ def webauthn_register_route() -> ResponseReturnValue:
AttestationObject(attestation['attestationObject']))
db.session.add(WebauthnCredential(
user_id=user.id,
user=user,
user_handle=session.pop('webauthn_register_user_handle'),
credential_data=cbor.encode(auth_data.credential_data.__dict__),
name=form.name.data))
@ -327,12 +334,12 @@ def password_change_post() -> ResponseReturnValue:
password_old = str(form.data['password_old'])
password_new = str(form.data['password_new'])
if not PasswordAuthProvider.check_auth_internal(
current_user, password_old):
get_current_user(), password_old):
return jsonify(
{'errors': {'password_old': 'Old Password is invalid'}})
current_user.change_password(password_new)
logger.info(f"user {current_user.username} changed password")
get_current_user().change_password(password_new)
logger.info(f"user {get_current_user().username} changed password")
db.session.commit()
return jsonify({})
return jsonify({'errors': form.errors})

View file

@ -9,6 +9,7 @@ from werkzeug.wrappers.response import Response as WerkzeugResponse
import logging
from ..model import User, SecurityUser
from ..hydra import hydra_service
logger = logging.getLogger(__name__)
@ -90,7 +91,7 @@ def init_login_manager(app: Flask) -> None:
oauth2.register(
name="custom",
client_id=app.config['OAUTH_ID'],
client_id=hydra_service.client_id,
client_secret=app.config['OAUTH_SECRET'],
server_metadata_url=f'{base_url}/.well-known/openid-configuration',
access_token_url=f"{base_url}/oauth2/token",