# -*- coding: utf-8 -*- """ Backend base module """ import logging import os from functools import wraps from io import StringIO from logging.config import dictConfig from logging.handlers import MemoryHandler from typing import Union import coloredlogs as coloredlogs import jwt import requests from flask import Flask, jsonify, abort from flask_httpauth import HTTPTokenAuth, HTTPBasicAuth, MultiAuth from flask_jwt_extended import JWTManager, decode_token, get_jwt_identity from flask_login import LoginManager from flask_sqlalchemy import SQLAlchemy from flask_cors import CORS from backend.config import Config __author__ = "Tobias Kurze" __copyright__ = "Copyright 2019, Tobias Kurze, KIT" __credits__ = ["Tobias Kurze"] __license__ = "" __version__ = "0.9.0" __maintainer__ = "Tobias Kurze" __email__ = "it@t-kurze.de" # __status__ = "Production" __status__ = "Development" from .tools.send_mail import get_smtp_default_handler dictConfig({ 'version': 1, 'formatters': { 'default': { 'format': '[%(asctime)s] {%(threadName)s} %(levelname)s in %(module)s, line %(lineno)d: %(message)s', } }, 'handlers': { 'wsgi': { 'class': 'logging.StreamHandler', 'stream': 'ext://flask.logging.wsgi_errors_stream', 'formatter': 'default' }, 'root_file': { 'class': 'logging.handlers.TimedRotatingFileHandler', 'filename': Config.ROOT_LOG_FILE, 'when': 'd', 'interval': 1, 'backupCount': 3, 'formatter': 'default', }, 'file': { 'class': 'logging.handlers.TimedRotatingFileHandler', 'filename': Config.LOG_FILE, 'when': 'd', 'interval': 1, 'backupCount': 5, 'formatter': 'default', }, 'errors_file': { 'class': 'logging.handlers.TimedRotatingFileHandler', 'filename': Config.ERROR_LOG_FILE, 'when': 'd', 'interval': 1, 'backupCount': 5, 'level': 'ERROR', 'formatter': 'default', }, }, 'loggers': { 'lrc': { 'level': Config.LOG_LEVEL, 'handlers': ['wsgi', 'file', 'errors_file'] } }, 'root': { 'level': 'ERROR', 'handlers': ['root_file', 'errors_file'] } }) main_logger = logging.getLogger("lrc") # following might be dangerous, as buffer might be filled without a mechanism to empty it smtp_error_handler = get_smtp_default_handler(subject="Warnings, errors and worse...!") mem_handler = MemoryHandler(capacity=10, flushLevel=logging.ERROR, target=smtp_error_handler) mem_handler.setLevel(logging.WARNING) # error_log_stream = StringIO() # error_log_stream_handler = logging.StreamHandler(stream=error_log_stream) # error_log_stream_handler.setLevel(logging.ERROR) # main_logger.addHandler(error_log_stream_handler) coloredlogs.install(level='DEBUG', logger=main_logger) class LrcException(Exception): def __init__(self, message_or_exception: Union[str, Exception], html_code: int = None): if isinstance(message_or_exception, str): super().__init__(message_or_exception) self.type = None else: super().__init__(str(message_or_exception)) self.type = type(message_or_exception) self.html_code = html_code def __repr__(self): if self.type is None: msg = "LRC Exception: \"{}\"".format(', '.join(super().args)) else: msg = "LRC Exception: (original Exception: {}) \"{}\"".format(self.type, ', '.join(super().args)) if self.html_code is not None: msg += " (HTML Code: {})".format(self.html_code) return msg def __str__(self): return self.__repr__() app = Flask(__name__) if os.environ.get('FLASK_ENV', None) == "development": app.config.from_object('backend.config.DevelopmentConfig') else: app.config.from_object('backend.config.Config') db = SQLAlchemy(app) login_manager = LoginManager() login_manager.init_app(app) # flask_jwt_extended: to be used usually by API jwt_extended = JWTManager(app) # this is another library.... this is (probaby ->verify) used to check JWTs provided by external sources (KIT, etc.) jwt_auth = HTTPTokenAuth('Bearer') @jwt_auth.verify_token def verify_token(token): """This function (and HTTPTokenAuth('Bearer')) has been defined to be used together with MultiAuth. For API calls solely using JWT authentication, jwt_required of flask_jwt_extended should be used directly.""" app.logger.info(token) try: decoded = decode_token(token) except jwt.exceptions.DecodeError as e: app.logger.warn("Could not verify token: {}".format(str(e))) return False except jwt.exceptions.ExpiredSignatureError as e: app.logger.warn("Could not verify token: {}".format(str(e))) return False app.logger.info(decoded) return True basic_auth = HTTPBasicAuth() multi_auth = MultiAuth(basic_auth, jwt_auth) from backend.auth import oidc_auth, auth_bp try: oidc_auth.init_app(app) except requests.exceptions.ConnectionError as err: app.logger.error("Could not connect to OIDC!!", err) # oidc_multi_auth = MultiAuth(oidc_auth, jwt_auth) <- can't work as OIDCAuthentication not implementing HTTPAuth from .serve_frontend import fe_bp from .api import auth_api_bp, api_v1, api_bp app.register_blueprint(auth_bp) app.register_blueprint(auth_api_bp) app.register_blueprint(api_bp) app.register_blueprint(fe_bp) CORS(app) CORS(api_bp) CORS(auth_api_bp) logging.getLogger('flask_cors').level = logging.DEBUG # Fix jwt_extended by 'duck typing' error handlers # jwt_extended._set_error_handler_callbacks(api_v1) # removed for the moment, might raise new (old) problems @jwt_extended.invalid_token_loader def unauthorized_jwt(token): main_logger.info("Unauthorized access; invalid token provided: {}".format(token)) abort(401) @jwt_extended.expired_token_loader def unauthorized_jwt(token): main_logger.info("Unauthorized access; expired token provided: {}".format(token)) abort(401)