import inspect import pkgutil import sys import telnetlib from abc import ABC, abstractmethod # monkey patching of telnet lib original_read_until = telnetlib.Telnet.read_until original_write = telnetlib.Telnet.write def new_read_until(self, match, timeout=None): if isinstance(match, str): return original_read_until(self, match.encode("ascii"), timeout) else: return original_read_until(self, match, timeout) def new_write(self, buffer): if isinstance(buffer, str): return original_write(self, buffer.encode("ascii")) else: return original_write(self, buffer) telnetlib.Telnet.read_until = new_read_until telnetlib.Telnet.write = new_write def read_line(self, timeout=2): return self.read_until("\n", timeout) telnetlib.Telnet.read_line = read_line def read_until_non_empty_line(self): line = self.read_line() if line is None: return None while len(line.rstrip()) <= 0: line = self.read_line() return line telnetlib.Telnet.read_until_non_empty_line = read_until_non_empty_line def assert_string_in_output(self, string, timeout=2): resp = self.read_until(string, timeout) if resp is None: return False, resp, resp = resp.decode("ascii") if string in resp: return True, resp return False, resp telnetlib.Telnet.assert_string_in_output = assert_string_in_output class TelnetAdapter(ABC): def __init__(self, address, esc_char="W"): self.address = address self.tn = None self.esc_char = esc_char @abstractmethod def _login(self): pass def _run_cmd(self, cmd, timeout=1, auto_connect=True): if self.tn is None and not auto_connect: raise Exception("Not connected!") elif self.tn is None: self._login() self.tn.write(cmd) out = self.tn.read_until_non_empty_line() res = out while out is not None and out != "": out = self.tn.read_until_non_empty_line() print(out) res += out return res @staticmethod def _get_response_str(tn_response): if isinstance(tn_response, bytes): return str(tn_response.decode("ascii").rstrip()) else: return str(tn_response).rstrip() class RecorderAdapter: @abstractmethod def _get_name(self): pass @abstractmethod def _get_version(self): pass @abstractmethod def is_recording(self) -> bool: pass def get_recording_status(self) -> str: pass def get_defined_recorder_adapters(): models = [] found_packages = list(pkgutil.iter_modules(sys.modules[__name__].__path__)) for f_p in found_packages: importer = f_p[0] rec_model_module = importer.find_module(f_p[1]).load_module(f_p[1]) rec_model = {'id': f_p[1], 'name': f_p[1], 'commands': {}} if hasattr(rec_model_module, 'RECORDER_MODEL_NAME'): rec_model['name'] = rec_model_module.RECORDER_MODEL_NAME if hasattr(rec_model_module, 'REQUIRES_USER'): rec_model['requires_user'] = rec_model_module.REQUIRES_USER if hasattr(rec_model_module, 'REQUIRES_PW'): rec_model['requires_password'] = rec_model_module.REQUIRES_PW for name, obj in inspect.getmembers(rec_model_module, inspect.isclass): if issubclass(obj, RecorderAdapter): commands = {} for method_name, method in inspect.getmembers(obj, predicate=inspect.isfunction): if len(method_name) > 0 and "_" == method_name[0]: continue signature = inspect.signature(method) parameters = {} for params in signature.parameters: if params == "self": continue param_type = signature.parameters[params].annotation.__name__ param_type = "_unknown_type" if param_type == "_empty" else param_type parameters[signature.parameters[params].name] = param_type if len(parameters) <= 0: parameters = None commands[method_name] = parameters rec_model["commands"] = commands models.append(rec_model) return models