Files
lrc-backend/backend/tools/model_updater.py

193 lines
7.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2019. Tobias Kurze
import hashlib
import json
import logging
import os
import re
from datetime import datetime
from json import dumps
from pprint import pprint
from sqlalchemy import and_
from sqlalchemy.exc import IntegrityError
from backend import db
from backend.models import Room
from backend.models.recorder_model import RecorderModel, RecorderCommand, Recorder
from backend.recorder_adapters import get_defined_recorder_adapters
from backend.tools.helpers import calculate_md5_checksum
logger = logging.getLogger("lrc." + __name__)
KNOWN_RECORDERS = {re.compile(r'(?P<name>SMP)[\s]*(?P<number>[\d]+)[\s]*.?[\s]*(?P<options>[\S]*)'): 'SMP',
re.compile(
r'(?P<name>LectureRecorder X2|LectureRecorder|VGADVI Recorder|DVI Broadcaster DL|DVIRecorderDL)'): 'Epiphan'}
def create_recorder_commands_for_recorder_adapter(command_definitions: dict, recorder_model: RecorderModel):
logger.debug("Received {} command definitions to create "
"for recoder model {}".format(len(command_definitions),
recorder_model.record_adapter_id))
existing_recorder_commands = RecorderCommand.query.filter(
and_(
RecorderCommand.name.in_([recorder_model.record_adapter_id + ":" + k for k in command_definitions.keys()])),
RecorderCommand.recorder_model == recorder_model)
existing_commands = set()
for existing_command in existing_recorder_commands:
existing_commands.add(existing_command.name)
args = command_definitions.get(existing_command.name)
if dumps(existing_command.parameters) != dumps(args):
logger.warning(
"The function definition {} collides with an existing definition of the same name "
"but different parameters!".format(
existing_command.name))
existing_command.last_time_modified = datetime.utcnow()
existing_command.parameters = args
db.session.commit()
for c_d in set(command_definitions.keys()) - existing_commands:
args = command_definitions.get(c_d)
# print(args)
# create new recorder command(s)
r_c = RecorderCommand(name=recorder_model.record_adapter_id + ":" + c_d, parameters=args,
recorder_model=recorder_model)
db.session.add(r_c)
db.session.flush()
db.session.commit()
def update_recorder_models_database(drop: bool = False):
if drop:
for r in RecorderModel.get_all():
db.session.delete(r)
db.session.commit()
r_as = get_defined_recorder_adapters()
for r_a in r_as:
try:
r_m = RecorderModel.get_by_adapter_id(r_a["id"])
model_checksum = calculate_md5_checksum(dumps(r_a["commands"]))
if r_m is None:
r_m = RecorderModel(record_adapter_id=r_a["id"], model_name=r_a["name"], checksum=model_checksum,
**r_a.get('class', {}).get_recorder_params())
db.session.add(r_m)
db.session.flush()
db.session.refresh(r_m)
logger.debug("Creating command definitions for rec mod adapter {}".format(r_m.record_adapter_id))
create_recorder_commands_for_recorder_adapter(r_a["commands"], r_m)
else:
if r_m.model_name != r_a["name"]:
r_m.model_name = r_a["name"]
r_m.last_time_modified = datetime.utcnow()
if model_checksum != r_m.checksum:
r_m.last_time_modified = datetime.utcnow()
r_m.checksum = model_checksum
r_m.last_checksum_change = datetime.utcnow()
logger.debug("Updating command definitions for rec mod adapter {}".format(r_m.record_adapter_id))
create_recorder_commands_for_recorder_adapter(r_a["commands"], r_m)
except IntegrityError as e:
logger.error(e)
db.session.rollback()
db.session.commit()
def get_recorder_room(rec: dict) -> Room():
rooms = Room.get_by_building_number(rec.get('building', None))
if rooms.count() <= 0:
logger.warning("Building {} unknown! Can not find room for recorder {}.".format(rec['building'], rec['name']))
return None
if rooms.count() == 1:
return rooms.first()
room_name = rec.get('room')
room_name = room_name.replace('', ' ')
for room in rooms:
if all([r_n in room.name for r_n in room_name.split()]):
return room
logger.warning("No room found for recorder {}".format(rec['name']))
def create_default_recorders():
f = os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir, 'models', 'initial_recorders.json'))
with open(f, 'r') as json_file:
recorders = json.load(json_file)['recorders']
for r in recorders:
type = r.get('type')
firmware_version = r.get('firmware_version', None)
room_rec_name = r.get('name')
username = r.get('username', None)
password = r.get('password', None)
mac = r.get('mac', None)
ip4 = r.get('ip', None)
additional_camera = r.get('additional_camera', False)
description = r.get('description', '')
for k_r in KNOWN_RECORDERS:
if match := k_r.search(type):
name = match.groupdict().get('name')
model_number = match.groupdict().get('number', None)
options = match.groupdict().get('options', None)
if options is not None and options != "":
options = options.split(',')
if model_number is not None:
model_name = name + " " + model_number
model_search_name = name + model_number[:-1] # just get prefix (remove last digit); SMP 35x -> 35
else:
model_name = name
model_search_name = KNOWN_RECORDERS[k_r]
rec_model = RecorderModel.get_where_adapter_id_contains(model_search_name)
rec = Recorder.get_by_mac(mac)
if rec is None:
rec = Recorder.get_by_name(room_rec_name + " Recorder")
if rec is None:
rec = Recorder()
rec.mac = mac
db.session.add(rec)
rec.name = room_rec_name + " Recorder"
rec.model_name = model_name
rec.recorder_model = rec_model
rec.username = username
rec.password = password
rec.firmware_version = firmware_version
rec.ip = ip4
rec.additional_camera_connected = additional_camera
rec.additional_note = description
rec.configured_options = options
rec.room = get_recorder_room(r)
db.session.flush()
db.session.refresh(rec)
db.session.commit()
if __name__ == '__main__':
# for r in Room.get_all():
# print(r)
# exit()
commands = RecorderCommand.get_all()
for c in commands:
print(c)
print(c.recorder_model)
for r_m in RecorderModel.get_all():
print(r_m.recorder_commands)
exit()
recorders = Recorder.get_all()
for r in recorders:
if r.room is None:
print("{}: {}".format(r, r.room))
# db.drop_all()
# db.create_all()
update_recorder_models_database()
create_default_recorders()
db.session.commit()
# print(get_recorder_room({"room": "Grosser Hörsaal Bauingenieure", "building": "10.50"}))
# print(get_recorder_room({"room": "Grosser Hörsaal Bauingenieure", "building": "30.95"}))