# Copyright (c) 2019. Tobias Kurze import shutil import time from datetime import datetime from pprint import pprint import requests from requests.auth import HTTPBasicAuth from requests.exceptions import ConnectionError from backend import LrcException from backend.recorder_adapters import RecorderAdapter # HOST = "localhost" from backend.tools.exception_decorator import exception_decorator BASE_URL = "http://172.23.8.102" # Audimax SMP 351 USER = "admin" PW = "lrgrashof+-" class EpiphanV1(RecorderAdapter): def __init__(self, url: str, admin_user: str, admin_pw: str): if not url.startswith('http'): url = 'http://' + url self.url = url self.user = admin_user self.password = admin_pw self.session = requests.Session() self.session.auth = HTTPBasicAuth(self.user, self.password) def _get_name(self): pass def _get_version(self): pass @exception_decorator(ConnectionError) def get_recording_status(self) -> dict: res = self.session.get(self.url + "/admin/ajax/recorder_status.cgi") if res.ok: return res.json() raise LrcException(res.text, res.status_code) @exception_decorator(ConnectionError) def get_sysinfo(self) -> dict: res = self.session.get(self.url + "/ajax/sysinfo.cgi") if res.ok: return res.json() raise LrcException(res.text, res.status_code) def is_recording(self) -> bool: state = self.get_recording_status().get('state', None) return state == "up" def get_recording_time(self): """ Returns recording time in seconds. Also returns 0 if not recording. :return: """ return self.get_recording_status().get('seconds', None) def start_recording(self): res = self.session.get(self.url + "/admin/ajax/start_recorder.cgi") if not res.ok: raise LrcException(res.text, res.status_code) time.sleep(2) # just a little bit of waiting time -> it takes a bit for the Epiphan to update its state def stop_recording(self): res = self.session.get(self.url + "/admin/ajax/stop_recorder.cgi") if not res.ok: raise LrcException(res.text, res.status_code) time.sleep(4) # just a little bit of waiting time -> it takes a bit for the Epiphan to update its state def get_ip_address(self): try: return self.get_sysinfo().get("system").get("network").get("interfaces")[0].get('ipaddr', None) except Exception as err: raise LrcException(str(err)) def get_disk_space(self): try: data = self.get_sysinfo().get("system").get("data") return {'available': data.get("available", None), 'free': data.get("free", None), 'total': data.get("total", None), 'used': data.get("total", 0) - data.get("available", 0)} except Exception as err: raise LrcException(str(err)) def get_video_inputs(self) -> list: ret = [] try: video = self.get_sysinfo().get("inputs").get("video") for v in video: ret.append( {'id': v.get('id', None), 'name': v.get('name', None), 'resolution': v.get('resolution', None)}) return ret except Exception as err: raise LrcException(str(err)) def get_hardware_revision(self): try: return self.get_sysinfo().get("system").get("firmware") except Exception as err: raise LrcException(str(err)) def get_system_time(self): try: time_stamp = self.get_sysinfo().get("time") return {'unix_time_stamp': time_stamp, 'date_time_utc': datetime.utcfromtimestamp(time_stamp).strftime('%Y-%m-%dT%H:%M:%SZ')} except Exception as err: raise LrcException(str(err)) def get_screenshot(self): ret = self.session.get(self.url + "/admin/grab_frame.cgi?size=256x192&device=DAV93133.vga&_t=1573471990578", stream=True) print(ret) pprint(ret.headers) with open('out.jpg', 'wb') as out_file: shutil.copyfileobj(ret.raw, out_file) del ret if __name__ == '__main__': e = EpiphanV1(BASE_URL, USER, PW) try: # print(e.is_recording()) """ if e.is_recording(): e.stop_recording() else: e.start_recording() print(e.is_recording()) """ """ print(e.get_ip_address()) print(e.get_disk_space()) print(e.get_video_inputs()) print(e.get_hardware_revision()) print(e.get_system_time()) """ e.get_screenshot() except LrcException as e: print(e)