Files
lrc-backend/backend/api/auth_api.py

204 lines
6.9 KiB
Python

# Copyright (c) 2019. Tobias Kurze
"""
This module provides functions related to authentication through the API.
For example: listing of available auth providers or registration of users.
Login through API does not start a new session, but instead returns JWT.
"""
import base64
import json
import logging
from pprint import pprint
import flask
from datetime import datetime, timedelta
import jwt
from flask import request, jsonify, current_app, url_for, Response, session, redirect, make_response
from flask_jwt_extended import create_access_token, create_refresh_token, jwt_refresh_token_required, get_jwt_identity, \
get_raw_jwt, jwt_required
from functools import wraps
from random import randint
from flask_login import logout_user, login_user
from typing import Iterable
from flask_restx import Resource, fields
from werkzeug.routing import BuildError
from backend import db, app, jwt_extended
from backend.api import auth_api_bp, auth_api_providers_ns, auth_api_register_ns
from backend.auth import AUTH_PROVIDERS, oidc_auth
from backend.auth.oidc_config import PROVIDER_NAME
from backend.models.user_model import User, Group, BlacklistToken
logger = logging.getLogger("lrc.api.auth")
@auth_api_bp.route('/providers', methods=('GET',))
def get_auth_providers():
providers = dict()
for p in AUTH_PROVIDERS:
provider = dict(AUTH_PROVIDERS[p])
try:
provider["url"] = url_for(AUTH_PROVIDERS[p]["url"])
except BuildError:
provider["url"] = AUTH_PROVIDERS[p]["url"]
providers[p] = provider
return jsonify(providers)
@auth_api_providers_ns.route('/')
class AuthProviders(Resource):
def get(self):
return get_auth_providers()
@auth_api_bp.route('/register', methods=('POST',))
def register():
data = request.get_json()
user = User(**data)
db.session.add(user)
db.session.commit()
return jsonify(user.to_dict()), 201
@auth_api_register_ns.route('/')
@auth_api_register_ns.expect(auth_api_register_ns.model('RegisterModel', {
'nickname': fields.String(required=False, description='The user\'s nickname'),
'first_name': fields.String(required=False, description='The user\'s first name'),
'last_name': fields.String(required=False, description='The user\'s last name'),
'lang': fields.String(required=False, description='The user\'s preferred language'),
'timezone': fields.String(required=False, description='The user\'s preferred timezone'),
'email': fields.String(required=True, description='The user\'s e-mail address'),
'password': fields.String(required=False, description='The group\'s name')
}))
class AuthProviders(Resource):
def get(self):
return register()
@auth_api_bp.route('/login', methods=('GET', 'POST',))
def login():
print("login")
print(request)
data = request.get_json()
if not data:
return jsonify({'message': 'Invalid request data', 'authenticated': False}), 401
print(data)
user = User.authenticate(**data)
if not user:
return jsonify({'message': 'Invalid credentials', 'authenticated': False}), 401
token = {
'access_token': create_access_token(identity=user, fresh=True),
'refresh_token': create_refresh_token(identity=user)
}
return jsonify(token), 200
# Endpoint for revoking the current users access token
@auth_api_bp.route('/logout', methods=['GET', 'DELETE'])
@jwt_required
def logout():
jti = get_raw_jwt()['jti']
db.session.add(BlacklistToken(token=jti))
db.session.commit()
return jsonify({"msg": "Successfully logged out"}), 200
# Endpoint for revoking the current users refresh token
@auth_api_bp.route('/logout2', methods=['GET', 'DELETE'])
@jwt_refresh_token_required
def logout2():
jti = get_raw_jwt()['jti']
db.session.add(BlacklistToken(token=jti))
db.session.commit()
return jsonify({"msg": "Successfully logged out"}), 200
def check_and_create_groups(groups: Iterable[str]):
user_groups = []
for g in groups:
group = Group.get_by_name(g)
if group is None:
group = Group(name=g)
db.session.add(group)
user_groups.append(group)
db.session.commit()
return user_groups
def create_or_retrieve_user_from_userinfo(userinfo):
try:
email = userinfo["email"]
except KeyError:
logger.error("email is missing in OIDC userinfo! Can't create user!")
return None
user_groups = check_and_create_groups(groups=userinfo.get("memberOf", []))
user = User.get_by_identifier(email)
if user is not None:
logger.info("user found -> update user")
pprint(user.to_dict())
user.first_name = userinfo.get("given_name", "")
user.last_name = userinfo.get("family_name", "")
for g in user_groups:
user.groups.append(g)
db.session.commit()
return user
user = User(email=email, first_name=userinfo.get("given_name", ""),
last_name=userinfo.get("family_name", ""), external_user=True,
groups=user_groups)
logger.info("creating new user")
db.session.add(user)
db.session.commit()
return user
@auth_api_bp.route('/oidc', methods=['GET'])
@auth_api_bp.route('/oidc/<redirect_url>', methods=['GET'])
@oidc_auth.oidc_auth(provider_name=PROVIDER_NAME)
def oidc(redirect_url=None):
logger.debug("oidc auth endpoint:")
user = create_or_retrieve_user_from_userinfo(flask.session['userinfo'])
if user is None:
logger.error(f"Could not authenticate: could not find or create user:\n{str(flask.session['userinfo'])}")
return "Could not authenticate: could not find or create user.", 401
if current_app.config.get("AUTH_RETURN_EXTERNAL_JWT", False):
token = jwt.encode(flask.session['id_token'], current_app.config['SECRET_KEY'])
else:
token = json.dumps({
'access_token': create_access_token(identity=user, fresh=True),
'refresh_token': create_refresh_token(identity=user)
})
logger.info("Token: {}".format(token))
if redirect_url is None:
redirect_url = request.headers.get("Referer")
if redirect_url is None:
redirect_url = request.args.get('redirect_url')
if redirect_url is None:
redirect_url = "/"
response = make_response(redirect(redirect_url))
response.set_cookie('tokens', base64.b64encode(token.encode('utf-8')))
return response
@auth_api_bp.route('/refresh', methods=['GET'])
@jwt_refresh_token_required
def refresh():
"""Refresh token endpoint. This will generate a new access token from
the refresh token, but will mark that access token as non-fresh,
as we do not actually verify a password in this endpoint."""
jwt_identity = get_jwt_identity()
user = User.get_by_identifier(jwt_identity)
logger.info("Refreshing token for " + str(user))
new_token = create_access_token(identity=user, fresh=False)
ret = {'access_token': new_token}
return jsonify(ret), 200