moved everything to a new module called backend
This commit is contained in:
136
backend/recorder_adapters/__init__.py
Normal file
136
backend/recorder_adapters/__init__.py
Normal file
@@ -0,0 +1,136 @@
|
||||
import inspect
|
||||
import pkgutil
|
||||
import sys
|
||||
import telnetlib
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
# monkey patching of telnet lib
|
||||
original_read_until = telnetlib.Telnet.read_until
|
||||
original_write = telnetlib.Telnet.write
|
||||
|
||||
|
||||
def new_read_until(self, match, timeout=None):
|
||||
if isinstance(match, str):
|
||||
return original_read_until(self, match.encode("ascii"), timeout)
|
||||
else:
|
||||
return original_read_until(self, match, timeout)
|
||||
|
||||
|
||||
def new_write(self, buffer):
|
||||
if isinstance(buffer, str):
|
||||
return original_write(self, buffer.encode("ascii"))
|
||||
else:
|
||||
return original_write(self, buffer)
|
||||
|
||||
|
||||
telnetlib.Telnet.read_until = new_read_until
|
||||
telnetlib.Telnet.write = new_write
|
||||
|
||||
|
||||
def read_line(self, timeout=2):
|
||||
return self.read_until("\n", timeout)
|
||||
|
||||
|
||||
telnetlib.Telnet.read_line = read_line
|
||||
|
||||
|
||||
def read_until_non_empty_line(self):
|
||||
line = self.read_line()
|
||||
if line is None:
|
||||
return None
|
||||
while len(line.rstrip()) <= 0:
|
||||
line = self.read_line()
|
||||
return line
|
||||
|
||||
|
||||
telnetlib.Telnet.read_until_non_empty_line = read_until_non_empty_line
|
||||
|
||||
|
||||
def assert_string_in_output(self, string, timeout=2):
|
||||
resp = self.read_until(string, timeout)
|
||||
if resp is None:
|
||||
return False, resp,
|
||||
resp = resp.decode("ascii")
|
||||
if string in resp:
|
||||
return True, resp
|
||||
return False, resp
|
||||
|
||||
|
||||
telnetlib.Telnet.assert_string_in_output = assert_string_in_output
|
||||
|
||||
|
||||
class TelnetAdapter(ABC):
|
||||
def __init__(self, address, esc_char="W"):
|
||||
self.address = address
|
||||
self.tn = None
|
||||
self.esc_char = esc_char
|
||||
|
||||
@abstractmethod
|
||||
def _login(self):
|
||||
pass
|
||||
|
||||
def _run_cmd(self, cmd, timeout=1, auto_connect=True):
|
||||
if self.tn is None and not auto_connect:
|
||||
raise Exception("Not connected!")
|
||||
elif self.tn is None:
|
||||
self._login()
|
||||
self.tn.write(cmd)
|
||||
out = self.tn.read_until_non_empty_line()
|
||||
res = out
|
||||
while out is not None and out != "":
|
||||
out = self.tn.read_until_non_empty_line()
|
||||
print(out)
|
||||
res += out
|
||||
return res
|
||||
|
||||
@staticmethod
|
||||
def _get_response_str(tn_response):
|
||||
if isinstance(tn_response, bytes):
|
||||
return str(tn_response.decode("ascii").rstrip())
|
||||
else:
|
||||
return str(tn_response).rstrip()
|
||||
|
||||
|
||||
class RecorderAdapter:
|
||||
@abstractmethod
|
||||
def _get_name(self):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def _get_version(self):
|
||||
pass
|
||||
|
||||
|
||||
def get_defined_recorder_adapters():
|
||||
models = []
|
||||
found_packages = list(pkgutil.iter_modules(sys.modules[__name__].__path__))
|
||||
for f_p in found_packages:
|
||||
importer = f_p[0]
|
||||
rec_model_module = importer.find_module(f_p[1]).load_module(f_p[1])
|
||||
rec_model = {'id': f_p[1], 'name': f_p[1], 'commands': {}}
|
||||
if hasattr(rec_model_module, 'RECORDER_MODEL_NAME'):
|
||||
rec_model['name'] = rec_model_module.RECORDER_MODEL_NAME
|
||||
if hasattr(rec_model_module, 'REQUIRES_USER'):
|
||||
rec_model['requires_user'] = rec_model_module.REQUIRES_USER
|
||||
if hasattr(rec_model_module, 'REQUIRES_PW'):
|
||||
rec_model['requires_password'] = rec_model_module.REQUIRES_PW
|
||||
for name, obj in inspect.getmembers(rec_model_module, inspect.isclass):
|
||||
if issubclass(obj, RecorderAdapter):
|
||||
commands = {}
|
||||
for method_name, method in inspect.getmembers(obj, predicate=inspect.isfunction):
|
||||
if len(method_name) > 0 and "_" == method_name[0]:
|
||||
continue
|
||||
signature = inspect.signature(method)
|
||||
parameters = {}
|
||||
for params in signature.parameters:
|
||||
if params == "self":
|
||||
continue
|
||||
param_type = signature.parameters[params].annotation.__name__
|
||||
param_type = "_unknown_type" if param_type == "_empty" else param_type
|
||||
parameters[signature.parameters[params].name] = param_type
|
||||
if len(parameters) <= 0:
|
||||
parameters = None
|
||||
commands[method_name] = parameters
|
||||
rec_model["commands"] = commands
|
||||
models.append(rec_model)
|
||||
return models
|
||||
Reference in New Issue
Block a user