add 2fa totp
This commit is contained in:
parent
06e99be868
commit
90e4e80ede
11 changed files with 233 additions and 14 deletions
|
@ -89,7 +89,7 @@ def init_oidc_provider(app):
|
|||
|
||||
def get_claims_for(self, user_id, requested_claims):
|
||||
user = self[user_id]
|
||||
print(f'user {user.username}')
|
||||
print(f'user {user.username} {requested_claims}')
|
||||
claims = {}
|
||||
for claim in requested_claims:
|
||||
if claim == 'name':
|
||||
|
|
|
@ -61,8 +61,10 @@ class TotpAuthProvider(AuthProvider):
|
|||
data = form.data['totp']
|
||||
if data is not None:
|
||||
print(f'data totp: {data}')
|
||||
if len(user.totps) == 0: # migration, TODO remove
|
||||
return True
|
||||
for totp in user.totps:
|
||||
if pyotp.TOTP(totp).verify(data):
|
||||
if totp.verify(data):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
|
|
@ -3,11 +3,10 @@ from flask_wtf import FlaskForm
|
|||
from wtforms import StringField, SubmitField, TextField, \
|
||||
TextAreaField, PasswordField, IntegerField, FloatField, \
|
||||
DateTimeField, DateField, FormField, BooleanField, \
|
||||
SelectField, Form as NoCsrfForm
|
||||
SelectField, Form as NoCsrfForm, HiddenField
|
||||
from wtforms.widgets.html5 import NumberInput, DateInput
|
||||
from wtforms.validators import DataRequired, NumberRange, Optional, NoneOf, Length
|
||||
from datetime import datetime
|
||||
|
||||
from wtforms.validators import DataRequired, NumberRange, \
|
||||
Optional, NoneOf, Length
|
||||
|
||||
|
||||
class ClientCertForm(FlaskForm):
|
||||
|
@ -22,3 +21,14 @@ class ClientCertForm(FlaskForm):
|
|||
NumberRange(min=1, max=365*2)
|
||||
])
|
||||
submit = SubmitField(gettext('Submit'))
|
||||
|
||||
|
||||
class TOTPForm(FlaskForm):
|
||||
secret = HiddenField(gettext('totp-Secret'))
|
||||
token = TextField(gettext('totp-verify token'))
|
||||
name = TextField(gettext('name'))
|
||||
submit = SubmitField(gettext('Activate'))
|
||||
|
||||
|
||||
class TOTPDeleteForm(FlaskForm):
|
||||
submit = SubmitField(gettext('Delete'))
|
||||
|
|
|
@ -7,6 +7,10 @@ from flask_login import UserMixin
|
|||
from ldap3.core.exceptions import LDAPSessionTerminatedByServerError
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
from collections.abc import MutableSequence
|
||||
from datetime import datetime
|
||||
import pyotp
|
||||
import json
|
||||
|
||||
ldap_conn = None # type: Connection
|
||||
base_dn = ''
|
||||
|
@ -160,12 +164,79 @@ class Certificate(object):
|
|||
def __str__(self):
|
||||
return f'Certificate(cn={self._cn}, ca_name={self._ca_name}, not_valid_before={self.not_valid_before}, not_valid_after={self.not_valid_after})'
|
||||
|
||||
|
||||
class Totp(object):
|
||||
|
||||
def __init__(self, name, secret, created_at=datetime.now()):
|
||||
self._secret = secret
|
||||
self._name = name
|
||||
self._created_at = created_at
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
return self._name
|
||||
|
||||
@property
|
||||
def created_at(self):
|
||||
return self._created_at
|
||||
|
||||
def verify(self, token: str):
|
||||
totp = pyotp.TOTP(self._secret)
|
||||
return totp.verify(token)
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
'secret': self._secret,
|
||||
'name': self._name,
|
||||
'created_at': int(self._created_at.timestamp())}
|
||||
|
||||
@staticmethod
|
||||
def from_dict(data):
|
||||
return Totp(
|
||||
name=data['name'],
|
||||
secret=data['secret'],
|
||||
created_at=datetime.fromtimestamp(data['created_at']))
|
||||
|
||||
|
||||
class TotpList(MutableSequence):
|
||||
def __init__(self, ldap_attr):
|
||||
super().__init__()
|
||||
self._ldap_attr = ldap_attr
|
||||
|
||||
def __getitem__(self, ii):
|
||||
return Totp.from_dict(json.loads(self._ldap_attr[ii]))
|
||||
|
||||
def __setitem__(self, ii, val: Totp):
|
||||
self._ldap_attr[ii] = json.dumps(val.to_dict()).encode()
|
||||
|
||||
def __len__(self):
|
||||
return len(self._ldap_attr)
|
||||
|
||||
def __delitem__(self, ii):
|
||||
del self._ldap_attr[ii]
|
||||
|
||||
def delete(self, totp_name):
|
||||
for i in range(len(self)):
|
||||
if self[i].name == totp_name:
|
||||
self._ldap_attr.delete(self._ldap_attr[i])
|
||||
|
||||
def insert(self, ii, val):
|
||||
self.append(val)
|
||||
|
||||
def append(self, val):
|
||||
self._ldap_attr.add(json.dumps(val.to_dict()).encode())
|
||||
|
||||
|
||||
class User(EntryBase):
|
||||
|
||||
dn = "uid={uid},{base_dn}"
|
||||
base_dn = "ou=users,{_base_dn}"
|
||||
object_classes = ["top", "inetOrgPerson", "LenticularUser"]
|
||||
|
||||
def __init__(self, ldap_object=None, **kwargs):
|
||||
super().__init__(ldap_object, **kwargs)
|
||||
self._totp_list = TotpList(ldap_object.totpSecret)
|
||||
|
||||
@property
|
||||
def is_authenticated(self):
|
||||
return True # TODO
|
||||
|
@ -173,6 +244,10 @@ class User(EntryBase):
|
|||
def get(self, key):
|
||||
print(f'getitem: {key}')
|
||||
|
||||
def make_writeable(self):
|
||||
self._ldap_object = self._ldap_object.entry_writable()
|
||||
self._totp_list = TotpList(self._ldap_object.totpSecret)
|
||||
|
||||
@property
|
||||
def entry_dn(self):
|
||||
return self._ldap_object.entry_dn
|
||||
|
@ -219,7 +294,7 @@ class User(EntryBase):
|
|||
|
||||
@property
|
||||
def totps(self):
|
||||
return ['JBSWY3DPEHPK3PXP']
|
||||
return self._totp_list
|
||||
|
||||
class _query(EntryBase._query):
|
||||
def by_username(self, username) -> 'User':
|
||||
|
|
|
@ -19,11 +19,12 @@ from flask_login import login_required, login_user, logout_user, current_user
|
|||
from werkzeug.utils import redirect
|
||||
import logging
|
||||
from datetime import timedelta
|
||||
import pyotp
|
||||
|
||||
|
||||
from ..model import User, SecurityUser
|
||||
from ..model import User, SecurityUser, Totp
|
||||
from ..form.login import LoginForm
|
||||
from ..form.frontend import ClientCertForm
|
||||
from ..form.frontend import ClientCertForm, TOTPForm, TOTPDeleteForm
|
||||
from ..auth_providers import AUTH_PROVIDER_LIST
|
||||
|
||||
|
||||
|
@ -68,7 +69,7 @@ def client_cert_new(service_name):
|
|||
service,
|
||||
form.data['publickey'],
|
||||
valid_time=valid_time)
|
||||
return jsonify( {
|
||||
return jsonify({
|
||||
'status': 'ok',
|
||||
'data': {
|
||||
'cert': cert.pem(),
|
||||
|
@ -88,6 +89,38 @@ def client_cert_new(service_name):
|
|||
@frontend_views.route('/totp')
|
||||
@login_required
|
||||
def totp():
|
||||
return render_template('frontend/totp.html.j2')
|
||||
delete_form = TOTPDeleteForm()
|
||||
return render_template('frontend/totp.html.j2', delete_form=delete_form)
|
||||
|
||||
|
||||
@frontend_views.route('/totp/new', methods=['GET','POST'])
|
||||
@login_required
|
||||
def totp_new():
|
||||
form = TOTPForm()
|
||||
|
||||
if form.validate_on_submit():
|
||||
totp = Totp(name=form.data['name'], secret=form.data['secret'])
|
||||
if totp.verify(form.data['token']):
|
||||
current_user.make_writeable()
|
||||
current_user.totps.append(totp)
|
||||
current_user._ldap_object.entry_commit_changes()
|
||||
return jsonify({
|
||||
'status': 'ok'})
|
||||
else:
|
||||
return jsonify({
|
||||
'status': 'error',
|
||||
'errors': [
|
||||
'TOTP Token invalid'
|
||||
]})
|
||||
return render_template('frontend/totp_new.html.j2', form=form)
|
||||
|
||||
|
||||
@frontend_views.route('/totp/<totp_name>/delete', methods=['GET','POST'])
|
||||
@login_required
|
||||
def totp_delete(totp_name):
|
||||
current_user.make_writeable()
|
||||
current_user.totps.delete(totp_name)
|
||||
current_user._ldap_object.entry_commit_changes()
|
||||
|
||||
return jsonify({
|
||||
'status': 'ok'})
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue