working db and tests Nr2

This commit is contained in:
2019-03-14 17:14:05 +01:00
parent 1c8cb55b46
commit bd9b6c61d3
16 changed files with 624 additions and 0 deletions

1
.env Normal file
View File

@@ -0,0 +1 @@
PYTHONPATH=${PYTHONPATH}:${PWD}/..

BIN
app.db Normal file

Binary file not shown.

76
config.py Normal file
View File

@@ -0,0 +1,76 @@
# -*- coding: utf-8 -*-
# ...
# available languages
import os
basedir = os.path.abspath(os.path.dirname(__file__))
class Config():
TEMPLATE_AUTO_RELOAD = True
SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(basedir, 'app.db')
SQLALCHEMY_MIGRATE_REPO = os.path.join(basedir, 'db_repository')
SQLALCHEMY_TRACK_MODIFICATIONS = True
WTF_CSRF_ENABLED = True
SECRET_KEY = 'you-will-never-guess'
OPENID_PROVIDERS = [
{'name': 'Google', 'url': 'https://www.google.com/accounts/o8/id'},
{'name': 'Yahoo', 'url': 'https://me.yahoo.com'},
{'name': 'AOL', 'url': 'http://openid.aol.com/<username>'},
{'name': 'Flickr', 'url': 'http://www.flickr.com/<username>'},
{'name': 'MyOpenID', 'url': 'https://www.myopenid.com'}]
OAUTH_CREDENTIALS = {
'facebook': {
'id': '1198624176930248',
'secret': '4fbc01d776834c1ffc89a5bed1cd97d0'
},
'twitter': {
'id': '3RzWQclolxWZIMq5LJqzRZPTl',
'secret': 'm9TEd58DSEtRrZHpz2EjrV9AhsBRxKMo8m3kuIZj3zLwzwIimt'
},
'google': {
'id': '1084993305658-d9n88548ssrtmt5v6s2dne57i4qpviur.apps.googleusercontent.com',
'secret': 'oNpvoAKMPMjRyiu5EDrmmX4X'
},
}
# mail server settings
MAIL_SERVER = 'localhost'
MAIL_PORT = 25
MAIL_USERNAME = None
MAIL_PASSWORD = None
# administrator list
ADMINS = ['you@example.com']
# pagination
POSTS_PER_PAGE = 5
LOCKS_PER_PAGE = 8
LANGUAGES = {
'en': 'English',
'es': 'Español'
}
#ASSETS_DEBUG = True
JWT_SECRET = "abcxyz"
JWT_ALGORITHM = "HS256"
JWT_EXP_DELTA_SECONDS = 5 * 60
INDEX_TEMPLATE = "index.html"
class ProductionConfig(Config):
DATABASE_URI = 'mysql://user@localhost/foo'
class DevelopmentConfig(Config):
SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(basedir, 'app.db_debug')
DEBUG = True
class TestingConfig(Config):
SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(basedir, 'app.db_test')
TESTING = True

0
database/__init__.py Normal file
View File

20
database/database.py Normal file
View File

@@ -0,0 +1,20 @@
from sqlalchemy import create_engine, MetaData
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.ext.declarative import declarative_base
engine = create_engine('sqlite:////tmp/test.db', convert_unicode=True)
db_session = scoped_session(sessionmaker(autocommit=False,
autoflush=False,
bind=engine))
Base = declarative_base()
Base.query = db_session.query_property()
metadata = MetaData()
def init_db():
# import all modules here that might define models so that
# they will be registered properly on the metadata. Otherwise
# you will have to import them first before calling init_db()
import app.models.user
import app.models.lock
metadata.create_all(bind=engine)

13
database/db_create.py Normal file
View File

@@ -0,0 +1,13 @@
#!/usr/bin/env python
from migrate.versioning import api
from backend.config import SQLALCHEMY_DATABASE_URI
from backend.config import SQLALCHEMY_MIGRATE_REPO
from backend import db
import os.path
db.create_all()
if not os.path.exists(SQLALCHEMY_MIGRATE_REPO):
api.create(SQLALCHEMY_MIGRATE_REPO, 'database repository')
api.version_control(SQLALCHEMY_DATABASE_URI, SQLALCHEMY_MIGRATE_REPO)
else:
api.version_control(SQLALCHEMY_DATABASE_URI, SQLALCHEMY_MIGRATE_REPO, api.version(SQLALCHEMY_MIGRATE_REPO))

