implement basic passkey login flow

This commit is contained in:
tuxcoder 2023-12-25 18:55:20 +01:00
parent 926afee5c5
commit 0a1da35d84
8 changed files with 150 additions and 96 deletions

View file

@ -1,39 +1,35 @@
from urllib.parse import urlencode, parse_qs
import flask
from flask import Blueprint, redirect, flash, current_app, session
from flask.templating import render_template
from flask_babel import gettext
from flask.typing import ResponseReturnValue
from flask import request, url_for, jsonify
from flask_login import login_required, login_user, logout_user, current_user
import logging
from urllib.parse import urlparse
from base64 import b64decode, b64encode
import http
from base64 import b64encode, b64decode, urlsafe_b64decode
import crypt
from datetime import datetime
from datetime import datetime, timedelta
import jwt
from flask import request, url_for, jsonify, Blueprint, redirect, current_app, session
from flask.templating import render_template
from flask.typing import ResponseReturnValue
import logging
import json
from ory_hydra_client.api.o_auth_2 import get_o_auth_2_consent_request, accept_o_auth_2_consent_request, accept_o_auth_2_login_request, get_o_auth_2_login_request, accept_o_auth_2_login_request, accept_o_auth_2_logout_request, get_o_auth_2_login_request
from ory_hydra_client import models as ory_hydra_m
from ory_hydra_client.models import TheRequestPayloadUsedToAcceptALoginOrConsentRequest, TheRequestPayloadUsedToAcceptAConsentRequest, GenericError
from ory_hydra_client.api.o_auth_2 import get_o_auth_2_consent_request, accept_o_auth_2_consent_request, accept_o_auth_2_login_request, get_o_auth_2_login_request, accept_o_auth_2_login_request, accept_o_auth_2_logout_request, get_o_auth_2_login_request
from ory_hydra_client.models import TheRequestPayloadUsedToAcceptAConsentRequest, GenericError
from typing import Optional
from urllib.parse import urlparse
from uuid import uuid4, UUID
import webauthn
from webauthn.helpers.structs import (
AuthenticatorSelectionCriteria,
PublicKeyCredentialDescriptor,
ResidentKeyRequirement,
UserVerificationRequirement,
)
from ..model import db, User, SecurityUser
from ..model import db, User, PasskeyCredential
from ..form.auth import ConsentForm, LoginForm, RegistrationForm
from ..auth_providers import AUTH_PROVIDER_LIST
from ..hydra import hydra_service
from ..wrapped_fido2_server import WrappedFido2Server
logger = logging.getLogger(__name__)
auth_views = Blueprint('auth', __name__, url_prefix='/auth')
webauthn = WrappedFido2Server()
@auth_views.route('/consent', methods=['GET', 'POST'])
@ -98,6 +94,9 @@ async def consent() -> ResponseReturnValue:
@auth_views.route('/login', methods=['GET', 'POST'])
async def login() -> ResponseReturnValue:
secret_key = current_app.config['SECRET_KEY']
public_url = urlparse(current_app.config['PUBLIC_URL'])
login_challenge = request.args.get('login_challenge')
if login_challenge is None:
return 'login_challenge missing', 400
@ -106,6 +105,21 @@ async def login() -> ResponseReturnValue:
logger.exception("could not fetch login request")
return redirect(url_for('frontend.index'))
## passkey
options = webauthn.generate_authentication_options(
rp_id = public_url.hostname,
user_verification = UserVerificationRequirement.REQUIRED,
challenge=webauthn.helpers.generate_challenge(32)
)
token = jwt.encode({
'challenge': b64encode(options.challenge).decode(),
'iat': datetime.utcnow() - timedelta(minutes=1),
'nbf': datetime.utcnow(),
'exp': datetime.utcnow() + timedelta(minutes=15),
}, secret_key, algorithm="HS256"
)
##
if login_request.skip:
resp = await accept_o_auth_2_login_request.asyncio(_client=hydra_service.hydra_client,
login_challenge=login_challenge,
@ -124,7 +138,13 @@ async def login() -> ResponseReturnValue:
session['auth_providers'] = []
return redirect(
url_for('auth.login_auth', login_challenge=login_challenge))
return render_template('auth/login.html.j2', form=form)
return render_template(
'auth/login.html.j2',
form=form,
options=webauthn.options_to_json(options),
token=token,
login_challenge=login_challenge,
)
@auth_views.route('/login/auth', methods=['GET', 'POST'])
@ -171,21 +191,54 @@ async def login_auth() -> ResponseReturnValue:
return render_template('auth/login_auth.html.j2', forms=auth_forms)
@auth_views.route('/passkey/verify', methods=['POST'])
async def passkey_verify() -> ResponseReturnValue:
secret_key = current_app.config['SECRET_KEY']
public_url = current_app.config['PUBLIC_URL']
@auth_views.route('/webauthn/pkcro', methods=['POST'])
def webauthn_pkcro_route() -> ResponseReturnValue:
"""login webauthn pkcro route"""
return '', 404
user = User.query.filter(User.id == session.get('webauthn_login_user_id')).one() #type: User
form = ButtonForm()
if user and form.validate_on_submit():
pkcro, state = webauthn.authenticate_begin(webauthn_credentials(user))
session['webauthn_login_state'] = state
return Response(b64encode(cbor.encode(pkcro)).decode('utf-8'), mimetype='text/plain')
data = request.get_json()
token = jwt.decode(data['token'], secret_key, algorithms=['HS256'])
challenge = urlsafe_b64decode(token['challenge'])
credential = data['credential']
credential_id = urlsafe_b64decode(credential['id'])
return '', HTTPStatus.BAD_REQUEST
login_challenge = data['login_challenge']
if login_challenge is None:
return 'missing login_challenge, bad request', 400
login_request = await get_o_auth_2_login_request.asyncio(_client=hydra_service.hydra_client, login_challenge=login_challenge)
if login_request is None:
return redirect(url_for('frontend.index'))
passkey = PasskeyCredential.query.filter(PasskeyCredential.credential_id == credential_id).first_or_404()
result = webauthn.verify_authentication_response(
credential = credential,
expected_rp_id = "localhost",
expected_challenge = challenge,
expected_origin = [ public_url ],
credential_public_key = passkey.credential_public_key,
credential_current_sign_count = passkey.sign_count,
)
logger.error(f"DEBUG: {passkey}")
logger.error(f"DEBUG: {result}")
passkey.sign_count = result.new_sign_count
passkey.last_used = datetime.utcnow()
user = passkey.user
user.last_login = datetime.now()
subject = str(user.id)
resp = await accept_o_auth_2_login_request.asyncio(_client=hydra_service.hydra_client,
login_challenge=login_challenge, json_body=ory_hydra_m.HandledLoginRequestIsTheRequestPayloadUsedToAcceptALoginRequest(
subject=subject,
remember=True,
))
if resp is None or isinstance( resp, GenericError):
return 'internal error, could not forward request', 503
db.session.commit()
return jsonify({'redirect': resp.redirect_to})
@auth_views.route("/logout")
async def logout() -> ResponseReturnValue:

View file

@ -17,6 +17,13 @@ from urllib.parse import urlparse
from typing import Optional, Any
import jwt
from datetime import datetime, timedelta
import webauthn
from webauthn.helpers.structs import (
AuthenticatorSelectionCriteria,
PublicKeyCredentialDescriptor,
ResidentKeyRequirement,
UserVerificationRequirement,
)
from ..model import db, User, Totp, AppToken, PasskeyCredential
from ..form.frontend import ClientCertForm, TOTPForm, \
@ -24,7 +31,6 @@ from ..form.frontend import ClientCertForm, TOTPForm, \
AppTokenForm, AppTokenDeleteForm
from ..form.base import ButtonForm
from ..auth_providers import PasswordAuthProvider
from .auth import webauthn
from .oauth2 import redirect_login, oauth2
from ..hydra import hydra_service
from ..pki import pki
@ -221,14 +227,6 @@ def totp_delete(totp_name) -> ResponseReturnValue:
## Passkey
import webauthn
from webauthn.helpers.structs import (
AuthenticatorSelectionCriteria,
PublicKeyCredentialDescriptor,
ResidentKeyRequirement,
UserVerificationRequirement,
)
@frontend_views.route('/passkey/list', methods=['GET'])
def passkey() -> ResponseReturnValue:
"""list registered credentials for current user"""