diff --git a/.env b/.env new file mode 100644 index 0000000..ea2ef28 --- /dev/null +++ b/.env @@ -0,0 +1 @@ +PYTHONPATH=${PYTHONPATH}:${PWD}/.. diff --git a/app.db b/app.db new file mode 100644 index 0000000..1938231 Binary files /dev/null and b/app.db differ diff --git a/config.py b/config.py new file mode 100644 index 0000000..a705f23 --- /dev/null +++ b/config.py @@ -0,0 +1,76 @@ +# -*- coding: utf-8 -*- +# ... +# available languages +import os +basedir = os.path.abspath(os.path.dirname(__file__)) + + +class Config(): + TEMPLATE_AUTO_RELOAD = True + + SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(basedir, 'app.db') + SQLALCHEMY_MIGRATE_REPO = os.path.join(basedir, 'db_repository') + SQLALCHEMY_TRACK_MODIFICATIONS = True + + WTF_CSRF_ENABLED = True + SECRET_KEY = 'you-will-never-guess' + OPENID_PROVIDERS = [ + {'name': 'Google', 'url': 'https://www.google.com/accounts/o8/id'}, + {'name': 'Yahoo', 'url': 'https://me.yahoo.com'}, + {'name': 'AOL', 'url': 'http://openid.aol.com/'}, + {'name': 'Flickr', 'url': 'http://www.flickr.com/'}, + {'name': 'MyOpenID', 'url': 'https://www.myopenid.com'}] + + OAUTH_CREDENTIALS = { + 'facebook': { + 'id': '1198624176930248', + 'secret': '4fbc01d776834c1ffc89a5bed1cd97d0' + }, + 'twitter': { + 'id': '3RzWQclolxWZIMq5LJqzRZPTl', + 'secret': 'm9TEd58DSEtRrZHpz2EjrV9AhsBRxKMo8m3kuIZj3zLwzwIimt' + }, + 'google': { + 'id': '1084993305658-d9n88548ssrtmt5v6s2dne57i4qpviur.apps.googleusercontent.com', + 'secret': 'oNpvoAKMPMjRyiu5EDrmmX4X' + }, + } + + # mail server settings + MAIL_SERVER = 'localhost' + MAIL_PORT = 25 + MAIL_USERNAME = None + MAIL_PASSWORD = None + + # administrator list + ADMINS = ['you@example.com'] + + # pagination + POSTS_PER_PAGE = 5 + LOCKS_PER_PAGE = 8 + + LANGUAGES = { + 'en': 'English', + 'es': 'EspaƱol' + } + + #ASSETS_DEBUG = True + + JWT_SECRET = "abcxyz" + JWT_ALGORITHM = "HS256" + JWT_EXP_DELTA_SECONDS = 5 * 60 + + INDEX_TEMPLATE = "index.html" + + +class ProductionConfig(Config): + DATABASE_URI = 'mysql://user@localhost/foo' + + +class DevelopmentConfig(Config): + SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(basedir, 'app.db_debug') + DEBUG = True + +class TestingConfig(Config): + SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(basedir, 'app.db_test') + TESTING = True diff --git a/database/__init__.py b/database/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/database/database.py b/database/database.py new file mode 100644 index 0000000..77453a6 --- /dev/null +++ b/database/database.py @@ -0,0 +1,20 @@ +from sqlalchemy import create_engine, MetaData +from sqlalchemy.orm import scoped_session, sessionmaker +from sqlalchemy.ext.declarative import declarative_base + +engine = create_engine('sqlite:////tmp/test.db', convert_unicode=True) +db_session = scoped_session(sessionmaker(autocommit=False, + autoflush=False, + bind=engine)) +Base = declarative_base() +Base.query = db_session.query_property() +metadata = MetaData() + + +def init_db(): + # import all modules here that might define models so that + # they will be registered properly on the metadata. Otherwise + # you will have to import them first before calling init_db() + import app.models.user + import app.models.lock + metadata.create_all(bind=engine) diff --git a/database/db_create.py b/database/db_create.py new file mode 100644 index 0000000..512d1bf --- /dev/null +++ b/database/db_create.py @@ -0,0 +1,13 @@ +#!/usr/bin/env python + +from migrate.versioning import api +from backend.config import SQLALCHEMY_DATABASE_URI +from backend.config import SQLALCHEMY_MIGRATE_REPO +from backend import db +import os.path +db.create_all() +if not os.path.exists(SQLALCHEMY_MIGRATE_REPO): + api.create(SQLALCHEMY_MIGRATE_REPO, 'database repository') + api.version_control(SQLALCHEMY_DATABASE_URI, SQLALCHEMY_MIGRATE_REPO) +else: + api.version_control(SQLALCHEMY_DATABASE_URI, SQLALCHEMY_MIGRATE_REPO, api.version(SQLALCHEMY_MIGRATE_REPO)) diff --git a/database/db_downgrade.py b/database/db_downgrade.py new file mode 100644 index 0000000..d958877 --- /dev/null +++ b/database/db_downgrade.py @@ -0,0 +1,9 @@ +#!/usr/bin/env python + +from migrate.versioning import api +from backend.config import SQLALCHEMY_DATABASE_URI +from backend.config import SQLALCHEMY_MIGRATE_REPO +v = api.db_version(SQLALCHEMY_DATABASE_URI, SQLALCHEMY_MIGRATE_REPO) +api.downgrade(SQLALCHEMY_DATABASE_URI, SQLALCHEMY_MIGRATE_REPO, v - 1) +v = api.db_version(SQLALCHEMY_DATABASE_URI, SQLALCHEMY_MIGRATE_REPO) +print('Current database version: ' + str(v)) \ No newline at end of file diff --git a/database/db_migrate.py b/database/db_migrate.py new file mode 100644 index 0000000..2608251 --- /dev/null +++ b/database/db_migrate.py @@ -0,0 +1,20 @@ +#!/usr/bin/env python + +import imp +from migrate.versioning import api +from backend import db +from backend.config import SQLALCHEMY_DATABASE_URI +from backend.config import SQLALCHEMY_MIGRATE_REPO + +v = api.db_version(SQLALCHEMY_DATABASE_URI, SQLALCHEMY_MIGRATE_REPO) +migration = SQLALCHEMY_MIGRATE_REPO + ('/versions/%03d_migration.py' % (v+1)) +tmp_module = imp.new_module('old_model') +old_model = api.create_model(SQLALCHEMY_DATABASE_URI, SQLALCHEMY_MIGRATE_REPO) +exec(old_model, tmp_module.__dict__) +extra_imports = 'import datetime\n' # your imports +script = extra_imports + api.make_update_script_for_model(SQLALCHEMY_DATABASE_URI, SQLALCHEMY_MIGRATE_REPO, tmp_module.meta, db.metadata) +open(migration, "wt").write(script) +api.upgrade(SQLALCHEMY_DATABASE_URI, SQLALCHEMY_MIGRATE_REPO) +v = api.db_version(SQLALCHEMY_DATABASE_URI, SQLALCHEMY_MIGRATE_REPO) +print('New migration saved as ' + migration) +print('Current database version: ' + str(v)) diff --git a/database/db_populate.py b/database/db_populate.py new file mode 100644 index 0000000..9bc96a7 --- /dev/null +++ b/database/db_populate.py @@ -0,0 +1,13 @@ +#!/usr/bin/env python + +from backend import db +from backend.models import example_model, user_model, post_model + +user = user_model.User(nickname="tobi", email="privat@t-kurze.de") +user.set_password("abcxyz") +db.session.add(user) + +example_data = example_model.ExampleDataItem(name="tolles data item", mac="12:34:56:78:90:AB") +db.session.add(example_data) + +db.session.commit() diff --git a/database/db_upgrade.py b/database/db_upgrade.py new file mode 100644 index 0000000..4775489 --- /dev/null +++ b/database/db_upgrade.py @@ -0,0 +1,8 @@ +#!/usr/bin/env python + +from migrate.versioning import api +from backend.config import SQLALCHEMY_DATABASE_URI +from backend.config import SQLALCHEMY_MIGRATE_REPO +api.upgrade(SQLALCHEMY_DATABASE_URI, SQLALCHEMY_MIGRATE_REPO) +v = api.db_version(SQLALCHEMY_DATABASE_URI, SQLALCHEMY_MIGRATE_REPO) +print('Current database version: ' + str(v)) \ No newline at end of file diff --git a/manage.py b/manage.py new file mode 100644 index 0000000..1b36e0e --- /dev/null +++ b/manage.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python + +import os +import unittest +import coverage + +from backend import app, db + +from flask_script import Manager +from flask_migrate import Migrate, MigrateCommand + +COV = coverage.coverage( + branch=True, + include='app/*', + omit=[ + 'app/tests/*', + 'app/server/config.py', + 'app/server/*/__init__.py' + ] +) +COV.start() + +migrate = Migrate(app, db) +manager = Manager(app) + +# migrations +manager.add_command('db', MigrateCommand) + + +@manager.command +def test(): + """Runs the unit tests without test coverage.""" + tests = unittest.TestLoader().discover('tests', pattern='test*.py') + result = unittest.TextTestRunner(verbosity=2).run(tests) + if result.wasSuccessful(): + return 0 + return 1 + + +@manager.command +def cov(): + """Runs the unit tests with coverage.""" + tests = unittest.TestLoader().discover('app/tests') + result = unittest.TextTestRunner(verbosity=2).run(tests) + if result.wasSuccessful(): + COV.stop() + COV.save() + print('Coverage Summary:') + COV.report() + basedir = os.path.abspath(os.path.dirname(__file__)) + covdir = os.path.join(basedir, 'tmp/coverage') + COV.html_report(directory=covdir) + print('HTML version: file://%s/index.html' % covdir) + COV.erase() + return 0 + return 1 + + +@manager.command +def create_db(): + """Creates the db tables.""" + db.create_all() + + +@manager.command +def drop_db(): + """Drops the db tables.""" + db.drop_all() + + +if __name__ == '__main__': + manager.run() diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/base.py b/tests/base.py new file mode 100644 index 0000000..5f96d99 --- /dev/null +++ b/tests/base.py @@ -0,0 +1,20 @@ + + +from flask_testing import TestCase +from backend import app, db + + +class BaseTestCase(TestCase): + """ Base Tests """ + + def create_app(self): + app.config.from_object('backend.config.Config') + return app + + def setUp(self): + db.create_all() + db.session.commit() + + def tearDown(self): + db.session.remove() + db.drop_all() diff --git a/tests/test__config.py b/tests/test__config.py new file mode 100644 index 0000000..9c49fee --- /dev/null +++ b/tests/test__config.py @@ -0,0 +1,49 @@ +import os +import unittest + +from flask import current_app +from flask_testing import TestCase + +from backend import app + +basedir = os.path.abspath(os.path.join(os.path.abspath(app.root_path), os.pardir)) + + +class TestDevelopmentConfig(TestCase): + def create_app(self): + app.config.from_object('backend.config.DevelopmentConfig') + return app + + def test_app_is_development(self): + self.assertFalse(app.config['SECRET_KEY'] is 'you-will-never-guess') + self.assertTrue(app.config['DEBUG'] is True) + self.assertFalse(current_app is None) + self.assertTrue( + app.config['SQLALCHEMY_DATABASE_URI'] == 'sqlite:///' + os.path.join(basedir, 'app.db_debug') + ) + + +class TestTestingConfig(TestCase): + def create_app(self): + app.config.from_object('backend.config.TestingConfig') + return app + + def test_app_is_testing(self): + self.assertFalse(app.config['SECRET_KEY'] is 'you-will-never-guess') + self.assertTrue(app.config['DEBUG']) + self.assertTrue( + app.config['SQLALCHEMY_DATABASE_URI'] == 'sqlite:///' + os.path.join(basedir, 'app.db_test') + ) + + +class TestProductionConfig(TestCase): + def create_app(self): + app.config.from_object('backend.config.Config') + return app + + def test_app_is_production(self): + self.assertTrue(app.config['DEBUG'] is False) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_auth.py b/tests/test_auth.py new file mode 100644 index 0000000..ff24965 --- /dev/null +++ b/tests/test_auth.py @@ -0,0 +1,287 @@ +import unittest +import json + +import time + +from backend import db +from backend.models.user_model import User, BlacklistToken +from backend.tests.base import BaseTestCase + + +def register_user(self, email, password): + return self.client.post( + '/auth/register', + data=json.dumps(dict( + email=email, + password=password + )), + content_type='application/json', + ) + + +class TestAuthBlueprint(BaseTestCase): + def test_registration(self): + """ Test for user registration """ + with self.client: + response = register_user(self, 'joe@gmail.com', '123456') + data = json.loads(response.data.decode()) + self.assertTrue(data['status'] == 'success') + self.assertTrue(data['message'] == 'Successfully registered.') + self.assertTrue(data['auth_token']) + self.assertTrue(response.content_type == 'application/json') + self.assertEqual(response.status_code, 201) + + def test_registered_with_already_registered_user(self): + """ Test registration with already registered email""" + user = User(email='joe@gmail.com', password='test') + db.session.add(user) + db.session.commit() + with self.client: + response = register_user(self, 'joe@gmail.com', '123456') + data = json.loads(response.data.decode()) + self.assertTrue(data['status'] == 'fail') + self.assertTrue( + data['message'] == 'User already exists. Please Log in.') + self.assertTrue(response.content_type == 'application/json') + self.assertEqual(response.status_code, 202) + + def test_registered_user_login(self): + """ Test for login of registered-user login """ + with self.client: + # user registration + resp_register = register_user(self, 'joe@gmail.com', '123456') + data_register = json.loads(resp_register.data.decode()) + self.assertTrue(data_register['status'] == 'success') + self.assertTrue( + data_register['message'] == 'Successfully registered.' + ) + self.assertTrue(data_register['auth_token']) + self.assertTrue(resp_register.content_type == 'application/json') + self.assertEqual(resp_register.status_code, 201) + # registered user login + response = self.client.post( + '/auth/login', + data=json.dumps(dict( + nickname='test_nick', + email='joe@gmail.com', + password='123456' + )), + content_type='application/json' + ) + data = json.loads(response.data.decode()) + self.assertTrue(data['status'] == 'success') + self.assertTrue(data['message'] == 'Successfully logged in.') + self.assertTrue(data['auth_token']) + self.assertTrue(response.content_type == 'application/json') + self.assertEqual(response.status_code, 200) + + def test_non_registered_user_login(self): + """ Test for login of non-registered user """ + with self.client: + response = self.client.post( + '/auth/login', + data=json.dumps(dict( + nickname='test_nick', + email='joe@gmail.com', + password='123456' + )), + content_type='application/json' + ) + data = json.loads(response.data.decode()) + self.assertTrue(data['status'] == 'fail') + self.assertTrue(data['message'] == 'User does not exist.') + self.assertTrue(response.content_type == 'application/json') + self.assertEqual(response.status_code, 404) + + def test_user_status(self): + """ Test for user status """ + with self.client: + resp_register = register_user(self, 'joe@gmail.com', '123456') + response = self.client.get( + '/auth/status', + headers=dict( + Authorization='Bearer ' + json.loads( + resp_register.data.decode() + )['auth_token'] + ) + ) + data = json.loads(response.data.decode()) + self.assertTrue(data['status'] == 'success') + self.assertTrue(data['data'] is not None) + self.assertTrue(data['data']['email'] == 'joe@gmail.com') + # self.assertTrue(data['data']['admin'] is 'true' or 'false') + self.assertEqual(response.status_code, 200) + + def test_valid_logout(self): + """ Test for logout before token expires """ + with self.client: + # user registration + resp_register = register_user(self, 'joe@gmail.com', '123456') + data_register = json.loads(resp_register.data.decode()) + self.assertTrue(data_register['status'] == 'success') + self.assertTrue( + data_register['message'] == 'Successfully registered.') + self.assertTrue(data_register['auth_token']) + self.assertTrue(resp_register.content_type == 'application/json') + self.assertEqual(resp_register.status_code, 201) + # user login + resp_login = self.client.post( + '/auth/login', + data=json.dumps(dict( + email='joe@gmail.com', + password='123456' + )), + content_type='application/json' + ) + data_login = json.loads(resp_login.data.decode()) + self.assertTrue(data_login['status'] == 'success') + self.assertTrue(data_login['message'] == 'Successfully logged in.') + self.assertTrue(data_login['auth_token']) + self.assertTrue(resp_login.content_type == 'application/json') + self.assertEqual(resp_login.status_code, 200) + # valid token logout + response = self.client.post( + '/auth/logout', + headers=dict( + Authorization='Bearer ' + json.loads( + resp_login.data.decode() + )['auth_token'] + ) + ) + data = json.loads(response.data.decode()) + self.assertTrue(data['status'] == 'success') + self.assertTrue(data['message'] == 'Successfully logged out.') + self.assertEqual(response.status_code, 200) + + def test_invalid_logout(self): + """ Testing logout after the token expires """ + with self.client: + # user registration + resp_register = register_user(self, 'joe@gmail.com', '123456') + data_register = json.loads(resp_register.data.decode()) + self.assertTrue(data_register['status'] == 'success') + self.assertTrue( + data_register['message'] == 'Successfully registered.') + self.assertTrue(data_register['auth_token']) + self.assertTrue(resp_register.content_type == 'application/json') + self.assertEqual(resp_register.status_code, 201) + # user login + resp_login = self.client.post( + '/auth/login', + data=json.dumps(dict( + email='joe@gmail.com', + password='123456' + )), + content_type='application/json' + ) + data_login = json.loads(resp_login.data.decode()) + self.assertTrue(data_login['status'] == 'success') + self.assertTrue(data_login['message'] == 'Successfully logged in.') + self.assertTrue(data_login['auth_token']) + self.assertTrue(resp_login.content_type == 'application/json') + self.assertEqual(resp_login.status_code, 200) + # invalid token logout + time.sleep(6) + response = self.client.post( + '/auth/logout', + headers=dict( + Authorization='Bearer ' + json.loads( + resp_login.data.decode() + )['auth_token'] + ) + ) + print(response.data) + data = json.loads(response.data.decode()) + self.assertTrue(data['status'] == 'fail') + self.assertTrue( + data['message'] == 'Signature expired. Please log in again.') + self.assertEqual(response.status_code, 401) + + def test_valid_blacklisted_token_logout(self): + """ Test for logout after a valid token gets blacklisted """ + with self.client: + # user registration + resp_register = register_user(self, 'joe@gmail.com', '123456') + data_register = json.loads(resp_register.data.decode()) + self.assertTrue(data_register['status'] == 'success') + self.assertTrue( + data_register['message'] == 'Successfully registered.') + self.assertTrue(data_register['auth_token']) + self.assertTrue(resp_register.content_type == 'application/json') + self.assertEqual(resp_register.status_code, 201) + # user login + resp_login = self.client.post( + '/auth/login', + data=json.dumps(dict( + email='joe@gmail.com', + password='123456' + )), + content_type='application/json' + ) + data_login = json.loads(resp_login.data.decode()) + self.assertTrue(data_login['status'] == 'success') + self.assertTrue(data_login['message'] == 'Successfully logged in.') + self.assertTrue(data_login['auth_token']) + self.assertTrue(resp_login.content_type == 'application/json') + self.assertEqual(resp_login.status_code, 200) + # blacklist a valid token + blacklist_token = BlacklistToken( + token=json.loads(resp_login.data.decode())['auth_token']) + db.session.add(blacklist_token) + db.session.commit() + # blacklisted valid token logout + response = self.client.post( + '/auth/logout', + headers=dict( + Authorization='Bearer ' + json.loads( + resp_login.data.decode() + )['auth_token'] + ) + ) + data = json.loads(response.data.decode()) + self.assertTrue(data['status'] == 'fail') + self.assertTrue(data['message'] == 'Token blacklisted. Please log in again.') + self.assertEqual(response.status_code, 401) + + def test_valid_blacklisted_token_user(self): + """ Test for user status with a blacklisted valid token """ + with self.client: + resp_register = register_user(self, 'joe@gmail.com', '123456') + # blacklist a valid token + blacklist_token = BlacklistToken( + token=json.loads(resp_register.data.decode())['auth_token']) + db.session.add(blacklist_token) + db.session.commit() + response = self.client.get( + '/auth/status', + headers=dict( + Authorization='Bearer ' + json.loads( + resp_register.data.decode() + )['auth_token'] + ) + ) + data = json.loads(response.data.decode()) + self.assertTrue(data['status'] == 'fail') + self.assertTrue(data['message'] == 'Token blacklisted. Please log in again.') + self.assertEqual(response.status_code, 401) + + def test_user_status_malformed_bearer_token(self): + """ Test for user status with malformed bearer token""" + with self.client: + resp_register = register_user(self, 'joe@gmail.com', '123456') + response = self.client.get( + '/auth/status', + headers=dict( + Authorization='Bearer' + json.loads( + resp_register.data.decode() + )['auth_token'] + ) + ) + data = json.loads(response.data.decode()) + self.assertTrue(data['status'] == 'fail') + self.assertTrue(data['message'] == 'Bearer token malformed.') + self.assertEqual(response.status_code, 401) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_user_model.py b/tests/test_user_model.py new file mode 100644 index 0000000..32963af --- /dev/null +++ b/tests/test_user_model.py @@ -0,0 +1,36 @@ +import unittest + +from backend import db +from backend.models.user_model import User +from backend.tests.base import BaseTestCase + + +class TestUserModel(BaseTestCase): + + def test_encode_auth_token(self): + user = User( + nickname='testheini', + email='test@test.com', + password='test' + ) + db.session.add(user) + db.session.commit() + auth_token = user.encode_auth_token() + self.assertTrue(isinstance(auth_token, bytes)) + + def test_decode_auth_token(self): + user = User( + nickname='testheini', + email='test@test.com', + password='test' + ) + db.session.add(user) + db.session.commit() + auth_token = user.encode_auth_token() + self.assertTrue(isinstance(auth_token, bytes)) + self.assertTrue(User.decode_auth_token( + auth_token.decode("utf-8")) == 1) + + +if __name__ == '__main__': + unittest.main()