9
database/db_downgrade.py Normal file
View File

@@ -0,0 +1,9 @@
#!/usr/bin/env python
from migrate.versioning import api
from backend.config import SQLALCHEMY_DATABASE_URI
from backend.config import SQLALCHEMY_MIGRATE_REPO
v = api.db_version(SQLALCHEMY_DATABASE_URI, SQLALCHEMY_MIGRATE_REPO)
api.downgrade(SQLALCHEMY_DATABASE_URI, SQLALCHEMY_MIGRATE_REPO, v - 1)
v = api.db_version(SQLALCHEMY_DATABASE_URI, SQLALCHEMY_MIGRATE_REPO)
print('Current database version: ' + str(v))

20
database/db_migrate.py Normal file
View File

@@ -0,0 +1,20 @@
#!/usr/bin/env python
import imp
from migrate.versioning import api
from backend import db
from backend.config import SQLALCHEMY_DATABASE_URI
from backend.config import SQLALCHEMY_MIGRATE_REPO
v = api.db_version(SQLALCHEMY_DATABASE_URI, SQLALCHEMY_MIGRATE_REPO)
migration = SQLALCHEMY_MIGRATE_REPO + ('/versions/%03d_migration.py' % (v+1))
tmp_module = imp.new_module('old_model')
old_model = api.create_model(SQLALCHEMY_DATABASE_URI, SQLALCHEMY_MIGRATE_REPO)
exec(old_model, tmp_module.__dict__)
extra_imports = 'import datetime\n' # your imports
script = extra_imports + api.make_update_script_for_model(SQLALCHEMY_DATABASE_URI, SQLALCHEMY_MIGRATE_REPO, tmp_module.meta, db.metadata)
open(migration, "wt").write(script)
api.upgrade(SQLALCHEMY_DATABASE_URI, SQLALCHEMY_MIGRATE_REPO)
v = api.db_version(SQLALCHEMY_DATABASE_URI, SQLALCHEMY_MIGRATE_REPO)
print('New migration saved as ' + migration)
print('Current database version: ' + str(v))

13
database/db_populate.py Normal file
View File

@@ -0,0 +1,13 @@
#!/usr/bin/env python
from backend import db
from backend.models import example_model, user_model, post_model
user = user_model.User(nickname="tobi", email="privat@t-kurze.de")
user.set_password("abcxyz")
db.session.add(user)
example_data = example_model.ExampleDataItem(name="tolles data item", mac="12:34:56:78:90:AB")
db.session.add(example_data)
db.session.commit()

8
database/db_upgrade.py Normal file
View File

@@ -0,0 +1,8 @@
#!/usr/bin/env python
from migrate.versioning import api
from backend.config import SQLALCHEMY_DATABASE_URI
from backend.config import SQLALCHEMY_MIGRATE_REPO
api.upgrade(SQLALCHEMY_DATABASE_URI, SQLALCHEMY_MIGRATE_REPO)
v = api.db_version(SQLALCHEMY_DATABASE_URI, SQLALCHEMY_MIGRATE_REPO)
print('Current database version: ' + str(v))

72
manage.py Normal file
View File

@@ -0,0 +1,72 @@
#!/usr/bin/env python
import os
import unittest
import coverage
from backend import app, db
from flask_script import Manager
from flask_migrate import Migrate, MigrateCommand
COV = coverage.coverage(
branch=True,
include='app/*',
omit=[
'app/tests/*',
'app/server/config.py',
'app/server/*/__init__.py'
]
)
COV.start()
migrate = Migrate(app, db)
manager = Manager(app)
# migrations
manager.add_command('db', MigrateCommand)
@manager.command
def test():
"""Runs the unit tests without test coverage."""
tests = unittest.TestLoader().discover('tests', pattern='test*.py')
result = unittest.TextTestRunner(verbosity=2).run(tests)
if result.wasSuccessful():
return 0
return 1
@manager.command
def cov():
"""Runs the unit tests with coverage."""
tests = unittest.TestLoader().discover('app/tests')
result = unittest.TextTestRunner(verbosity=2).run(tests)
if result.wasSuccessful():
COV.stop()
COV.save()
print('Coverage Summary:')
COV.report()
basedir = os.path.abspath(os.path.dirname(__file__))
covdir = os.path.join(basedir, 'tmp/coverage')
COV.html_report(directory=covdir)
print('HTML version: file://%s/index.html' % covdir)
COV.erase()
return 0
return 1
@manager.command
def create_db():
"""Creates the db tables."""
db.create_all()
@manager.command
def drop_db():
"""Drops the db tables."""
db.drop_all()
if __name__ == '__main__':
manager.run()

