added virtual commands model

This commit is contained in:
Tobias Kurze
2019-08-05 13:41:12 +02:00
parent 51536766bf
commit 186614bc4a
7 changed files with 306 additions and 178 deletions

View File

@@ -5,6 +5,7 @@ For example: listing of available auth providers or registration of users.
Login through API does not start a new session, but instead returns JWT.
"""
import inspect
import pkgutil
from flask_jwt_extended import jwt_required
@@ -125,9 +126,6 @@ class RecorderList(Resource):
return recorder
# ==
recorder_model_model = api_recorder.model('Recorder Model', {
'id': fields.String(required=False, description='The recorder model\'s identifier'),
'name': fields.String(required=True, description='The recorder model\'s name'),
@@ -159,17 +157,6 @@ class RecorderModelResource(Resource):
return recorder_model
api_recorder.abort(404)
@jwt_required
@api_recorder.doc('delete_recorder_model')
@api_recorder.response(204, 'Recorder model deleted')
def delete(self, id):
"""Delete a recorder model given its identifier"""
recorder_model = RecorderModel.query.get(id)
if recorder_model is not None:
db.session.delete(recorder_model)
db.session.commit()
return '', 204
api_recorder.abort(404)
@jwt_required
@api_recorder.doc('update_recorder_model')
@@ -186,21 +173,11 @@ class RecorderModelResource(Resource):
@api_recorder.route('/model')
class RecorderModelList(Resource):
@jwt_required
#@jwt_required
@api_recorder.doc('recorders')
@api_recorder.marshal_list_with(recorder_model_model)
def get(self):
models = []
found_packages = list(pkgutil.iter_modules(r_a.__path__))
importer = found_packages[0][0]
for f_p in found_packages:
importer = f_p[0]
rec_model_module = importer.find_module(f_p[1]).load_module(f_p[1])
rec_model = {'id': f_p[1], 'name': f_p[1]}
if 'RECORDER_MODEL_NAME' in rec_model_module:
rec_model['name'] = rec_model_module.RECORDER_MODEL_NAME
models.append(rec_model)
print(models)
models = r_a.get_defined_recorder_adapters()
return models
"""
List all recorder models
@@ -208,25 +185,6 @@ class RecorderModelList(Resource):
"""
return Recorder.get_all()
@jwt_required
@api_recorder.doc('create_recorder')
@api_recorder.expect(recorder_model_model)
@api_recorder.marshal_with(recorder_model_model, code=201)
def post(self):
if "recorders" in api_recorder.payload:
if api_recorder.payload["recorders"] is None or len(api_recorder.payload["recorders"]) < 1 :
api_recorder.payload["recorders"] = []
else:
rec_model = RecorderModel.query.get(api_recorder.payload["recorder_model_id"])
if rec_model is not None:
api_recorder.payload["recorder_model"] = rec_model
else:
return "specified recorder model (id: {}) does not exist!".format(api_recorder.payload["recorder_model_id"]), 404
recorder = Recorder(**api_recorder.payload)
db.session.add(recorder)
db.session.commit()
return recorder
# ==

View File

@@ -74,6 +74,13 @@ class Config():
INDEX_TEMPLATE = "index.html"
PERMISSIONS = ["RECODER_NEW", "RECORDER_EDIT", "RECODER_SHOW", "RECORDER_DELETE",
"RECORDER_COMMAND_EXECUTE", "RECORDER_COMMAND_EDIT_ACL",
"VIRTUAL_COMMAND_CREATE", "VIRTUAL_COMMAND_EDIT", "VIRTUAL_COMMAND_SHOW", "VIRTUAL_COMMAND_DELETE",
"CRON_JOB_CREATE", "CRON_JOB_EDIT", "CRON_JOB_SHOW", "CRON_JOB_DELETE"]
GROUPS = ["Admins", "ZML", "read_only"]
class ProductionConfig(Config):
DATABASE_URI = 'mysql://user@localhost/foo'

View File

@@ -11,6 +11,7 @@ import re
from sqlalchemy import or_
from datetime import datetime, timedelta
from backend.models.virtual_command_model import virtual_command_recorder_command_table
metadata = MetaData()
@@ -32,6 +33,7 @@ recorder_model_recorder_command_table = db.Table('recorder_model_recorder_comman
class RecorderModel(db.Model):
id = db.Column(db.Integer, autoincrement=True, primary_key=True)
model_name = db.Column(db.Unicode(63), unique=True, nullable=False)
model_checksum = db.Column(db.String(63), unique=True, nullable=True)
notes = db.Column(db.Unicode(255), unique=False, nullable=True, default=None)
recorder_commands = db.relationship('RecorderCommand', secondary=recorder_model_recorder_command_table,
back_populates='recorder_models')
@@ -49,9 +51,9 @@ class Recorder(db.Model):
network_name = db.Column(db.String(127), unique=True, nullable=True, default=None)
telnet_port = db.Column(db.Integer, unique=False, nullable=False, default=23)
ssh_port = db.Column(db.Integer, unique=False, nullable=False, default=22)
use_telnet_instead_ssh = db.Column(db.Boolean, nullable=False, default=False)
recorder_model_id = db.Column(db.Integer, db.ForeignKey('recorder_model.id'))
recorder_model = db.relationship('RecorderModel', back_populates='recorders')
virtual_commands = db.relationship('VirtualCommand', back_populates='recorders')
def __init__(self, **kwargs):
super(Recorder, self).__init__(**kwargs)
@@ -90,6 +92,9 @@ class RecorderCommand(db.Model):
id = db.Column(db.Integer, autoincrement=True, primary_key=True)
name = db.Column(db.Unicode(63), unique=True, nullable=False)
description = db.Column(db.Unicode(511), nullable=True, default=None)
command = db.Column(db.String(2047), nullable=False)
parameters = db.Column(db.String(2047), nullable=True)
recorder_models = db.relationship('RecorderModel', secondary=recorder_model_recorder_command_table,
back_populates='recorder_commands')
virtual_commands = db.relationship('VirtualCommand', secondary=virtual_command_recorder_command_table,
back_populates='recorder_commands')

View File

@@ -13,7 +13,7 @@ from backend.models.example_model import ExampleDataItem
import re
import jwt
from flask_login import UserMixin
from sqlalchemy import or_
from sqlalchemy import or_, event
from datetime import datetime, timedelta
from passlib.hash import sha256_crypt
from hashlib import md5
@@ -455,3 +455,9 @@ class Permission(db.Model):
description = db.Column(db.Unicode(511))
groups = db.relationship(Group, secondary=group_permission_table,
back_populates='permissions')
@event.listens_for(Permission.__table__, 'after_create')
def insert_initial_permissions(*args, **kwargs):
for p in app.config.get("PERMISSIONS", []):
db.session.add(p)

View File

@@ -0,0 +1,94 @@
import json
from datetime import datetime
from sqlalchemy.ext.hybrid import hybrid_property
from backend import db
# This is the association table for the many-to-many relationship between
# virtual commands and recorder commands.
virtual_command_recorder_command_table = db.Table('virtual_command_recorder_command',
db.Column('virtual_command_id', db.Integer,
db.ForeignKey('virtual_command.id',
onupdate="CASCADE",
ondelete="CASCADE"),
primary_key=True),
db.Column('recorder_command_id', db.Integer,
db.ForeignKey('recorder_command.id',
onupdate="CASCADE",
ondelete="CASCADE"),
primary_key=True))
# This is the association table for the many-to-many relationship between
# virtual commands and recorder commands.
virtual_command_recorder_table = db.Table('virtual_command_recorder',
db.Column('virtual_command_id', db.Integer,
db.ForeignKey('virtual_command.id',
onupdate="CASCADE",
ondelete="CASCADE"),
primary_key=True),
db.Column('recorder_id', db.Integer,
db.ForeignKey('recorder.id',
onupdate="CASCADE",
ondelete="CASCADE"),
primary_key=True))
class VirtualCommand(db.Model):
id = db.Column(db.Integer, autoincrement=True, primary_key=True)
created_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow())
name = db.Column(db.Unicode(63), unique=True, nullable=False)
description = db.Column(db.Unicode(255), unique=False, nullable=True, default="")
recorders = db.relationship('Recorder', secondary=virtual_command_recorder_table,
back_populates='virtual_commands')
recorder_commands = db.relationship('RecorderCommand', secondary=virtual_command_recorder_command_table,
back_populates='virtual_commands')
parent_virtual_command = db.relationship('VirtualCommand', back_populates='child_virtual_commands',
nullable=True, default=None)
child_virtual_commands = db.relationship('VirtualCommand', back_populates='parent_virtual_command')
command_order_string = db.Column(db.String)
def __init__(self, **kwargs):
super(VirtualCommand, self).__init__(**kwargs)
@staticmethod
def get_by_name(name):
"""
Find group by name
:param name:
:return:
"""
return VirtualCommand.query.filter(VirtualCommand.name == name).first()
@staticmethod
def get_all():
"""
Return all groups
:return:
"""
return VirtualCommand.query.all()
@hybrid_property
def command_order(self):
if self.command_order_string is None:
return []
return self.command_order_string.split()
@command_order.setter
def command_order(self, value: list):
pass
def __str__(self):
return self.name
def to_dict(self):
return dict(id=self.id, name=self.name)
def toJSON(self):
return json.dumps(self.to_dict(), default=lambda o: o.__dict__,
sort_keys=True, indent=4)

View File

@@ -1,7 +1,12 @@
import inspect
import pkgutil
import sys
import telnetlib
from abc import ABC, abstractmethod
# monkey patching of telnet lib
from pprint import pprint
original_read_until = telnetlib.Telnet.read_until
original_write = telnetlib.Telnet.write
@@ -63,14 +68,14 @@ class TelnetAdapter(ABC):
self.esc_char = esc_char
@abstractmethod
def login(self):
def _login(self):
pass
def run_cmd(self, cmd, timeout=1, auto_connect=True):
def _run_cmd(self, cmd, timeout=1, auto_connect=True):
if self.tn is None and not auto_connect:
raise Exception("Not connected!")
elif self.tn is None:
self.login()
self._login()
self.tn.write(cmd)
out = self.tn.read_until_non_empty_line()
res = out
@@ -81,8 +86,53 @@ class TelnetAdapter(ABC):
return res
@staticmethod
def get_response_str(tn_response):
def _get_response_str(tn_response):
if isinstance(tn_response, bytes):
return str(tn_response.decode("ascii").rstrip())
else:
return str(tn_response).rstrip()
class RecorderAdapter:
@abstractmethod
def _get_name(self):
pass
@abstractmethod
def _get_version(self):
pass
def get_defined_recorder_adapters():
models = []
found_packages = list(pkgutil.iter_modules(sys.modules[__name__].__path__))
for f_p in found_packages:
importer = f_p[0]
rec_model_module = importer.find_module(f_p[1]).load_module(f_p[1])
rec_model = {'id': f_p[1], 'name': f_p[1], 'commands': {}}
if hasattr(rec_model_module, 'RECORDER_MODEL_NAME'):
rec_model['name'] = rec_model_module.RECORDER_MODEL_NAME
for name, obj in inspect.getmembers(rec_model_module, inspect.isclass):
if issubclass(obj, RecorderAdapter):
commands = {}
for method_name, method in inspect.getmembers(obj, predicate=inspect.isfunction):
if len(method_name) > 0 and "_" == method_name[0]:
continue
signature = inspect.signature(method)
parameters = {}
for params in signature.parameters:
if params == "self":
continue
param_type = signature.parameters[params].annotation.__name__
param_type = "_unknown_type" if param_type == "_empty" else param_type
parameters[signature.parameters[params].name] = param_type
if len(parameters) <= 0:
parameters = None
commands[method_name] = parameters
rec_model["commands"] = commands
models.append(rec_model)
pprint(models)
return models
get_defined_recorder_adapters()

View File

@@ -1,9 +1,7 @@
import getpass
import sys
from abc import ABC, abstractmethod
from backend.recorder_adapters import telnetlib, TelnetAdapter
from backend.recorder_adapters import telnetlib, TelnetAdapter, RecorderAdapter
RECORDER_MODEL_NAME = "SMP 351 / 352"
VERSION = "0.9.0"
# HOST = "localhost"
# HOST = "129.13.51.102" # Audimax SMP 351
@@ -14,12 +12,12 @@ USER = "admin"
PW = "123mzsmp"
class SMP(TelnetAdapter):
class SMP(TelnetAdapter, RecorderAdapter):
def __init__(self, address, admin_password):
super().__init__(address)
self.admin_pw = admin_password
def login(self):
def _login(self):
self.tn = telnetlib.Telnet(HOST)
self.tn.read_until("\r\nPassword:")
# password = getpass.getpass()
@@ -39,6 +37,12 @@ class SMP(TelnetAdapter):
raise Exception("Could not login as administrator with given pw!")
print("OK, we have admin rights!")
def _get_name(self):
return RECORDER_MODEL_NAME
def _get_version(self):
return VERSION
def get_version(self, include_build=False, verbose_info=False):
if verbose_info:
self.tn.write("0Q")
@@ -48,76 +52,76 @@ class SMP(TelnetAdapter):
else:
self.tn.write("1Q\n")
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_bootstrap_version(self):
self.tn.write("2Q")
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_factory_firmware_version(self):
self.tn.write("3Q")
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_updated_firmware_version(self):
self.tn.write("4Q")
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_part_number(self):
self.tn.write("N")
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_model_name(self):
self.tn.write("1I")
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_model_description(self):
self.tn.write("2I")
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_system_memory_usage(self):
self.tn.write("3I")
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_number_of_connected_users(self):
self.tn.write("10I")
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_system_processer_usage(self):
self.tn.write("11I")
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_system_processor_idle(self):
self.tn.write("12I")
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_eth0_link_status(self):
self.tn.write("13I")
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_file_transfer_config(self):
self.tn.write("38I")
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_active_alarms(self):
self.tn.write("39I")
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def set_unit_name(self, name):
def set_unit_name(self, name: str):
# TODO: check name (must comply with internet host name standards)
self.tn.write(self.esc_char + name + "CN\n")
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def reset_unit_name(self):
self.tn.write(self.esc_char + " CN\n")
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_unit_name(self):
self.tn.write(self.esc_char + "CN\n")
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_telnet_connections(self):
self.tn.write(self.esc_char + "CC\n")
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def set_verbose_mode(self, mode: int):
"""
@@ -132,25 +136,27 @@ class SMP(TelnetAdapter):
if mode not in range(4):
raise Exception("Only values from 0 to 3 are allowed!")
self.tn.write(self.esc_char + str(mode) + "CV\n")
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_verbose_mode(self):
self.tn.write(self.esc_char + "CV\n")
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
"""
def save_configuration(self):
pass
def restore_configuration(self):
pass
"""
def reboot(self):
self.tn.write(self.esc_char + "1BOOT\n")
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def restart_network(self):
self.tn.write(self.esc_char + "2BOOT\n")
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def reset_flash(self):
"""
@@ -158,7 +164,7 @@ class SMP(TelnetAdapter):
:return:
"""
self.tn.write(self.esc_char + "ZFFF\n")
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def system_reset(self):
"""
@@ -166,7 +172,7 @@ class SMP(TelnetAdapter):
:return:
"""
self.tn.write(self.esc_char + "ZXXX\n")
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def reset_settings_and_delete_all_files(self):
"""
@@ -174,7 +180,7 @@ class SMP(TelnetAdapter):
:return:
"""
self.tn.write(self.esc_char + "ZY\n")
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def absolute_reset(self):
"""
@@ -182,9 +188,9 @@ class SMP(TelnetAdapter):
:return:
"""
self.tn.write(self.esc_char + "ZQQQ\n")
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def set_front_panel_lock(self, mode):
def set_front_panel_lock(self, mode: int):
"""
0=Off
1=complete lockout (no front panel control)
@@ -196,7 +202,7 @@ class SMP(TelnetAdapter):
if mode not in range(4):
raise Exception("Only values from 0 to 3 are allowed!")
self.tn.write(str(mode) + "X\n")
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_front_panel_lock(self):
"""
@@ -208,13 +214,14 @@ class SMP(TelnetAdapter):
:return:
"""
self.tn.write("X\n")
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
"""
A lot of stuff related to network settings (ports of services, SNMP, IP, DHCP, etc.)
Only some stuff will be implemented here!
"""
"""
def get_date_time(self):
pass
@@ -241,6 +248,7 @@ class SMP(TelnetAdapter):
def get_dns_server_ip(self):
pass
"""
"""
RS-232 / serial port related stuff not implemented.
@@ -260,13 +268,13 @@ class SMP(TelnetAdapter):
if channel_num not in range(1, 3):
raise Exception("input_num must be a value between 1 and 2!")
self.tn.write("{}*{}!\n".format(input_num, channel_num))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_input(self, channel_num: int):
if channel_num not in range(1, 2):
raise Exception("input_num must be a value between 1 and 2!")
self.tn.write("{}!\n".format(channel_num))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def set_input_format(self, input_num: int, input_format: int):
"""
@@ -283,13 +291,13 @@ class SMP(TelnetAdapter):
if input_format not in range(1, 4):
raise Exception("input_num must be a value between 1 and 3!")
self.tn.write("{}*{}\\\n".format(input_num, input_format))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_input_format(self, input_num: int):
if input_num not in range(1, 6):
raise Exception("input_num must be a value between 1 and 5!")
self.tn.write("{}\\\n".format(input_num))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def set_input_name(self, input_num: int, input_name: str):
if input_num not in range(1, 6):
@@ -301,17 +309,17 @@ class SMP(TelnetAdapter):
except UnicodeEncodeError:
raise Exception("input_name must only contain ascii characters")
self.tn.write("{}{},{}NI\n".format(self.esc_char, input_num, input_name))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_input_name(self, input_num: int):
if input_num not in range(1, 6):
raise Exception("input_num must be a value between 1 and 5!")
self.tn.write("{}{}NI\n".format(self.esc_char, input_num))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_input_selction_per_channel(self):
self.tn.write("32I\n")
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
"""
Input configuration part skipped
@@ -319,15 +327,15 @@ class SMP(TelnetAdapter):
def stop_recording(self):
self.tn.write("{}Y0RCDR\n".format(self.esc_char))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def start_recording(self):
self.tn.write("{}Y1RCDR\n".format(self.esc_char))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def pause_recording(self):
self.tn.write("{}Y2RCDR\n".format(self.esc_char))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_recording_status(self):
"""
@@ -338,7 +346,7 @@ class SMP(TelnetAdapter):
:return: status
"""
self.tn.write("{}YRCDR\n".format(self.esc_char))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def extent_recording_time(self, extension_time: int):
"""
@@ -349,53 +357,53 @@ class SMP(TelnetAdapter):
if extension_time not in range(0, 100):
raise Exception("extension_time must be a value between 0 and 99!")
self.tn.write("{}E{}RCDR\n".format(self.esc_char, extension_time))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def add_chapter_marker(self):
self.tn.write("{}BRCDR\n".format(self.esc_char))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def swap_channel_positions(self):
self.tn.write("%\n")
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_recording_status_text(self):
self.tn.write("I\n")
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_elapsed_recording_time(self):
self.tn.write("35I\n")
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_remaining_recording_time(self):
self.tn.write("36I\n")
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_recording_destination(self):
self.tn.write("37I\n")
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
"""
Metadata part skipped
"""
def recall_user_preset(self, channel_number, preset_number):
def recall_user_preset(self, channel_number: int, preset_number: int):
if channel_number not in range(1, 3):
raise Exception("channel_number must be a value between 1 and 2!")
if preset_number not in range(1, 17):
raise Exception("preset_number must be a value between 1 and 16!")
self.tn.write("1*{}*{}.\n".format(channel_number, preset_number))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def save_user_preset(self, channel_number, preset_number):
def save_user_preset(self, channel_number: int, preset_number: int):
if channel_number not in range(1, 3):
raise Exception("channel_number must be a value between 1 and 2!")
if preset_number not in range(1, 17):
raise Exception("preset_number must be a value between 1 and 16!")
self.tn.write("1*{}*{},\n".format(channel_number, preset_number))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def set_user_preset_name(self, preset_number, preset_name):
def set_user_preset_name(self, preset_number: int, preset_name: str):
if preset_number not in range(1, 17):
raise Exception("preset_number must be a value between 1 and 16!")
if len(preset_name) > 16:
@@ -405,38 +413,38 @@ class SMP(TelnetAdapter):
except UnicodeEncodeError:
raise Exception("preset_name must only contain ascii characters")
self.tn.write("{}1*{},{}PNAM\n".format(self.esc_char, preset_number, preset_name))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_user_preset_name(self, preset_number):
def get_user_preset_name(self, preset_number: int):
if preset_number not in range(1, 17):
raise Exception("preset_number must be a value between 1 and 16!")
self.tn.write("{}1*{}PNAM\n".format(self.esc_char, preset_number))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_user_presets(self, input_number):
def get_user_presets(self, input_number: int):
if input_number not in range(1, 6):
raise Exception("input_number must be a value between 1 and 5!")
self.tn.write("52*{}#\n".format(input_number))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
# Input Presets
def recall_input_preset(self, channel_number, preset_number):
def recall_input_preset(self, channel_number: int, preset_number: int):
if channel_number not in range(1, 3):
raise Exception("channel_number must be a value between 1 and 2!")
if preset_number not in range(1, 129):
raise Exception("preset_number must be a value between 1 and 128!")
self.tn.write("2*{}*{}.\n".format(channel_number, preset_number))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def save_input_preset(self, channel_number, preset_number):
def save_input_preset(self, channel_number: int, preset_number: int):
if channel_number not in range(1, 3):
raise Exception("channel_number must be a value between 1 and 2!")
if preset_number not in range(1, 129):
raise Exception("preset_number must be a value between 1 and 128!")
self.tn.write("1*{}*{},\n".format(channel_number, preset_number))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def set_input_preset_name(self, preset_number, preset_name):
def set_input_preset_name(self, preset_number: int, preset_name: str):
if preset_number not in range(1, 129):
raise Exception("preset_number must be a value between 1 and 128!")
if len(preset_name) > 16:
@@ -446,26 +454,26 @@ class SMP(TelnetAdapter):
except UnicodeEncodeError:
raise Exception("preset_name must only contain ascii characters")
self.tn.write("{}2*{},{}PNAM\n".format(self.esc_char, preset_number, preset_name))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_input_preset_name(self, preset_number):
def get_input_preset_name(self, preset_number: int):
if preset_number not in range(1, 129):
raise Exception("preset_number must be a value between 1 and 128!")
self.tn.write("{}2*{}PNAM\n".format(self.esc_char, preset_number))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def delete_input_preset(self, preset_number):
def delete_input_preset(self, preset_number: int):
if preset_number not in range(1, 129):
raise Exception("preset_number must be a value between 1 and 128!")
self.tn.write("{}X2*{}PRST\n".format(self.esc_char, preset_number))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_input_presets(self):
self.tn.write("51#\n")
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
# Streaming Presets
def recall_streaming_preset(self, output_number, preset_number):
def recall_streaming_preset(self, output_number: int, preset_number: int):
"""
Output_number:
1 = Channel A
@@ -480,9 +488,9 @@ class SMP(TelnetAdapter):
if preset_number not in range(1, 17):
raise Exception("preset_number must be a value between 1 and 16!")
self.tn.write("3*{}*{}.\n".format(output_number, preset_number))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def save_streaming_preset(self, output_number, preset_number):
def save_streaming_preset(self, output_number: int, preset_number: int):
"""
Output_number:
1 = Channel A
@@ -497,9 +505,9 @@ class SMP(TelnetAdapter):
if preset_number not in range(1, 17):
raise Exception("preset_number must be a value between 1 and 16!")
self.tn.write("3*{}*{},\n".format(output_number, preset_number))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def set_streaming_preset_name(self, preset_number, preset_name):
def set_streaming_preset_name(self, preset_number: int, preset_name: str):
if preset_number not in range(1, 17):
raise Exception("preset_number must be a value between 1 and 16!")
if len(preset_name) > 16:
@@ -509,22 +517,22 @@ class SMP(TelnetAdapter):
except UnicodeEncodeError:
raise Exception("preset_name must only contain ascii characters")
self.tn.write("{}3*{},{}PNAM\n".format(self.esc_char, preset_number, preset_name))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_streaming_preset_name(self, preset_number):
def get_streaming_preset_name(self, preset_number: int):
if preset_number not in range(1, 17):
raise Exception("preset_number must be a value between 1 and 16!")
self.tn.write("{}3*{}PNAM\n".format(self.esc_char, preset_number))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def reset_streaming_preset_to_default(self, preset_number):
def reset_streaming_preset_to_default(self, preset_number: int):
if preset_number not in range(1, 17):
raise Exception("preset_number must be a value between 1 and 16!")
self.tn.write("{}X3*{}PRST\n".format(self.esc_char, preset_number))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
# Encoder Presets
def recall_encoder_preset(self, output_number, preset_number):
def recall_encoder_preset(self, output_number: int, preset_number: int):
"""
Output_number:
1 = Channel A
@@ -539,9 +547,9 @@ class SMP(TelnetAdapter):
if preset_number not in range(1, 17):
raise Exception("preset_number must be a value between 1 and 16!")
self.tn.write("4*{}*{}.\n".format(output_number, preset_number))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def save_encoder_preset(self, output_number, preset_number):
def save_encoder_preset(self, output_number: int, preset_number: int):
"""
Output_number:
1 = Channel A
@@ -556,9 +564,9 @@ class SMP(TelnetAdapter):
if preset_number not in range(1, 17):
raise Exception("preset_number must be a value between 1 and 16!")
self.tn.write("4*{}*{},\n".format(output_number, preset_number))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def set_encoder_preset_name(self, preset_number, preset_name):
def set_encoder_preset_name(self, preset_number: int, preset_name: str):
if preset_number not in range(1, 17):
raise Exception("preset_number must be a value between 1 and 16!")
if len(preset_name) > 16:
@@ -568,37 +576,37 @@ class SMP(TelnetAdapter):
except UnicodeEncodeError:
raise Exception("preset_name must only contain ascii characters")
self.tn.write("{}4*{},{}PNAM\n".format(self.esc_char, preset_number, preset_name))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_encoder_preset_name(self, preset_number):
def get_encoder_preset_name(self, preset_number: int):
if preset_number not in range(1, 17):
raise Exception("preset_number must be a value between 1 and 16!")
self.tn.write("{}4*{}PNAM\n".format(self.esc_char, preset_number))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def reset_encoder_preset_to_default(self, preset_number):
def reset_encoder_preset_to_default(self, preset_number: int):
if preset_number not in range(1, 17):
raise Exception("preset_number must be a value between 1 and 16!")
self.tn.write("{}X4*{}PRST\n".format(self.esc_char, preset_number))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
# Layout Presets
def save_layout_preset(self, preset_number):
def save_layout_preset(self, preset_number: int):
if preset_number not in range(1, 17):
raise Exception("preset_number must be a value between 1 and 16!")
self.tn.write("7*{},\n".format(preset_number))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def recall_layout_preset(self, preset_number, include_input_selections=True):
def recall_layout_preset(self, preset_number: int, include_input_selections: bool = True):
if preset_number not in range(1, 17):
raise Exception("preset_number must be a value between 1 and 16!")
if include_input_selections:
self.tn.write("7*{}.\n".format(preset_number))
else:
self.tn.write("8*{}.\n".format(preset_number))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def set_layout_preset_name(self, preset_number, preset_name):
def set_layout_preset_name(self, preset_number: int, preset_name: str):
if preset_number not in range(1, 17):
raise Exception("preset_number must be a value between 1 and 16!")
if len(preset_name) > 16:
@@ -608,26 +616,26 @@ class SMP(TelnetAdapter):
except UnicodeEncodeError:
raise Exception("preset_name must only contain ascii characters")
self.tn.write("{}7*{},{}PNAM\n".format(self.esc_char, preset_number, preset_name))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_layout_preset_name(self, preset_number):
def get_layout_preset_name(self, preset_number: int):
if preset_number not in range(1, 17):
raise Exception("preset_number must be a value between 1 and 16!")
self.tn.write("{}7*{}PNAM\n".format(self.esc_char, preset_number))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def reset_layout_preset_to_default(self, preset_number):
def reset_layout_preset_to_default(self, preset_number: int):
if preset_number not in range(1, 17):
raise Exception("preset_number must be a value between 1 and 16!")
self.tn.write("{}X7*{}PRST\n".format(self.esc_char, preset_number))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
"""
Input adjustments skipped
Picture adjustments skipped
"""
def mute_output(self, output_number):
def mute_output(self, output_number: int):
"""
Output_number:
1 = Channel A
@@ -638,9 +646,9 @@ class SMP(TelnetAdapter):
if output_number not in range(1, 3):
raise Exception("output_number must be a value between 1 and 2!")
self.tn.write("{}*1B\n".format(output_number))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def unmute_output(self, output_number):
def unmute_output(self, output_number: int):
"""
Output_number:
1 = Channel A
@@ -651,9 +659,9 @@ class SMP(TelnetAdapter):
if output_number not in range(1, 3):
raise Exception("output_number must be a value between 1 and 2!")
self.tn.write("{}*0B\n".format(output_number))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def is_muted(self, output_number):
def is_muted(self, output_number: int):
"""
Output_number:
1 = Channel A
@@ -664,7 +672,7 @@ class SMP(TelnetAdapter):
if output_number not in range(1, 3):
raise Exception("output_number must be a value between 1 and 2!")
self.tn.write("{}B\n".format(output_number))
return int(TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())) > 0
return int(TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())) > 0
"""
EDID skipped
@@ -672,7 +680,7 @@ class SMP(TelnetAdapter):
some advanced options skipped
"""
def get_input_hdcp_status(self, input_number):
def get_input_hdcp_status(self, input_number: int):
"""
returns:
0 = no sink / source detected
@@ -684,57 +692,57 @@ class SMP(TelnetAdapter):
if input_number not in range(1, 6):
raise Exception("input_number must be a value between 1 and 6!")
self.tn.write("{}I{}HDCP\n".format(self.esc_char, input_number))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def set_input_authorization_hdcp_on(self, input_number):
def set_input_authorization_hdcp_on(self, input_number: int):
if input_number not in range(1, 6):
raise Exception("input_number must be a value between 1 and 6!")
self.tn.write("{}E{}*1HDCP\n".format(self.esc_char, input_number))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def set_input_authorization_hdcp_off(self, input_number):
def set_input_authorization_hdcp_off(self, input_number: int):
if input_number not in range(1, 6):
raise Exception("input_number must be a value between 1 and 6!")
self.tn.write("{}E{}*0HDCP\n".format(self.esc_char, input_number))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_input_authorization_hdcp_status(self, input_number):
def get_input_authorization_hdcp_status(self, input_number: int):
if input_number not in range(1, 6):
raise Exception("input_number must be a value between 1 and 6!")
self.tn.write("{}E{}HDCP\n".format(self.esc_char, input_number))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def enable_hdcp_notification(self):
self.tn.write("{}N1HDCP\n".format(self.esc_char))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def disable_hdcp_notification(self):
self.tn.write("{}N0HDCP\n".format(self.esc_char))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_hdcp_notification_status(self):
self.tn.write("{}NHDCP\n".format(self.esc_char))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
# background image settings
def set_background_image(self, filename: str):
self.tn.write("{}{}RF\n".format(self.esc_char, filename))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_background_image_filename(self):
self.tn.write("{}RF\n".format(self.esc_char))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def mute_background_image(self):
self.tn.write("{}0RF\n".format(self.esc_char))
return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line())
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def main():
smp = SMP(HOST, PW)
print(smp)
smp.login()
smp._login()
print(smp.get_version(verbose_info=False))
print(smp.get_bootstrap_version())