212 lines
7.5 KiB
Python
212 lines
7.5 KiB
Python
# Copyright (c) 2019. Tobias Kurze
|
|
"""
|
|
This module provides functions related to authentication through the API.
|
|
For example: listing of available auth providers or registration of users.
|
|
|
|
Login through API does not start a new session, but instead returns JWT.
|
|
"""
|
|
import base64
|
|
import json
|
|
import logging
|
|
from pprint import pprint
|
|
|
|
import flask
|
|
from datetime import datetime, timedelta
|
|
import jwt
|
|
from flask import request, jsonify, current_app, url_for, Response, session, redirect, make_response
|
|
from flask_jwt_extended import create_access_token, create_refresh_token, jwt_refresh_token_required, get_jwt_identity, \
|
|
get_raw_jwt, jwt_required
|
|
from functools import wraps
|
|
from random import randint
|
|
|
|
from flask_login import logout_user, login_user
|
|
from typing import Iterable
|
|
|
|
from flask_restx import Resource, fields, abort, inputs
|
|
from sqlalchemy.exc import IntegrityError
|
|
from werkzeug.routing import BuildError
|
|
|
|
from backend import db, app, jwt_extended
|
|
from backend.api import auth_api_bp, auth_api_providers_ns, auth_api_register_ns
|
|
from backend.api.models import user_model
|
|
from backend.auth import AUTH_PROVIDERS, oidc_auth
|
|
from backend.auth.oidc_config import PROVIDER_NAME
|
|
from backend.models.user_model import User, Group, BlacklistToken
|
|
|
|
logger = logging.getLogger("lrc.api.auth")
|
|
|
|
|
|
@auth_api_bp.route('/providers', methods=('GET',))
|
|
def get_auth_providers():
|
|
providers = dict()
|
|
for p in AUTH_PROVIDERS:
|
|
provider = dict(AUTH_PROVIDERS[p])
|
|
try:
|
|
provider["url"] = url_for(AUTH_PROVIDERS[p]["url"])
|
|
except BuildError:
|
|
provider["url"] = AUTH_PROVIDERS[p]["url"]
|
|
providers[p] = provider
|
|
return jsonify(providers)
|
|
|
|
|
|
@auth_api_providers_ns.route('/')
|
|
class AuthProviders(Resource):
|
|
def get(self):
|
|
return get_auth_providers()
|
|
|
|
|
|
@auth_api_register_ns.route('/')
|
|
@auth_api_register_ns.expect(auth_api_register_ns.model('RegisterModel', {
|
|
'nickname': fields.String(required=False, description='The user\'s nickname'),
|
|
'first_name': fields.String(required=False, description='The user\'s first name'),
|
|
'last_name': fields.String(required=False, description='The user\'s last name'),
|
|
'lang': fields.String(required=False, description='The user\'s preferred language'),
|
|
'timezone': fields.String(required=False, description='The user\'s preferred timezone'),
|
|
'email': fields.String(required=True, type=inputs.email(), description='The user\'s e-mail address'),
|
|
'password': fields.String(required=False, description='The group\'s name')
|
|
}))
|
|
class Registration(Resource):
|
|
@auth_api_register_ns.marshal_list_with(user_model)
|
|
def post(self):
|
|
print("in registration")
|
|
data = request.get_json()
|
|
try:
|
|
user = User(**data)
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
pprint(user.to_dict())
|
|
return user, 201
|
|
except IntegrityError as e:
|
|
abort(400, message=str(e).split('\n')[0].split(')')[1].strip())
|
|
except AssertionError as e:
|
|
abort(400, message=str(e))
|
|
|
|
|
|
@auth_api_bp.route('/login', methods=('GET', 'POST',))
|
|
def login():
|
|
print("login")
|
|
print(request)
|
|
data = request.get_json()
|
|
if not data:
|
|
return jsonify({'message': 'Invalid request data', 'authenticated': False}), 401
|
|
print(data)
|
|
user = User.authenticate(**data)
|
|
|
|
if not user:
|
|
return jsonify({'message': 'Invalid credentials', 'authenticated': False}), 401
|
|
|
|
token = {
|
|
'access_token': create_access_token(identity=user, fresh=True),
|
|
'refresh_token': create_refresh_token(identity=user)
|
|
}
|
|
return jsonify(token), 200
|
|
|
|
|
|
# Endpoint for revoking the current users access token
|
|
@auth_api_bp.route('/logout', methods=['GET', 'DELETE'])
|
|
@jwt_required
|
|
def logout():
|
|
jti = get_raw_jwt()['jti']
|
|
db.session.add(BlacklistToken(token=jti))
|
|
db.session.commit()
|
|
return jsonify({"msg": "Successfully logged out"}), 200
|
|
|
|
|
|
# Endpoint for revoking the current users refresh token
|
|
@auth_api_bp.route('/logout2', methods=['GET', 'DELETE'])
|
|
@auth_api_bp.route('/revokeRefreshToken', methods=['GET', 'DELETE'])
|
|
@jwt_refresh_token_required
|
|
def logout2():
|
|
jti = get_raw_jwt()['jti']
|
|
db.session.add(BlacklistToken(token=jti))
|
|
db.session.commit()
|
|
return jsonify({"msg": "Successfully logged out"}), 200
|
|
|
|
|
|
def check_and_create_groups(groups: Iterable[str]):
|
|
user_groups = []
|
|
for g in groups:
|
|
group = Group.get_by_name(g)
|
|
if group is None:
|
|
group = Group(name=g)
|
|
db.session.add(group)
|
|
user_groups.append(group)
|
|
|
|
db.session.commit()
|
|
return user_groups
|
|
|
|
|
|
def create_or_retrieve_user_from_userinfo(userinfo):
|
|
try:
|
|
email = userinfo["email"]
|
|
except KeyError:
|
|
logger.error("email is missing in OIDC userinfo! Can't create user!")
|
|
return None
|
|
|
|
pprint(userinfo)
|
|
user_groups = check_and_create_groups(groups=userinfo.get("memberOf", []))
|
|
user = User.get_by_identifier(email)
|
|
|
|
if user is not None:
|
|
logger.info("user found -> update user")
|
|
pprint(user.to_dict())
|
|
user.first_name = userinfo.get("given_name", "")
|
|
user.last_name = userinfo.get("family_name", "")
|
|
user.external_user_id = userinfo.get("eduperson_principal_name", None)
|
|
for g in user_groups:
|
|
user.groups.append(g)
|
|
db.session.commit()
|
|
return user
|
|
|
|
user = User(email=email, first_name=userinfo.get("given_name", ""),
|
|
last_name=userinfo.get("family_name", ""), external_user=True,
|
|
groups=user_groups, external_user_id=userinfo.get("eduperson_principal_name", None))
|
|
|
|
logger.info("creating new user")
|
|
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
return user
|
|
|
|
|
|
@auth_api_bp.route('/oidc', methods=['GET'])
|
|
@auth_api_bp.route('/oidc/<redirect_url>', methods=['GET'])
|
|
@oidc_auth.oidc_auth(provider_name=PROVIDER_NAME)
|
|
def oidc(redirect_url=None):
|
|
logger.debug("oidc auth endpoint:")
|
|
user = create_or_retrieve_user_from_userinfo(flask.session['userinfo'])
|
|
if user is None:
|
|
logger.error(f"Could not authenticate: could not find or create user:\n{str(flask.session['userinfo'])}")
|
|
return "Could not authenticate: could not find or create user.", 401
|
|
if current_app.config.get("AUTH_RETURN_EXTERNAL_JWT", False):
|
|
token = jwt.encode(flask.session['id_token'], current_app.config['SECRET_KEY'])
|
|
else:
|
|
token = json.dumps({
|
|
'access_token': create_access_token(identity=user, fresh=True),
|
|
'refresh_token': create_refresh_token(identity=user)
|
|
})
|
|
logger.info("Token: {}".format(token))
|
|
if redirect_url is None:
|
|
redirect_url = request.headers.get("Referer")
|
|
if redirect_url is None:
|
|
redirect_url = request.args.get('redirect_url')
|
|
if redirect_url is None:
|
|
redirect_url = "/"
|
|
response = make_response(redirect(redirect_url))
|
|
response.set_cookie('tokens', base64.b64encode(token.encode('utf-8')))
|
|
return response
|
|
|
|
|
|
@auth_api_bp.route('/refresh', methods=['GET'])
|
|
@jwt_refresh_token_required
|
|
def refresh():
|
|
"""Refresh token endpoint. This will generate a new access token from
|
|
the refresh token, but will mark that access token as non-fresh,
|
|
as we do not actually verify a password in this endpoint."""
|
|
jwt_identity = get_jwt_identity()
|
|
user = User.get_by_identifier(jwt_identity)
|
|
logger.info("Refreshing token for " + str(user))
|
|
new_token = create_access_token(identity=user, fresh=False)
|
|
ret = {'access_token': new_token}
|
|
return jsonify(ret), 200
|