code to call recorder functions from frointend

This commit is contained in:
2019-12-12 18:44:07 +01:00
parent da200f95b8
commit bb4db25dcd
6 changed files with 160 additions and 79 deletions

View File

@@ -12,13 +12,14 @@ from flask_jwt_extended import jwt_required, get_current_user, get_jwt_claims
from flask_restplus import fields, Resource from flask_restplus import fields, Resource
from backend import db from backend import db
from backend.api import api_control, get_jwt_identity from backend.api import api_control, get_jwt_identity, Recorder, RecorderCommand, pprint
from backend.recorder_adapters.helpers import execute_recorder_command
control_command_response_model = api_control.model('Control Command Response', { control_command_response_model = api_control.model('Control Command Response', {
'time': fields.DateTime(required=False, description='Creation date of the recorder'), 'time': fields.DateTime(required=False, description='Creation date of the recorder'),
'state': fields.String(min_length=3, required=True, description='The recorder\'s name'), 'ok': fields.Boolean(required=True, description='Field indicating whether command execution was successful.'),
'output': fields.String(required=False, description='The recorder\'s description'), 'output': fields.String(required=False, description='Command output in case of success'),
'error': fields.String(required=False, description='The recorder\'s description'), 'error': fields.String(required=False, description='Error description in case of a problem.'),
}) })
@@ -27,16 +28,28 @@ class ControlCommand(Resource):
control_command_parser = api_control.parser() control_command_parser = api_control.parser()
control_command_parser.add_argument('recorder_id', type=int, default=1, required=True) control_command_parser.add_argument('recorder_id', type=int, default=1, required=True)
control_command_parser.add_argument('command_id', type=int, default=1, required=True) control_command_parser.add_argument('command_id', type=int, default=1, required=True)
control_command_parser.add_argument('parameters', default=json.dumps({'p1': 'v1'}), type=dict, required=False, control_command_parser.add_argument('parameters', default={}, type=dict, required=False,
location='json') location='json')
@jwt_required #@jwt_required
@api_control.doc('run_command') @api_control.doc('run_command')
@api_control.expect(control_command_parser) @api_control.expect(control_command_parser)
@api_control.marshal_with(control_command_response_model, skip_none=False, code=201) @api_control.marshal_with(control_command_response_model, skip_none=False, code=201)
def post(self): def post(self):
print(get_current_user()) #print(get_current_user())
print(get_jwt_identity()) #print(get_jwt_identity())
current_user = {'user': get_current_user(), 'claims': get_jwt_claims()} #current_user = {'user': get_current_user(), 'claims': get_jwt_claims()}
#TODO: right check! (acl, etc.)
args = self.control_command_parser.parse_args() args = self.control_command_parser.parse_args()
return {'time': datetime.utcnow(), 'output': args, 'state': current_user} recorder = Recorder.get_by_identifier(args.get('recorder_id'))
if recorder is None:
api_control.abort(404, "Recorder not found!")
command = RecorderCommand.get_by_identifier(args.get('command_id'))
if command is None:
api_control.abort(404, "Command not found!")
success, output = execute_recorder_command(recorder, command, args.get('parameters', None))
if success:
return {'time': datetime.utcnow(), 'output': output, 'ok': success}
return {'time': datetime.utcnow(), 'error': output, 'ok': success}

View File

@@ -28,7 +28,7 @@ scheduler = None
def get_default_scheduler(): def get_default_scheduler():
cron_logger.debug("creating scheduler!") cron_logger.debug("creating scheduler!")
global scheduler global scheduler
scheduler = BackgroundScheduler() scheduler = BackgroundScheduler(timezone='utc')
scheduler.daemonic = False scheduler.daemonic = False
return scheduler return scheduler

View File

@@ -8,6 +8,7 @@ import json
import pkgutil import pkgutil
import os import os
import re import re
from typing import Union
from sqlalchemy import MetaData from sqlalchemy import MetaData
from sqlalchemy.exc import IntegrityError from sqlalchemy.exc import IntegrityError
@@ -234,8 +235,12 @@ class RecorderCommand(db.Model, ModelBase):
def get_all(): def get_all():
return RecorderCommand.query.all() return RecorderCommand.query.all()
@staticmethod
def get_by_identifier(identifier):
return RecorderCommand.query.filter(RecorderCommand.id == identifier).first()
@property @property
def parameters(self): def parameters(self) -> Union[dict, None]:
if self.parameters_string is None: if self.parameters_string is None:
return None return None
return json.loads(self.parameters_string) return json.loads(self.parameters_string)

