# -*- coding: utf-8 -*- """ Example user model and related models """ import json import sqlalchemy from sqlalchemy.orm import relation, validates from sqlalchemy import MetaData, any_ from backend import db, app, login_manager from backend.config import Config from backend.models.post_model import Post from backend.models.example_model import ExampleDataItem import re import jwt from flask_login import UserMixin from sqlalchemy import or_, event from datetime import datetime, timedelta from passlib.hash import sha256_crypt from hashlib import md5 metadata = MetaData() followers = db.Table('followers', db.Column('follower_id', db.Integer, db.ForeignKey('user.id')), db.Column('followed_id', db.Integer, db.ForeignKey('user.id')) ) acquaintances = db.Table('acquaintances', db.Column('me_id', db.Integer, db.ForeignKey('user.id')), db.Column('acquaintance_id', db.Integer, db.ForeignKey('user.id')) ) user_favorite_recorders_table = db.Table('user_favorite_recorders', db.Column('user_id', db.Integer, db.ForeignKey('user.id', onupdate="CASCADE", ondelete="CASCADE"), primary_key=True), db.Column('recorder_id', db.Integer, db.ForeignKey('recorder.id', onupdate="CASCADE", ondelete="CASCADE"), primary_key=True)) # This is the association table for the many-to-many relationship between # groups and members - this is, the memberships. user_group_table = db.Table('user_group', db.Column('user_id', db.Integer, db.ForeignKey('user.id', onupdate="CASCADE", ondelete="CASCADE"), primary_key=True), db.Column('group_id', db.Integer, db.ForeignKey('group.id', onupdate="CASCADE", ondelete="CASCADE"), primary_key=True)) # This is the association table for the many-to-many relationship between # groups and permissions. group_permission_table = db.Table('group_permission', db.Column('group_id', db.Integer, db.ForeignKey('group.id', onupdate="CASCADE", ondelete="CASCADE"), primary_key=True), db.Column('permission_id', db.Integer, db.ForeignKey('permission.id', onupdate="CASCADE", ondelete="CASCADE"), primary_key=True)) # This is the association table for the many-to-many relationship between # users and permissions. user_permission_table = db.Table('user_permission', db.Column('user_id', db.Integer, db.ForeignKey('user.id', onupdate="CASCADE", ondelete="CASCADE"), primary_key=True), db.Column('permission_id', db.Integer, db.ForeignKey('permission.id', onupdate="CASCADE", ondelete="CASCADE"), primary_key=True)) class User(UserMixin, db.Model): """ Example user model representation. """ id = db.Column(db.Integer, primary_key=True) social_id = db.Column(db.Unicode(63), nullable=True, unique=True) nickname = db.Column(db.Unicode(63), index=True, unique=True) first_name = db.Column(db.Unicode(63), index=True, nullable=True) last_name = db.Column(db.Unicode(63), index=True, nullable=True) email = db.Column(db.String(120), nullable=False, index=True, unique=True) lang = db.Column(db.Unicode(32), index=False, unique=False) timezone = db.Column(db.Unicode(63), index=False, unique=False) posts = db.relationship('Post', backref='author', lazy='dynamic') example_data_item = db.relationship('ExampleDataItem', backref='owner') example_data_item_id = db.Column(db.ForeignKey(ExampleDataItem.id)) about_me = db.Column(db.Unicode(255)) role = db.Column(db.Unicode(63)) groups = db.relationship('Group', secondary=user_group_table, back_populates='users') permissions = db.relationship('Permission', secondary=user_permission_table, back_populates='users') password = db.Column(db.String(255), nullable=True) registered_on = db.Column(db.DateTime, nullable=False, default=datetime.utcnow()) external_user = db.Column(db.Boolean, default=False) external_user_id = db.Column(db.Unicode(63), unique=True, nullable=True, default=None) last_seen = db.Column(db.DateTime, default=datetime.utcnow()) last_time_modified = db.Column(db.DateTime, default=datetime.utcnow()) jwt_exp_delta_seconds = db.Column(db.Integer, nullable=True) acquainted = db.relationship('User', secondary=acquaintances, primaryjoin=(acquaintances.c.me_id == id), secondaryjoin=(acquaintances.c.acquaintance_id == id), backref=db.backref('acquaintances', lazy='dynamic'), lazy='dynamic') followed = db.relationship('User', secondary=followers, primaryjoin=(followers.c.follower_id == id), secondaryjoin=(followers.c.followed_id == id), backref=db.backref('followers', lazy='dynamic'), lazy='dynamic') favorite_recorders = db.relationship('Recorder', secondary=user_favorite_recorders_table) def __init__(self, **kwargs): super(User, self).__init__(**kwargs) password = kwargs.get("password", None) external_user = kwargs.get("external_user", None) if password is not None: self.password = sha256_crypt.encrypt(password) if external_user is not None: self.external_user = external_user @validates('email') def validate_address(self, key, email): assert re.match(r"[^@]+@[^@]+\.[^@]+", email), "email is invalid" return email @staticmethod @login_manager.user_loader def get_by_identifier(identifier): """ Find user by identifier, which might be the nickname or the e-mail address. :param identifier: :return: """ return User.query.filter(or_(User.nickname == identifier, User.email == identifier, User.id == identifier)).first() @staticmethod @login_manager.user_loader def get_by_id(identifier): """ Find user by ID. :param identifier: :return: """ return User.query.filter(User.id == identifier).first() @staticmethod def get_all(): """ Return all users :return: """ return User.query.all() @staticmethod def make_unique_nickname(nickname): """ Add suffix (counter) to nickname in order to get a unique nickname. :param nickname: :return: """ if User.query.filter_by(nickname=nickname).first() is None: return nickname version = 2 while True: new_nickname = nickname + str(version) if User.query.filter_by(nickname=new_nickname).first() is None: break version += 1 return new_nickname @staticmethod def make_valid_nickname(nickname): """ Replaces certain characters (except a-zA-Z0-9_.) in nickname with blancs. :param nickname: :return: """ return re.sub('[^a-zA-Z0-9_.]', '', nickname) @classmethod def authenticate(cls, **kwargs): email = kwargs.get('email') password = kwargs.get('password') if not email or not password: return None user = cls.query.filter_by(email=email).first() if not user: user = cls.query.filter_by(nickname=email).first() # be nice and allow nickname as well... if not user or not user.verify_password(password): return None return user @property def is_authenticated(self): """ Returns true if user is authenticated. :return: """ # TODO: implement correctly return True @property def is_active(self): """ Returns true if user is active. :return: """ # TODO: implement correctly return True @property def is_anonymous(self): """ Returns true if user is anonymous. :return: """ # TODO: implement correctly return False @property def is_read_only(self): """ Returns true if user is active. :return: """ # TODO: implement correctly return True @property def effective_permissions(self): permissions = Config.ROLE_PERMISSION_MAPPINGS.get(self.role, set()) for g in self.groups: for p in g.permissions: permissions.add(p) return permissions @staticmethod def decode_auth_token(auth_token): """ Decodes the auth token :param auth_token: :return: integer|string """ try: payload = jwt.decode(auth_token, app.config.get('SECRET_KEY')) is_blacklisted_token = BlacklistToken.check_blacklist(auth_token) if is_blacklisted_token: return 'Token blacklisted. Please log in again.' else: return payload['sub'] except jwt.ExpiredSignatureError: return 'Signature expired. Please log in again.' except jwt.InvalidTokenError: return 'Invalid token. Please log in again.' def encode_auth_token(self): """ Generates the Auth Token :return: string """ try: payload = { 'exp': datetime.utcnow() + timedelta(days=0, hours=3, seconds=5), 'iat': datetime.utcnow(), 'sub': self.id } return jwt.encode( payload, app.config.get('SECRET_KEY'), algorithm='HS256' ) except Exception as e: return e def set_password(self, password): """ SHA256 encrypts the given password and sets it on the user. :param password: :return: """ self.password = sha256_crypt.encrypt(password) def verify_password(self, password): """ Verifies that the given password matches the SHA256 encrypted password stored on the user. :param password: :return: """ if self.password is None: return False return sha256_crypt.verify(password, self.password) def get_id(self): """ Returns the ID of the user. :return: """ try: # noinspection PyUnresolvedReferences return unicode(self.id) # python 2 except NameError: return str(self.id) # python 3 def avatar(self, size): """ Returns an avatar URL. :param size: :return: """ return 'https://s.gravatar.com/avatar/%s?d=mm&s=%d' % (md5(self.email.encode('utf-8')).hexdigest(), size) def acquaint(self, user): """ Adds an acquaintance to the user object. :param user: :return: """ if not self.is_acquainted(user): self.acquainted.append(user) return self def unacquaint(self, user): """ Removes the user from the list of acquaintances. :param user: :return: """ if self.is_acquainted(user): self.acquainted.remove(user) return self def is_acquainted(self, user): """ Check if the provided user is an acquaintance. :param user: :return: """ return self.acquainted.filter(acquaintances.c.acquaintance_id == user.id).count() > 0 def get_acquaintances(self): """ Returns the list of acquaintances. :return: """ return User.query.join(acquaintances, (acquaintances.c.acquaintance_id == User.id)).filter( acquaintances.c.me_id == self.id).order_by(User.nickname.desc()) def shared_example_data_items(self): """ Returns a list of the shared data items. :return: """ return ExampleDataItem.query.join(acquaintances, (acquaintances.c.acquaintance_id == ExampleDataItem.user_id)).filter( acquaintances.c.me_id == self.id).order_by(ExampleDataItem.timestamp.desc()) def follow(self, user): """ Add user to list of followers. :param user: :return: """ if not self.is_following(user): self.followed.append(user) return self def unfollow(self, user): """ Remove user from the list of followers. :param user: :return: """ if self.is_following(user): self.followed.remove(user) return self def is_following(self, user): """ Checks if specified user is a follower. :param user: :return: """ return self.followed.filter(followers.c.followed_id == user.id).count() > 0 def followed_posts(self): """ Returns list of followed posts. :return: """ return Post.query.join(followers, (followers.c.followed_id == Post.user_id)).filter( followers.c.follower_id == self.id).order_by(Post.timestamp.desc()) def to_dict(self): # return self.__dict__ return dict(id=self.id, email=self.email, groups=[g.to_dict() for g in self.groups]) def toJSON(self): return json.dumps(self.to_dict(), default=lambda o: o.__dict__, sort_keys=True, indent=4) def __repr__(self): return '' % self.email class BlacklistToken(db.Model): """ Token Model for storing JWT tokens """ __tablename__ = 'blacklist_tokens' id = db.Column(db.Integer, primary_key=True, autoincrement=True) token = db.Column(db.String(500), unique=True, nullable=False) blacklisted_on = db.Column(db.DateTime, nullable=False) def __init__(self, token): self.token = token self.blacklisted_on = datetime.now() def __repr__(self): return '