Files
lrc-backend/backend/recorder_adapters/__init__.py
2019-11-12 12:32:49 +01:00

141 lines
4.2 KiB
Python

import inspect
import pkgutil
import sys
import telnetlib
from abc import ABC, abstractmethod
# monkey patching of telnet lib
original_read_until = telnetlib.Telnet.read_until
original_write = telnetlib.Telnet.write
def new_read_until(self, match, timeout=None):
if isinstance(match, str):
return original_read_until(self, match.encode("ascii"), timeout)
else:
return original_read_until(self, match, timeout)
def new_write(self, buffer):
if isinstance(buffer, str):
return original_write(self, buffer.encode("ascii"))
else:
return original_write(self, buffer)
telnetlib.Telnet.read_until = new_read_until
telnetlib.Telnet.write = new_write
def read_line(self, timeout=2):
return self.read_until("\n", timeout)
telnetlib.Telnet.read_line = read_line
def read_until_non_empty_line(self):
line = self.read_line()
if line is None:
return None
while len(line.rstrip()) <= 0:
line = self.read_line()
return line
telnetlib.Telnet.read_until_non_empty_line = read_until_non_empty_line
def assert_string_in_output(self, string, timeout=2):
resp = self.read_until(string, timeout)
if resp is None:
return False, resp,
resp = resp.decode("ascii")
if string in resp:
return True, resp
return False, resp
telnetlib.Telnet.assert_string_in_output = assert_string_in_output
class TelnetAdapter(ABC):
def __init__(self, address, esc_char="W"):
self.address = address
self.tn = None
self.esc_char = esc_char
@abstractmethod
def _login(self):
pass
def _run_cmd(self, cmd, timeout=1, auto_connect=True):
if self.tn is None and not auto_connect:
raise Exception("Not connected!")
elif self.tn is None:
self._login()
self.tn.write(cmd)
out = self.tn.read_until_non_empty_line()
res = out
while out is not None and out != "":
out = self.tn.read_until_non_empty_line()
print(out)
res += out
return res
@staticmethod
def _get_response_str(tn_response):
if isinstance(tn_response, bytes):
return str(tn_response.decode("ascii").rstrip())
else:
return str(tn_response).rstrip()
class RecorderAdapter:
@abstractmethod
def _get_name(self):
pass
@abstractmethod
def _get_version(self):
pass
@abstractmethod
def is_recording(self) -> bool:
pass
def get_defined_recorder_adapters():
models = []
found_packages = list(pkgutil.iter_modules(sys.modules[__name__].__path__))
for f_p in found_packages:
importer = f_p[0]
rec_model_module = importer.find_module(f_p[1]).load_module(f_p[1])
rec_model = {'id': f_p[1], 'name': f_p[1], 'commands': {}}
if hasattr(rec_model_module, 'RECORDER_MODEL_NAME'):
rec_model['name'] = rec_model_module.RECORDER_MODEL_NAME
if hasattr(rec_model_module, 'REQUIRES_USER'):
rec_model['requires_user'] = rec_model_module.REQUIRES_USER
if hasattr(rec_model_module, 'REQUIRES_PW'):
rec_model['requires_password'] = rec_model_module.REQUIRES_PW
for name, obj in inspect.getmembers(rec_model_module, inspect.isclass):
if issubclass(obj, RecorderAdapter):
commands = {}
for method_name, method in inspect.getmembers(obj, predicate=inspect.isfunction):
if len(method_name) > 0 and "_" == method_name[0]:
continue
signature = inspect.signature(method)
parameters = {}
for params in signature.parameters:
if params == "self":
continue
param_type = signature.parameters[params].annotation.__name__
param_type = "_unknown_type" if param_type == "_empty" else param_type
parameters[signature.parameters[params].name] = param_type
if len(parameters) <= 0:
parameters = None
commands[method_name] = parameters
rec_model["commands"] = commands
models.append(rec_model)
return models