179 lines
5.5 KiB
Python
179 lines
5.5 KiB
Python
import importlib
|
|
import inspect
|
|
import pkgutil
|
|
import sys
|
|
import telnetlib
|
|
from abc import ABC, abstractmethod
|
|
|
|
defined_recorder_adapters = None
|
|
|
|
# monkey patching of telnet lib
|
|
from pprint import pprint
|
|
|
|
original_read_until = telnetlib.Telnet.read_until
|
|
original_write = telnetlib.Telnet.write
|
|
|
|
|
|
def new_read_until(self, match, timeout=None):
|
|
if isinstance(match, str):
|
|
return original_read_until(self, match.encode("ascii"), timeout)
|
|
else:
|
|
return original_read_until(self, match, timeout)
|
|
|
|
|
|
def new_write(self, buffer):
|
|
if isinstance(buffer, str):
|
|
return original_write(self, buffer.encode("ascii"))
|
|
else:
|
|
return original_write(self, buffer)
|
|
|
|
|
|
telnetlib.Telnet.read_until = new_read_until
|
|
telnetlib.Telnet.write = new_write
|
|
|
|
|
|
def read_line(self, timeout=2):
|
|
return self.read_until("\n", timeout)
|
|
|
|
|
|
telnetlib.Telnet.read_line = read_line
|
|
|
|
|
|
def read_until_non_empty_line(self):
|
|
line = self.read_line()
|
|
if line is None:
|
|
return None
|
|
while len(line.rstrip()) <= 0:
|
|
line = self.read_line()
|
|
return line
|
|
|
|
|
|
telnetlib.Telnet.read_until_non_empty_line = read_until_non_empty_line
|
|
|
|
|
|
def assert_string_in_output(self, string, timeout=2):
|
|
resp = self.read_until(string, timeout)
|
|
if resp is None:
|
|
return False, resp,
|
|
resp = resp.decode("ascii")
|
|
if string in resp:
|
|
return True, resp
|
|
return False, resp
|
|
|
|
|
|
telnetlib.Telnet.assert_string_in_output = assert_string_in_output
|
|
|
|
|
|
class TelnetAdapter(ABC):
|
|
def __init__(self, address, esc_char="W"):
|
|
self.address = address
|
|
self.tn = None
|
|
self.esc_char = esc_char
|
|
|
|
@abstractmethod
|
|
def _login(self):
|
|
pass
|
|
|
|
def _run_cmd(self, cmd, timeout=1, auto_connect=True):
|
|
if self.tn is None and not auto_connect:
|
|
raise Exception("Not connected!")
|
|
elif self.tn is None:
|
|
self._login()
|
|
self.tn.write(cmd)
|
|
out = self.tn.read_until_non_empty_line()
|
|
res = out
|
|
while out is not None and out != "":
|
|
out = self.tn.read_until_non_empty_line()
|
|
print(out)
|
|
res += out
|
|
return res
|
|
|
|
@staticmethod
|
|
def _get_response_str(tn_response):
|
|
if isinstance(tn_response, bytes):
|
|
return str(tn_response.decode("ascii").rstrip())
|
|
else:
|
|
return str(tn_response).rstrip()
|
|
|
|
|
|
class RecorderAdapter:
|
|
def __init__(self, address: str, user: str, password: str):
|
|
self.address = address
|
|
self.user = user
|
|
self.password = password
|
|
|
|
@classmethod
|
|
@abstractmethod
|
|
def get_recorder_params(cls) -> dict:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def _get_name(self):
|
|
pass
|
|
|
|
@abstractmethod
|
|
def _get_version(self):
|
|
pass
|
|
|
|
@abstractmethod
|
|
def is_recording(self) -> bool:
|
|
pass
|
|
|
|
def get_recording_status(self) -> str:
|
|
pass
|
|
|
|
|
|
def get_defined_recorder_adapters() -> list:
|
|
rec_adapters_module = importlib.import_module(".recorder_adapters", package='backend')
|
|
rec_adapter_class = getattr(rec_adapters_module, "RecorderAdapter") # needed, otherwise subclass check may fail
|
|
models = []
|
|
found_packages = list(pkgutil.iter_modules(rec_adapters_module.__path__))
|
|
for f_p in found_packages:
|
|
importer = f_p[0]
|
|
rec_model_module = importer.find_module(f_p[1]).load_module(f_p[1])
|
|
rec_model = {'id': f_p[1], 'name': f_p[1], 'commands': {}, 'path': rec_model_module.__file__}
|
|
if hasattr(rec_model_module, 'RECORDER_MODEL_NAME'):
|
|
rec_model['name'] = rec_model_module.RECORDER_MODEL_NAME
|
|
if hasattr(rec_model_module, 'REQUIRES_USER'):
|
|
rec_model['requires_user'] = rec_model_module.REQUIRES_USER
|
|
if hasattr(rec_model_module, 'REQUIRES_PW'):
|
|
rec_model['requires_password'] = rec_model_module.REQUIRES_PW
|
|
for name, obj in inspect.getmembers(rec_model_module, inspect.isclass):
|
|
if issubclass(obj, rec_adapter_class) and name != "RecorderAdapter":
|
|
rec_model['id'] = rec_model['id'] + "." + obj.__name__
|
|
rec_model['class'] = obj
|
|
commands = {}
|
|
for method_name, method in inspect.getmembers(obj, predicate=inspect.isfunction):
|
|
if len(method_name) > 0 and "_" == method_name[0]:
|
|
continue
|
|
signature = inspect.signature(method)
|
|
parameters = {}
|
|
for params in signature.parameters:
|
|
if params == "self":
|
|
continue
|
|
param_type = signature.parameters[params].annotation.__name__
|
|
param_type = "_unknown_type" if param_type == "_empty" else param_type
|
|
parameters[signature.parameters[params].name] = param_type
|
|
if len(parameters) <= 0:
|
|
parameters = None
|
|
commands[method_name] = parameters
|
|
rec_model["commands"] = commands
|
|
models.append(rec_model)
|
|
return models
|
|
|
|
|
|
def get_recorder_adapter_by_id(id: str, **kwargs):
|
|
global defined_recorder_adapters
|
|
if defined_recorder_adapters is None:
|
|
defined_recorder_adapters = get_defined_recorder_adapters()
|
|
for rec_adapter in defined_recorder_adapters:
|
|
if id in rec_adapter.get('id', '').split("."):
|
|
return rec_adapter['class'](**kwargs)
|
|
return None
|
|
|
|
|
|
if __name__ == '__main__':
|
|
print(get_defined_recorder_adapters())
|
|
get_recorder_adapter_by_id('SMP35x', address="172.22.246.207", password="123mzsmp")
|
|
exit()
|