0
tests/__init__.py Normal file
View File

20
tests/base.py Normal file
View File

@@ -0,0 +1,20 @@
from flask_testing import TestCase
from backend import app, db
class BaseTestCase(TestCase):
""" Base Tests """
def create_app(self):
app.config.from_object('backend.config.Config')
return app
def setUp(self):
db.create_all()
db.session.commit()
def tearDown(self):
db.session.remove()
db.drop_all()

49
tests/test__config.py Normal file
View File

@@ -0,0 +1,49 @@
import os
import unittest
from flask import current_app
from flask_testing import TestCase
from backend import app
basedir = os.path.abspath(os.path.join(os.path.abspath(app.root_path), os.pardir))
class TestDevelopmentConfig(TestCase):
def create_app(self):
app.config.from_object('backend.config.DevelopmentConfig')
return app
def test_app_is_development(self):
self.assertFalse(app.config['SECRET_KEY'] is 'you-will-never-guess')
self.assertTrue(app.config['DEBUG'] is True)
self.assertFalse(current_app is None)
self.assertTrue(
app.config['SQLALCHEMY_DATABASE_URI'] == 'sqlite:///' + os.path.join(basedir, 'app.db_debug')
)
class TestTestingConfig(TestCase):
def create_app(self):
app.config.from_object('backend.config.TestingConfig')
return app
def test_app_is_testing(self):
self.assertFalse(app.config['SECRET_KEY'] is 'you-will-never-guess')
self.assertTrue(app.config['DEBUG'])
self.assertTrue(
app.config['SQLALCHEMY_DATABASE_URI'] == 'sqlite:///' + os.path.join(basedir, 'app.db_test')
)
class TestProductionConfig(TestCase):
def create_app(self):
app.config.from_object('backend.config.Config')
return app
def test_app_is_production(self):
self.assertTrue(app.config['DEBUG'] is False)
if __name__ == '__main__':
unittest.main()

287
tests/test_auth.py Normal file
View File

