133 lines
3.9 KiB
Python
133 lines
3.9 KiB
Python
import inspect
|
|
import pkgutil
|
|
import sys
|
|
import telnetlib
|
|
from abc import ABC, abstractmethod
|
|
|
|
# monkey patching of telnet lib
|
|
original_read_until = telnetlib.Telnet.read_until
|
|
original_write = telnetlib.Telnet.write
|
|
|
|
|
|
def new_read_until(self, match, timeout=None):
|
|
if isinstance(match, str):
|
|
return original_read_until(self, match.encode("ascii"), timeout)
|
|
else:
|
|
return original_read_until(self, match, timeout)
|
|
|
|
|
|
def new_write(self, buffer):
|
|
if isinstance(buffer, str):
|
|
return original_write(self, buffer.encode("ascii"))
|
|
else:
|
|
return original_write(self, buffer)
|
|
|
|
|
|
telnetlib.Telnet.read_until = new_read_until
|
|
telnetlib.Telnet.write = new_write
|
|
|
|
|
|
def read_line(self, timeout=2):
|
|
return self.read_until("\n", timeout)
|
|
|
|
|
|
telnetlib.Telnet.read_line = read_line
|
|
|
|
|
|
def read_until_non_empty_line(self):
|
|
line = self.read_line()
|
|
if line is None:
|
|
return None
|
|
while len(line.rstrip()) <= 0:
|
|
line = self.read_line()
|
|
return line
|
|
|
|
|
|
telnetlib.Telnet.read_until_non_empty_line = read_until_non_empty_line
|
|
|
|
|
|
def assert_string_in_output(self, string, timeout=2):
|
|
resp = self.read_until(string, timeout)
|
|
if resp is None:
|
|
return False, resp,
|
|
resp = resp.decode("ascii")
|
|
if string in resp:
|
|
return True, resp
|
|
return False, resp
|
|
|
|
|
|
telnetlib.Telnet.assert_string_in_output = assert_string_in_output
|
|
|
|
|
|
class TelnetAdapter(ABC):
|
|
def __init__(self, address, esc_char="W"):
|
|
self.address = address
|
|
self.tn = None
|
|
self.esc_char = esc_char
|
|
|
|
@abstractmethod
|
|
def _login(self):
|
|
pass
|
|
|
|
def _run_cmd(self, cmd, timeout=1, auto_connect=True):
|
|
if self.tn is None and not auto_connect:
|
|
raise Exception("Not connected!")
|
|
elif self.tn is None:
|
|
self._login()
|
|
self.tn.write(cmd)
|
|
out = self.tn.read_until_non_empty_line()
|
|
res = out
|
|
while out is not None and out != "":
|
|
out = self.tn.read_until_non_empty_line()
|
|
print(out)
|
|
res += out
|
|
return res
|
|
|
|
@staticmethod
|
|
def _get_response_str(tn_response):
|
|
if isinstance(tn_response, bytes):
|
|
return str(tn_response.decode("ascii").rstrip())
|
|
else:
|
|
return str(tn_response).rstrip()
|
|
|
|
|
|
class RecorderAdapter:
|
|
@abstractmethod
|
|
def _get_name(self):
|
|
pass
|
|
|
|
@abstractmethod
|
|
def _get_version(self):
|
|
pass
|
|
|
|
|
|
def get_defined_recorder_adapters():
|
|
models = []
|
|
found_packages = list(pkgutil.iter_modules(sys.modules[__name__].__path__))
|
|
for f_p in found_packages:
|
|
importer = f_p[0]
|
|
rec_model_module = importer.find_module(f_p[1]).load_module(f_p[1])
|
|
rec_model = {'id': f_p[1], 'name': f_p[1], 'commands': {}}
|
|
if hasattr(rec_model_module, 'RECORDER_MODEL_NAME'):
|
|
rec_model['name'] = rec_model_module.RECORDER_MODEL_NAME
|
|
for name, obj in inspect.getmembers(rec_model_module, inspect.isclass):
|
|
if issubclass(obj, RecorderAdapter):
|
|
commands = {}
|
|
for method_name, method in inspect.getmembers(obj, predicate=inspect.isfunction):
|
|
if len(method_name) > 0 and "_" == method_name[0]:
|
|
continue
|
|
signature = inspect.signature(method)
|
|
parameters = {}
|
|
for params in signature.parameters:
|
|
if params == "self":
|
|
continue
|
|
param_type = signature.parameters[params].annotation.__name__
|
|
param_type = "_unknown_type" if param_type == "_empty" else param_type
|
|
parameters[signature.parameters[params].name] = param_type
|
|
if len(parameters) <= 0:
|
|
parameters = None
|
|
commands[method_name] = parameters
|
|
rec_model["commands"] = commands
|
|
models.append(rec_model)
|
|
return models
|