Files
lrc-backend/models/user_model.py

465 lines
15 KiB
Python

# -*- coding: utf-8 -*-
"""
Example user model and related models
"""
import json
from sqlalchemy.orm import relation
from sqlalchemy import MetaData
from backend import db, app, login_manager
from backend.models.post_model import Post
from backend.models.example_model import ExampleDataItem
import re
import jwt
from flask_login import UserMixin
from sqlalchemy import or_, event
from datetime import datetime, timedelta
from passlib.hash import sha256_crypt
from hashlib import md5
metadata = MetaData()
followers = db.Table('followers',
db.Column('follower_id', db.Integer, db.ForeignKey('user.id')),
db.Column('followed_id', db.Integer, db.ForeignKey('user.id'))
)
acquaintances = db.Table('acquaintances',
db.Column('me_id', db.Integer, db.ForeignKey('user.id')),
db.Column('acquaintance_id', db.Integer, db.ForeignKey('user.id'))
)
# This is the association table for the many-to-many relationship between
# groups and members - this is, the memberships.
user_group_table = db.Table('user_group',
db.Column('user_id', db.Integer,
db.ForeignKey('user.id',
onupdate="CASCADE",
ondelete="CASCADE"),
primary_key=True),
db.Column('group_id', db.Integer,
db.ForeignKey('group.id',
onupdate="CASCADE",
ondelete="CASCADE"),
primary_key=True))
# This is the association table for the many-to-many relationship between
# groups and permissions.
group_permission_table = db.Table('group_permission',
db.Column('group_id', db.Integer,
db.ForeignKey('group.id',
onupdate="CASCADE",
ondelete="CASCADE"),
primary_key=True),
db.Column('permission_id', db.Integer,
db.ForeignKey('permission.id',
onupdate="CASCADE",
ondelete="CASCADE"),
primary_key=True))
class User(UserMixin, db.Model):
"""
Example user model representation.
"""
id = db.Column(db.Integer, primary_key=True)
social_id = db.Column(db.Unicode(63), nullable=True, unique=True)
nickname = db.Column(db.Unicode(63), index=True, unique=True)
first_name = db.Column(db.Unicode(63), index=True, nullable=True)
last_name = db.Column(db.Unicode(63), index=True, nullable=True)
email = db.Column(db.String(120), nullable=False, index=True, unique=True)
lang = db.Column(db.Unicode(32), index=False, unique=False)
timezone = db.Column(db.Unicode(63), index=False, unique=False)
posts = db.relationship('Post', backref='author', lazy='dynamic')
example_data_item = db.relationship('ExampleDataItem', backref='owner')
example_data_item_id = db.Column(db.ForeignKey(ExampleDataItem.id))
about_me = db.Column(db.Unicode(255))
role = db.Column(db.Unicode(63))
groups = db.relationship('Group', secondary=user_group_table, back_populates='users')
password = db.Column(db.String(255), nullable=True)
registered_on = db.Column(db.DateTime, nullable=False, default=datetime.utcnow())
external_user = db.Column(db.Boolean, default=False)
last_seen = db.Column(db.DateTime, default=datetime.utcnow())
jwt_exp_delta_seconds = db.Column(db.Integer, nullable=True)
acquainted = db.relationship('User',
secondary=acquaintances,
primaryjoin=(acquaintances.c.me_id == id),
secondaryjoin=(acquaintances.c.acquaintance_id == id),
backref=db.backref('acquaintances', lazy='dynamic'),
lazy='dynamic')
followed = db.relationship('User',
secondary=followers,
primaryjoin=(followers.c.follower_id == id),
secondaryjoin=(followers.c.followed_id == id),
backref=db.backref('followers', lazy='dynamic'),
lazy='dynamic')
def __init__(self, **kwargs):
super(User, self).__init__(**kwargs)
password = kwargs.get("password", None)
external_user = kwargs.get("external_user", None)
if password is not None:
self.password = sha256_crypt.encrypt(password)
if external_user is not None:
self.external_user = external_user
@staticmethod
@login_manager.user_loader
def get_by_identifier(identifier):
"""
Find user by identifier, which might be the nickname or the e-mail address.
:param identifier:
:return:
"""
return User.query.filter(or_(User.nickname == identifier,
User.email == identifier,
User.id == identifier)).first()
@staticmethod
@login_manager.user_loader
def get_by_id(identifier):
"""
Find user by ID.
:param identifier:
:return:
"""
return User.query.filter(User.id == identifier).first()
@staticmethod
def get_all():
"""
Return all users
:return:
"""
return User.query.all()
@staticmethod
def make_unique_nickname(nickname):
"""
Add suffix (counter) to nickname in order to get a unique nickname.
:param nickname:
:return:
"""
if User.query.filter_by(nickname=nickname).first() is None:
return nickname
version = 2
while True:
new_nickname = nickname + str(version)
if User.query.filter_by(nickname=new_nickname).first() is None:
break
version += 1
return new_nickname
@staticmethod
def make_valid_nickname(nickname):
"""
Replaces certain characters (except a-zA-Z0-9_.) in nickname with blancs.
:param nickname:
:return:
"""
return re.sub('[^a-zA-Z0-9_.]', '', nickname)
@classmethod
def authenticate(cls, **kwargs):
email = kwargs.get('email')
password = kwargs.get('password')
if not email or not password:
return None
user = cls.query.filter_by(email=email).first()
if not user or not user.verify_password(password):
return None
return user
@property
def is_authenticated(self):
"""
Returns true if user is authenticated.
:return:
"""
# TODO: implement correctly
return True
@property
def is_active(self):
"""
Returns true if user is active.
:return:
"""
# TODO: implement correctly
return True
@property
def is_anonymous(self):
"""
Returns true if user is anonymous.
:return:
"""
# TODO: implement correctly
return False
@property
def is_read_only(self):
"""
Returns true if user is active.
:return:
"""
# TODO: implement correctly
return True
@staticmethod
def decode_auth_token(auth_token):
"""
Decodes the auth token
:param auth_token:
:return: integer|string
"""
try:
payload = jwt.decode(auth_token, app.config.get('SECRET_KEY'))
is_blacklisted_token = BlacklistToken.check_blacklist(auth_token)
if is_blacklisted_token:
return 'Token blacklisted. Please log in again.'
else:
return payload['sub']
except jwt.ExpiredSignatureError:
return 'Signature expired. Please log in again.'
except jwt.InvalidTokenError:
return 'Invalid token. Please log in again.'
def encode_auth_token(self):
"""
Generates the Auth Token
:return: string
"""
try:
payload = {
'exp': datetime.utcnow() + timedelta(days=0, hours=3, seconds=5),
'iat': datetime.utcnow(),
'sub': self.id
}
return jwt.encode(
payload,
app.config.get('SECRET_KEY'),
algorithm='HS256'
)
except Exception as e:
return e
def set_password(self, password):
"""
SHA256 encrypts the given password and sets it on the user.
:param password:
:return:
"""
self.password = sha256_crypt.encrypt(password)
def verify_password(self, password):
"""
Verifies that the given password matches the SHA256 encrypted password stored on the user.
:param password:
:return:
"""
return sha256_crypt.verify(password, self.password)
def get_id(self):
"""
Returns the ID of the user.
:return:
"""
try:
# noinspection PyUnresolvedReferences
return unicode(self.id) # python 2
except NameError:
return str(self.id) # python 3
def avatar(self, size):
"""
Returns an avatar URL.
:param size:
:return:
"""
return 'https://s.gravatar.com/avatar/%s?d=mm&s=%d' % (md5(self.email.encode('utf-8')).hexdigest(), size)
def acquaint(self, user):
"""
Adds an acquaintance to the user object.
:param user:
:return:
"""
if not self.is_acquainted(user):
self.acquainted.append(user)
return self
def unacquaint(self, user):
"""
Removes the user from the list of acquaintances.
:param user:
:return:
"""
if self.is_acquainted(user):
self.acquainted.remove(user)
return self
def is_acquainted(self, user):
"""
Check if the provided user is an acquaintance.
:param user:
:return:
"""
return self.acquainted.filter(acquaintances.c.acquaintance_id == user.id).count() > 0
def get_acquaintances(self):
"""
Returns the list of acquaintances.
:return:
"""
return User.query.join(acquaintances, (acquaintances.c.acquaintance_id == User.id)).filter(
acquaintances.c.me_id == self.id).order_by(User.nickname.desc())
def shared_example_data_items(self):
"""
Returns a list of the shared data items.
:return:
"""
return ExampleDataItem.query.join(acquaintances,
(acquaintances.c.acquaintance_id == ExampleDataItem.user_id)).filter(
acquaintances.c.me_id == self.id).order_by(ExampleDataItem.timestamp.desc())
def follow(self, user):
"""
Add user to list of followers.
:param user:
:return:
"""
if not self.is_following(user):
self.followed.append(user)
return self
def unfollow(self, user):
"""
Remove user from the list of followers.
:param user:
:return:
"""
if self.is_following(user):
self.followed.remove(user)
return self
def is_following(self, user):
"""
Checks if specified user is a follower.
:param user:
:return:
"""
return self.followed.filter(followers.c.followed_id == user.id).count() > 0
def followed_posts(self):
"""
Returns list of followed posts.
:return:
"""
return Post.query.join(followers, (followers.c.followed_id == Post.user_id)).filter(
followers.c.follower_id == self.id).order_by(Post.timestamp.desc())
def to_dict(self):
#return self.__dict__
return dict(id=self.id, email=self.email, groups=[g.to_dict() for g in self.groups])
def toJSON(self):
return json.dumps(self.to_dict(), default=lambda o: o.__dict__,
sort_keys=True, indent=4)
def __repr__(self):
return '<User %r>' % self.nickname
class BlacklistToken(db.Model):
"""
Token Model for storing JWT tokens
"""
__tablename__ = 'blacklist_tokens'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
token = db.Column(db.String(500), unique=True, nullable=False)
blacklisted_on = db.Column(db.DateTime, nullable=False)
def __init__(self, token):
self.token = token
self.blacklisted_on = datetime.now()
def __repr__(self):
return '<id: token: {}'.format(self.token)
@staticmethod
def check_blacklist(auth_token):
"""
check whether auth token has been blacklisted
:param auth_token:
:return:
"""
res = BlacklistToken.query.filter_by(token=str(auth_token)).first()
if res:
return True
else:
return False
class Group(db.Model):
id = db.Column(db.Integer, autoincrement=True, primary_key=True)
name = db.Column(db.Unicode(63), unique=True, nullable=False)
description = db.Column(db.Unicode(255), unique=False, nullable=True, default="")
users = db.relationship('User', secondary=user_group_table, back_populates='groups')
permissions = db.relationship('Permission', secondary=group_permission_table, back_populates='groups')
def __init__(self, **kwargs):
super(Group, self).__init__(**kwargs)
@staticmethod
def get_by_name(name):
"""
Find group by name
:param name:
:return:
"""
return Group.query.filter(Group.name == name).first()
@staticmethod
def get_all():
"""
Return all groups
:return:
"""
return Group.query.all()
def __str__(self):
return self.name
def to_dict(self):
return dict(id=self.id, name=self.name)
def toJSON(self):
return json.dumps(self.to_dict(), default=lambda o: o.__dict__,
sort_keys=True, indent=4)
class Permission(db.Model):
"""Table containing permissions associated with groups."""
id = db.Column(db.Integer, autoincrement=True, primary_key=True)
name = db.Column(db.Unicode(63), unique=True, nullable=False)
description = db.Column(db.Unicode(511))
groups = db.relationship(Group, secondary=group_permission_table,
back_populates='permissions')
@event.listens_for(Permission.__table__, 'after_create')
def insert_initial_permissions(*args, **kwargs):
for p in app.config.get("PERMISSIONS", []):
db.session.add(Permission(name=p))
db.session.commit()