@@ -0,0 +1,287 @@
import unittest
import json
import time
from backend import db
from backend.models.user_model import User, BlacklistToken
from backend.tests.base import BaseTestCase
def register_user(self, email, password):
return self.client.post(
'/auth/register',
data=json.dumps(dict(
email=email,
password=password
)),
content_type='application/json',
)
class TestAuthBlueprint(BaseTestCase):
def test_registration(self):
""" Test for user registration """
with self.client:
response = register_user(self, 'joe@gmail.com', '123456')
data = json.loads(response.data.decode())
self.assertTrue(data['status'] == 'success')
self.assertTrue(data['message'] == 'Successfully registered.')
self.assertTrue(data['auth_token'])
self.assertTrue(response.content_type == 'application/json')
self.assertEqual(response.status_code, 201)
def test_registered_with_already_registered_user(self):
""" Test registration with already registered email"""
user = User(email='joe@gmail.com', password='test')
db.session.add(user)
db.session.commit()
with self.client:
response = register_user(self, 'joe@gmail.com', '123456')
data = json.loads(response.data.decode())
self.assertTrue(data['status'] == 'fail')
self.assertTrue(
data['message'] == 'User already exists. Please Log in.')
self.assertTrue(response.content_type == 'application/json')
self.assertEqual(response.status_code, 202)
def test_registered_user_login(self):
""" Test for login of registered-user login """
with self.client:
# user registration
resp_register = register_user(self, 'joe@gmail.com', '123456')
data_register = json.loads(resp_register.data.decode())
self.assertTrue(data_register['status'] == 'success')
self.assertTrue(
data_register['message'] == 'Successfully registered.'
)
self.assertTrue(data_register['auth_token'])
self.assertTrue(resp_register.content_type == 'application/json')
self.assertEqual(resp_register.status_code, 201)
# registered user login
response = self.client.post(
'/auth/login',
data=json.dumps(dict(
nickname='test_nick',
email='joe@gmail.com',
password='123456'
)),
content_type='application/json'
)
data = json.loads(response.data.decode())
self.assertTrue(data['status'] == 'success')
self.assertTrue(data['message'] == 'Successfully logged in.')
self.assertTrue(data['auth_token'])
self.assertTrue(response.content_type == 'application/json')
self.assertEqual(response.status_code, 200)
def test_non_registered_user_login(self):
""" Test for login of non-registered user """
with self.client:
response = self.client.post(
'/auth/login',
data=json.dumps(dict(
nickname='test_nick',
email='joe@gmail.com',
password='123456'
)),
content_type='application/json'
)
data = json.loads(response.data.decode())
self.assertTrue(data['status'] == 'fail')
self.assertTrue(data['message'] == 'User does not exist.')
self.assertTrue(response.content_type == 'application/json')
self.assertEqual(response.status_code, 404)
def test_user_status(self):
""" Test for user status """
with self.client:
resp_register = register_user(self, 'joe@gmail.com', '123456')
response = self.client.get(
'/auth/status',
headers=dict(
Authorization='Bearer ' + json.loads(
resp_register.data.decode()
)['auth_token']
)
)
data = json.loads(response.data.decode())
self.assertTrue(data['status'] == 'success')
self.assertTrue(data['data'] is not None)
self.assertTrue(data['data']['email'] == 'joe@gmail.com')
# self.assertTrue(data['data']['admin'] is 'true' or 'false')
self.assertEqual(response.status_code, 200)
def test_valid_logout(self):
""" Test for logout before token expires """
with self.client:
# user registration
resp_register = register_user(self, 'joe@gmail.com', '123456')
data_register = json.loads(resp_register.data.decode())
self.assertTrue(data_register['status'] == 'success')
self.assertTrue(
data_register['message'] == 'Successfully registered.')
self.assertTrue(data_register['auth_token'])
self.assertTrue(resp_register.content_type == 'application/json')
self.assertEqual(resp_register.status_code, 201)
# user login
resp_login = self.client.post(
'/auth/login',
data=json.dumps(dict(
email='joe@gmail.com',
password='123456'
)),
content_type='application/json'
)
data_login = json.loads(resp_login.data.decode())
self.assertTrue(data_login['status'] == 'success')
self.assertTrue(data_login['message'] == 'Successfully logged in.')
self.assertTrue(data_login['auth_token'])
self.assertTrue(resp_login.content_type == 'application/json')
self.assertEqual(resp_login.status_code, 200)
# valid token logout
response = self.client.post(
'/auth/logout',
headers=dict(
Authorization='Bearer ' + json.loads(
resp_login.data.decode()
)['auth_token']
)
)
data = json.loads(response.data.decode())
self.assertTrue(data['status'] == 'success')
self.assertTrue(data['message'] == 'Successfully logged out.')
self.assertEqual(response.status_code, 200)
def test_invalid_logout(self):
""" Testing logout after the token expires """
with self.client:
# user registration
resp_register = register_user(self, 'joe@gmail.com', '123456')
data_register = json.loads(resp_register.data.decode())
self.assertTrue(data_register['status'] == 'success')
self.assertTrue(
data_register['message'] == 'Successfully registered.')
self.assertTrue(data_register['auth_token'])
self.assertTrue(resp_register.content_type == 'application/json')
self.assertEqual(resp_register.status_code, 201)
# user login
resp_login = self.client.post(
'/auth/login',
data=json.dumps(dict(
email='joe@gmail.com',
password='123456'
)),
content_type='application/json'
)
data_login = json.loads(resp_login.data.decode())
self.assertTrue(data_login['status'] == 'success')
self.assertTrue(data_login['message'] == 'Successfully logged in.')
self.assertTrue(data_login['auth_token'])
self.assertTrue(resp_login.content_type == 'application/json')
self.assertEqual(resp_login.status_code, 200)
# invalid token logout
time.sleep(6)
response = self.client.post(
'/auth/logout',
headers=dict(
Authorization='Bearer ' + json.loads(
resp_login.data.decode()
)['auth_token']
)
)
print(response.data)
data = json.loads(response.data.decode())
self.assertTrue(data['status'] == 'fail')
self.assertTrue(
data['message'] == 'Signature expired. Please log in again.')
self.assertEqual(response.status_code, 401)
def test_valid_blacklisted_token_logout(self):
""" Test for logout after a valid token gets blacklisted """
with self.client:
# user registration
resp_register = register_user(self, 'joe@gmail.com', '123456')
data_register = json.loads(resp_register.data.decode())
self.assertTrue(data_register['status'] == 'success')
self.assertTrue(
data_register['message'] == 'Successfully registered.')
self.assertTrue(data_register['auth_token'])
self.assertTrue(resp_register.content_type == 'application/json')
self.assertEqual(resp_register.status_code, 201)
# user login
resp_login = self.client.post(
'/auth/login',
data=json.dumps(dict(
email='joe@gmail.com',
password='123456'
)),
content_type='application/json'
)
data_login = json.loads(resp_login.data.decode())
self.assertTrue(data_login['status'] == 'success')
self.assertTrue(data_login['message'] == 'Successfully logged in.')
self.assertTrue(data_login['auth_token'])
self.assertTrue(resp_login.content_type == 'application/json')
self.assertEqual(resp_login.status_code, 200)
# blacklist a valid token
blacklist_token = BlacklistToken(
token=json.loads(resp_login.data.decode())['auth_token'])
db.session.add(blacklist_token)
db.session.commit()
# blacklisted valid token logout
response = self.client.post(
'/auth/logout',
headers=dict(
Authorization='Bearer ' + json.loads(
resp_login.data.decode()
)['auth_token']
)
)
data = json.loads(response.data.decode())
self.assertTrue(data['status'] == 'fail')
self.assertTrue(data['message'] == 'Token blacklisted. Please log in again.')
self.assertEqual(response.status_code, 401)
def test_valid_blacklisted_token_user(self):
""" Test for user status with a blacklisted valid token """
with self.client:
resp_register = register_user(self, 'joe@gmail.com', '123456')
# blacklist a valid token
blacklist_token = BlacklistToken(
token=json.loads(resp_register.data.decode())['auth_token'])
db.session.add(blacklist_token)
db.session.commit()
response = self.client.get(
'/auth/status',
headers=dict(
Authorization='Bearer ' + json.loads(
resp_register.data.decode()
)['auth_token']
)
)
data = json.loads(response.data.decode())
self.assertTrue(data['status'] == 'fail')
self.assertTrue(data['message'] == 'Token blacklisted. Please log in again.')
self.assertEqual(response.status_code, 401)
def test_user_status_malformed_bearer_token(self):
""" Test for user status with malformed bearer token"""
with self.client:
resp_register = register_user(self, 'joe@gmail.com', '123456')
response = self.client.get(
'/auth/status',
headers=dict(
Authorization='Bearer' + json.loads(
resp_register.data.decode()
)['auth_token']
)
)
data = json.loads(response.data.decode())
self.assertTrue(data['status'] == 'fail')
self.assertTrue(data['message'] == 'Bearer token malformed.')
self.assertEqual(response.status_code, 401)
if __name__ == '__main__':
unittest.main()

36
tests/test_user_model.py Normal file
View File

@@ -0,0 +1,36 @@
import unittest
from backend import db
from backend.models.user_model import User
from backend.tests.base import BaseTestCase
class TestUserModel(BaseTestCase):
def test_encode_auth_token(self):
user = User(
nickname='testheini',
email='test@test.com',
password='test'
)
db.session.add(user)
db.session.commit()
auth_token = user.encode_auth_token()
self.assertTrue(isinstance(auth_token, bytes))
def test_decode_auth_token(self):
user = User(
nickname='testheini',
email='test@test.com',
password='test'
)
db.session.add(user)
db.session.commit()
auth_token = user.encode_auth_token()
self.assertTrue(isinstance(auth_token, bytes))
self.assertTrue(User.decode_auth_token(
auth_token.decode("utf-8")) == 1)
if __name__ == '__main__':
unittest.main()