View File

@@ -58,7 +58,7 @@ class SMP35x(TelnetAdapter, RecorderAdapter):
# print(self.tn.assert_string_in_output("Login Administrator")) # print(self.tn.assert_string_in_output("Login Administrator"))
self.tn = None self.tn = None
logger.error("Could definitely not login (as admin) with given password! {}".format(self.address)) logger.error("Could definitely not login (as admin) with given password! {}".format(self.address))
raise Exception("Could not login as administrator with given pw!") raise LrcException("Could not login as administrator with given pw!")
# print("OK, we have admin rights!") # print("OK, we have admin rights!")
def _get_name(self): def _get_name(self):
@@ -158,7 +158,7 @@ class SMP35x(TelnetAdapter, RecorderAdapter):
:return: :return:
""" """
if mode not in range(4): if mode not in range(4):
raise Exception("Only values from 0 to 3 are allowed!") raise LrcException("Only values from 0 to 3 are allowed!")
self.tn.write(self.esc_char + str(mode) + "CV\n") self.tn.write(self.esc_char + str(mode) + "CV\n")
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
@@ -175,7 +175,7 @@ class SMP35x(TelnetAdapter, RecorderAdapter):
:return: :return:
""" """
if mode not in [0, 2]: if mode not in [0, 2]:
raise Exception("Only values 0 and 2 are allowed!") raise LrcException("Only values 0 and 2 are allowed!")
self.tn.write(self.esc_char + "1*{}XF\n".format(mode)) self.tn.write(self.esc_char + "1*{}XF\n".format(mode))
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
@@ -188,7 +188,7 @@ class SMP35x(TelnetAdapter, RecorderAdapter):
:return: :return:
""" """
if mode not in [0, 2]: if mode not in [0, 2]:
raise Exception("Only values 0 and 2 are allowed!") raise LrcException("Only values 0 and 2 are allowed!")
self.tn.write(self.esc_char + "0*{}XF\n".format(mode)) self.tn.write(self.esc_char + "0*{}XF\n".format(mode))
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
@@ -242,7 +242,7 @@ class SMP35x(TelnetAdapter, RecorderAdapter):
:return: :return:
""" """
if mode not in range(4): if mode not in range(4):
raise Exception("Only values from 0 to 3 are allowed!") raise LrcException("Only values from 0 to 3 are allowed!")
self.tn.write(str(mode) + "X\n") self.tn.write(str(mode) + "X\n")
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
@@ -306,15 +306,15 @@ class SMP35x(TelnetAdapter, RecorderAdapter):
:return: :return:
""" """
if input_num not in range(1, 6): if input_num not in range(1, 6):
raise Exception("input_num must be a value between 1 and 5!") raise LrcException("input_num must be a value between 1 and 5!")
if channel_num not in range(1, 3): if channel_num not in range(1, 3):
raise Exception("input_num must be a value between 1 and 2!") raise LrcException("input_num must be a value between 1 and 2!")
self.tn.write("{}*{}!\n".format(input_num, channel_num)) self.tn.write("{}*{}!\n".format(input_num, channel_num))
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_input(self, channel_num: int): def get_input(self, channel_num: int):
if channel_num not in range(1, 2): if channel_num not in range(1, 2):
raise Exception("input_num must be a value between 1 and 2!") raise LrcException("input_num must be a value between 1 and 2!")
self.tn.write("{}!\n".format(channel_num)) self.tn.write("{}!\n".format(channel_num))
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
@@ -329,33 +329,33 @@ class SMP35x(TelnetAdapter, RecorderAdapter):
:return: :return:
""" """
if input_num not in range(1, 6): if input_num not in range(1, 6):
raise Exception("input_num must be a value between 1 and 5!") raise LrcException("input_num must be a value between 1 and 5!")
if input_format not in range(1, 4): if input_format not in range(1, 4):
raise Exception("input_num must be a value between 1 and 3!") raise LrcException("input_num must be a value between 1 and 3!")
self.tn.write("{}*{}\\\n".format(input_num, input_format)) self.tn.write("{}*{}\\\n".format(input_num, input_format))
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_input_format(self, input_num: int): def get_input_format(self, input_num: int):
if input_num not in range(1, 6): if input_num not in range(1, 6):
raise Exception("input_num must be a value between 1 and 5!") raise LrcException("input_num must be a value between 1 and 5!")
self.tn.write("{}\\\n".format(input_num)) self.tn.write("{}\\\n".format(input_num))
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def set_input_name(self, input_num: int, input_name: str): def set_input_name(self, input_num: int, input_name: str):
if input_num not in range(1, 6): if input_num not in range(1, 6):
raise Exception("input_num must be a value between 1 and 5!") raise LrcException("input_num must be a value between 1 and 5!")
if len(input_name) > 16: if len(input_name) > 16:
raise Exception("input_name must be no longer than 16 chars") raise LrcException("input_name must be no longer than 16 chars")
try: try:
input_name.encode('ascii') input_name.encode('ascii')
except UnicodeEncodeError: except UnicodeEncodeError:
raise Exception("input_name must only contain ascii characters") raise LrcException("input_name must only contain ascii characters")
self.tn.write("{}{},{}NI\n".format(self.esc_char, input_num, input_name)) self.tn.write("{}{},{}NI\n".format(self.esc_char, input_num, input_name))
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_input_name(self, input_num: int): def get_input_name(self, input_num: int):
if input_num not in range(1, 6): if input_num not in range(1, 6):
raise Exception("input_num must be a value between 1 and 5!") raise LrcException("input_num must be a value between 1 and 5!")
self.tn.write("{}{}NI\n".format(self.esc_char, input_num)) self.tn.write("{}{}NI\n".format(self.esc_char, input_num))
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
@@ -401,7 +401,7 @@ class SMP35x(TelnetAdapter, RecorderAdapter):
:return: :return:
""" """
if extension_time not in range(0, 100): if extension_time not in range(0, 100):
raise Exception("extension_time must be a value between 0 and 99!") raise LrcException("extension_time must be a value between 0 and 99!")
self.tn.write("{}E{}RCDR\n".format(self.esc_char, extension_time)) self.tn.write("{}E{}RCDR\n".format(self.esc_char, extension_time))
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
@@ -435,82 +435,82 @@ class SMP35x(TelnetAdapter, RecorderAdapter):
def recall_user_preset(self, channel_number: int, preset_number: int): def recall_user_preset(self, channel_number: int, preset_number: int):
if channel_number not in range(1, 3): if channel_number not in range(1, 3):
raise Exception("channel_number must be a value between 1 and 2!") raise LrcException("channel_number must be a value between 1 and 2!")
if preset_number not in range(1, 17): if preset_number not in range(1, 17):
raise Exception("preset_number must be a value between 1 and 16!") raise LrcException("preset_number must be a value between 1 and 16!")
self.tn.write("1*{}*{}.\n".format(channel_number, preset_number)) self.tn.write("1*{}*{}.\n".format(channel_number, preset_number))
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def save_user_preset(self, channel_number: int, preset_number: int): def save_user_preset(self, channel_number: int, preset_number: int):
if channel_number not in range(1, 3): if channel_number not in range(1, 3):
raise Exception("channel_number must be a value between 1 and 2!") raise LrcException("channel_number must be a value between 1 and 2!")
if preset_number not in range(1, 17): if preset_number not in range(1, 17):
raise Exception("preset_number must be a value between 1 and 16!") raise LrcException("preset_number must be a value between 1 and 16!")
self.tn.write("1*{}*{},\n".format(channel_number, preset_number)) self.tn.write("1*{}*{},\n".format(channel_number, preset_number))
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def set_user_preset_name(self, preset_number: int, preset_name: str): def set_user_preset_name(self, preset_number: int, preset_name: str):
if preset_number not in range(1, 17): if preset_number not in range(1, 17):
raise Exception("preset_number must be a value between 1 and 16!") raise LrcException("preset_number must be a value between 1 and 16!")
if len(preset_name) > 16: if len(preset_name) > 16:
raise Exception("preset_name must be no longer than 16 chars") raise LrcException("preset_name must be no longer than 16 chars")
try: try:
preset_name.encode('ascii') preset_name.encode('ascii')
except UnicodeEncodeError: except UnicodeEncodeError:
raise Exception("preset_name must only contain ascii characters") raise LrcException("preset_name must only contain ascii characters")
self.tn.write("{}1*{},{}PNAM\n".format(self.esc_char, preset_number, preset_name)) self.tn.write("{}1*{},{}PNAM\n".format(self.esc_char, preset_number, preset_name))
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_user_preset_name(self, preset_number: int): def get_user_preset_name(self, preset_number: int):
if preset_number not in range(1, 17): if preset_number not in range(1, 17):
raise Exception("preset_number must be a value between 1 and 16!") raise LrcException("preset_number must be a value between 1 and 16!")
self.tn.write("{}1*{}PNAM\n".format(self.esc_char, preset_number)) self.tn.write("{}1*{}PNAM\n".format(self.esc_char, preset_number))
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_user_presets(self, input_number: int): def get_user_presets(self, input_number: int):
if input_number not in range(1, 6): if input_number not in range(1, 6):
raise Exception("input_number must be a value between 1 and 5!") raise LrcException("input_number must be a value between 1 and 5!")
self.tn.write("52*{}#\n".format(input_number)) self.tn.write("52*{}#\n".format(input_number))
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
# Input Presets # Input Presets
def recall_input_preset(self, channel_number: int, preset_number: int): def recall_input_preset(self, channel_number: int, preset_number: int):
if channel_number not in range(1, 3): if channel_number not in range(1, 3):
raise Exception("channel_number must be a value between 1 and 2!") raise LrcException("channel_number must be a value between 1 and 2!")
if preset_number not in range(1, 129): if preset_number not in range(1, 129):
raise Exception("preset_number must be a value between 1 and 128!") raise LrcException("preset_number must be a value between 1 and 128!")
self.tn.write("2*{}*{}.\n".format(channel_number, preset_number)) self.tn.write("2*{}*{}.\n".format(channel_number, preset_number))
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def save_input_preset(self, channel_number: int, preset_number: int): def save_input_preset(self, channel_number: int, preset_number: int):
if channel_number not in range(1, 3): if channel_number not in range(1, 3):
raise Exception("channel_number must be a value between 1 and 2!") raise LrcException("channel_number must be a value between 1 and 2!")
if preset_number not in range(1, 129): if preset_number not in range(1, 129):
raise Exception("preset_number must be a value between 1 and 128!") raise LrcException("preset_number must be a value between 1 and 128!")
self.tn.write("1*{}*{},\n".format(channel_number, preset_number)) self.tn.write("1*{}*{},\n".format(channel_number, preset_number))
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def set_input_preset_name(self, preset_number: int, preset_name: str): def set_input_preset_name(self, preset_number: int, preset_name: str):
if preset_number not in range(1, 129): if preset_number not in range(1, 129):
raise Exception("preset_number must be a value between 1 and 128!") raise LrcException("preset_number must be a value between 1 and 128!")
if len(preset_name) > 16: if len(preset_name) > 16:
raise Exception("preset_name must be no longer than 16 chars") raise LrcException("preset_name must be no longer than 16 chars")
try: try:
preset_name.encode('ascii') preset_name.encode('ascii')
except UnicodeEncodeError: except UnicodeEncodeError:
raise Exception("preset_name must only contain ascii characters") raise LrcException("preset_name must only contain ascii characters")
self.tn.write("{}2*{},{}PNAM\n".format(self.esc_char, preset_number, preset_name)) self.tn.write("{}2*{},{}PNAM\n".format(self.esc_char, preset_number, preset_name))
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_input_preset_name(self, preset_number: int): def get_input_preset_name(self, preset_number: int):
if preset_number not in range(1, 129): if preset_number not in range(1, 129):
raise Exception("preset_number must be a value between 1 and 128!") raise LrcException("preset_number must be a value between 1 and 128!")
self.tn.write("{}2*{}PNAM\n".format(self.esc_char, preset_number)) self.tn.write("{}2*{}PNAM\n".format(self.esc_char, preset_number))
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def delete_input_preset(self, preset_number: int): def delete_input_preset(self, preset_number: int):
if preset_number not in range(1, 129): if preset_number not in range(1, 129):
raise Exception("preset_number must be a value between 1 and 128!") raise LrcException("preset_number must be a value between 1 and 128!")
self.tn.write("{}X2*{}PRST\n".format(self.esc_char, preset_number)) self.tn.write("{}X2*{}PRST\n".format(self.esc_char, preset_number))
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
@@ -530,9 +530,9 @@ class SMP35x(TelnetAdapter, RecorderAdapter):
:return: :return:
""" """
if output_number not in range(1, 4): if output_number not in range(1, 4):
raise Exception("output_number must be a value between 1 and 3!") raise LrcException("output_number must be a value between 1 and 3!")
if preset_number not in range(1, 17): if preset_number not in range(1, 17):
raise Exception("preset_number must be a value between 1 and 16!") raise LrcException("preset_number must be a value between 1 and 16!")
self.tn.write("3*{}*{}.\n".format(output_number, preset_number)) self.tn.write("3*{}*{}.\n".format(output_number, preset_number))
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
@@ -547,33 +547,33 @@ class SMP35x(TelnetAdapter, RecorderAdapter):
:return: :return:
""" """
if output_number not in range(1, 4): if output_number not in range(1, 4):
raise Exception("output_number must be a value between 1 and 3!") raise LrcException("output_number must be a value between 1 and 3!")
if preset_number not in range(1, 17): if preset_number not in range(1, 17):
raise Exception("preset_number must be a value between 1 and 16!") raise LrcException("preset_number must be a value between 1 and 16!")
self.tn.write("3*{}*{},\n".format(output_number, preset_number)) self.tn.write("3*{}*{},\n".format(output_number, preset_number))
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def set_streaming_preset_name(self, preset_number: int, preset_name: str): def set_streaming_preset_name(self, preset_number: int, preset_name: str):
if preset_number not in range(1, 17): if preset_number not in range(1, 17):
raise Exception("preset_number must be a value between 1 and 16!") raise LrcException("preset_number must be a value between 1 and 16!")
if len(preset_name) > 16: if len(preset_name) > 16:
raise Exception("preset_name must be no longer than 16 chars") raise LrcException("preset_name must be no longer than 16 chars")
try: try:
preset_name.encode('ascii') preset_name.encode('ascii')
except UnicodeEncodeError: except UnicodeEncodeError:
raise Exception("preset_name must only contain ascii characters") raise LrcException("preset_name must only contain ascii characters")
self.tn.write("{}3*{},{}PNAM\n".format(self.esc_char, preset_number, preset_name)) self.tn.write("{}3*{},{}PNAM\n".format(self.esc_char, preset_number, preset_name))
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_streaming_preset_name(self, preset_number: int): def get_streaming_preset_name(self, preset_number: int):
if preset_number not in range(1, 17): if preset_number not in range(1, 17):
raise Exception("preset_number must be a value between 1 and 16!") raise LrcException("preset_number must be a value between 1 and 16!")
self.tn.write("{}3*{}PNAM\n".format(self.esc_char, preset_number)) self.tn.write("{}3*{}PNAM\n".format(self.esc_char, preset_number))
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def reset_streaming_preset_to_default(self, preset_number: int): def reset_streaming_preset_to_default(self, preset_number: int):
if preset_number not in range(1, 17): if preset_number not in range(1, 17):
raise Exception("preset_number must be a value between 1 and 16!") raise LrcException("preset_number must be a value between 1 and 16!")
self.tn.write("{}X3*{}PRST\n".format(self.esc_char, preset_number)) self.tn.write("{}X3*{}PRST\n".format(self.esc_char, preset_number))
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
@@ -589,9 +589,9 @@ class SMP35x(TelnetAdapter, RecorderAdapter):
:return: :return:
""" """
if output_number not in range(1, 4): if output_number not in range(1, 4):
raise Exception("output_number must be a value between 1 and 3!") raise LrcException("output_number must be a value between 1 and 3!")
if preset_number not in range(1, 17): if preset_number not in range(1, 17):
raise Exception("preset_number must be a value between 1 and 16!") raise LrcException("preset_number must be a value between 1 and 16!")
self.tn.write("4*{}*{}.\n".format(output_number, preset_number)) self.tn.write("4*{}*{}.\n".format(output_number, preset_number))
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
@@ -606,46 +606,46 @@ class SMP35x(TelnetAdapter, RecorderAdapter):
:return: :return:
""" """
if output_number not in range(1, 4): if output_number not in range(1, 4):
raise Exception("output_number must be a value between 1 and 3!") raise LrcException("output_number must be a value between 1 and 3!")
if preset_number not in range(1, 17): if preset_number not in range(1, 17):
raise Exception("preset_number must be a value between 1 and 16!") raise LrcException("preset_number must be a value between 1 and 16!")
self.tn.write("4*{}*{},\n".format(output_number, preset_number)) self.tn.write("4*{}*{},\n".format(output_number, preset_number))
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def set_encoder_preset_name(self, preset_number: int, preset_name: str): def set_encoder_preset_name(self, preset_number: int, preset_name: str):
if preset_number not in range(1, 17): if preset_number not in range(1, 17):
raise Exception("preset_number must be a value between 1 and 16!") raise LrcException("preset_number must be a value between 1 and 16!")
if len(preset_name) > 16: if len(preset_name) > 16:
raise Exception("preset_name must be no longer than 16 chars") raise LrcException("preset_name must be no longer than 16 chars")
try: try:
preset_name.encode('ascii') preset_name.encode('ascii')
except UnicodeEncodeError: except UnicodeEncodeError:
raise Exception("preset_name must only contain ascii characters") raise LrcException("preset_name must only contain ascii characters")
self.tn.write("{}4*{},{}PNAM\n".format(self.esc_char, preset_number, preset_name)) self.tn.write("{}4*{},{}PNAM\n".format(self.esc_char, preset_number, preset_name))
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_encoder_preset_name(self, preset_number: int): def get_encoder_preset_name(self, preset_number: int):
if preset_number not in range(1, 17): if preset_number not in range(1, 17):
raise Exception("preset_number must be a value between 1 and 16!") raise LrcException("preset_number must be a value between 1 and 16!")
self.tn.write("{}4*{}PNAM\n".format(self.esc_char, preset_number)) self.tn.write("{}4*{}PNAM\n".format(self.esc_char, preset_number))
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def reset_encoder_preset_to_default(self, preset_number: int): def reset_encoder_preset_to_default(self, preset_number: int):
if preset_number not in range(1, 17): if preset_number not in range(1, 17):
raise Exception("preset_number must be a value between 1 and 16!") raise LrcException("preset_number must be a value between 1 and 16!")
self.tn.write("{}X4*{}PRST\n".format(self.esc_char, preset_number)) self.tn.write("{}X4*{}PRST\n".format(self.esc_char, preset_number))
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
# Layout Presets # Layout Presets
def save_layout_preset(self, preset_number: int): def save_layout_preset(self, preset_number: int):
if preset_number not in range(1, 17): if preset_number not in range(1, 17):
raise Exception("preset_number must be a value between 1 and 16!") raise LrcException("preset_number must be a value between 1 and 16!")
self.tn.write("7*{},\n".format(preset_number)) self.tn.write("7*{},\n".format(preset_number))
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def recall_layout_preset(self, preset_number: int, include_input_selections: bool = True): def recall_layout_preset(self, preset_number: int, include_input_selections: bool = True):
if preset_number not in range(1, 17): if preset_number not in range(1, 17):
raise Exception("preset_number must be a value between 1 and 16!") raise LrcException("preset_number must be a value between 1 and 16!")
if include_input_selections: if include_input_selections:
self.tn.write("7*{}.\n".format(preset_number)) self.tn.write("7*{}.\n".format(preset_number))
else: else:
@@ -654,25 +654,25 @@ class SMP35x(TelnetAdapter, RecorderAdapter):
def set_layout_preset_name(self, preset_number: int, preset_name: str): def set_layout_preset_name(self, preset_number: int, preset_name: str):
if preset_number not in range(1, 17): if preset_number not in range(1, 17):
raise Exception("preset_number must be a value between 1 and 16!") raise LrcException("preset_number must be a value between 1 and 16!")
if len(preset_name) > 16: if len(preset_name) > 16:
raise Exception("preset_name must be no longer than 16 chars") raise LrcException("preset_name must be no longer than 16 chars")
try: try:
preset_name.encode('ascii') preset_name.encode('ascii')
except UnicodeEncodeError: except UnicodeEncodeError:
raise Exception("preset_name must only contain ascii characters") raise LrcException("preset_name must only contain ascii characters")
self.tn.write("{}7*{},{}PNAM\n".format(self.esc_char, preset_number, preset_name)) self.tn.write("{}7*{},{}PNAM\n".format(self.esc_char, preset_number, preset_name))
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_layout_preset_name(self, preset_number: int): def get_layout_preset_name(self, preset_number: int):
if preset_number not in range(1, 17): if preset_number not in range(1, 17):
raise Exception("preset_number must be a value between 1 and 16!") raise LrcException("preset_number must be a value between 1 and 16!")
self.tn.write("{}7*{}PNAM\n".format(self.esc_char, preset_number)) self.tn.write("{}7*{}PNAM\n".format(self.esc_char, preset_number))
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def reset_layout_preset_to_default(self, preset_number: int): def reset_layout_preset_to_default(self, preset_number: int):
if preset_number not in range(1, 17): if preset_number not in range(1, 17):
raise Exception("preset_number must be a value between 1 and 16!") raise LrcException("preset_number must be a value between 1 and 16!")
self.tn.write("{}X7*{}PRST\n".format(self.esc_char, preset_number)) self.tn.write("{}X7*{}PRST\n".format(self.esc_char, preset_number))
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
@@ -690,7 +690,7 @@ class SMP35x(TelnetAdapter, RecorderAdapter):
:return: :return:
""" """
if output_number not in range(1, 3): if output_number not in range(1, 3):
raise Exception("output_number must be a value between 1 and 2!") raise LrcException("output_number must be a value between 1 and 2!")
self.tn.write("{}*1B\n".format(output_number)) self.tn.write("{}*1B\n".format(output_number))
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
@@ -703,7 +703,7 @@ class SMP35x(TelnetAdapter, RecorderAdapter):
:return: :return:
""" """
if output_number not in range(1, 3): if output_number not in range(1, 3):
raise Exception("output_number must be a value between 1 and 2!") raise LrcException("output_number must be a value between 1 and 2!")
self.tn.write("{}*0B\n".format(output_number)) self.tn.write("{}*0B\n".format(output_number))
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
@@ -716,7 +716,7 @@ class SMP35x(TelnetAdapter, RecorderAdapter):
:return: :return:
""" """
if output_number not in range(1, 3): if output_number not in range(1, 3):
raise Exception("output_number must be a value between 1 and 2!") raise LrcException("output_number must be a value between 1 and 2!")
self.tn.write("{}B\n".format(output_number)) self.tn.write("{}B\n".format(output_number))
return int(TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())) > 0 return int(TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())) > 0
@@ -741,25 +741,25 @@ class SMP35x(TelnetAdapter, RecorderAdapter):
:return: :return:
""" """
if input_number not in range(1, 6): if input_number not in range(1, 6):
raise Exception("input_number must be a value between 1 and 6!") raise LrcException("input_number must be a value between 1 and 6!")
self.tn.write("{}I{}HDCP\n".format(self.esc_char, input_number)) self.tn.write("{}I{}HDCP\n".format(self.esc_char, input_number))
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def set_input_authorization_hdcp_on(self, input_number: int): def set_input_authorization_hdcp_on(self, input_number: int):
if input_number not in range(1, 6): if input_number not in range(1, 6):
raise Exception("input_number must be a value between 1 and 6!") raise LrcException("input_number must be a value between 1 and 6!")
self.tn.write("{}E{}*1HDCP\n".format(self.esc_char, input_number)) self.tn.write("{}E{}*1HDCP\n".format(self.esc_char, input_number))
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def set_input_authorization_hdcp_off(self, input_number: int): def set_input_authorization_hdcp_off(self, input_number: int):
if input_number not in range(1, 6): if input_number not in range(1, 6):
raise Exception("input_number must be a value between 1 and 6!") raise LrcException("input_number must be a value between 1 and 6!")
self.tn.write("{}E{}*0HDCP\n".format(self.esc_char, input_number)) self.tn.write("{}E{}*0HDCP\n".format(self.esc_char, input_number))
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_input_authorization_hdcp_status(self, input_number: int): def get_input_authorization_hdcp_status(self, input_number: int):
if input_number not in range(1, 6): if input_number not in range(1, 6):
raise Exception("input_number must be a value between 1 and 6!") raise LrcException("input_number must be a value between 1 and 6!")
self.tn.write("{}E{}HDCP\n".format(self.esc_char, input_number)) self.tn.write("{}E{}HDCP\n".format(self.esc_char, input_number))
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())

