88 lines
2.2 KiB
Python
88 lines
2.2 KiB
Python
import telnetlib
|
|
from abc import ABC, abstractmethod
|
|
|
|
# monkey patching of telnet lib
|
|
original_read_until = telnetlib.Telnet.read_until
|
|
original_write = telnetlib.Telnet.write
|
|
|
|
|
|
def new_read_until(self, match, timeout=None):
|
|
if isinstance(match, str):
|
|
return original_read_until(self, match.encode("ascii"), timeout)
|
|
else:
|
|
return original_read_until(self, match, timeout)
|
|
|
|
|
|
def new_write(self, buffer):
|
|
if isinstance(buffer, str):
|
|
return original_write(self, buffer.encode("ascii"))
|
|
else:
|
|
return original_write(self, buffer)
|
|
|
|
|
|
telnetlib.Telnet.read_until = new_read_until
|
|
telnetlib.Telnet.write = new_write
|
|
|
|
|
|
def read_line(self, timeout=2):
|
|
return self.read_until("\n", timeout)
|
|
|
|
|
|
telnetlib.Telnet.read_line = read_line
|
|
|
|
|
|
def read_until_non_empty_line(self):
|
|
line = self.read_line()
|
|
if line is None:
|
|
return None
|
|
while len(line.rstrip()) <= 0:
|
|
line = self.read_line()
|
|
return line
|
|
|
|
|
|
telnetlib.Telnet.read_until_non_empty_line = read_until_non_empty_line
|
|
|
|
|
|
def assert_string_in_output(self, string, timeout=2):
|
|
resp = self.read_until(string, timeout)
|
|
if resp is None:
|
|
return False, resp,
|
|
resp = resp.decode("ascii")
|
|
if string in resp:
|
|
return True, resp
|
|
return False, resp
|
|
|
|
|
|
telnetlib.Telnet.assert_string_in_output = assert_string_in_output
|
|
|
|
|
|
class TelnetAdapter(ABC):
|
|
def __init__(self, address):
|
|
self.address = address
|
|
self.tn = None
|
|
|
|
@abstractmethod
|
|
def login(self):
|
|
pass
|
|
|
|
def run_cmd(self, cmd, timeout=1, auto_connect=True):
|
|
if self.tn is None and not auto_connect:
|
|
raise Exception("Not connected!")
|
|
elif self.tn is None:
|
|
self.login()
|
|
self.tn.write(cmd)
|
|
out = tn.read_until_non_empty_line()
|
|
res = out
|
|
while out is not None and out != "":
|
|
out = tn.read_until_non_empty_line()
|
|
print(out)
|
|
res += out
|
|
return res
|
|
|
|
@staticmethod
|
|
def get_response_str(tn_response):
|
|
if isinstance(tn_response, bytes):
|
|
return str(tn_response.decode("ascii").rstrip())
|
|
else:
|
|
return str(tn_response).rstrip()
|