diff --git a/backend/recorder_adapters/extron_smp.py b/backend/recorder_adapters/extron_smp.py index 152d74e..47326e7 100644 --- a/backend/recorder_adapters/extron_smp.py +++ b/backend/recorder_adapters/extron_smp.py @@ -1,9 +1,10 @@ +# pylint: disable=too-many-lines """ Recorder Adapter for SMP """ import enum import logging -from typing import Union +from typing import TypeVar, Union from backend import LrcException from backend.recorder_adapters import telnetlib, TelnetAdapter, RecorderAdapter @@ -31,11 +32,54 @@ PW = "123mzsmp" class SMP35x(TelnetAdapter, RecorderAdapter): """ A class representing the Extron SMP recorder. - - Attributes: - AudioChannels (enum.Enum): Enum representing the available audio channels on the Extron SMP recorder. """ + S = TypeVar("S", bound=enum.IntEnum) + + def _get_number_from_enum( + self, number_or_enum: Union[S, int], enum_class: S + ) -> int: + if isinstance(number_or_enum, enum.IntEnum): + return number_or_enum.value + elif isinstance(number_or_enum, int): + if number_or_enum in iter(enum_class): + return number_or_enum + raise ValueError( + f"number must be a {enum_class} or one of " + f"{','.join([str(x.value) for x in iter(enum_class)])}, " + f"but was {number_or_enum}" + ) + else: + raise TypeError(f"channel_number must be a {enum_class} or int") + + class InputNumber(enum.IntEnum): + """ + An enumeration representing the input numbers for an Extron SMP recorder. + + Attributes: + Input_1 (int): The input number for Input 1. + Input_2 (int): The input number for Input 2. + Input_3 (int): The input number for Input 3. + Input_4 (int): The input number for Input 4. + Input_5 (int): The input number for Input 5. + """ + INPUT_1 = 1 + INPUT_2 = 2 + INPUT_3 = 3 + INPUT_4 = 4 + INPUT_5 = 5 + + class OutputChannel(enum.IntEnum): + """ + Enum representing the output channels of an Extron SMP recorder. + + Attributes: + A (int): The first output channel. + B (int): The second output channel. + """ + A = 1 # channel A + B = 2 # channel B + class AudioChannels(enum.IntEnum): """ Enum representing the available audio channels on the Extron SMP recorder. @@ -64,6 +108,51 @@ class SMP35x(TelnetAdapter, RecorderAdapter): OUTPUT_LEFT = 60000 OUTPUT_RIGHT = 60001 + class VerboseMode(enum.IntEnum): + """ + Enum class representing the different verbose modes for Extron SMP devices. + + Attributes: + CLEAR_NONE (int): Default mode for telnet connections, clears non-essential information. + VERBOSE (int): Default mode for USB and RS-232 host control, provides verbose information. + TAGGED (int): Provides tagged responses for queries. + VERBOSE_TAGGED (int): Provides verbose information and tagged responses for queries. + """ + + CLEAR_NONE = 0 + VERBOSE = 1 + TAGGED = 2 + VERBOSE_TAGGED = 3 + + class FrontPanelLockMode(enum.IntEnum): + """ + Enumeration of the front panel lock modes for an Extron SMP recorder. + + Attributes: + OFF (int): No front panel lock. + COMPLETE_LOCKOUT (int): Complete front panel lockout. + MENU_LOCKOUT (int): Lockout of front panel menu controls. + RECORDING_CONTROLS_ONLY (int): Lockout of all front panel controls except + for recording controls. + """ + + OFF = 0 + COMPLETE_LOCKOUT = 1 + MENU_LOCKOUT = 2 + RECORDING_CONTROLS_ONLY = 3 + + class ConfigurationType(enum.IntEnum): + """ + An enumeration of the types of configurations on an Extron SMP device. + + Attributes: + IP_CONFIG (int): The configuration type for IP settings. + BOX_SPECIFIC_CONFIG (int): The configuration type for device-specific settings. + """ + + IP_CONFIG = 0 + BOX_SPECIFIC_CONFIG = 2 + def __init__(self, address, password, auto_login=True, **_kwargs): """ Initializes a new instance of the SMP35x class. @@ -297,67 +386,108 @@ class SMP35x(TelnetAdapter, RecorderAdapter): return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def reset_unit_name(self): + """ + Resets the unit name of the Extron SMP recorder. + + Returns: + str: The response from the recorder after resetting the unit name. + """ self.tn.write(self.esc_char + " CN\n") return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def get_unit_name(self): + """ + Sends a command to the Extron SMP device to retrieve its unit name. + + Returns: + str: The unit name of the Extron SMP device. + """ self.tn.write(self.esc_char + "CN\n") return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def get_telnet_connections(self): + """ + Sends a command to the Extron SMP device to retrieve a list of + active Telnet connections, and returns the response. + + Returns: + str: A string containing the response from the device. + """ self.tn.write(self.esc_char + "CC\n") return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) - def set_verbose_mode(self, mode: int): + def set_verbose_mode(self, mode: Union[VerboseMode, int]): """ - Mode: - 0=clear/none (default for telnet connections - 1=verbose mode (default for USB and RS-232 host control) - 2=tagged responses for queries - 3=verbose mode and tagged responses for queries - :param mode: - :return: + Sets the verbose mode of the Extron SMP recorder. + + Args: + mode (Union[VerboseMode, int]): The verbose mode to set. This can be either a + `VerboseMode` enum value or an integer representing the verbose mode. + + Returns: + str: The response from the recorder after setting the verbose mode. """ - if mode not in range(4): - raise LrcException("Only values from 0 to 3 are allowed!") + mode = self._get_number_from_enum(mode, SMP35x.VerboseMode) self.tn.write(self.esc_char + str(mode) + "CV\n") return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def get_verbose_mode(self): + """ + Sends the 'CV' command to the Extron SMP device to get the current verbose mode setting. + + Returns: + str: The response from the device, indicating the current verbose mode setting. + """ self.tn.write(self.esc_char + "CV\n") return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) - def save_configuration(self, mode: int = 2): + def save_configuration(self, config_type: Union[ConfigurationType, int] = 2): """ - Mode: - 0 = IP config - 2 = Box specific parameters - :param mode: - :return: + Saves the current configuration of the Extron SMP device to non-volatile memory. + + Args: + config_type (Union[ConfigurationType, int], optional): Type of configuration to save. + Defaults to 2. + + Returns: + str: The response from the device after the configuration is saved. """ - if mode not in [0, 2]: - raise LrcException("Only values 0 and 2 are allowed!") - self.tn.write(self.esc_char + f"1*{mode}XF\n") + config_type = self._get_number_from_enum(config_type, SMP35x.ConfigurationType) + self.tn.write(self.esc_char + f"1*{config_type}XF\n") return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) - def restore_configuration(self, mode: int = 2): + def restore_configuration(self, config_type: Union[ConfigurationType, int] = 2): """ - Mode: - 0 = IP config - 2 = Box specific parameters - :param mode: - :return: + Restores the configuration of the Extron SMP device to the specified configuration type. + + Args: + config_type (Union[ConfigurationType, int], optional): Configuration type to restore. + Defaults to 2. + + Returns: + str: The response from the device. """ - if mode not in [0, 2]: - raise LrcException("Only values 0 and 2 are allowed!") - self.tn.write(self.esc_char + f"0*{mode}XF\n") + config_type = self._get_number_from_enum(config_type, SMP35x.ConfigurationType) + self.tn.write(self.esc_char + f"0*{config_type}XF\n") return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def reboot(self): + """ + Reboots the Extron SMP device by sending the '1BOOT' command over Telnet. + + Returns: + str: The response from the device after sending the '1BOOT' command. + """ self.tn.write(self.esc_char + "1BOOT\n") return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def restart_network(self): + """ + Restarts the Extron SMP's network connection by sending the '2BOOT' command over Telnet. + + Returns: + str: The response from the Extron SMP after sending the '2BOOT' command. + """ self.tn.write(self.esc_char + "2BOOT\n") return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) @@ -393,17 +523,17 @@ class SMP35x(TelnetAdapter, RecorderAdapter): self.tn.write(self.esc_char + "ZQQQ\n") return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) - def set_front_panel_lock(self, mode: int): + def set_front_panel_lock(self, mode: Union["SMP35x.FrontPanelLockMode", int]): """ - 0=Off - 1=complete lockout (no front panel control) - 2=menu lockout - 3=recording controls - :param mode: Execute mode int code - :return: + Sets the front panel lock mode of the Extron SMP device. + + Args: + mode (Union["SMP35x.FrontPanelLockMode", int]): The front panel lock mode to set. + + Returns: + str: The response from the device after setting the front panel lock mode. """ - if mode not in range(4): - raise LrcException("Only values from 0 to 3 are allowed!") + mode = self._get_number_from_enum(mode, SMP35x.FrontPanelLockMode) self.tn.write(str(mode) + "X\n") return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) @@ -419,63 +549,116 @@ class SMP35x(TelnetAdapter, RecorderAdapter): self.tn.write("X\n") return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) - """ - A lot of stuff related to network settings (ports of services, SNMP, IP, DHCP, etc.) - Only some stuff will be implemented here! - """ + ### A lot of stuff related to network settings (ports of services, SNMP, IP, DHCP, etc.) + ### Only some stuff will be implemented here! - """ def get_date_time(self): - pass + """ + Gets the current date and time from the Extron SMP recorder. + + Returns: + A datetime object representing the current date and time. + """ + raise NotImplementedError("get_date_time not implemented yet!") def get_time_zone(self): - pass + """ + Gets the time zone of the Extron SMP recorder. + + Returns: + str: The time zone of the recorder. + """ + raise NotImplementedError("get_time_zone not implemented yet!") def get_dhcp_mode(self): - pass + """ + Gets the current DHCP mode of the Extron SMP recorder. + + Returns: + str: The current DHCP mode of the recorder. Possible values are "on", "off", or "auto". + """ + raise NotImplementedError("get_dhcp_mode not implemented yet!") def get_network_settings(self): - pass + """ + Retrieves the current network settings for the Extron SMP recorder. + + Returns: + A dictionary containing the current network settings for the recorder. + """ + raise NotImplementedError("get_network_settings not implemented yet!") def get_ip_address(self): - pass + """ + Returns the IP address of the Extron SMP recorder. + """ + raise NotImplementedError("get_ip_address not implemented yet!") def get_mac_address(self): - pass + """ + Gets the MAC address of the Extron SMP recorder. + + :return: A string representing the MAC address of the recorder. + """ + raise NotImplementedError("get_mac_address not implemented yet!") def get_subnet_mask(self): - pass + """ + Gets the subnet mask for the Extron SMP recorder. + + :return: The subnet mask as a string. + """ + raise NotImplementedError("get_subnet_mask not implemented yet!") def get_gateway_ip(self): - pass + """ + Gets the IP address of the gateway device for the Extron SMP recorder. + This method is not yet implemented. + """ + raise NotImplementedError("get_gateway_ip not implemented yet!") def get_dns_server_ip(self): - pass - """ - - """ - RS-232 / serial port related stuff not implemented. - Password and security related stuff not implemented. - File related stuff not implemented. (-> use sftp) - """ - - def set_input(self, input_num: int, channel_num: int): """ - Switches input # (1 to 5) to output channel (1=A [input 1 and 2], 2=B [input 3, 4 and 5]) + Returns the IP address of the DNS server configured on the Extron SMP device. + + Raises: + NotImplementedError: This method has not been implemented yet. + """ + raise NotImplementedError("get_dns_server_ip not implemented yet!") + + ### RS-232 / serial port related stuff not implemented. + + ### Password and security related stuff not implemented. + + ### File related stuff not implemented. (-> use sftp) + + def set_input(self, input_num: Union[InputNumber, int], channel_num: Union[OutputChannel, int]): + """ + Switches input # (1 to 5) to output channel (1=A [input 1 and 2], 2=B [input 3, 4 and 5]) :param input_num: :param channel_num: :return: """ - if input_num not in range(1, 6): - raise LrcException("input_num must be a value between 1 and 5!") - if channel_num not in range(1, 3): - raise LrcException("input_num must be a value between 1 and 2!") + input_num = self._get_number_from_enum(input_num, SMP35x.InputNumber) + channel_num = self._get_number_from_enum(channel_num, SMP35x.OutputChannel) self.tn.write(f"{input_num}*{channel_num}!\n") return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) - def get_input(self, channel_num: int): - if channel_num not in range(1, 2): - raise LrcException("input_num must be a value between 1 and 2!") + def get_input(self, channel_num: Union[OutputChannel, int]) -> str: + """ + Sends a command to the Extron SMP device to get the current input for the specified channel. + + Args: + channel_num (Union[OutputChannel, int]): The channel number to get the input for. This can be either an + integer value or an OutputChannel enum value. + + Returns: + str: The current input for the specified channel, as a string. + + Raises: + TelnetError: If there was an error communicating with the Extron SMP device. + """ + channel_num = self._get_number_from_enum(channel_num, SMP35x.OutputChannel) self.tn.write(f"{channel_num}!\n") return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) @@ -524,19 +707,35 @@ class SMP35x(TelnetAdapter, RecorderAdapter): self.tn.write("32I\n") return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) - """ - Input configuration part skipped - """ + ### Input configuration part skipped def stop_recording(self): + """ + Sends a command to stop recording on the Extron SMP device and returns the response. + + Returns: + str: The response from the device. + """ self.tn.write(f"{self.esc_char}Y0RCDR\n") return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def start_recording(self): + """ + Sends a command to start recording on the Extron SMP device and returns the response. + + Returns: + A string containing the response from the device. + """ self.tn.write(f"{self.esc_char}Y1RCDR\n") return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def pause_recording(self): + """ + Sends a command to pause recording on the Extron SMP recorder. + + Returns: + str: The response from the recorder as a string. + """ self.tn.write(f"{self.esc_char}Y2RCDR\n") return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) @@ -553,6 +752,9 @@ class SMP35x(TelnetAdapter, RecorderAdapter): return int(TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())) def is_recording(self) -> bool: + """ + Returns True if the Extron SMP is currently recording, False otherwise. + """ return self.get_recording_status() == 1 def extent_recording_time(self, extension_time: int): @@ -567,26 +769,62 @@ class SMP35x(TelnetAdapter, RecorderAdapter): return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def add_chapter_marker(self): + """ + Sends a command to the Extron SMP recorder to add a chapter marker to the current recording. + + Returns: + str: A response string from the recorder indicating success or failure. + """ self.tn.write(f"{self.esc_char}BRCDR\n") return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def swap_channel_positions(self): + """ + Sends a command to the Extron SMP device to swap the positions of the current input and output channels. + + Returns: + str: The response from the device after sending the command. + """ self.tn.write("%\n") return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def get_recording_status_text(self): + """ + Sends the 'I' command to the Extron SMP device to retrieve the current recording status text. + + Returns: + A string representing the current recording status text. + """ self.tn.write("I\n") return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def get_elapsed_recording_time(self): + """ + Sends a command to the Extron SMP recorder to retrieve the elapsed recording time. + + Returns: + A string representing the elapsed recording time in the format "HH:MM:SS". + """ self.tn.write("35I\n") return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def get_remaining_recording_time(self): + """ + Sends a command to the Extron SMP device to retrieve the remaining recording time. + + Returns: + str: The remaining recording time in the format "HH:MM:SS". + """ self.tn.write("36I\n") return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def get_recording_destination(self): + """ + Sends a command to the Extron SMP device to retrieve the current recording destination. + + Returns: + str: The recording destination as a string. + """ self.tn.write("37I\n") return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) @@ -678,6 +916,10 @@ class SMP35x(TelnetAdapter, RecorderAdapter): return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def get_input_presets(self): + """ + Sends a command to the Extron SMP device to retrieve the available input presets. + Returns the response from the device as a string. + """ self.tn.write("51#\n") return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) @@ -963,36 +1205,20 @@ class SMP35x(TelnetAdapter, RecorderAdapter): return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) # Audio settings - def _get_audio_channel_number( - self, channel_number: Union["SMP35x.AudioChannels", int] - ): - if isinstance(channel_number, SMP35x.AudioChannels): - return channel_number.value - elif isinstance(channel_number, int): - if channel_number in iter(SMP35x.AudioChannels): - return channel_number - raise ValueError( - f"channel_number must be a SMP35x.AudioChannels or one of " - f"{','.join([str(x.value) for x in iter(SMP35x.AudioChannels)])}, " - f"but was {channel_number}" - ) - else: - raise TypeError("channel_number must be a SMP35x.AudioChannels or int") - def mute_audio_channel(self, channel_number: Union["SMP35x.AudioChannels", int]): - num = self._get_audio_channel_number(channel_number) + num = self._get_number_from_enum(channel_number, SMP35x.AudioChannels) self.tn.write(f"{self.esc_char}M{num}*1AU\n") return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def unmute_audio_channel(self, channel_number: Union["SMP35x.AudioChannels", int]): - num = self._get_audio_channel_number(channel_number) + num = self._get_number_from_enum(channel_number, SMP35x.AudioChannels) self.tn.write(f"{self.esc_char}M{num}*0AU\n") return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()) def is_audio_channel_muted( self, channel_number: Union["SMP35x.AudioChannels", int] ): - num = self._get_audio_channel_number(channel_number) + num = self._get_number_from_enum(channel_number, SMP35x.AudioChannels) self.tn.write(f"{self.esc_char}M{num}AU\n") return ( int(TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())) diff --git a/backend/tests/extron_smp_testing.py b/backend/tests/extron_smp_testing.py index 0fe8e6c..b5526ca 100644 --- a/backend/tests/extron_smp_testing.py +++ b/backend/tests/extron_smp_testing.py @@ -13,18 +13,27 @@ pw = "123mzsmp" def print_tn(tn_response): + """ + Prints the given TN response, decoding it if necessary. + + Args: + tn_response (bytes or str): The TN response to print. + + Returns: + None + """ if isinstance(tn_response, bytes): print(tn_response.decode("ascii").rstrip()) else: print(str(tn_response).rstrip()) -def run_cmd(tn, cmd, timeout=1): - tn.write(cmd) - out = tn.read_until_non_empty_line() +def run_cmd(telnet_con, cmd, _timeout=1): + telnet_con.write(cmd) + out = telnet_con.read_until_non_empty_line() res = out while out is not None and out != "": - out = tn.read_until_non_empty_line() + out = telnet_con.read_until_non_empty_line() print(out) res += out return res diff --git a/backend/tools/exception_decorator.py b/backend/tools/exception_decorator.py index 3d67194..f61055c 100644 --- a/backend/tools/exception_decorator.py +++ b/backend/tools/exception_decorator.py @@ -1,14 +1,29 @@ +# pylint: disable=missing-module-docstring,missing-function-docstring,missing-class-docstring from backend import LrcException def exception_decorator(*exceptions): + """ + A decorator that catches specified exceptions and raises them as LrcExceptions. + + Args: + *exceptions: A variable-length argument list of exceptions to catch. + + Returns: + A decorator function that can be applied to other functions. + + Example: + @exception_decorator(ValueError, TypeError) + def my_function(): + # code here + """ def decorator(func): def new_func(*args, **kwargs): try: ret = func(*args, **kwargs) return ret except exceptions as e: - raise LrcException(e) + raise LrcException(e) from e return new_func