diff --git a/recorder_adapters/__init__.py b/recorder_adapters/__init__.py index e5d823a..a89b575 100644 --- a/recorder_adapters/__init__.py +++ b/recorder_adapters/__init__.py @@ -57,9 +57,10 @@ telnetlib.Telnet.assert_string_in_output = assert_string_in_output class TelnetAdapter(ABC): - def __init__(self, address): + def __init__(self, address, esc_char = "W"): self.address = address self.tn = None + self.esc_char = esc_char @abstractmethod def login(self): diff --git a/recorder_adapters/extron_smp.py b/recorder_adapters/extron_smp.py index d6327b2..42df61c 100644 --- a/recorder_adapters/extron_smp.py +++ b/recorder_adapters/extron_smp.py @@ -9,7 +9,7 @@ from backend.recorder_adapters import telnetlib, TelnetAdapter HOST = "172.22.246.207" # Test SMP MZ USER = "admin" -PW = "123mzsmp" +PW = "123mzsmpx" class SMP(TelnetAdapter): @@ -20,24 +20,268 @@ class SMP(TelnetAdapter): def login(self): self.tn = telnetlib.Telnet(HOST) self.tn.read_until("\r\nPassword:") - # password = getpass.getpass() + #password = getpass.getpass() password = self.admin_pw self.tn.write(password + "\n\r") - if not self.tn.assert_string_in_output("Login Administrator")[0]: + out = self.tn.assert_string_in_output("Login Administrator") + if not out[0]: + print(out[1]) + if "Password:" in out[1]: + print("re-enter pw") + self.tn.write("123mzsmp\n\r") + print(self.tn.assert_string_in_output("Login Administrator")) print("WRONG (admin) password!! Exiting!") self.tn = None raise Exception("Could not login as administrator with given pw!") print("OK, we have admin rights!") - def get_version(self): - self.tn.write("1Q\n") + def get_version(self, include_build=False, verbose_info=False): + if verbose_info: + self.tn.write("0Q") + else: + if include_build: + self.tn.write("*Q\n") + else: + self.tn.write("1Q\n") - # print_tn(read_line(tn)) return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + def get_bootstrap_version(self): + self.tn.write("2Q") + return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + + def get_factory_firmware_version(self): + self.tn.write("3Q") + return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + + def get_updated_firmware_version(self): + self.tn.write("4Q") + return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + + def get_part_number(self): + self.tn.write("N") + return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + + def get_model_name(self): + self.tn.write("1I") + return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + + def get_model_description(self): + self.tn.write("2I") + return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + + def get_system_memory_usage(self): + self.tn.write("3I") + return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + + def get_number_of_connected_users(self): + self.tn.write("10I") + return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + + def get_system_processer_usage(self): + self.tn.write("11I") + return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + + def get_system_processor_idle(self): + self.tn.write("12I") + return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + + def get_eth0_link_status(self): + self.tn.write("13I") + return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + + def get_file_transfer_config(self): + self.tn.write("38I") + return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + + def get_active_alarms(self): + self.tn.write("39I") + return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + + def set_unit_name(self, name): + # TODO: check name (must comply with internet host name standards) + self.tn.write(self.esc_char + name + "CN\n") + return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + + def reset_unit_name(self): + self.tn.write(self.esc_char + " CN\n") + return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + + def get_unit_name(self): + self.tn.write(self.esc_char + "CN\n") + return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + + + def get_telnet_connections(self): + self.tn.write(self.esc_char + "CC\n") + return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + + + def set_verbose_mode(self, mode:int): + """ + Mode: + 0=clear/none (default for telnet connections + 1=verbose mode (default for USB and RS-232 host control) + 2=tagged responses for queries + 3=verbose mode and tagged responses for queries + :param mode: + :return: + """ + if mode not in range(4): + raise Exception("Only values from 0 to 3 are allowed!") + self.tn.write(self.esc_char + str(mode) + "CV\n") + return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + + def get_verbose_mode(self): + self.tn.write(self.esc_char + "CV\n") + return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + + + def save_configuration(self): + pass + + def restore_configuration(self): + pass + + def reboot(self): + self.tn.write(self.esc_char + "1BOOT\n") + return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + + def restart_network(self): + self.tn.write(self.esc_char + "2BOOT\n") + return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + + def reset_flash(self): + """ + Reset flash memory (excludes recording files). + :return: + """ + self.tn.write(self.esc_char + "ZFFF\n") + return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + + def system_reset(self): + """ + Resets device to default and deletes recorded files + :return: + """ + self.tn.write(self.esc_char + "ZXXX\n") + return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + + def reset_settings_and_delete_all_files(self): + """ + Reset to default except IP address, delete all user and recorded files + :return: + """ + self.tn.write(self.esc_char + "ZY\n") + return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + + def absolute_reset(self): + """ + Same as System Reset, plus returns the IP address and subnet mask to defaults. + :return: + """ + self.tn.write(self.esc_char + "ZQQQ\n") + return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + + def set_front_panel_lock(self, mode): + """ + 0=Off + 1=complete lockout (no front panel control) + 2=menu lockout + 3=recording controls + :param mode: Execute mode int code + :return: + """ + if mode not in range(4): + raise Exception("Only values from 0 to 3 are allowed!") + self.tn.write(str(mode) + "X\n") + return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + + def get_front_panel_lock(self): + """ + View executive mode. + 0=Off + 1=complete lockout (no front panel control) + 2=menu lockout + 3=recording controls + :return: + """ + self.tn.write("X\n") + return TelnetAdapter.get_response_str(self.tn.read_until_non_empty_line()) + + """ + A lot of stuff related to network settings (ports of services, SNMP, IP, DHCP, etc.) + Only some stuff will be implemented here! + """ + + def get_date_time(self): + pass + + def get_time_zone(self): + pass + + def get_dhcp_mode(self): + pass + + def get_network_settings(self): + pass + + def get_ip_address(self): + pass + + def get_mac_address(self): + pass + + def get_subnet_mask(self): + pass + + def get_gateway_ip(self): + pass + + def get_dns_server_ip(self): + pass + + """ + RS-232 / serial port related stuff not implemented. + Password and security related stuff not implemented. + File related stuff not implemented. (-> use sftp) + """ + + def set_input(self, input_num:int, channel_num:int): + """ + Switches input # (1 to 5) to output channel (1=A [input 1 and 2], 2=B [input 3, 4 and 5]) + :param input_num: + :param channel_num: + :return: + """ + if input_num not in range(1,6): + raise Exception("input_num must be a value between 1 and 5!") + if channel_num not in range(1,2): + raise Exception("input_num must be a value between 1 and 2!") + + + smp = SMP(HOST, PW) print(smp) smp.login() -print(smp.get_version()) +print(smp.get_version(verbose_info=False)) + +print(smp.get_bootstrap_version()) +print(smp.get_part_number()) +print(smp.get_model_name()) +print(smp.get_model_description()) + +print(smp.get_system_memory_usage()) + +print(smp.get_file_transfer_config()) + + +# print(smp.get_unit_name()) +# print(smp.set_unit_name("mzsmp")) +# print(smp.get_unit_name()) +# print(smp.reset_unit_name()) + +print(smp.set_front_panel_lock(0)) +print(smp.get_front_panel_lock())