lenticular_cloud2/lenticular_cloud/views/frontend.py

310 lines
11 KiB
Python
Raw Normal View History

2020-05-09 18:00:07 +00:00
from authlib.integrations.base_client.errors import MissingTokenError, InvalidTokenError
2022-04-08 19:28:22 +00:00
from base64 import b64encode, b64decode
from fido2 import cbor
from fido2.client import ClientData
from fido2.ctap2 import AttestationObject, AttestedCredentialData, AuthenticatorData
from flask import Blueprint, Response, redirect, request
from flask import current_app
2022-04-08 19:28:22 +00:00
from flask import jsonify, session, flash
from flask import render_template, url_for
from flask_login import login_user, logout_user, current_user
2022-04-08 19:28:22 +00:00
from http import HTTPStatus
2020-05-09 18:00:07 +00:00
from werkzeug.utils import redirect
import logging
from datetime import timedelta
from base64 import b64decode
from flask.typing import ResponseReturnValue
from oauthlib.oauth2.rfc6749.errors import TokenExpiredError
from ory_hydra_client.api.admin import list_subject_consent_sessions, revoke_consent_sessions
from ory_hydra_client.models import GenericError
2022-04-08 19:28:22 +00:00
from urllib.parse import urlencode, parse_qs
from random import SystemRandom
import string
from typing import Optional
2020-05-09 18:00:07 +00:00
2022-04-08 19:28:22 +00:00
from ..model import db, User, SecurityUser, Totp, WebauthnCredential
from ..form.frontend import ClientCertForm, TOTPForm, \
2022-04-08 19:28:22 +00:00
TOTPDeleteForm, PasswordChangeForm, WebauthnRegisterForm
from ..form.base import ButtonForm
2022-06-18 17:35:05 +00:00
from ..auth_providers import PasswordAuthProvider
2022-04-08 19:28:22 +00:00
from .auth import webauthn
from .oauth2 import redirect_login, oauth2
from ..hydra import hydra_service
2022-06-17 11:38:49 +00:00
from ..pki import pki
from ..lenticular_services import lenticular_services
2020-05-09 18:00:07 +00:00
frontend_views = Blueprint('frontend', __name__, url_prefix='')
logger = logging.getLogger(__name__)
def before_request() -> Optional[ResponseReturnValue]:
try:
resp = oauth2.custom.get('/userinfo')
2022-02-06 22:57:01 +00:00
if not current_user.is_authenticated or resp.status_code != 200:
2020-06-21 09:52:37 +00:00
logger.info('user not logged in redirect')
2020-06-02 17:09:32 +00:00
return redirect_login()
except MissingTokenError:
2022-06-17 11:38:49 +00:00
logger.info('MissingTokenError redirect user to login')
return redirect_login()
except InvalidTokenError:
2022-06-17 11:38:49 +00:00
logger.info('InvalidTokenError redirect user to login')
2020-06-02 17:09:32 +00:00
return redirect_login()
return None
2020-05-09 18:00:07 +00:00
frontend_views.before_request(before_request)
2020-05-21 11:20:27 +00:00
2020-06-02 17:09:32 +00:00
@frontend_views.route('/logout')
def logout() -> ResponseReturnValue:
2020-06-02 17:09:32 +00:00
logout_user()
return redirect(
f'{current_app.config["HYDRA_PUBLIC_URL"]}/oauth2/sessions/logout')
2020-05-21 11:20:27 +00:00
2020-05-09 18:00:07 +00:00
@frontend_views.route('/', methods=['GET'])
def index() -> ResponseReturnValue:
2020-06-02 17:09:32 +00:00
if 'next_url' in session:
next_url = session['next_url']
del session['next_url']
return redirect(next_url)
2020-05-09 18:00:07 +00:00
return render_template('frontend/index.html.j2')
@frontend_views.route('/client_cert')
def client_cert() -> ResponseReturnValue:
2020-05-09 18:00:07 +00:00
client_certs = {}
2022-06-17 11:38:49 +00:00
for service in lenticular_services.values():
client_certs[str(service.name)] = \
2022-06-17 11:38:49 +00:00
pki.get_client_certs(current_user, service)
2020-05-09 18:00:07 +00:00
return render_template(
'frontend/client_cert.html.j2',
2022-06-17 11:38:49 +00:00
services=lenticular_services,
client_certs=client_certs)
2020-05-09 18:00:07 +00:00
@frontend_views.route('/client_cert/<service_name>/<serial_number>')
def get_client_cert(service_name, serial_number) -> ResponseReturnValue:
2022-06-17 11:38:49 +00:00
service = lenticular_services[service_name]
cert = pki.get_client_cert(
current_user, service, serial_number)
return jsonify({
'data': {
'pem': cert.pem()}
})
@frontend_views.route(
'/client_cert/<service_name>/<serial_number>', methods=['DELETE'])
def revoke_client_cert(service_name, serial_number) -> ResponseReturnValue:
2022-06-17 11:38:49 +00:00
service = lenticular_services[service_name]
cert = pki.get_client_cert(
current_user, service, serial_number)
2022-06-17 11:38:49 +00:00
pki.revoke_certificate(cert)
return jsonify({})
2020-05-09 18:00:07 +00:00
@frontend_views.route(
'/client_cert/<service_name>/new',
methods=['GET', 'POST'])
def client_cert_new(service_name) -> ResponseReturnValue:
2022-06-17 11:38:49 +00:00
service = lenticular_services[service_name]
2020-05-09 18:00:07 +00:00
form = ClientCertForm()
if form.validate_on_submit():
valid_time = int(form.data['valid_time']) * timedelta(1, 0, 0)
2022-06-17 11:38:49 +00:00
cert = pki.signing_publickey(
2020-05-09 18:00:07 +00:00
current_user,
service,
form.data['publickey'],
valid_time=valid_time)
2020-05-10 12:34:28 +00:00
return jsonify({
2020-05-09 18:00:07 +00:00
'status': 'ok',
'data': {
'cert': cert.pem(),
2022-06-17 11:38:49 +00:00
'ca_cert': pki.get_ca_cert_pem(service)
2020-05-09 18:00:07 +00:00
}})
elif form.is_submitted():
return jsonify({
'status': 'error',
'errors': form.errors
})
return render_template(
'frontend/client_cert_new.html.j2',
2020-05-09 18:00:07 +00:00
service=service,
form=form)
2022-06-17 07:44:46 +00:00
@frontend_views.route('/app_token')
def app_token() -> ResponseReturnValue:
2022-06-18 11:05:18 +00:00
delete_form = TOTPDeleteForm()
return render_template('frontend/app_token.html.j2', delete_form=delete_form)
2022-06-17 07:44:46 +00:00
@frontend_views.route('/app_token/<service_name>/new')
def app_token_new(service_name: str) -> ResponseReturnValue:
return
@frontend_views.route('/app_token/<service_name>/<token_name>')
def app_token_delete(service_name: str, token_name: str) -> ResponseReturnValue:
return
2020-05-09 18:00:07 +00:00
@frontend_views.route('/totp')
def totp() -> ResponseReturnValue:
2020-05-10 12:34:28 +00:00
delete_form = TOTPDeleteForm()
return render_template('frontend/totp.html.j2', delete_form=delete_form)
2020-05-09 18:00:07 +00:00
@frontend_views.route('/totp/new', methods=['GET', 'POST'])
def totp_new() -> ResponseReturnValue:
2020-05-10 12:34:28 +00:00
form = TOTPForm()
if form.validate_on_submit():
totp = Totp(name=form.data['name'], secret=form.data['secret'])
if totp.verify(form.data['token']):
current_user.totps.append(totp)
db.session.commit()
2020-05-10 12:34:28 +00:00
return jsonify({
'status': 'ok'})
else:
return jsonify({
'status': 'error',
'errors': [
'TOTP Token invalid'
]})
return render_template('frontend/totp_new.html.j2', form=form)
@frontend_views.route('/totp/<totp_name>/delete', methods=['GET', 'POST'])
def totp_delete(totp_name) -> ResponseReturnValue:
totp = Totp.query.filter(Totp.name == totp_name).first()
db.session.delete(totp)
db.session.commit()
2020-05-09 18:00:07 +00:00
2020-05-10 12:34:28 +00:00
return jsonify({
'status': 'ok'})
2022-04-08 19:28:22 +00:00
@frontend_views.route('/webauthn/list', methods=['GET'])
def webauthn_list_route() -> ResponseReturnValue:
"""list registered credentials for current user"""
creds = WebauthnCredential.query.all()
2022-04-08 19:29:23 +00:00
return render_template('frontend/webauthn_list.html', creds=creds, button_form=ButtonForm())
2022-04-08 19:28:22 +00:00
@frontend_views.route('/webauthn/delete/<webauthn_id>', methods=['POST'])
def webauthn_delete_route(webauthn_id: str) -> ResponseReturnValue:
"""delete registered credential"""
form = ButtonForm()
if form.validate_on_submit():
cred = WebauthnCredential.query.filter(WebauthnCredential.id == webauthn_id).one()
db.session.delete(cred)
db.session.commit()
return redirect(url_for('app.webauthn_list_route'))
return '', HTTPStatus.BAD_REQUEST
def webauthn_credentials(user: User) -> list[AttestedCredentialData]:
"""get and decode all credentials for given user"""
return [AttestedCredentialData.create(**cbor.decode(cred.credential_data)) for cred in user.webauthn_credentials]
def random_string(length=32) -> str:
"""generates random string"""
return ''.join([SystemRandom().choice(string.ascii_letters + string.digits) for i in range(length)])
@frontend_views.route('/webauthn/pkcco', methods=['POST'])
def webauthn_pkcco_route() -> ResponseReturnValue:
"""get publicKeyCredentialCreationOptions"""
2022-04-08 19:29:23 +00:00
user = User.query.get(current_user.id)
user_handle = random_string()
exclude_credentials = webauthn_credentials(user)
pkcco, state = webauthn.register_begin(
{'id': user_handle.encode('utf-8'), 'name': user.username, 'displayName': user.username},
exclude_credentials)
session['webauthn_register_user_handle'] = user_handle
session['webauthn_register_state'] = state
return Response(b64encode(cbor.encode(pkcco)).decode('utf-8'), mimetype='text/plain')
2022-04-08 19:28:22 +00:00
@frontend_views.route('/webauthn/register', methods=['GET', 'POST'])
def webauthn_register_route() -> ResponseReturnValue:
"""register credential for current user"""
user = User.query.get(current_user.id)
form = WebauthnRegisterForm()
if form.validate_on_submit():
try:
attestation = cbor.decode(b64decode(form.attestation.data))
auth_data = webauthn.register_complete(
session.pop('webauthn_register_state'),
ClientData(attestation['clientDataJSON']),
AttestationObject(attestation['attestationObject']))
db.session.add(WebauthnCredential(
user_id=user.id,
user_handle=session.pop('webauthn_register_user_handle'),
credential_data=cbor.encode(auth_data.credential_data.__dict__),
name=form.name.data))
db.session.commit()
return redirect(url_for('app.webauthn_list_route'))
except (KeyError, ValueError) as e:
2022-06-17 11:38:49 +00:00
logger.exception(e)
2022-04-08 19:28:22 +00:00
flash('Error during registration.', 'error')
2022-04-08 19:29:23 +00:00
return render_template('frontend/webauthn_register.html', form=form)
2022-04-08 19:28:22 +00:00
@frontend_views.route('/password_change')
def password_change() -> ResponseReturnValue:
form = PasswordChangeForm()
return render_template('frontend/password_change.html.j2', form=form)
@frontend_views.route('/password_change', methods=['POST'])
def password_change_post() -> ResponseReturnValue:
form = PasswordChangeForm()
if form.validate():
password_old = str(form.data['password_old'])
password_new = str(form.data['password_new'])
2022-06-18 17:35:05 +00:00
if not PasswordAuthProvider.check_auth_internal(
current_user, password_old):
return jsonify(
{'errors': {'password_old': 'Old Password is invalid'}})
resp = current_user.change_password(password_new)
if resp:
return jsonify({})
else:
return jsonify({'errors': {'internal': 'internal server errror'}})
return jsonify({'errors': form.errors})
@frontend_views.route('/oauth2_token')
2022-04-08 19:28:22 +00:00
async def oauth2_tokens() -> ResponseReturnValue:
subject = oauth2.custom.get('/userinfo').json()['sub']
2022-04-08 19:28:22 +00:00
consent_sessions = await list_subject_consent_sessions.asyncio(subject=subject, _client=hydra_service.hydra_client)
if consent_sessions is None or isinstance( consent_sessions, GenericError):
return 'internal error, could not fetch sessions', 500
return render_template(
'frontend/oauth2_tokens.html.j2',
consent_sessions=consent_sessions)
@frontend_views.route('/oauth2_token/<client_id>', methods=['DELETE'])
2022-04-08 19:28:22 +00:00
async def oauth2_token_revoke(client_id: str) -> ResponseReturnValue:
subject = oauth2.session.get('/userinfo').json()['sub']
2022-04-08 19:28:22 +00:00
await revoke_consent_sessions.asyncio( _client=hydra_service.hydra_client,
subject=subject,
client=client_id)
return jsonify({})