272 lines
9.5 KiB
Python
272 lines
9.5 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
Models for lecture recorder
|
|
"""
|
|
import importlib
|
|
import ipaddress
|
|
import json
|
|
import pkgutil
|
|
import os
|
|
import re
|
|
|
|
from sqlalchemy import MetaData
|
|
from sqlalchemy.exc import IntegrityError
|
|
from sqlalchemy.ext.hybrid import hybrid_property
|
|
from sqlalchemy.orm import validates
|
|
|
|
from backend import db, app, login_manager, LrcException
|
|
from sqlalchemy import or_
|
|
from datetime import datetime, timedelta
|
|
|
|
from backend.models.virtual_command_model import (
|
|
virtual_command_recorder_command_table,
|
|
virtual_command_recorder_table,
|
|
)
|
|
|
|
metadata = MetaData()
|
|
|
|
|
|
class RecorderModel(db.Model):
|
|
__table_args__ = {"extend_existing": True}
|
|
id = db.Column(db.Integer, autoincrement=True, primary_key=True)
|
|
created_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow())
|
|
last_time_modified = db.Column(db.DateTime, nullable=True, default=None)
|
|
record_adapter_id = db.Column(db.Unicode(63), unique=True, nullable=False)
|
|
model_name = db.Column(db.Unicode(63), unique=True, nullable=False)
|
|
notes = db.Column(db.Unicode(255), unique=False, nullable=True, default=None)
|
|
recorder_commands = db.relationship(
|
|
"RecorderCommand", back_populates="recorder_model"
|
|
)
|
|
recorders = db.relationship("Recorder", back_populates="recorder_model")
|
|
checksum = db.Column(
|
|
db.String(63), unique=True, nullable=False
|
|
) # checksum of the recorder commands! (see: model_updater.py)
|
|
last_checksum_change = db.Column(db.DateTime, nullable=True, default=None)
|
|
_requires_user = db.Column(db.Integer, default=False, name="requires_user")
|
|
_requires_password = db.Column(db.Integer, default=True, name="requires_password")
|
|
|
|
@staticmethod
|
|
def get_all():
|
|
return RecorderModel.query.all()
|
|
|
|
@staticmethod
|
|
def get_by_name(name):
|
|
return RecorderModel.query.filter(RecorderModel.model_name == name).first()
|
|
|
|
@staticmethod
|
|
def get_by_adapter_id(name):
|
|
return RecorderModel.query.filter(
|
|
RecorderModel.record_adapter_id == name
|
|
).first()
|
|
|
|
@staticmethod
|
|
def get_where_adapter_id_contains(adapter_id):
|
|
return RecorderModel.query.filter(
|
|
RecorderModel.record_adapter_id.contains(adapter_id)
|
|
).first()
|
|
|
|
@staticmethod
|
|
def get_by_checksum(md5_sum):
|
|
return RecorderModel.query.filter(RecorderModel.checksum == md5_sum).first()
|
|
|
|
@hybrid_property
|
|
def requires_username(self):
|
|
return self._requires_user > 0
|
|
|
|
@requires_username.setter
|
|
def requires_username(self, val: bool):
|
|
self._requires_user = 1 if val else 0
|
|
|
|
@hybrid_property
|
|
def requires_password(self):
|
|
return self._requires_password > 0
|
|
|
|
@requires_password.setter
|
|
def requires_password(self, val: bool):
|
|
self._requires_password = 1 if val else 0
|
|
|
|
def __str__(self):
|
|
return self.model_name + " (record adapter: {})".format(self.record_adapter_id)
|
|
|
|
|
|
class Recorder(db.Model):
|
|
__table_args__ = {"extend_existing": True}
|
|
id = db.Column(db.Integer, autoincrement=True, primary_key=True)
|
|
created_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow())
|
|
last_time_modified = db.Column(
|
|
db.DateTime, nullable=False, default=datetime.utcnow()
|
|
)
|
|
name = db.Column(db.Unicode(63), unique=True, nullable=False)
|
|
model_name = db.Column(db.Unicode(63), unique=False, nullable=False)
|
|
serial_number = db.Column(db.Unicode(63), unique=True, nullable=True)
|
|
locked = db.Column(db.Boolean, default=False) # no modifications allowed
|
|
lock_message = db.Column(db.String, nullable=True, default=None)
|
|
offline = db.Column(db.Boolean, default=False) # maintenance, etc.
|
|
description = db.Column(db.Unicode(255), unique=False, nullable=True, default="")
|
|
_mac = db.Column(db.String(17), unique=True, nullable=True)
|
|
_ip = db.Column(db.String(15), unique=True, nullable=True, default=None)
|
|
_ip6 = db.Column(db.String(46), unique=True, nullable=True, default=None)
|
|
network_name = db.Column(db.String(127), unique=True, nullable=True, default=None)
|
|
telnet_port = db.Column(db.Integer, unique=False, nullable=False, default=23)
|
|
ssh_port = db.Column(db.Integer, unique=False, nullable=False, default=22)
|
|
username = db.Column(db.String, nullable=True, default=None)
|
|
password = db.Column(db.String, nullable=True, default=None)
|
|
_configured_options_json_string = db.Column(db.UnicodeText, default="")
|
|
_additional_notes_json_string = db.Column(db.UnicodeText, default="")
|
|
additional_camera_connected = db.Column(db.Boolean, default=False)
|
|
firmware_version = db.Column(db.String, nullable=True, default=None)
|
|
room_id = db.Column(db.Integer, db.ForeignKey("room.id"))
|
|
room = db.relationship(
|
|
"Room", uselist=False, back_populates="recorder"
|
|
) # one-to-one relation (uselist=False)
|
|
recorder_model_id = db.Column(db.Integer, db.ForeignKey("recorder_model.id"))
|
|
recorder_model = db.relationship("RecorderModel", back_populates="recorders")
|
|
virtual_commands = db.relationship(
|
|
"VirtualCommand",
|
|
secondary=virtual_command_recorder_table,
|
|
back_populates="recorders",
|
|
)
|
|
|
|
def __init__(self, **kwargs):
|
|
super(Recorder, self).__init__(**kwargs)
|
|
|
|
@staticmethod
|
|
def get_by_name(name):
|
|
return Recorder.query.filter(Recorder.name == name).first()
|
|
|
|
@staticmethod
|
|
def get_by_identifier(identifier):
|
|
return Recorder.query.filter(Recorder.id == identifier).first()
|
|
|
|
@staticmethod
|
|
def get_by_mac(mac: str):
|
|
if mac is None or mac == "":
|
|
return None
|
|
mac = mac.replace("-", ":").lower()
|
|
return Recorder.query.filter(Recorder._mac == mac).first()
|
|
|
|
@staticmethod
|
|
def get_all():
|
|
return Recorder.query.all()
|
|
|
|
@validates("name")
|
|
def validate_name(self, key, value):
|
|
assert len(value) > 2
|
|
return value
|
|
|
|
@hybrid_property
|
|
def configured_options(self) -> list:
|
|
return json.loads(self._configured_options_json_string)
|
|
|
|
@configured_options.setter
|
|
def configured_options(self, value: list):
|
|
self._configured_options_json_string = json.dumps(value)
|
|
|
|
def add_configured_option(self, value: str):
|
|
self._configured_options_json_string = json.dumps(
|
|
self.configured_options.append(value)
|
|
)
|
|
|
|
@hybrid_property
|
|
def additional_notes(self) -> list:
|
|
return json.loads(self._additional_notes_json_string)
|
|
|
|
@additional_notes.setter
|
|
def additional_notes(self, value: list):
|
|
self._additional_notes_json_string = json.dumps(value)
|
|
|
|
def add_additional_notes(self, value: str):
|
|
self._additional_notes_json_string = json.dumps(
|
|
self._additional_notes_json_string.append(value)
|
|
)
|
|
|
|
@hybrid_property
|
|
def mac(self) -> str:
|
|
return self._mac.upper()
|
|
|
|
@mac.setter
|
|
def mac(self, value: str):
|
|
if value is None or value == "":
|
|
return
|
|
if re.match("[0-9a-f]{2}([-:]?)[0-9a-f]{2}(\\1[0-9a-f]{2}){4}$", value.lower()):
|
|
self._mac = value.replace("-", ":").lower()
|
|
else:
|
|
raise LrcException("'{}' is not a valid MAC Address!".format(value))
|
|
|
|
@hybrid_property
|
|
def ip(self) -> str:
|
|
return self._ip
|
|
|
|
@ip.setter
|
|
def ip(self, value: str):
|
|
try:
|
|
ipaddress.IPv4Interface(value)
|
|
self._ip = value
|
|
except (ipaddress.AddressValueError, ipaddress.NetmaskValueError):
|
|
raise LrcException("'{}' is not a valid IPv4 Address!".format(value))
|
|
|
|
@hybrid_property
|
|
def ip6(self) -> str:
|
|
return self._ip6
|
|
|
|
@ip6.setter
|
|
def ip6(self, value: str):
|
|
try:
|
|
ipaddress.IPv6Interface(value)
|
|
self._ip6 = value
|
|
except (ipaddress.AddressValueError, ipaddress.NetmaskValueError):
|
|
raise LrcException("'{}' is not a valid IPv6 Address!".format(value))
|
|
|
|
# handle bad ip
|
|
|
|
def __str__(self):
|
|
return self.name
|
|
|
|
def to_dict(self):
|
|
return dict(id=self.id, name=self.name)
|
|
|
|
def toJSON(self):
|
|
return json.dumps(
|
|
self.to_dict(), default=lambda o: o.__dict__, sort_keys=True, indent=4
|
|
)
|
|
|
|
|
|
class RecorderCommand(db.Model):
|
|
__table_args__ = {"extend_existing": True}
|
|
"""Table containing permissions associated with groups."""
|
|
id = db.Column(db.Integer, autoincrement=True, primary_key=True)
|
|
created_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow())
|
|
last_time_modified = db.Column(
|
|
db.DateTime, nullable=True, default=datetime.utcnow()
|
|
)
|
|
name = db.Column(db.Unicode(63), unique=True, nullable=False)
|
|
alternative_name = db.Column(
|
|
db.Unicode(63), unique=True, nullable=True, default=None
|
|
)
|
|
disabled = db.Column(db.Boolean, default=False)
|
|
description = db.Column(db.Unicode(511), nullable=True, default=None)
|
|
parameters_string = db.Column(db.String(2047), nullable=True)
|
|
recorder_model = db.relationship(
|
|
"RecorderModel", back_populates="recorder_commands"
|
|
)
|
|
recorder_model_id = db.Column(db.Integer, db.ForeignKey("recorder_model.id"))
|
|
virtual_commands = db.relationship(
|
|
"VirtualCommand",
|
|
secondary=virtual_command_recorder_command_table,
|
|
back_populates="recorder_commands",
|
|
)
|
|
|
|
@staticmethod
|
|
def get_all():
|
|
return RecorderCommand.query.all()
|
|
|
|
@property
|
|
def parameters(self):
|
|
if self.parameters_string is None:
|
|
return None
|
|
return json.loads(self.parameters_string)
|
|
|
|
@parameters.setter
|
|
def parameters(self, parameters_dict: dict):
|
|
self.parameters_string = json.dumps(parameters_dict)
|