From 186614bc4a52326e492a14dad537753f9e0b41e0 Mon Sep 17 00:00:00 2001 From: Tobias Kurze Date: Mon, 5 Aug 2019 13:41:12 +0200 Subject: [PATCH] added virtual commands model --- api/recorder_api.py | 48 +----- config.py | 7 + models/recorder_model.py | 9 +- models/user_model.py | 8 +- models/virtual_command_model.py | 94 ++++++++++++ recorder_adapters/__init__.py | 58 ++++++- recorder_adapters/extron_smp.py | 260 ++++++++++++++++---------------- 7 files changed, 306 insertions(+), 178 deletions(-) create mode 100644 models/virtual_command_model.py diff --git a/api/recorder_api.py b/api/recorder_api.py index ce1f447..e5da45f 100644 --- a/api/recorder_api.py +++ b/api/recorder_api.py @@ -5,6 +5,7 @@ For example: listing of available auth providers or registration of users. Login through API does not start a new session, but instead returns JWT. """ +import inspect import pkgutil from flask_jwt_extended import jwt_required @@ -125,9 +126,6 @@ class RecorderList(Resource): return recorder -# == - - recorder_model_model = api_recorder.model('Recorder Model', { 'id': fields.String(required=False, description='The recorder model\'s identifier'), 'name': fields.String(required=True, description='The recorder model\'s name'), @@ -159,17 +157,6 @@ class RecorderModelResource(Resource): return recorder_model api_recorder.abort(404) - @jwt_required - @api_recorder.doc('delete_recorder_model') - @api_recorder.response(204, 'Recorder model deleted') - def delete(self, id): - """Delete a recorder model given its identifier""" - recorder_model = RecorderModel.query.get(id) - if recorder_model is not None: - db.session.delete(recorder_model) - db.session.commit() - return '', 204 - api_recorder.abort(404) @jwt_required @api_recorder.doc('update_recorder_model') @@ -186,21 +173,11 @@ class RecorderModelResource(Resource): @api_recorder.route('/model') class RecorderModelList(Resource): - @jwt_required + #@jwt_required @api_recorder.doc('recorders') @api_recorder.marshal_list_with(recorder_model_model) def get(self): - models = [] - found_packages = list(pkgutil.iter_modules(r_a.__path__)) - importer = found_packages[0][0] - for f_p in found_packages: - importer = f_p[0] - rec_model_module = importer.find_module(f_p[1]).load_module(f_p[1]) - rec_model = {'id': f_p[1], 'name': f_p[1]} - if 'RECORDER_MODEL_NAME' in rec_model_module: - rec_model['name'] = rec_model_module.RECORDER_MODEL_NAME - models.append(rec_model) - print(models) + models = r_a.get_defined_recorder_adapters() return models """ List all recorder models @@ -208,25 +185,6 @@ class RecorderModelList(Resource): """ return Recorder.get_all() - @jwt_required - @api_recorder.doc('create_recorder') - @api_recorder.expect(recorder_model_model) - @api_recorder.marshal_with(recorder_model_model, code=201) - def post(self): - if "recorders" in api_recorder.payload: - if api_recorder.payload["recorders"] is None or len(api_recorder.payload["recorders"]) < 1 : - api_recorder.payload["recorders"] = [] - else: - rec_model = RecorderModel.query.get(api_recorder.payload["recorder_model_id"]) - if rec_model is not None: - api_recorder.payload["recorder_model"] = rec_model - else: - return "specified recorder model (id: {}) does not exist!".format(api_recorder.payload["recorder_model_id"]), 404 - recorder = Recorder(**api_recorder.payload) - db.session.add(recorder) - db.session.commit() - return recorder - # == diff --git a/config.py b/config.py index f8ecac8..bd6b146 100644 --- a/config.py +++ b/config.py @@ -74,6 +74,13 @@ class Config(): INDEX_TEMPLATE = "index.html" + PERMISSIONS = ["RECODER_NEW", "RECORDER_EDIT", "RECODER_SHOW", "RECORDER_DELETE", + "RECORDER_COMMAND_EXECUTE", "RECORDER_COMMAND_EDIT_ACL", + "VIRTUAL_COMMAND_CREATE", "VIRTUAL_COMMAND_EDIT", "VIRTUAL_COMMAND_SHOW", "VIRTUAL_COMMAND_DELETE", + "CRON_JOB_CREATE", "CRON_JOB_EDIT", "CRON_JOB_SHOW", "CRON_JOB_DELETE"] + + GROUPS = ["Admins", "ZML", "read_only"] + class ProductionConfig(Config): DATABASE_URI = 'mysql://user@localhost/foo' diff --git a/models/recorder_model.py b/models/recorder_model.py index 033f410..a60b926 100644 --- a/models/recorder_model.py +++ b/models/recorder_model.py @@ -11,6 +11,7 @@ import re from sqlalchemy import or_ from datetime import datetime, timedelta +from backend.models.virtual_command_model import virtual_command_recorder_command_table metadata = MetaData() @@ -32,6 +33,7 @@ recorder_model_recorder_command_table = db.Table('recorder_model_recorder_comman class RecorderModel(db.Model): id = db.Column(db.Integer, autoincrement=True, primary_key=True) model_name = db.Column(db.Unicode(63), unique=True, nullable=False) + model_checksum = db.Column(db.String(63), unique=True, nullable=True) notes = db.Column(db.Unicode(255), unique=False, nullable=True, default=None) recorder_commands = db.relationship('RecorderCommand', secondary=recorder_model_recorder_command_table, back_populates='recorder_models') @@ -49,9 +51,9 @@ class Recorder(db.Model): network_name = db.Column(db.String(127), unique=True, nullable=True, default=None) telnet_port = db.Column(db.Integer, unique=False, nullable=False, default=23) ssh_port = db.Column(db.Integer, unique=False, nullable=False, default=22) - use_telnet_instead_ssh = db.Column(db.Boolean, nullable=False, default=False) recorder_model_id = db.Column(db.Integer, db.ForeignKey('recorder_model.id')) recorder_model = db.relationship('RecorderModel', back_populates='recorders') + virtual_commands = db.relationship('VirtualCommand', back_populates='recorders') def __init__(self, **kwargs): super(Recorder, self).__init__(**kwargs) @@ -90,6 +92,9 @@ class RecorderCommand(db.Model): id = db.Column(db.Integer, autoincrement=True, primary_key=True) name = db.Column(db.Unicode(63), unique=True, nullable=False) description = db.Column(db.Unicode(511), nullable=True, default=None) - command = db.Column(db.String(2047), nullable=False) + parameters = db.Column(db.String(2047), nullable=True) recorder_models = db.relationship('RecorderModel', secondary=recorder_model_recorder_command_table, back_populates='recorder_commands') + + virtual_commands = db.relationship('VirtualCommand', secondary=virtual_command_recorder_command_table, + back_populates='recorder_commands') diff --git a/models/user_model.py b/models/user_model.py index 8b135a4..f52d3fb 100644 --- a/models/user_model.py +++ b/models/user_model.py @@ -13,7 +13,7 @@ from backend.models.example_model import ExampleDataItem import re import jwt from flask_login import UserMixin -from sqlalchemy import or_ +from sqlalchemy import or_, event from datetime import datetime, timedelta from passlib.hash import sha256_crypt from hashlib import md5 @@ -455,3 +455,9 @@ class Permission(db.Model): description = db.Column(db.Unicode(511)) groups = db.relationship(Group, secondary=group_permission_table, back_populates='permissions') + + +@event.listens_for(Permission.__table__, 'after_create') +def insert_initial_permissions(*args, **kwargs): + for p in app.config.get("PERMISSIONS", []): + db.session.add(p) diff --git a/models/virtual_command_model.py b/models/virtual_command_model.py new file mode 100644 index 0000000..167890d --- /dev/null +++ b/models/virtual_command_model.py @@ -0,0 +1,94 @@ +import json +from datetime import datetime + +from sqlalchemy.ext.hybrid import hybrid_property + +from backend import db + +# This is the association table for the many-to-many relationship between +# virtual commands and recorder commands. +virtual_command_recorder_command_table = db.Table('virtual_command_recorder_command', + db.Column('virtual_command_id', db.Integer, + db.ForeignKey('virtual_command.id', + onupdate="CASCADE", + ondelete="CASCADE"), + primary_key=True), + db.Column('recorder_command_id', db.Integer, + db.ForeignKey('recorder_command.id', + onupdate="CASCADE", + ondelete="CASCADE"), + primary_key=True)) + + +# This is the association table for the many-to-many relationship between +# virtual commands and recorder commands. +virtual_command_recorder_table = db.Table('virtual_command_recorder', + db.Column('virtual_command_id', db.Integer, + db.ForeignKey('virtual_command.id', + onupdate="CASCADE", + ondelete="CASCADE"), + primary_key=True), + db.Column('recorder_id', db.Integer, + db.ForeignKey('recorder.id', + onupdate="CASCADE", + ondelete="CASCADE"), + primary_key=True)) + + +class VirtualCommand(db.Model): + id = db.Column(db.Integer, autoincrement=True, primary_key=True) + created_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow()) + name = db.Column(db.Unicode(63), unique=True, nullable=False) + description = db.Column(db.Unicode(255), unique=False, nullable=True, default="") + + recorders = db.relationship('Recorder', secondary=virtual_command_recorder_table, + back_populates='virtual_commands') + + recorder_commands = db.relationship('RecorderCommand', secondary=virtual_command_recorder_command_table, + back_populates='virtual_commands') + + parent_virtual_command = db.relationship('VirtualCommand', back_populates='child_virtual_commands', + nullable=True, default=None) + child_virtual_commands = db.relationship('VirtualCommand', back_populates='parent_virtual_command') + + command_order_string = db.Column(db.String) + + def __init__(self, **kwargs): + super(VirtualCommand, self).__init__(**kwargs) + + @staticmethod + def get_by_name(name): + """ + Find group by name + :param name: + :return: + """ + return VirtualCommand.query.filter(VirtualCommand.name == name).first() + + @staticmethod + def get_all(): + """ + Return all groups + :return: + """ + return VirtualCommand.query.all() + + @hybrid_property + def command_order(self): + if self.command_order_string is None: + return [] + return self.command_order_string.split() + + @command_order.setter + def command_order(self, value: list): + pass + + def __str__(self): + return self.name + + def to_dict(self): + return dict(id=self.id, name=self.name) + + def toJSON(self): + return json.dumps(self.to_dict(), default=lambda o: o.__dict__, + sort_keys=True, indent=4) diff --git a/recorder_adapters/__init__.py b/recorder_adapters/__init__.py index af780a8..8df01cf 100644 --- a/recorder_adapters/__init__.py +++ b/recorder_adapters/__init__.py @@ -1,7 +1,12 @@ +import inspect +import pkgutil +import sys import telnetlib from abc import ABC, abstractmethod # monkey patching of telnet lib +from pprint import pprint + original_read_until = telnetlib.Telnet.read_until original_write = telnetlib.Telnet.write @@ -63,14 +68,14 @@ class TelnetAdapter(ABC): self.esc_char = esc_char @abstractmethod - def login(self): + def _login(self): pass - def run_cmd(self, cmd, timeout=1, auto_connect=True): + def _run_cmd(self, cmd, timeout=1, auto_connect=True): if self.tn is None and not auto_connect: raise Exception("Not connected!") elif self.tn is None: - self.login() + self._login() self.tn.write(cmd) out = self.tn.read_until_non_empty_line() res = out @@ -81,8 +86,53 @@ class TelnetAdapter(ABC): return res @staticmethod - def get_response_str(tn_response): + def _get_response_str(tn_response): if isinstance(tn_response, bytes): return str(tn_response.decode("ascii").rstrip()) else: return str(tn_response).rstrip() + + +class RecorderAdapter: + @abstractmethod + def _get_name(self): + pass + + @abstractmethod + def _get_version(self): + pass + + +def get_defined_recorder_adapters(): + models = [] + found_packages = list(pkgutil.iter_modules(sys.modules[__name__].__path__)) + for f_p in found_packages: + importer = f_p[0] + rec_model_module = importer.find_module(f_p[1]).load_module(f_p[1]) + rec_model = {'id': f_p[1], 'name': f_p[1], 'commands': {}} + if hasattr(rec_model_module, 'RECORDER_MODEL_NAME'): + rec_model['name'] = rec_model_module.RECORDER_MODEL_NAME + for name, obj in inspect.getmembers(rec_model_module, inspect.isclass): + if issubclass(obj, RecorderAdapter): + commands = {} + for method_name, method in inspect.getmembers(obj, predicate=inspect.isfunction): + if len(method_name) > 0 and "_" == method_name[0]: + continue + signature = inspect.signature(method) + parameters = {} + for params in signature.parameters: + if params == "self": + continue + param_type = signature.parameters[params].annotation.__name__ + param_type = "_unknown_type" if param_type == "_empty" else param_type + parameters[signature.parameters[params].name] = param_type + if len(parameters) <= 0: + parameters = None + commands[method_name] = parameters + rec_model["commands"] = commands + models.append(rec_model) + pprint(models) + return models + + +get_defined_recorder_adapters() diff --git a/recorder_adapters/extron_smp.py b/recorder_adapters/extron_smp.py index 859f4df..dd3bd95 100644 --- a/recorder_adapters/extron_smp.py +++ b/recorder_adapters/extron_smp.py @@ -1,9 +1,7 @@ -import getpass -import sys -from abc import ABC, abstractmethod -from backend.recorder_adapters import telnetlib, TelnetAdapter +from backend.recorder_adapters import telnetlib, TelnetAdapter, RecorderAdapter RECORDER_MODEL_NAME = "SMP 351 / 352" +VERSION = "0.9.0" # HOST = "localhost" # HOST = "129.13.51.102" # Audimax SMP 351 @@ -14,12 +12,12 @@ USER = "admin" PW = "123mzsmp" -class SMP(TelnetAdapter): +class SMP(TelnetAdapter, RecorderAdapter): def __init__(self, address, admin_password): super().__init__(address) self.admin_pw = admin_password - def login(self): + def _login(self): self.tn = telnetlib.Telnet(HOST) self.tn.read_until("\r\nPassword:") # password = getpass.getpass() @@ -39,6 +37,12 @@ class SMP(TelnetAdapter): raise Exception("Could not login as administrator with given pw!") print("OK, we have admin rights!") + def _get_name(self): + return RECORDER_MODEL_NAME + + def _get_version(self): + return VERSION + def get_version(self, include_build=False, verbose_info=False): if verbose_info: self.tn.write("0Q") @@ -48,76 +52,76 @@ class SMP(TelnetAdapter): else: self.tn.write("1Q\n") - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def get_bootstrap_version(self): self.tn.write("2Q") - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def get_factory_firmware_version(self): self.tn.write("3Q") - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def get_updated_firmware_version(self): self.tn.write("4Q") - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def get_part_number(self): self.tn.write("N") - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def get_model_name(self): self.tn.write("1I") - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def get_model_description(self): self.tn.write("2I") - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def get_system_memory_usage(self): self.tn.write("3I") - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def get_number_of_connected_users(self): self.tn.write("10I") - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def get_system_processer_usage(self): self.tn.write("11I") - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def get_system_processor_idle(self): self.tn.write("12I") - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def get_eth0_link_status(self): self.tn.write("13I") - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def get_file_transfer_config(self): self.tn.write("38I") - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def get_active_alarms(self): self.tn.write("39I") - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) - def set_unit_name(self, name): + def set_unit_name(self, name: str): # TODO: check name (must comply with internet host name standards) self.tn.write(self.esc_char + name + "CN\n") - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def reset_unit_name(self): self.tn.write(self.esc_char + " CN\n") - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def get_unit_name(self): self.tn.write(self.esc_char + "CN\n") - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def get_telnet_connections(self): self.tn.write(self.esc_char + "CC\n") - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def set_verbose_mode(self, mode: int): """ @@ -132,25 +136,27 @@ class SMP(TelnetAdapter): if mode not in range(4): raise Exception("Only values from 0 to 3 are allowed!") self.tn.write(self.esc_char + str(mode) + "CV\n") - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def get_verbose_mode(self): self.tn.write(self.esc_char + "CV\n") - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) + """ def save_configuration(self): pass def restore_configuration(self): pass + """ def reboot(self): self.tn.write(self.esc_char + "1BOOT\n") - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def restart_network(self): self.tn.write(self.esc_char + "2BOOT\n") - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def reset_flash(self): """ @@ -158,7 +164,7 @@ class SMP(TelnetAdapter): :return: """ self.tn.write(self.esc_char + "ZFFF\n") - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def system_reset(self): """ @@ -166,7 +172,7 @@ class SMP(TelnetAdapter): :return: """ self.tn.write(self.esc_char + "ZXXX\n") - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def reset_settings_and_delete_all_files(self): """ @@ -174,7 +180,7 @@ class SMP(TelnetAdapter): :return: """ self.tn.write(self.esc_char + "ZY\n") - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def absolute_reset(self): """ @@ -182,9 +188,9 @@ class SMP(TelnetAdapter): :return: """ self.tn.write(self.esc_char + "ZQQQ\n") - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) - def set_front_panel_lock(self, mode): + def set_front_panel_lock(self, mode: int): """ 0=Off 1=complete lockout (no front panel control) @@ -196,7 +202,7 @@ class SMP(TelnetAdapter): if mode not in range(4): raise Exception("Only values from 0 to 3 are allowed!") self.tn.write(str(mode) + "X\n") - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def get_front_panel_lock(self): """ @@ -208,13 +214,14 @@ class SMP(TelnetAdapter): :return: """ self.tn.write("X\n") - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) """ A lot of stuff related to network settings (ports of services, SNMP, IP, DHCP, etc.) Only some stuff will be implemented here! """ + """ def get_date_time(self): pass @@ -241,6 +248,7 @@ class SMP(TelnetAdapter): def get_dns_server_ip(self): pass + """ """ RS-232 / serial port related stuff not implemented. @@ -260,13 +268,13 @@ class SMP(TelnetAdapter): if channel_num not in range(1, 3): raise Exception("input_num must be a value between 1 and 2!") self.tn.write("{}*{}!\n".format(input_num, channel_num)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def get_input(self, channel_num: int): if channel_num not in range(1, 2): raise Exception("input_num must be a value between 1 and 2!") self.tn.write("{}!\n".format(channel_num)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def set_input_format(self, input_num: int, input_format: int): """ @@ -283,13 +291,13 @@ class SMP(TelnetAdapter): if input_format not in range(1, 4): raise Exception("input_num must be a value between 1 and 3!") self.tn.write("{}*{}\\\n".format(input_num, input_format)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def get_input_format(self, input_num: int): if input_num not in range(1, 6): raise Exception("input_num must be a value between 1 and 5!") self.tn.write("{}\\\n".format(input_num)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def set_input_name(self, input_num: int, input_name: str): if input_num not in range(1, 6): @@ -301,17 +309,17 @@ class SMP(TelnetAdapter): except UnicodeEncodeError: raise Exception("input_name must only contain ascii characters") self.tn.write("{}{},{}NI\n".format(self.esc_char, input_num, input_name)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def get_input_name(self, input_num: int): if input_num not in range(1, 6): raise Exception("input_num must be a value between 1 and 5!") self.tn.write("{}{}NI\n".format(self.esc_char, input_num)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def get_input_selction_per_channel(self): self.tn.write("32I\n") - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) """ Input configuration part skipped @@ -319,15 +327,15 @@ class SMP(TelnetAdapter): def stop_recording(self): self.tn.write("{}Y0RCDR\n".format(self.esc_char)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def start_recording(self): self.tn.write("{}Y1RCDR\n".format(self.esc_char)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def pause_recording(self): self.tn.write("{}Y2RCDR\n".format(self.esc_char)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def get_recording_status(self): """ @@ -338,7 +346,7 @@ class SMP(TelnetAdapter): :return: status """ self.tn.write("{}YRCDR\n".format(self.esc_char)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def extent_recording_time(self, extension_time: int): """ @@ -349,53 +357,53 @@ class SMP(TelnetAdapter): if extension_time not in range(0, 100): raise Exception("extension_time must be a value between 0 and 99!") self.tn.write("{}E{}RCDR\n".format(self.esc_char, extension_time)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def add_chapter_marker(self): self.tn.write("{}BRCDR\n".format(self.esc_char)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def swap_channel_positions(self): self.tn.write("%\n") - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def get_recording_status_text(self): self.tn.write("I\n") - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def get_elapsed_recording_time(self): self.tn.write("35I\n") - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def get_remaining_recording_time(self): self.tn.write("36I\n") - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def get_recording_destination(self): self.tn.write("37I\n") - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) """ Metadata part skipped """ - def recall_user_preset(self, channel_number, preset_number): + def recall_user_preset(self, channel_number: int, preset_number: int): if channel_number not in range(1, 3): raise Exception("channel_number must be a value between 1 and 2!") if preset_number not in range(1, 17): raise Exception("preset_number must be a value between 1 and 16!") self.tn.write("1*{}*{}.\n".format(channel_number, preset_number)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) - def save_user_preset(self, channel_number, preset_number): + def save_user_preset(self, channel_number: int, preset_number: int): if channel_number not in range(1, 3): raise Exception("channel_number must be a value between 1 and 2!") if preset_number not in range(1, 17): raise Exception("preset_number must be a value between 1 and 16!") self.tn.write("1*{}*{},\n".format(channel_number, preset_number)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) - def set_user_preset_name(self, preset_number, preset_name): + def set_user_preset_name(self, preset_number: int, preset_name: str): if preset_number not in range(1, 17): raise Exception("preset_number must be a value between 1 and 16!") if len(preset_name) > 16: @@ -405,38 +413,38 @@ class SMP(TelnetAdapter): except UnicodeEncodeError: raise Exception("preset_name must only contain ascii characters") self.tn.write("{}1*{},{}PNAM\n".format(self.esc_char, preset_number, preset_name)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) - def get_user_preset_name(self, preset_number): + def get_user_preset_name(self, preset_number: int): if preset_number not in range(1, 17): raise Exception("preset_number must be a value between 1 and 16!") self.tn.write("{}1*{}PNAM\n".format(self.esc_char, preset_number)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) - def get_user_presets(self, input_number): + def get_user_presets(self, input_number: int): if input_number not in range(1, 6): raise Exception("input_number must be a value between 1 and 5!") self.tn.write("52*{}#\n".format(input_number)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) # Input Presets - def recall_input_preset(self, channel_number, preset_number): + def recall_input_preset(self, channel_number: int, preset_number: int): if channel_number not in range(1, 3): raise Exception("channel_number must be a value between 1 and 2!") if preset_number not in range(1, 129): raise Exception("preset_number must be a value between 1 and 128!") self.tn.write("2*{}*{}.\n".format(channel_number, preset_number)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) - def save_input_preset(self, channel_number, preset_number): + def save_input_preset(self, channel_number: int, preset_number: int): if channel_number not in range(1, 3): raise Exception("channel_number must be a value between 1 and 2!") if preset_number not in range(1, 129): raise Exception("preset_number must be a value between 1 and 128!") self.tn.write("1*{}*{},\n".format(channel_number, preset_number)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) - def set_input_preset_name(self, preset_number, preset_name): + def set_input_preset_name(self, preset_number: int, preset_name: str): if preset_number not in range(1, 129): raise Exception("preset_number must be a value between 1 and 128!") if len(preset_name) > 16: @@ -446,26 +454,26 @@ class SMP(TelnetAdapter): except UnicodeEncodeError: raise Exception("preset_name must only contain ascii characters") self.tn.write("{}2*{},{}PNAM\n".format(self.esc_char, preset_number, preset_name)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) - def get_input_preset_name(self, preset_number): + def get_input_preset_name(self, preset_number: int): if preset_number not in range(1, 129): raise Exception("preset_number must be a value between 1 and 128!") self.tn.write("{}2*{}PNAM\n".format(self.esc_char, preset_number)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) - def delete_input_preset(self, preset_number): + def delete_input_preset(self, preset_number: int): if preset_number not in range(1, 129): raise Exception("preset_number must be a value between 1 and 128!") self.tn.write("{}X2*{}PRST\n".format(self.esc_char, preset_number)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def get_input_presets(self): self.tn.write("51#\n") - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) # Streaming Presets - def recall_streaming_preset(self, output_number, preset_number): + def recall_streaming_preset(self, output_number: int, preset_number: int): """ Output_number: 1 = Channel A @@ -480,9 +488,9 @@ class SMP(TelnetAdapter): if preset_number not in range(1, 17): raise Exception("preset_number must be a value between 1 and 16!") self.tn.write("3*{}*{}.\n".format(output_number, preset_number)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) - def save_streaming_preset(self, output_number, preset_number): + def save_streaming_preset(self, output_number: int, preset_number: int): """ Output_number: 1 = Channel A @@ -497,9 +505,9 @@ class SMP(TelnetAdapter): if preset_number not in range(1, 17): raise Exception("preset_number must be a value between 1 and 16!") self.tn.write("3*{}*{},\n".format(output_number, preset_number)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) - def set_streaming_preset_name(self, preset_number, preset_name): + def set_streaming_preset_name(self, preset_number: int, preset_name: str): if preset_number not in range(1, 17): raise Exception("preset_number must be a value between 1 and 16!") if len(preset_name) > 16: @@ -509,22 +517,22 @@ class SMP(TelnetAdapter): except UnicodeEncodeError: raise Exception("preset_name must only contain ascii characters") self.tn.write("{}3*{},{}PNAM\n".format(self.esc_char, preset_number, preset_name)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) - def get_streaming_preset_name(self, preset_number): + def get_streaming_preset_name(self, preset_number: int): if preset_number not in range(1, 17): raise Exception("preset_number must be a value between 1 and 16!") self.tn.write("{}3*{}PNAM\n".format(self.esc_char, preset_number)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) - def reset_streaming_preset_to_default(self, preset_number): + def reset_streaming_preset_to_default(self, preset_number: int): if preset_number not in range(1, 17): raise Exception("preset_number must be a value between 1 and 16!") self.tn.write("{}X3*{}PRST\n".format(self.esc_char, preset_number)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) # Encoder Presets - def recall_encoder_preset(self, output_number, preset_number): + def recall_encoder_preset(self, output_number: int, preset_number: int): """ Output_number: 1 = Channel A @@ -539,9 +547,9 @@ class SMP(TelnetAdapter): if preset_number not in range(1, 17): raise Exception("preset_number must be a value between 1 and 16!") self.tn.write("4*{}*{}.\n".format(output_number, preset_number)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) - def save_encoder_preset(self, output_number, preset_number): + def save_encoder_preset(self, output_number: int, preset_number: int): """ Output_number: 1 = Channel A @@ -556,9 +564,9 @@ class SMP(TelnetAdapter): if preset_number not in range(1, 17): raise Exception("preset_number must be a value between 1 and 16!") self.tn.write("4*{}*{},\n".format(output_number, preset_number)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) - def set_encoder_preset_name(self, preset_number, preset_name): + def set_encoder_preset_name(self, preset_number: int, preset_name: str): if preset_number not in range(1, 17): raise Exception("preset_number must be a value between 1 and 16!") if len(preset_name) > 16: @@ -568,37 +576,37 @@ class SMP(TelnetAdapter): except UnicodeEncodeError: raise Exception("preset_name must only contain ascii characters") self.tn.write("{}4*{},{}PNAM\n".format(self.esc_char, preset_number, preset_name)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) - def get_encoder_preset_name(self, preset_number): + def get_encoder_preset_name(self, preset_number: int): if preset_number not in range(1, 17): raise Exception("preset_number must be a value between 1 and 16!") self.tn.write("{}4*{}PNAM\n".format(self.esc_char, preset_number)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) - def reset_encoder_preset_to_default(self, preset_number): + def reset_encoder_preset_to_default(self, preset_number: int): if preset_number not in range(1, 17): raise Exception("preset_number must be a value between 1 and 16!") self.tn.write("{}X4*{}PRST\n".format(self.esc_char, preset_number)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) # Layout Presets - def save_layout_preset(self, preset_number): + def save_layout_preset(self, preset_number: int): if preset_number not in range(1, 17): raise Exception("preset_number must be a value between 1 and 16!") self.tn.write("7*{},\n".format(preset_number)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) - def recall_layout_preset(self, preset_number, include_input_selections=True): + def recall_layout_preset(self, preset_number: int, include_input_selections: bool = True): if preset_number not in range(1, 17): raise Exception("preset_number must be a value between 1 and 16!") if include_input_selections: self.tn.write("7*{}.\n".format(preset_number)) else: self.tn.write("8*{}.\n".format(preset_number)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) - def set_layout_preset_name(self, preset_number, preset_name): + def set_layout_preset_name(self, preset_number: int, preset_name: str): if preset_number not in range(1, 17): raise Exception("preset_number must be a value between 1 and 16!") if len(preset_name) > 16: @@ -608,26 +616,26 @@ class SMP(TelnetAdapter): except UnicodeEncodeError: raise Exception("preset_name must only contain ascii characters") self.tn.write("{}7*{},{}PNAM\n".format(self.esc_char, preset_number, preset_name)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) - def get_layout_preset_name(self, preset_number): + def get_layout_preset_name(self, preset_number: int): if preset_number not in range(1, 17): raise Exception("preset_number must be a value between 1 and 16!") self.tn.write("{}7*{}PNAM\n".format(self.esc_char, preset_number)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) - def reset_layout_preset_to_default(self, preset_number): + def reset_layout_preset_to_default(self, preset_number: int): if preset_number not in range(1, 17): raise Exception("preset_number must be a value between 1 and 16!") self.tn.write("{}X7*{}PRST\n".format(self.esc_char, preset_number)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) """ Input adjustments skipped Picture adjustments skipped """ - def mute_output(self, output_number): + def mute_output(self, output_number: int): """ Output_number: 1 = Channel A @@ -638,9 +646,9 @@ class SMP(TelnetAdapter): if output_number not in range(1, 3): raise Exception("output_number must be a value between 1 and 2!") self.tn.write("{}*1B\n".format(output_number)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) - def unmute_output(self, output_number): + def unmute_output(self, output_number: int): """ Output_number: 1 = Channel A @@ -651,9 +659,9 @@ class SMP(TelnetAdapter): if output_number not in range(1, 3): raise Exception("output_number must be a value between 1 and 2!") self.tn.write("{}*0B\n".format(output_number)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) - def is_muted(self, output_number): + def is_muted(self, output_number: int): """ Output_number: 1 = Channel A @@ -664,7 +672,7 @@ class SMP(TelnetAdapter): if output_number not in range(1, 3): raise Exception("output_number must be a value between 1 and 2!") self.tn.write("{}B\n".format(output_number)) - return int(TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())) > 0 + return int(TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())) > 0 """ EDID skipped @@ -672,7 +680,7 @@ class SMP(TelnetAdapter): some advanced options skipped """ - def get_input_hdcp_status(self, input_number): + def get_input_hdcp_status(self, input_number: int): """ returns: 0 = no sink / source detected @@ -684,57 +692,57 @@ class SMP(TelnetAdapter): if input_number not in range(1, 6): raise Exception("input_number must be a value between 1 and 6!") self.tn.write("{}I{}HDCP\n".format(self.esc_char, input_number)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) - def set_input_authorization_hdcp_on(self, input_number): + def set_input_authorization_hdcp_on(self, input_number: int): if input_number not in range(1, 6): raise Exception("input_number must be a value between 1 and 6!") self.tn.write("{}E{}*1HDCP\n".format(self.esc_char, input_number)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) - def set_input_authorization_hdcp_off(self, input_number): + def set_input_authorization_hdcp_off(self, input_number: int): if input_number not in range(1, 6): raise Exception("input_number must be a value between 1 and 6!") self.tn.write("{}E{}*0HDCP\n".format(self.esc_char, input_number)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) - def get_input_authorization_hdcp_status(self, input_number): + def get_input_authorization_hdcp_status(self, input_number: int): if input_number not in range(1, 6): raise Exception("input_number must be a value between 1 and 6!") self.tn.write("{}E{}HDCP\n".format(self.esc_char, input_number)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def enable_hdcp_notification(self): self.tn.write("{}N1HDCP\n".format(self.esc_char)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def disable_hdcp_notification(self): self.tn.write("{}N0HDCP\n".format(self.esc_char)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def get_hdcp_notification_status(self): self.tn.write("{}NHDCP\n".format(self.esc_char)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) # background image settings def set_background_image(self, filename: str): self.tn.write("{}{}RF\n".format(self.esc_char, filename)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def get_background_image_filename(self): self.tn.write("{}RF\n".format(self.esc_char)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def mute_background_image(self): self.tn.write("{}0RF\n".format(self.esc_char)) - return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def main(): smp = SMP(HOST, PW) print(smp) - smp.login() + smp._login() print(smp.get_version(verbose_info=False)) print(smp.get_bootstrap_version())