View File

@@ -0,0 +1,61 @@
import logging
from typing import Union
from backend.models import Recorder, RecorderCommand, LrcException
from backend.recorder_adapters import get_recorder_adapter_by_id
logger = logging.getLogger("lrc.recorder_adapters.helpers")
def validate_recorder_command_params(recorder_command: RecorderCommand, params: dict, fail_on_missing_params=False):
if recorder_command.parameters is None or len(recorder_command.parameters) == 0:
if len(params) == 0:
logger.debug("Number (0) of parameters matching expected number of args of command signature.")
return True
logger.info("More arguments specified ({}) than expected!".format(len(params)))
return False
for p_n in recorder_command.parameters:
p_t = recorder_command.parameters[p_n]
p = params.get(p_n, None)
if p is None:
if fail_on_missing_params:
return False
else:
if p_t == 'int':
params[p_n] = int(p)
return True
def get_function_from_recorder_command(recorder: Recorder, recorder_command: RecorderCommand,
connect_by_network_name=True):
adapter_name, function_name = recorder_command.name.split(':')
if connect_by_network_name and recorder.network_name is not None:
address = recorder.network_name
elif recorder.ip6 is not None:
address = recorder.ip6
else:
address = recorder.ip
logger.debug("Using {} to create recorder adapter to connect to {}.".format(address, recorder.name))
package_name, class_name = adapter_name.split('.')
adapter = get_recorder_adapter_by_id(class_name, address=address, user=recorder.username,
password=recorder.password, firmware_version=recorder.firmware_version)
logger.debug("Built adapter {}".format(adapter))
return getattr(adapter, function_name)
def execute_recorder_command(recorder: Recorder, recorder_command: RecorderCommand, params: Union[dict, None] = None,
connect_by_network_name=True):
if params is None:
params = dict()
if validate_recorder_command_params(recorder_command, params):
try:
func = get_function_from_recorder_command(recorder, recorder_command, connect_by_network_name)
logger.debug(
"Executing func: {} with params: '{}'".format(func, ", ".join(": ".join(str(_)) for _ in params.items())))
out = func(**params)
return True, out
except LrcException as e:
return False, str(e)
else:
logger.info("Could not validate given parameters!")
return False

View File

@@ -67,12 +67,14 @@ def update_recorder_models_database(drop: bool = False):
db.session.commit() db.session.commit()
r_as = get_defined_recorder_adapters() r_as = get_defined_recorder_adapters()
for r_a in r_as: for r_a in r_as:
if r_a.get('class') is None: # skip modules without class (helpers.py, e.g.)
continue
try: try:
r_m = RecorderModel.get_by_adapter_id(r_a["id"]) r_m = RecorderModel.get_by_adapter_id(r_a["id"])
model_checksum = calculate_md5_checksum(dumps(r_a["commands"])) model_checksum = calculate_md5_checksum(dumps(r_a["commands"]))
if r_m is None: if r_m is None:
r_m = RecorderModel(record_adapter_id=r_a["id"], model_name=r_a["name"], checksum=model_checksum, r_m = RecorderModel(record_adapter_id=r_a["id"], model_name=r_a["name"], checksum=model_checksum,
**r_a.get('class', {}).get_recorder_params()) **r_a.get('class').get_recorder_params())
db.session.add(r_m) db.session.add(r_m)
db.session.flush() db.session.flush()
db.session.refresh(r_m) db.session.refresh(r_m)