Files
lrc-backend/recorder_adapters/__init__.py
2019-07-03 16:09:10 +02:00

89 lines
2.2 KiB
Python

import telnetlib
from abc import ABC, abstractmethod
# monkey patching of telnet lib
original_read_until = telnetlib.Telnet.read_until
original_write = telnetlib.Telnet.write
def new_read_until(self, match, timeout=None):
if isinstance(match, str):
return original_read_until(self, match.encode("ascii"), timeout)
else:
return original_read_until(self, match, timeout)
def new_write(self, buffer):
if isinstance(buffer, str):
return original_write(self, buffer.encode("ascii"))
else:
return original_write(self, buffer)
telnetlib.Telnet.read_until = new_read_until
telnetlib.Telnet.write = new_write
def read_line(self, timeout=2):
return self.read_until("\n", timeout)
telnetlib.Telnet.read_line = read_line
def read_until_non_empty_line(self):
line = self.read_line()
if line is None:
return None
while len(line.rstrip()) <= 0:
line = self.read_line()
return line
telnetlib.Telnet.read_until_non_empty_line = read_until_non_empty_line
def assert_string_in_output(self, string, timeout=2):
resp = self.read_until(string, timeout)
if resp is None:
return False, resp,
resp = resp.decode("ascii")
if string in resp:
return True, resp
return False, resp
telnetlib.Telnet.assert_string_in_output = assert_string_in_output
class TelnetAdapter(ABC):
def __init__(self, address, esc_char="W"):
self.address = address
self.tn = None
self.esc_char = esc_char
@abstractmethod
def login(self):
pass
def run_cmd(self, cmd, timeout=1, auto_connect=True):
if self.tn is None and not auto_connect:
raise Exception("Not connected!")
elif self.tn is None:
self.login()
self.tn.write(cmd)
out = tn.read_until_non_empty_line()
res = out
while out is not None and out != "":
out = tn.read_until_non_empty_line()
print(out)
res += out
return res
@staticmethod
def get_response_str(tn_response):
if isinstance(tn_response, bytes):
return str(tn_response.decode("ascii").rstrip())
else:
return str(tn_response).rstrip()