added a lot of enums for SMP parameters

This commit is contained in:
2023-10-25 16:24:03 +02:00
parent b60d89ce0a
commit 2d0a5c2974
3 changed files with 349 additions and 99 deletions

View File

@@ -1,9 +1,10 @@
# pylint: disable=too-many-lines
"""
Recorder Adapter for SMP
"""
import enum
import logging
from typing import Union
from typing import TypeVar, Union
from backend import LrcException
from backend.recorder_adapters import telnetlib, TelnetAdapter, RecorderAdapter
@@ -31,11 +32,54 @@ PW = "123mzsmp"
class SMP35x(TelnetAdapter, RecorderAdapter):
"""
A class representing the Extron SMP recorder.
Attributes:
AudioChannels (enum.Enum): Enum representing the available audio channels on the Extron SMP recorder.
"""
S = TypeVar("S", bound=enum.IntEnum)
def _get_number_from_enum(
self, number_or_enum: Union[S, int], enum_class: S
) -> int:
if isinstance(number_or_enum, enum.IntEnum):
return number_or_enum.value
elif isinstance(number_or_enum, int):
if number_or_enum in iter(enum_class):
return number_or_enum
raise ValueError(
f"number must be a {enum_class} or one of "
f"{','.join([str(x.value) for x in iter(enum_class)])}, "
f"but was {number_or_enum}"
)
else:
raise TypeError(f"channel_number must be a {enum_class} or int")
class InputNumber(enum.IntEnum):
"""
An enumeration representing the input numbers for an Extron SMP recorder.
Attributes:
Input_1 (int): The input number for Input 1.
Input_2 (int): The input number for Input 2.
Input_3 (int): The input number for Input 3.
Input_4 (int): The input number for Input 4.
Input_5 (int): The input number for Input 5.
"""
INPUT_1 = 1
INPUT_2 = 2
INPUT_3 = 3
INPUT_4 = 4
INPUT_5 = 5
class OutputChannel(enum.IntEnum):
"""
Enum representing the output channels of an Extron SMP recorder.
Attributes:
A (int): The first output channel.
B (int): The second output channel.
"""
A = 1 # channel A
B = 2 # channel B
class AudioChannels(enum.IntEnum):
"""
Enum representing the available audio channels on the Extron SMP recorder.
@@ -64,6 +108,51 @@ class SMP35x(TelnetAdapter, RecorderAdapter):
OUTPUT_LEFT = 60000
OUTPUT_RIGHT = 60001
class VerboseMode(enum.IntEnum):
"""
Enum class representing the different verbose modes for Extron SMP devices.
Attributes:
CLEAR_NONE (int): Default mode for telnet connections, clears non-essential information.
VERBOSE (int): Default mode for USB and RS-232 host control, provides verbose information.
TAGGED (int): Provides tagged responses for queries.
VERBOSE_TAGGED (int): Provides verbose information and tagged responses for queries.
"""
CLEAR_NONE = 0
VERBOSE = 1
TAGGED = 2
VERBOSE_TAGGED = 3
class FrontPanelLockMode(enum.IntEnum):
"""
Enumeration of the front panel lock modes for an Extron SMP recorder.
Attributes:
OFF (int): No front panel lock.
COMPLETE_LOCKOUT (int): Complete front panel lockout.
MENU_LOCKOUT (int): Lockout of front panel menu controls.
RECORDING_CONTROLS_ONLY (int): Lockout of all front panel controls except
for recording controls.
"""
OFF = 0
COMPLETE_LOCKOUT = 1
MENU_LOCKOUT = 2
RECORDING_CONTROLS_ONLY = 3
class ConfigurationType(enum.IntEnum):
"""
An enumeration of the types of configurations on an Extron SMP device.
Attributes:
IP_CONFIG (int): The configuration type for IP settings.
BOX_SPECIFIC_CONFIG (int): The configuration type for device-specific settings.
"""
IP_CONFIG = 0
BOX_SPECIFIC_CONFIG = 2
def __init__(self, address, password, auto_login=True, **_kwargs):
"""
Initializes a new instance of the SMP35x class.
@@ -297,67 +386,108 @@ class SMP35x(TelnetAdapter, RecorderAdapter):
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def reset_unit_name(self):
"""
Resets the unit name of the Extron SMP recorder.
Returns:
str: The response from the recorder after resetting the unit name.
"""
self.tn.write(self.esc_char + " CN\n")
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_unit_name(self):
"""
Sends a command to the Extron SMP device to retrieve its unit name.
Returns:
str: The unit name of the Extron SMP device.
"""
self.tn.write(self.esc_char + "CN\n")
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_telnet_connections(self):
"""
Sends a command to the Extron SMP device to retrieve a list of
active Telnet connections, and returns the response.
Returns:
str: A string containing the response from the device.
"""
self.tn.write(self.esc_char + "CC\n")
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def set_verbose_mode(self, mode: int):
def set_verbose_mode(self, mode: Union[VerboseMode, int]):
"""
Mode:
0=clear/none (default for telnet connections
1=verbose mode (default for USB and RS-232 host control)
2=tagged responses for queries
3=verbose mode and tagged responses for queries
:param mode:
:return:
Sets the verbose mode of the Extron SMP recorder.
Args:
mode (Union[VerboseMode, int]): The verbose mode to set. This can be either a
`VerboseMode` enum value or an integer representing the verbose mode.
Returns:
str: The response from the recorder after setting the verbose mode.
"""
if mode not in range(4):
raise LrcException("Only values from 0 to 3 are allowed!")
mode = self._get_number_from_enum(mode, SMP35x.VerboseMode)
self.tn.write(self.esc_char + str(mode) + "CV\n")
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_verbose_mode(self):
"""
Sends the 'CV' command to the Extron SMP device to get the current verbose mode setting.
Returns:
str: The response from the device, indicating the current verbose mode setting.
"""
self.tn.write(self.esc_char + "CV\n")
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def save_configuration(self, mode: int = 2):
def save_configuration(self, config_type: Union[ConfigurationType, int] = 2):
"""
Mode:
0 = IP config
2 = Box specific parameters
:param mode:
:return:
Saves the current configuration of the Extron SMP device to non-volatile memory.
Args:
config_type (Union[ConfigurationType, int], optional): Type of configuration to save.
Defaults to 2.
Returns:
str: The response from the device after the configuration is saved.
"""
if mode not in [0, 2]:
raise LrcException("Only values 0 and 2 are allowed!")
self.tn.write(self.esc_char + f"1*{mode}XF\n")
config_type = self._get_number_from_enum(config_type, SMP35x.ConfigurationType)
self.tn.write(self.esc_char + f"1*{config_type}XF\n")
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def restore_configuration(self, mode: int = 2):
def restore_configuration(self, config_type: Union[ConfigurationType, int] = 2):
"""
Mode:
0 = IP config
2 = Box specific parameters
:param mode:
:return:
Restores the configuration of the Extron SMP device to the specified configuration type.
Args:
config_type (Union[ConfigurationType, int], optional): Configuration type to restore.
Defaults to 2.
Returns:
str: The response from the device.
"""
if mode not in [0, 2]:
raise LrcException("Only values 0 and 2 are allowed!")
self.tn.write(self.esc_char + f"0*{mode}XF\n")
config_type = self._get_number_from_enum(config_type, SMP35x.ConfigurationType)
self.tn.write(self.esc_char + f"0*{config_type}XF\n")
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def reboot(self):
"""
Reboots the Extron SMP device by sending the '1BOOT' command over Telnet.
Returns:
str: The response from the device after sending the '1BOOT' command.
"""
self.tn.write(self.esc_char + "1BOOT\n")
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def restart_network(self):
"""
Restarts the Extron SMP's network connection by sending the '2BOOT' command over Telnet.
Returns:
str: The response from the Extron SMP after sending the '2BOOT' command.
"""
self.tn.write(self.esc_char + "2BOOT\n")
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
@@ -393,17 +523,17 @@ class SMP35x(TelnetAdapter, RecorderAdapter):
self.tn.write(self.esc_char + "ZQQQ\n")
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def set_front_panel_lock(self, mode: int):
def set_front_panel_lock(self, mode: Union["SMP35x.FrontPanelLockMode", int]):
"""
0=Off
1=complete lockout (no front panel control)
2=menu lockout
3=recording controls
:param mode: Execute mode int code
:return:
Sets the front panel lock mode of the Extron SMP device.
Args:
mode (Union["SMP35x.FrontPanelLockMode", int]): The front panel lock mode to set.
Returns:
str: The response from the device after setting the front panel lock mode.
"""
if mode not in range(4):
raise LrcException("Only values from 0 to 3 are allowed!")
mode = self._get_number_from_enum(mode, SMP35x.FrontPanelLockMode)
self.tn.write(str(mode) + "X\n")
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
@@ -419,63 +549,116 @@ class SMP35x(TelnetAdapter, RecorderAdapter):
self.tn.write("X\n")
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
"""
A lot of stuff related to network settings (ports of services, SNMP, IP, DHCP, etc.)
Only some stuff will be implemented here!
"""
### A lot of stuff related to network settings (ports of services, SNMP, IP, DHCP, etc.)
### Only some stuff will be implemented here!
"""
def get_date_time(self):
pass
"""
Gets the current date and time from the Extron SMP recorder.
Returns:
A datetime object representing the current date and time.
"""
raise NotImplementedError("get_date_time not implemented yet!")
def get_time_zone(self):
pass
"""
Gets the time zone of the Extron SMP recorder.
Returns:
str: The time zone of the recorder.
"""
raise NotImplementedError("get_time_zone not implemented yet!")
def get_dhcp_mode(self):
pass
"""
Gets the current DHCP mode of the Extron SMP recorder.
Returns:
str: The current DHCP mode of the recorder. Possible values are "on", "off", or "auto".
"""
raise NotImplementedError("get_dhcp_mode not implemented yet!")
def get_network_settings(self):
pass
"""
Retrieves the current network settings for the Extron SMP recorder.
Returns:
A dictionary containing the current network settings for the recorder.
"""
raise NotImplementedError("get_network_settings not implemented yet!")
def get_ip_address(self):
pass
"""
Returns the IP address of the Extron SMP recorder.
"""
raise NotImplementedError("get_ip_address not implemented yet!")
def get_mac_address(self):
pass
"""
Gets the MAC address of the Extron SMP recorder.
:return: A string representing the MAC address of the recorder.
"""
raise NotImplementedError("get_mac_address not implemented yet!")
def get_subnet_mask(self):
pass
"""
Gets the subnet mask for the Extron SMP recorder.
:return: The subnet mask as a string.
"""
raise NotImplementedError("get_subnet_mask not implemented yet!")
def get_gateway_ip(self):
pass
"""
Gets the IP address of the gateway device for the Extron SMP recorder.
This method is not yet implemented.
"""
raise NotImplementedError("get_gateway_ip not implemented yet!")
def get_dns_server_ip(self):
pass
"""
"""
RS-232 / serial port related stuff not implemented.
Password and security related stuff not implemented.
File related stuff not implemented. (-> use sftp)
"""
def set_input(self, input_num: int, channel_num: int):
"""
Switches input # (1 to 5) to output channel (1=A [input 1 and 2], 2=B [input 3, 4 and 5])
Returns the IP address of the DNS server configured on the Extron SMP device.
Raises:
NotImplementedError: This method has not been implemented yet.
"""
raise NotImplementedError("get_dns_server_ip not implemented yet!")
### RS-232 / serial port related stuff not implemented.
### Password and security related stuff not implemented.
### File related stuff not implemented. (-> use sftp)
def set_input(self, input_num: Union[InputNumber, int], channel_num: Union[OutputChannel, int]):
"""
Switches input # (1 to 5) to output channel (1=A [input 1 and 2], 2=B [input 3, 4 and 5])
:param input_num:
:param channel_num:
:return:
"""
if input_num not in range(1, 6):
raise LrcException("input_num must be a value between 1 and 5!")
if channel_num not in range(1, 3):
raise LrcException("input_num must be a value between 1 and 2!")
input_num = self._get_number_from_enum(input_num, SMP35x.InputNumber)
channel_num = self._get_number_from_enum(channel_num, SMP35x.OutputChannel)
self.tn.write(f"{input_num}*{channel_num}!\n")
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_input(self, channel_num: int):
if channel_num not in range(1, 2):
raise LrcException("input_num must be a value between 1 and 2!")
def get_input(self, channel_num: Union[OutputChannel, int]) -> str:
"""
Sends a command to the Extron SMP device to get the current input for the specified channel.
Args:
channel_num (Union[OutputChannel, int]): The channel number to get the input for. This can be either an
integer value or an OutputChannel enum value.
Returns:
str: The current input for the specified channel, as a string.
Raises:
TelnetError: If there was an error communicating with the Extron SMP device.
"""
channel_num = self._get_number_from_enum(channel_num, SMP35x.OutputChannel)
self.tn.write(f"{channel_num}!\n")
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
@@ -524,19 +707,35 @@ class SMP35x(TelnetAdapter, RecorderAdapter):
self.tn.write("32I\n")
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
"""
Input configuration part skipped
"""
### Input configuration part skipped
def stop_recording(self):
"""
Sends a command to stop recording on the Extron SMP device and returns the response.
Returns:
str: The response from the device.
"""
self.tn.write(f"{self.esc_char}Y0RCDR\n")
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def start_recording(self):
"""
Sends a command to start recording on the Extron SMP device and returns the response.
Returns:
A string containing the response from the device.
"""
self.tn.write(f"{self.esc_char}Y1RCDR\n")
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def pause_recording(self):
"""
Sends a command to pause recording on the Extron SMP recorder.
Returns:
str: The response from the recorder as a string.
"""
self.tn.write(f"{self.esc_char}Y2RCDR\n")
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
@@ -553,6 +752,9 @@ class SMP35x(TelnetAdapter, RecorderAdapter):
return int(TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()))
def is_recording(self) -> bool:
"""
Returns True if the Extron SMP is currently recording, False otherwise.
"""
return self.get_recording_status() == 1
def extent_recording_time(self, extension_time: int):
@@ -567,26 +769,62 @@ class SMP35x(TelnetAdapter, RecorderAdapter):
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def add_chapter_marker(self):
"""
Sends a command to the Extron SMP recorder to add a chapter marker to the current recording.
Returns:
str: A response string from the recorder indicating success or failure.
"""
self.tn.write(f"{self.esc_char}BRCDR\n")
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def swap_channel_positions(self):
"""
Sends a command to the Extron SMP device to swap the positions of the current input and output channels.
Returns:
str: The response from the device after sending the command.
"""
self.tn.write("%\n")
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_recording_status_text(self):
"""
Sends the 'I' command to the Extron SMP device to retrieve the current recording status text.
Returns:
A string representing the current recording status text.
"""
self.tn.write("I\n")
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_elapsed_recording_time(self):
"""
Sends a command to the Extron SMP recorder to retrieve the elapsed recording time.
Returns:
A string representing the elapsed recording time in the format "HH:MM:SS".
"""
self.tn.write("35I\n")
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_remaining_recording_time(self):
"""
Sends a command to the Extron SMP device to retrieve the remaining recording time.
Returns:
str: The remaining recording time in the format "HH:MM:SS".
"""
self.tn.write("36I\n")
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_recording_destination(self):
"""
Sends a command to the Extron SMP device to retrieve the current recording destination.
Returns:
str: The recording destination as a string.
"""
self.tn.write("37I\n")
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
@@ -678,6 +916,10 @@ class SMP35x(TelnetAdapter, RecorderAdapter):
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def get_input_presets(self):
"""
Sends a command to the Extron SMP device to retrieve the available input presets.
Returns the response from the device as a string.
"""
self.tn.write("51#\n")
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
@@ -963,36 +1205,20 @@ class SMP35x(TelnetAdapter, RecorderAdapter):
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
# Audio settings
def _get_audio_channel_number(
self, channel_number: Union["SMP35x.AudioChannels", int]
):
if isinstance(channel_number, SMP35x.AudioChannels):
return channel_number.value
elif isinstance(channel_number, int):
if channel_number in iter(SMP35x.AudioChannels):
return channel_number
raise ValueError(
f"channel_number must be a SMP35x.AudioChannels or one of "
f"{','.join([str(x.value) for x in iter(SMP35x.AudioChannels)])}, "
f"but was {channel_number}"
)
else:
raise TypeError("channel_number must be a SMP35x.AudioChannels or int")
def mute_audio_channel(self, channel_number: Union["SMP35x.AudioChannels", int]):
num = self._get_audio_channel_number(channel_number)
num = self._get_number_from_enum(channel_number, SMP35x.AudioChannels)
self.tn.write(f"{self.esc_char}M{num}*1AU\n")
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def unmute_audio_channel(self, channel_number: Union["SMP35x.AudioChannels", int]):
num = self._get_audio_channel_number(channel_number)
num = self._get_number_from_enum(channel_number, SMP35x.AudioChannels)
self.tn.write(f"{self.esc_char}M{num}*0AU\n")
return TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line())
def is_audio_channel_muted(
self, channel_number: Union["SMP35x.AudioChannels", int]
):
num = self._get_audio_channel_number(channel_number)
num = self._get_number_from_enum(channel_number, SMP35x.AudioChannels)
self.tn.write(f"{self.esc_char}M{num}AU\n")
return (
int(TelnetAdapter._get_response_str(self.tn.read_until_non_empty_line()))

View File

@@ -13,18 +13,27 @@ pw = "123mzsmp"
def print_tn(tn_response):
"""
Prints the given TN response, decoding it if necessary.
Args:
tn_response (bytes or str): The TN response to print.
Returns:
None
"""
if isinstance(tn_response, bytes):
print(tn_response.decode("ascii").rstrip())
else:
print(str(tn_response).rstrip())
def run_cmd(tn, cmd, timeout=1):
tn.write(cmd)
out = tn.read_until_non_empty_line()
def run_cmd(telnet_con, cmd, _timeout=1):
telnet_con.write(cmd)
out = telnet_con.read_until_non_empty_line()
res = out
while out is not None and out != "":
out = tn.read_until_non_empty_line()
out = telnet_con.read_until_non_empty_line()
print(out)
res += out
return res

View File

@@ -1,14 +1,29 @@
# pylint: disable=missing-module-docstring,missing-function-docstring,missing-class-docstring
from backend import LrcException
def exception_decorator(*exceptions):
"""
A decorator that catches specified exceptions and raises them as LrcExceptions.
Args:
*exceptions: A variable-length argument list of exceptions to catch.
Returns:
A decorator function that can be applied to other functions.
Example:
@exception_decorator(ValueError, TypeError)
def my_function():
# code here
"""
def decorator(func):
def new_func(*args, **kwargs):
try:
ret = func(*args, **kwargs)
return ret
except exceptions as e:
raise LrcException(e)
raise LrcException(e) from e
return new_func