# -*- coding: utf-8 -*- """ Example user model and related models """ from backend import db, app from backend.models.post_model import Post from backend.models.example_model import ExampleDataItem import re import jwt from flask_login import UserMixin from sqlalchemy import or_ from datetime import datetime, timedelta from passlib.hash import sha256_crypt from hashlib import md5 followers = db.Table('followers', db.Column('follower_id', db.Integer, db.ForeignKey('user.id')), db.Column('followed_id', db.Integer, db.ForeignKey('user.id')) ) acquaintances = db.Table('acquaintances', db.Column('me_id', db.Integer, db.ForeignKey('user.id')), db.Column('acquaintance_id', db.Integer, db.ForeignKey('user.id')) ) class User(UserMixin, db.Model): """ Example user model representation. """ id = db.Column(db.Integer, primary_key=True) social_id = db.Column(db.String(64), nullable=True, unique=True) nickname = db.Column(db.String(64), index=True, unique=True) first_name = db.Column(db.String(64), index=True, nullable=True) last_name = db.Column(db.String(64), index=True, nullable=True) email = db.Column(db.String(120), nullable=False, index=True, unique=True) lang = db.Column(db.String(16), index=False, unique=False) timezone = db.Column(db.String(32), index=False, unique=False) posts = db.relationship('Post', backref='author', lazy='dynamic') example_data_item = db.relationship('ExampleDataItem', backref='owner') example_data_item_id = db.Column(db.ForeignKey(ExampleDataItem.id)) about_me = db.Column(db.String(140)) role = db.Column(db.String(64)) password = db.Column(db.String(255), nullable=True) registered_on = db.Column(db.DateTime, nullable=False, default=datetime.utcnow()) last_seen = db.Column(db.DateTime, default=datetime.utcnow()) jwt_exp_delta_seconds = db.Column(db.Integer, nullable=True) acquainted = db.relationship('User', secondary=acquaintances, primaryjoin=(acquaintances.c.me_id == id), secondaryjoin=(acquaintances.c.acquaintance_id == id), backref=db.backref('acquaintances', lazy='dynamic'), lazy='dynamic') followed = db.relationship('User', secondary=followers, primaryjoin=(followers.c.follower_id == id), secondaryjoin=(followers.c.followed_id == id), backref=db.backref('followers', lazy='dynamic'), lazy='dynamic') def __init__(self, **kwargs): super(User, self).__init__(**kwargs) password = kwargs.get("password", None) if password is not None: self.password = sha256_crypt.encrypt(password) # do custom initialization here @staticmethod def get_by_identifier(identifier): """ Find user by identifier, which might be the nickname or the e-mail address. :param identifier: :return: """ return User.query.filter(or_(User.nickname == identifier, User.email == identifier, User.id == identifier)).first() @staticmethod def get_all(): """ Return all users :return: """ return User.query.all() @staticmethod def make_unique_nickname(nickname): """ Add suffix (counter) to nickname in order to get a unique nickname. :param nickname: :return: """ if User.query.filter_by(nickname=nickname).first() is None: return nickname version = 2 while True: new_nickname = nickname + str(version) if User.query.filter_by(nickname=new_nickname).first() is None: break version += 1 return new_nickname @staticmethod def make_valid_nickname(nickname): """ Replaces certain characters (except a-zA-Z0-9_.) in nickname with blancs. :param nickname: :return: """ return re.sub('[^a-zA-Z0-9_.]', '', nickname) @classmethod def authenticate(cls, **kwargs): email = kwargs.get('email') password = kwargs.get('password') if not email or not password: return None user = cls.query.filter_by(email=email).first() if not user or not user.verify_password(password): return None return user @property def is_authenticated(self): """ Returns true if user is authenticated. :return: """ # TODO: implement correctly return True @property def is_active(self): """ Returns true if user is active. :return: """ # TODO: implement correctly return True @property def is_anonymous(self): """ Returns true if user is anonymous. :return: """ # TODO: implement correctly return False @staticmethod def decode_auth_token(auth_token): """ Decodes the auth token :param auth_token: :return: integer|string """ try: payload = jwt.decode(auth_token, app.config.get('SECRET_KEY')) is_blacklisted_token = BlacklistToken.check_blacklist(auth_token) if is_blacklisted_token: return 'Token blacklisted. Please log in again.' else: return payload['sub'] except jwt.ExpiredSignatureError: return 'Signature expired. Please log in again.' except jwt.InvalidTokenError: return 'Invalid token. Please log in again.' def encode_auth_token(self): """ Generates the Auth Token :return: string """ try: payload = { 'exp': datetime.utcnow() + timedelta(days=0, hours=3, seconds=5), 'iat': datetime.utcnow(), 'sub': self.id } return jwt.encode( payload, app.config.get('SECRET_KEY'), algorithm='HS256' ) except Exception as e: return e def set_password(self, password): """ SHA256 encrypts the given password and sets it on the user. :param password: :return: """ self.password = sha256_crypt.encrypt(password) def verify_password(self, password): """ Verifies that the given password matches the SHA256 encrypted password stored on the user. :param password: :return: """ return sha256_crypt.verify(password, self.password) def get_id(self): """ Returns the ID of the user. :return: """ try: # noinspection PyUnresolvedReferences return unicode(self.id) # python 2 except NameError: return str(self.id) # python 3 def avatar(self, size): """ Returns an avatar URL. :param size: :return: """ return 'https://s.gravatar.com/avatar/%s?d=mm&s=%d' % (md5(self.email.encode('utf-8')).hexdigest(), size) def acquaint(self, user): """ Adds an acquaintance to the user object. :param user: :return: """ if not self.is_acquainted(user): self.acquainted.append(user) return self def unacquaint(self, user): """ Removes the user from the list of acquaintances. :param user: :return: """ if self.is_acquainted(user): self.acquainted.remove(user) return self def is_acquainted(self, user): """ Check if the provided user is an acquaintance. :param user: :return: """ return self.acquainted.filter(acquaintances.c.acquaintance_id == user.id).count() > 0 def get_acquaintances(self): """ Returns the list of acquaintances. :return: """ return User.query.join(acquaintances, (acquaintances.c.acquaintance_id == User.id)).filter( acquaintances.c.me_id == self.id).order_by(User.nickname.desc()) def shared_example_data_items(self): """ Returns a list of the shared data items. :return: """ return ExampleDataItem.query.join(acquaintances, (acquaintances.c.acquaintance_id == ExampleDataItem.user_id)).filter( acquaintances.c.me_id == self.id).order_by(ExampleDataItem.timestamp.desc()) def follow(self, user): """ Add user to list of followers. :param user: :return: """ if not self.is_following(user): self.followed.append(user) return self def unfollow(self, user): """ Remove user from the list of followers. :param user: :return: """ if self.is_following(user): self.followed.remove(user) return self def is_following(self, user): """ Checks if specified user is a follower. :param user: :return: """ return self.followed.filter(followers.c.followed_id == user.id).count() > 0 def followed_posts(self): """ Returns list of followed posts. :return: """ return Post.query.join(followers, (followers.c.followed_id == Post.user_id)).filter( followers.c.follower_id == self.id).order_by(Post.timestamp.desc()) def to_dict(self): return dict(id=self.id, email=self.email) def __repr__(self): return '' % self.nickname class BlacklistToken(db.Model): """ Token Model for storing JWT tokens """ __tablename__ = 'blacklist_tokens' id = db.Column(db.Integer, primary_key=True, autoincrement=True) token = db.Column(db.String(500), unique=True, nullable=False) blacklisted_on = db.Column(db.DateTime, nullable=False) def __init__(self, token): self.token = token self.blacklisted_on = datetime.now() def __repr__(self): return '