from authlib.integrations.base_client.errors import MissingTokenError, InvalidTokenError from base64 import b64encode, b64decode from flask import Blueprint, redirect, request from flask import current_app from flask import jsonify, session from flask import render_template, url_for from flask_login import logout_user, current_user from http import HTTPStatus from werkzeug.utils import redirect import logging from datetime import timedelta from flask.typing import ResponseReturnValue from ory_hydra_client.api.o_auth_2 import list_o_auth_2_consent_sessions, revoke_o_auth_2_consent_sessions from ory_hydra_client.models import GenericError from urllib.parse import urlparse from typing import Optional, Any import jwt from datetime import datetime, timedelta import webauthn from webauthn.helpers.structs import ( AuthenticatorSelectionCriteria, PublicKeyCredentialDescriptor, ResidentKeyRequirement, UserVerificationRequirement, ) from ..model import db, User, AppToken, PasskeyCredential from ..form.frontend import ClientCertForm, PasswordChangeForm, \ AppTokenForm, AppTokenDeleteForm, PasskeyRegisterForm from ..form.base import ButtonForm from ..auth_providers import PasswordAuthProvider from .oauth2 import redirect_login, oauth2 from ..hydra import hydra_service from ..pki import pki from ..lenticular_services import lenticular_services frontend_views = Blueprint('frontend', __name__, url_prefix='') logger = logging.getLogger(__name__) def get_current_user() -> User: user_any: Any = current_user user: User = user_any return user def before_request() -> Optional[ResponseReturnValue]: try: resp = oauth2.custom.get('/userinfo') if not get_current_user().is_authenticated or resp.status_code != 200: logger.info('user not logged in redirect') return redirect_login() except MissingTokenError: logger.info('MissingTokenError redirect user to login') return redirect_login() except InvalidTokenError: logger.info('InvalidTokenError redirect user to login') return redirect_login() return None frontend_views.before_request(before_request) @frontend_views.route('/logout') def logout() -> ResponseReturnValue: logout_user() return redirect( f'{current_app.config["HYDRA_PUBLIC_URL"]}/oauth2/sessions/logout') @frontend_views.route('/', methods=['GET']) def index() -> ResponseReturnValue: if 'next_url' in session: next_url = session['next_url'] del session['next_url'] return redirect(next_url) return render_template( 'frontend/index.html.j2', lenticular_services=lenticular_services, ) @frontend_views.route('/client_cert') def client_cert() -> ResponseReturnValue: client_certs = {} for service in lenticular_services.values(): client_certs[str(service.name)] = \ pki.get_client_certs(get_current_user(), service) return render_template( 'frontend/client_cert.html.j2', services=lenticular_services, client_certs=client_certs) @frontend_views.route('/client_cert//') def get_client_cert(service_name, serial_number) -> ResponseReturnValue: service = lenticular_services[service_name] cert = pki.get_client_cert( get_current_user(), service, serial_number) return jsonify({ 'data': { 'pem': cert.pem()} }) @frontend_views.route( '/client_cert//', methods=['DELETE']) def revoke_client_cert(service_name, serial_number) -> ResponseReturnValue: service = lenticular_services[service_name] cert = pki.get_client_cert( get_current_user(), service, serial_number) pki.revoke_certificate(cert) return jsonify({}) @frontend_views.route( '/client_cert//new', methods=['GET', 'POST']) def client_cert_new(service_name) -> ResponseReturnValue: if service_name not in lenticular_services: return '', 404 service = lenticular_services[service_name] form = ClientCertForm() if form.validate_on_submit(): valid_time = int(form.data['valid_time']) * timedelta(1, 0, 0) cert = pki.signing_publickey( get_current_user(), service, form.data['publickey'], valid_time=valid_time) return jsonify({ 'status': 'ok', 'data': { 'cert': cert.pem(), 'ca_cert': pki.get_ca_cert_pem(service) }}) elif form.is_submitted(): return jsonify({ 'status': 'error', 'errors': form.errors }) return render_template( 'frontend/client_cert_new.html.j2', service=service, form=form) @frontend_views.route('/app_token') def app_token() -> ResponseReturnValue: delete_form = AppTokenDeleteForm() form = ClientCertForm() return render_template('frontend/app_token.html.j2', delete_form=delete_form, services=lenticular_services) @frontend_views.route('/app_token/new', methods=['GET','POST']) def app_token_new() -> ResponseReturnValue: form = AppTokenForm() if form.validate_on_submit(): user_any = get_current_user() # type: Any user = user_any # type: User app_token = AppToken.new(user, name="",scopes="") form.populate_obj(app_token) # check for duplicate names for user_app_token in user.app_tokens: if user_app_token.name == app_token.name: return 'name already exist', 400 user.app_tokens.append(app_token) db.session.commit() return render_template('frontend/app_token_new_show.html.j2', app_token=app_token) return render_template('frontend/app_token_new.html.j2', form=form) @frontend_views.route('/app_token/', methods=["POST"]) def app_token_delete(app_token_name: str) -> ResponseReturnValue: form = AppTokenDeleteForm() if form.validate_on_submit(): app_token = get_current_user().get_token_by_name(app_token_name) if app_token is None: return 'not found', 404 db.session.delete(app_token) db.session.commit() return redirect(url_for('frontend.app_token')) ## Passkey @frontend_views.route('/passkey/list', methods=['GET']) def passkey() -> ResponseReturnValue: """list registered credentials for current user""" user = get_current_user() return render_template( 'frontend/passkey_list.html.j2', credentials=user.passkey_credentials, button_form=ButtonForm() ) @frontend_views.route('/passkey/new', methods=['GET']) def passkey_new() -> ResponseReturnValue: """register credential for current user""" public_url = urlparse(current_app.config['PUBLIC_URL']) user = get_current_user() form = PasskeyRegisterForm() options = webauthn.generate_registration_options( rp_name="Lenticluar Cloud", rp_id=public_url.hostname, user_id=str(user.id), user_name=user.username, authenticator_selection=AuthenticatorSelectionCriteria( user_verification=UserVerificationRequirement.REQUIRED, resident_key=ResidentKeyRequirement.REQUIRED, ), exclude_credentials = list(map(lambda x: PublicKeyCredentialDescriptor(id=x.credential_id), user.passkey_credentials)) ) secret_key = current_app.config['SECRET_KEY'] token = jwt.encode({ 'challenge': b64encode(options.challenge).decode(), 'iat': datetime.utcnow() - timedelta(minutes=1), 'nbf': datetime.utcnow(), 'exp': datetime.utcnow() + timedelta(minutes=15), }, secret_key, algorithm="HS256" ) return render_template( 'frontend/passkey_new.html.j2', form=form, options=webauthn.options_to_json(options), token=token, ) @frontend_views.route('/passkey/new', methods=['POST']) def passkey_new_process() -> ResponseReturnValue: secret_key = current_app.config['SECRET_KEY'] public_url = urlparse(current_app.config['PUBLIC_URL']) user = get_current_user() data = request.get_json() try: token = jwt.decode( data['token'], secret_key, algorithms=['HS256'], options = { 'require': ["challenge", "exp", "iat", "nbf"], }) except jwt.exceptions.MissingRequiredClaimError: return jsonify({'message': "invalid token"}), 400 challenge = b64decode(token['challenge']) credential = data['credential'] name = data['name'] result = webauthn.verify_registration_response( credential = credential, expected_rp_id = public_url.hostname, expected_challenge = challenge, expected_origin = [ public_url.geturl() ], ) if not result.user_verified: return jsonify({ "message": "invalid auth" }), 403 db.session.add(PasskeyCredential( id=None, user_id=user.id, credential_id=result.credential_id, credential_public_key=result.credential_public_key, name=name, )) db.session.commit() logger.info(f"add new passkey for user {user.username}") return jsonify({}) @frontend_views.route('/passkey/delete/', methods=['POST']) def passkey_delete(id: str) -> ResponseReturnValue: """delete registered credential""" user = get_current_user() form = ButtonForm() if form.validate_on_submit(): cred = PasskeyCredential.query.filter(PasskeyCredential.id == id).first_or_404() if cred.user_id != user.id: return '', 404 db.session.delete(cred) db.session.commit() return redirect(url_for('.passkey')) return '', HTTPStatus.BAD_REQUEST @frontend_views.route('/password_change') def password_change() -> ResponseReturnValue: form = PasswordChangeForm() return render_template('frontend/password_change.html.j2', form=form) @frontend_views.route('/password_change', methods=['POST']) def password_change_post() -> ResponseReturnValue: form = PasswordChangeForm() if form.validate(): password_old = str(form.data['password_old']) password_new = str(form.data['password_new']) if not PasswordAuthProvider.check_auth_internal( get_current_user(), password_old): return jsonify( {'errors': {'password_old': 'Old Password is invalid'}}) get_current_user().change_password(password_new) logger.info(f"user {get_current_user().username} changed password") db.session.commit() return jsonify({}) return jsonify({'errors': form.errors}) @frontend_views.route('/oauth2_token') async def oauth2_tokens() -> ResponseReturnValue: subject = oauth2.custom.get('/userinfo').json()['sub'] consent_sessions = await list_o_auth_2_consent_sessions.asyncio(subject=subject, _client=hydra_service.hydra_client) if consent_sessions is None or isinstance( consent_sessions, GenericError): return 'internal error, could not fetch sessions', 500 return render_template( 'frontend/oauth2_tokens.html.j2', consent_sessions=consent_sessions) @frontend_views.route('/oauth2_token/', methods=['DELETE']) async def oauth2_token_revoke(client_id: str) -> ResponseReturnValue: subject = oauth2.session.get('/userinfo').json()['sub'] await revoke_o_auth_2_consent_sessions.asyncio_detailed( _client=hydra_service.hydra_client, subject=subject, client=client_id) return jsonify({})