Files
lrc-backend/backend/recorder_adapters/epiphan_base.py

151 lines
4.7 KiB
Python

# Copyright (c) 2019. Tobias Kurze
import shutil
import time
from datetime import datetime
from pprint import pprint
import requests
from requests.auth import HTTPBasicAuth
from requests.exceptions import ConnectionError
from backend import LrcException
from backend.recorder_adapters import RecorderAdapter
# HOST = "localhost"
from backend.tools.exception_decorator import exception_decorator
BASE_URL = "http://172.23.8.102" # Audimax SMP 351
USER = "admin"
PW = "lrgrashof+-"
class EpiphanV1(RecorderAdapter):
def __init__(self, url: str, admin_user: str, admin_pw: str):
if not url.startswith('http'):
url = 'http://' + url
self.url = url
self.user = admin_user
self.password = admin_pw
self.session = requests.Session()
self.session.auth = HTTPBasicAuth(self.user, self.password)
def _get_name(self):
pass
def _get_version(self):
pass
@exception_decorator(ConnectionError)
def get_recording_status(self) -> dict:
res = self.session.get(self.url + "/admin/ajax/recorder_status.cgi")
if res.ok:
return res.json()
raise LrcException(res.text, res.status_code)
@exception_decorator(ConnectionError)
def get_sysinfo(self) -> dict:
res = self.session.get(self.url + "/ajax/sysinfo.cgi")
if res.ok:
return res.json()
raise LrcException(res.text, res.status_code)
def is_recording(self) -> bool:
state = self.get_recording_status().get('state', None)
return state == "up"
def get_recording_time(self):
"""
Returns recording time in seconds. Also returns 0 if not recording.
:return:
"""
return self.get_recording_status().get('seconds', None)
def start_recording(self):
res = self.session.get(self.url + "/admin/ajax/start_recorder.cgi")
if not res.ok:
raise LrcException(res.text, res.status_code)
time.sleep(2) # just a little bit of waiting time -> it takes a bit for the Epiphan to update its state
def stop_recording(self):
res = self.session.get(self.url + "/admin/ajax/stop_recorder.cgi")
if not res.ok:
raise LrcException(res.text, res.status_code)
time.sleep(4) # just a little bit of waiting time -> it takes a bit for the Epiphan to update its state
def get_ip_address(self):
try:
return self.get_sysinfo().get("system").get("network").get("interfaces")[0].get('ipaddr', None)
except Exception as err:
raise LrcException(str(err))
def get_disk_space(self):
try:
data = self.get_sysinfo().get("system").get("data")
return {'available': data.get("available", None), 'free': data.get("free", None),
'total': data.get("total", None), 'used': data.get("total", 0) - data.get("available", 0)}
except Exception as err:
raise LrcException(str(err))
def get_video_inputs(self) -> list:
ret = []
try:
video = self.get_sysinfo().get("inputs").get("video")
for v in video:
ret.append(
{'id': v.get('id', None), 'name': v.get('name', None), 'resolution': v.get('resolution', None)})
return ret
except Exception as err:
raise LrcException(str(err))
def get_hardware_revision(self):
try:
return self.get_sysinfo().get("system").get("firmware")
except Exception as err:
raise LrcException(str(err))
def get_system_time(self):
try:
time_stamp = self.get_sysinfo().get("time")
return {'unix_time_stamp': time_stamp,
'date_time_utc': datetime.utcfromtimestamp(time_stamp).strftime('%Y-%m-%dT%H:%M:%SZ')}
except Exception as err:
raise LrcException(str(err))
def get_screenshot(self):
ret = self.session.get(self.url + "/admin/grab_frame.cgi?size=256x192&device=DAV93133.vga&_t=1573471990578",
stream=True)
print(ret)
pprint(ret.headers)
with open('out.jpg', 'wb') as out_file:
shutil.copyfileobj(ret.raw, out_file)
del ret
if __name__ == '__main__':
e = EpiphanV1(BASE_URL, USER, PW)
try:
# print(e.is_recording())
"""
if e.is_recording():
e.stop_recording()
else:
e.start_recording()
print(e.is_recording())
"""
"""
print(e.get_ip_address())
print(e.get_disk_space())
print(e.get_video_inputs())
print(e.get_hardware_revision())
print(e.get_system_time())
"""
e.get_screenshot()
except LrcException as e:
print(e)