from authlib.integrations.base_client.errors import MissingTokenError, InvalidTokenError from base64 import b64encode, b64decode from flask import Blueprint, redirect, request from flask import current_app from flask import jsonify, session from flask import render_template, url_for from flask_login import logout_user, current_user from http import HTTPStatus from werkzeug.utils import redirect import logging from datetime import timedelta from flask.typing import ResponseReturnValue from ory_hydra_client.api.o_auth_2 import list_o_auth_2_consent_sessions, revoke_o_auth_2_consent_sessions from ory_hydra_client.models import GenericError from urllib.parse import urlparse from typing import Optional, Any import jwt from datetime import datetime, timedelta from ..model import db, User, Totp, AppToken, PasskeyCredential from ..form.frontend import ClientCertForm, TOTPForm, \ TOTPDeleteForm, PasswordChangeForm, WebauthnRegisterForm, \ AppTokenForm, AppTokenDeleteForm from ..form.base import ButtonForm from ..auth_providers import PasswordAuthProvider from .auth import webauthn from .oauth2 import redirect_login, oauth2 from ..hydra import hydra_service from ..pki import pki from ..lenticular_services import lenticular_services frontend_views = Blueprint('frontend', __name__, url_prefix='') logger = logging.getLogger(__name__) def get_current_user() -> User: user_any: Any = current_user user: User = user_any return user def before_request() -> Optional[ResponseReturnValue]: try: resp = oauth2.custom.get('/userinfo') if not get_current_user().is_authenticated or resp.status_code != 200: logger.info('user not logged in redirect') return redirect_login() except MissingTokenError: logger.info('MissingTokenError redirect user to login') return redirect_login() except InvalidTokenError: logger.info('InvalidTokenError redirect user to login') return redirect_login() return None frontend_views.before_request(before_request) @frontend_views.route('/logout') def logout() -> ResponseReturnValue: logout_user() return redirect( f'{current_app.config["HYDRA_PUBLIC_URL"]}/oauth2/sessions/logout') @frontend_views.route('/', methods=['GET']) def index() -> ResponseReturnValue: if 'next_url' in session: next_url = session['next_url'] del session['next_url'] return redirect(next_url) return render_template('frontend/index.html.j2') @frontend_views.route('/client_cert') def client_cert() -> ResponseReturnValue: client_certs = {} for service in lenticular_services.values(): client_certs[str(service.name)] = \ pki.get_client_certs(get_current_user(), service) return render_template( 'frontend/client_cert.html.j2', services=lenticular_services, client_certs=client_certs) @frontend_views.route('/client_cert//') def get_client_cert(service_name, serial_number) -> ResponseReturnValue: service = lenticular_services[service_name] cert = pki.get_client_cert( get_current_user(), service, serial_number) return jsonify({ 'data': { 'pem': cert.pem()} }) @frontend_views.route( '/client_cert//', methods=['DELETE']) def revoke_client_cert(service_name, serial_number) -> ResponseReturnValue: service = lenticular_services[service_name] cert = pki.get_client_cert( get_current_user(), service, serial_number) pki.revoke_certificate(cert) return jsonify({}) @frontend_views.route( '/client_cert//new', methods=['GET', 'POST']) def client_cert_new(service_name) -> ResponseReturnValue: if service_name not in lenticular_services: return '', 404 service = lenticular_services[service_name] form = ClientCertForm() if form.validate_on_submit(): valid_time = int(form.data['valid_time']) * timedelta(1, 0, 0) cert = pki.signing_publickey( get_current_user(), service, form.data['publickey'], valid_time=valid_time) return jsonify({ 'status': 'ok', 'data': { 'cert': cert.pem(), 'ca_cert': pki.get_ca_cert_pem(service) }}) elif form.is_submitted(): return jsonify({ 'status': 'error', 'errors': form.errors }) return render_template( 'frontend/client_cert_new.html.j2', service=service, form=form) @frontend_views.route('/app_token') def app_token() -> ResponseReturnValue: delete_form = AppTokenDeleteForm() form = ClientCertForm() return render_template('frontend/app_token.html.j2', delete_form=delete_form, services=lenticular_services) @frontend_views.route('/app_token/new', methods=['GET','POST']) def app_token_new() -> ResponseReturnValue: form = AppTokenForm() if form.validate_on_submit(): user_any = get_current_user() # type: Any user = user_any # type: User app_token = AppToken.new(user, name="",scopes="") form.populate_obj(app_token) # check for duplicate names for user_app_token in user.app_tokens: if user_app_token.name == app_token.name: return 'name already exist', 400 user.app_tokens.append(app_token) db.session.commit() return render_template('frontend/app_token_new_show.html.j2', app_token=app_token) return render_template('frontend/app_token_new.html.j2', form=form) @frontend_views.route('/app_token/', methods=["POST"]) def app_token_delete(app_token_name: str) -> ResponseReturnValue: form = AppTokenDeleteForm() if form.validate_on_submit(): app_token = get_current_user().get_token_by_name(app_token_name) if app_token is None: return 'not found', 404 db.session.delete(app_token) db.session.commit() return redirect(url_for('frontend.app_token')) @frontend_views.route('/totp') def totp() -> ResponseReturnValue: delete_form = TOTPDeleteForm() return render_template('frontend/totp.html.j2', delete_form=delete_form) @frontend_views.route('/totp/new', methods=['GET', 'POST']) def totp_new() -> ResponseReturnValue: form = TOTPForm() if form.validate_on_submit(): totp = Totp(name=form.data['name'], secret=form.data['secret'], user=get_current_user()) if totp.verify(form.data['token']): get_current_user().totps.append(totp) db.session.commit() return jsonify({ 'status': 'ok'}) else: return jsonify({ 'status': 'error', 'errors': [ 'TOTP Token invalid' ]}) return render_template('frontend/totp_new.html.j2', form=form) @frontend_views.route('/totp//delete', methods=['GET', 'POST']) def totp_delete(totp_name) -> ResponseReturnValue: totp = Totp.query.filter(Totp.name == totp_name).first() # type: Optional[Totp] db.session.delete(totp) db.session.commit() return jsonify({ 'status': 'ok'}) ## Passkey import webauthn from webauthn.helpers.structs import ( AuthenticatorSelectionCriteria, PublicKeyCredentialDescriptor, ResidentKeyRequirement, UserVerificationRequirement, ) @frontend_views.route('/passkey/list', methods=['GET']) def passkey() -> ResponseReturnValue: """list registered credentials for current user""" credentials = PasskeyCredential.query.all() return render_template('frontend/passkey_list.html.j2', credentials=credentials, button_form=ButtonForm()) @frontend_views.route('/passkey/new', methods=['GET']) def passkey_new() -> ResponseReturnValue: """register credential for current user""" public_url = urlparse(current_app.config['PUBLIC_URL']) user = get_current_user() # type: User form = WebauthnRegisterForm() options = webauthn.generate_registration_options( rp_name="Lenticluar Cloud", rp_id=public_url.hostname, user_id=str(user.id), user_name=user.username, authenticator_selection=AuthenticatorSelectionCriteria( user_verification=UserVerificationRequirement.REQUIRED, resident_key=ResidentKeyRequirement.REQUIRED, ), exclude_credentials = list(map(lambda x: PublicKeyCredentialDescriptor(id=x.credential_id), user.passkey_credentials)) ) secret_key = current_app.config['SECRET_KEY'] token = jwt.encode({ 'challenge': b64encode(options.challenge).decode(), 'iat': datetime.utcnow() - timedelta(minutes=1), 'nbf': datetime.utcnow(), 'exp': datetime.utcnow() + timedelta(minutes=15), }, secret_key, algorithm="HS256" ) return render_template( 'frontend/passkey_new.html.j2', form=form, options=webauthn.options_to_json(options), token=token, ) @frontend_views.route('/passkey/new', methods=['POST']) def passkey_new_process() -> ResponseReturnValue: secret_key = current_app.config['SECRET_KEY'] public_url = urlparse(current_app.config['PUBLIC_URL']) user = get_current_user() data = request.get_json() try: token = jwt.decode( data['token'], secret_key, algorithms=['HS256'], options = { 'require': ["challenge", "exp", "iat", "nbf"], }) except jwt.exceptions.MissingRequiredClaimError: return jsonify({'message': "invalid token"}), 400 challenge = b64decode(token['challenge']) credential = data['credential'] name = data['name'] result = webauthn.verify_registration_response( credential = credential, expected_rp_id = public_url.hostname, expected_challenge = challenge, expected_origin = [ public_url.geturl() ], ) if not result.user_verified: return jsonify({ "message": "invalid auth" }), 403 db.session.add(PasskeyCredential( id=None, user_id=user.id, credential_id=result.credential_id, credential_public_key=result.credential_public_key, name=name, )) db.session.commit() logger.info(f"add new passkey for user {user.username}") return jsonify({}) @frontend_views.route('/passkey/delete/', methods=['POST']) def passkey_delete(id: str) -> ResponseReturnValue: """delete registered credential""" form = ButtonForm() if form.validate_on_submit(): cred = PasskeyCredential.query.filter(PasskeyCredential.id == id).first_or_404() db.session.delete(cred) db.session.commit() return redirect(url_for('.passkey')) return '', HTTPStatus.BAD_REQUEST @frontend_views.route('/password_change') def password_change() -> ResponseReturnValue: form = PasswordChangeForm() return render_template('frontend/password_change.html.j2', form=form) @frontend_views.route('/password_change', methods=['POST']) def password_change_post() -> ResponseReturnValue: form = PasswordChangeForm() if form.validate(): password_old = str(form.data['password_old']) password_new = str(form.data['password_new']) if not PasswordAuthProvider.check_auth_internal( get_current_user(), password_old): return jsonify( {'errors': {'password_old': 'Old Password is invalid'}}) get_current_user().change_password(password_new) logger.info(f"user {get_current_user().username} changed password") db.session.commit() return jsonify({}) return jsonify({'errors': form.errors}) @frontend_views.route('/oauth2_token') async def oauth2_tokens() -> ResponseReturnValue: subject = oauth2.custom.get('/userinfo').json()['sub'] consent_sessions = await list_o_auth_2_consent_sessions.asyncio(subject=subject, _client=hydra_service.hydra_client) if consent_sessions is None or isinstance( consent_sessions, GenericError): return 'internal error, could not fetch sessions', 500 return render_template( 'frontend/oauth2_tokens.html.j2', consent_sessions=consent_sessions) @frontend_views.route('/oauth2_token/', methods=['DELETE']) async def oauth2_token_revoke(client_id: str) -> ResponseReturnValue: subject = oauth2.session.get('/userinfo').json()['sub'] await revoke_o_auth_2_consent_sessions.asyncio_detailed( _client=hydra_service.hydra_client, subject=subject, client=client_id) return jsonify({})