# Copyright (c) 2019. Tobias Kurze """ This module provides functions related to authentication through the API. For example: listing of available auth providers or registration of users. Login through API does not start a new session, but instead returns JWT. """ import base64 import json from pprint import pprint import flask import jwt from flask import request, jsonify, current_app, url_for, redirect, make_response from flask_jwt_extended import create_access_token, create_refresh_token, get_jwt, get_jwt_identity, jwt_required from typing import Iterable from flask_restx import Resource, fields from werkzeug.routing import BuildError from backend import db, app from backend.api import auth_api_bp, auth_api_providers_ns, auth_api_register_ns from backend.auth import AUTH_PROVIDERS, oidc_auth from backend.models.user_model import User, Group, BlacklistToken @auth_api_bp.route('/providers', methods=('GET',)) def get_auth_providers(): providers = dict() for p in AUTH_PROVIDERS: provider = dict(AUTH_PROVIDERS[p]) try: provider["url"] = url_for(AUTH_PROVIDERS[p]["url"]) except BuildError: provider["url"] = AUTH_PROVIDERS[p]["url"] providers[p] = provider return jsonify(providers) @auth_api_providers_ns.route('/') class AuthProviders(Resource): def get(self): return get_auth_providers() @auth_api_bp.route('/register', methods=('POST',)) def register(): data = request.get_json() user = User(**data) db.session.add(user) db.session.commit() return jsonify(user.to_dict()), 201 # @auth_api_register_ns.route('/') # @auth_api_register_ns.expect(auth_api_register_ns.model('RegisterModel', { # 'nickname': fields.String(required=False, description='The user\'s nickname'), # 'first_name': fields.String(required=False, description='The user\'s first name'), # 'last_name': fields.String(required=False, description='The user\'s last name'), # 'lang': fields.String(required=False, description='The user\'s preferred language'), # 'timezone': fields.String(required=False, description='The user\'s preferred timezone'), # 'email': fields.String(required=True, description='The user\'s e-mail address'), # 'password': fields.String(required=False, description='The group\'s name') # })) # class AuthProviders(Resource): # def get(self): # return register() @auth_api_bp.route('/login', methods=('GET', 'POST',)) def login(): print("login") print(request) data = request.get_json() if not data: return jsonify({'message': 'Invalid request data', 'authenticated': False}), 401 print(data) user = User.authenticate(**data) if not user: return jsonify({'message': 'Invalid credentials', 'authenticated': False}), 401 token = { 'access_token': create_access_token(identity=user, fresh=True), 'refresh_token': create_refresh_token(identity=user) } return jsonify(token), 200 # Endpoint for revoking the current users access token @auth_api_bp.route('/logout', methods=['GET', 'DELETE']) @jwt_required def logout(): jti = get_jwt()['jti'] db.session.add(BlacklistToken(token=jti)) db.session.commit() return jsonify({"msg": "Successfully logged out"}), 200 # Endpoint for revoking the current users refresh token @auth_api_bp.route('/logout2', methods=['GET', 'DELETE']) @jwt_required(refresh=True) def logout2(): jti = get_jwt()['jti'] db.session.add(BlacklistToken(token=jti)) db.session.commit() return jsonify({"msg": "Successfully logged out"}), 200 def check_and_create_groups(groups: Iterable[str]): user_groups = [] for g in groups: group = Group.get_by_name(g) if group is None: group = Group(name=g) db.session.add(group) user_groups.append(group) db.session.commit() return user_groups def create_or_retrieve_user_from_userinfo(userinfo): try: email = userinfo["email"] except KeyError: return None user_groups = check_and_create_groups(groups=userinfo.get("memberOf", [])) user = User.get_by_identifier(email) if user is not None: app.logger.info("user found -> update user") pprint(user.to_dict()) user.first_name = userinfo.get("given_name", "") user.last_name = userinfo.get("family_name", "") for g in user_groups: user.groups.append(g) db.session.commit() return user user = User(email=email, first_name=userinfo.get("given_name", ""), last_name=userinfo.get("family_name", ""), external_user=True, groups=user_groups) app.logger.info("creating new user") db.session.add(user) db.session.commit() return user @auth_api_bp.route('/oidc', methods=['GET']) @auth_api_bp.route('/oidc/', methods=['GET']) @oidc_auth.oidc_auth() def oidc(redirect_url=None): user = create_or_retrieve_user_from_userinfo(flask.session['userinfo']) if user is None: return "Could not authenticate: could not find or create user.", 401 if current_app.config.get("AUTH_RETURN_EXTERNAL_JWT", False): token = jwt.encode(flask.session['id_token'], current_app.config['SECRET_KEY']) else: token = json.dumps({ 'access_token': create_access_token(identity=user, fresh=True), 'refresh_token': create_refresh_token(identity=user) }) if redirect_url is None: redirect_url = request.headers.get("Referer") if redirect_url is None: redirect_url = request.args.get('redirect_url') if redirect_url is None: redirect_url = "/" app.logger.info("Token: {}".format(token)) response = make_response(redirect(redirect_url)) response.set_cookie('tokens', base64.b64encode(token.encode('utf-8'))) return response @auth_api_bp.route('/refresh', methods=['GET']) @jwt_required(refresh=True) def refresh(): """Refresh token endpoint. This will generate a new access token from the refresh token, but will mark that access token as non-fresh, as we do not actually verify a password in this endpoint.""" jwt_identity = get_jwt_identity() user = User.get_by_identifier(jwt_identity) app.logger.info("Refreshing token for " + str(user)) new_token = create_access_token(identity=user, fresh=False) ret = {'access_token': new_token} return jsonify(ret), 200