add basic passkey management

This commit is contained in:
tuxcoder 2023-12-25 17:28:09 +01:00
parent 5759cb1e4f
commit f858a1a78c
10 changed files with 258 additions and 273 deletions

View file

@ -1,29 +1,24 @@
from authlib.integrations.base_client.errors import MissingTokenError, InvalidTokenError
from base64 import b64encode, b64decode
from fido2 import cbor
from fido2.webauthn import CollectedClientData, AttestationObject, AttestedCredentialData, AuthenticatorData, PublicKeyCredentialUserEntity
from flask import Blueprint, Response, redirect, request
from flask import Blueprint, redirect, request
from flask import current_app
from flask import jsonify, session, flash
from flask import jsonify, session
from flask import render_template, url_for
from flask_login import login_user, logout_user, current_user
from flask_login import logout_user, current_user
from http import HTTPStatus
from werkzeug.utils import redirect
import logging
from datetime import timedelta
from base64 import b64decode
from flask.typing import ResponseReturnValue
from oauthlib.oauth2.rfc6749.errors import TokenExpiredError
from ory_hydra_client.api.o_auth_2 import list_o_auth_2_consent_sessions, revoke_o_auth_2_consent_sessions
from ory_hydra_client.models import GenericError
from urllib.parse import urlencode, parse_qs
from random import SystemRandom
import string
from collections.abc import Iterable
from typing import Optional, Mapping, Iterator, List, Any
from urllib.parse import urlparse
from typing import Optional, Any
import jwt
from datetime import datetime, timedelta
from ..model import db, User, SecurityUser, Totp, AppToken, WebauthnCredential
from ..model import db, User, Totp, AppToken, PasskeyCredential
from ..form.frontend import ClientCertForm, TOTPForm, \
TOTPDeleteForm, PasswordChangeForm, WebauthnRegisterForm, \
AppTokenForm, AppTokenDeleteForm
@ -223,96 +218,118 @@ def totp_delete(totp_name) -> ResponseReturnValue:
'status': 'ok'})
@frontend_views.route('/webauthn/list', methods=['GET'])
def webauthn_list_route() -> ResponseReturnValue:
## Passkey
import webauthn
from webauthn.helpers.structs import (
AuthenticatorSelectionCriteria,
PublicKeyCredentialDescriptor,
ResidentKeyRequirement,
UserVerificationRequirement,
)
@frontend_views.route('/passkey/list', methods=['GET'])
def passkey() -> ResponseReturnValue:
"""list registered credentials for current user"""
creds = WebauthnCredential.query.all() # type: Iterable[WebauthnCredential]
return render_template('frontend/webauthn_list.html', creds=creds, button_form=ButtonForm())
credentials = PasskeyCredential.query.all()
return render_template('frontend/passkey_list.html.j2', credentials=credentials, button_form=ButtonForm())
@frontend_views.route('/webauthn/delete/<webauthn_id>', methods=['POST'])
def webauthn_delete_route(webauthn_id: str) -> ResponseReturnValue:
@frontend_views.route('/passkey/new', methods=['GET'])
def passkey_new() -> ResponseReturnValue:
"""register credential for current user"""
public_url = urlparse(current_app.config['PUBLIC_URL'])
user = get_current_user() # type: User
form = WebauthnRegisterForm()
options = webauthn.generate_registration_options(
rp_name="Lenticluar Cloud",
rp_id=public_url.hostname,
user_id=str(user.id),
user_name=user.username,
authenticator_selection=AuthenticatorSelectionCriteria(
user_verification=UserVerificationRequirement.REQUIRED,
resident_key=ResidentKeyRequirement.REQUIRED,
),
exclude_credentials = list(map(lambda x: PublicKeyCredentialDescriptor(id=x.credential_id), user.passkey_credentials))
)
secret_key = current_app.config['SECRET_KEY']
token = jwt.encode({
'challenge': b64encode(options.challenge).decode(),
'iat': datetime.utcnow() - timedelta(minutes=1),
'nbf': datetime.utcnow(),
'exp': datetime.utcnow() + timedelta(minutes=15),
}, secret_key, algorithm="HS256"
)
return render_template(
'frontend/passkey_new.html.j2',
form=form,
options=webauthn.options_to_json(options),
token=token,
)
@frontend_views.route('/passkey/new', methods=['POST'])
def passkey_new_process() -> ResponseReturnValue:
secret_key = current_app.config['SECRET_KEY']
public_url = urlparse(current_app.config['PUBLIC_URL'])
user = get_current_user()
data = request.get_json()
try:
token = jwt.decode(
data['token'], secret_key, algorithms=['HS256'],
options = {
'require': ["challenge", "exp", "iat", "nbf"],
})
except jwt.exceptions.MissingRequiredClaimError:
return jsonify({'message': "invalid token"}), 400
challenge = b64decode(token['challenge'])
credential = data['credential']
name = data['name']
result = webauthn.verify_registration_response(
credential = credential,
expected_rp_id = public_url.hostname,
expected_challenge = challenge,
expected_origin = [ public_url.geturl() ],
)
if not result.user_verified:
return jsonify({ "message": "invalid auth" }), 403
db.session.add(PasskeyCredential(
id=None,
user_id=user.id,
credential_id=result.credential_id,
credential_public_key=result.credential_public_key,
name=name,
))
db.session.commit()
logger.info(f"add new passkey for user {user.username}")
return jsonify({})
@frontend_views.route('/passkey/delete/<id>', methods=['POST'])
def passkey_delete(id: str) -> ResponseReturnValue:
"""delete registered credential"""
form = ButtonForm()
if form.validate_on_submit():
cred = WebauthnCredential.query.filter(WebauthnCredential.id == webauthn_id).one() # type: WebauthnCredential
cred = PasskeyCredential.query.filter(PasskeyCredential.id == id).first_or_404()
db.session.delete(cred)
db.session.commit()
return redirect(url_for('app.webauthn_list_route'))
return redirect(url_for('.passkey'))
return '', HTTPStatus.BAD_REQUEST
def webauthn_credentials(user: User) -> list[AttestedCredentialData]:
"""get and decode all credentials for given user"""
def decode(creds: List[WebauthnCredential]) -> Iterator[AttestedCredentialData]:
for cred in creds:
data = cbor.decode(cred.credential_data)
if isinstance(data, Mapping):
yield AttestedCredentialData.create(**data)
return list(decode(user.webauthn_credentials))
def random_string(length=32) -> str:
"""generates random string"""
return ''.join([SystemRandom().choice(string.ascii_letters + string.digits) for i in range(length)])
@frontend_views.route('/webauthn/pkcco', methods=['POST'])
def webauthn_pkcco_route() -> ResponseReturnValue:
"""get publicKeyCredentialCreationOptions"""
user = User.query.get(get_current_user().id) #type: Optional[User]
if user is None:
return 'internal error', 500
user_handle = random_string()
exclude_credentials = webauthn_credentials(user)
pkcco, state = webauthn.register_begin(
user=PublicKeyCredentialUserEntity(id=user_handle.encode('utf-8'), name=user.username, display_name=user.username),
credentials=exclude_credentials
)
session['webauthn_register_user_handle'] = user_handle
session['webauthn_register_state'] = state
return Response(b64encode(cbor.encode(pkcco)).decode('utf-8'), mimetype='text/plain')
@frontend_views.route('/webauthn/register', methods=['GET', 'POST'])
def webauthn_register_route() -> ResponseReturnValue:
"""register credential for current user"""
user = get_current_user() # type: User
form = WebauthnRegisterForm()
if form.validate_on_submit():
try:
attestation = cbor.decode(b64decode(form.attestation.data))
if not isinstance(attestation, Mapping) or 'clientDataJSON' not in attestation or 'attestationObject' not in attestation:
return 'invalid attestion data', 400
auth_data = webauthn.register_complete(
session.pop('webauthn_register_state'),
CollectedClientData(attestation['clientDataJSON']),
AttestationObject(attestation['attestationObject']))
db.session.add(WebauthnCredential(
user=user,
user_handle=session.pop('webauthn_register_user_handle'),
credential_data=cbor.encode(auth_data.credential_data.__dict__),
name=form.name.data))
db.session.commit()
return redirect(url_for('app.webauthn_list_route'))
except (KeyError, ValueError) as e:
logger.exception(e)
flash('Error during registration.', 'error')
return render_template('frontend/webauthn_register.html', form=form)
@frontend_views.route('/password_change')
def password_change() -> ResponseReturnValue:
form = PasswordChangeForm()