import json import os import logging from io import StringIO import requests from requests.auth import HTTPBasicAuth from multiprocessing.pool import ThreadPool from multiprocessing.context import TimeoutError from pprint import pprint from ics import Calendar from backend import LrcException from backend.config import Config from backend.recorder_adapters import RecorderAdapter from backend.recorder_adapters.epiphan_base import EpiphanV1 from backend.recorder_adapters.extron_smp import SMP from backend.tools.send_mail import send_error_mail logger = logging.getLogger("lrc.tools.simple_state_checker") rec_err_state_log_stream = StringIO() rec_err_state_log_stream_handler = logging.StreamHandler(stream=rec_err_state_log_stream) rec_err_state_log_stream_handler.setLevel(logging.ERROR) logger.addHandler(rec_err_state_log_stream_handler) base_url = "https://opencast.bibliothek.kit.edu" session = requests.session() session.auth = HTTPBasicAuth(Config.OPENCAST_USER, Config.OPENCAST_PW) config = {'service_urls': {}} recorders = None def get_service_url(service_type: str): if service_type in config['service_urls']: return config['service_urls'][service_type] params = {'serviceType': service_type} url = base_url + "/services/available.json" res = session.get(url, params=params) if res.ok: service = res.json()["services"]["service"] config["service_urls"][service_type] = service["host"] + \ service["path"] return service["host"] + service["path"] return None def get_calender(rec_id): params = {'agentid': rec_id} url = get_service_url('org.opencastproject.scheduler') + "/calendars" res = session.get(url, params=params) if res.ok: return Calendar(res.text) def get_capture_agents(): url = get_service_url("org.opencastproject.capture.admin") + "/agents.json" res = session.get(url) if res.ok: return res.json()["agents"]["agent"] def get_recorder_details(): """Temporary implementation using initial_recorders.json. Should be replaced by DB layer later!""" global recorders if recorders is None: f = os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir, 'models', 'initial_recorders.json')) with open(f, 'r') as json_file: recorders = json.load(json_file)['recorders'] return recorders def get_recorder_by_name(name: str): for r in get_recorder_details(): if r["name"] == name: return r return None def notify_users_of_problem(msg: str): pass def get_recorder_adapter(recorder_info: dict) -> RecorderAdapter: if "SMP" in recorder_info["type"]: rec = SMP(recorder_info['ip'], recorder_info['password']) else: rec = EpiphanV1(recorder_info['ip'], recorder_info["username"], recorder_info["password"]) return rec def check_capture_agent_state(a: dict): logger.debug("Checking Agent {}".format(a['name'])) c = get_calender(a['name']) is_recording_in_calendar = len(list(c.timeline.now())) >= 1 if is_recording_in_calendar: logger.info("{} has entry in Calender and should therefore be recording... checking now!".format(a['name'])) if a['state'] == "capturing": recorder_info = get_recorder_by_name(a['name']) try: rec = get_recorder_adapter(recorder_info) if rec.is_recording(): logger.info("OK – recorder {} is recording :)".format(a['name'])) else: logger.info(rec.get_recording_status()) logger.error("FATAL - recorder {} must be recording but is not!!!!".format(a['name'])) except LrcException as e: logger.fatal("Exception occurred: {}".format(str(e))) logger.error("Could not check state of recorder {}, Address: {}".format(a['name'], recorder_info['ip'])) else: logger.error("FATAL: {} is not in capturing state...but should be!!".format(a['name'])) else: recorder_info = get_recorder_by_name(a['name']) try: rec = get_recorder_adapter(recorder_info) if rec.is_recording(): logger.error("FATAL - recorder must not be recording!!!!") else: logger.info("OK – recorder is not recording :)") except LrcException as e: logger.fatal("Exception occurred: {}".format(str(e))) logger.error("Could not check state of recorder {}, Address: {}".format(a['name'], recorder_info['ip'])) agents = get_capture_agents() logger.info("Got {} capture agents that will be checked...".format(len(agents))) # pool = ThreadPool(5) # pool.map(check_capture_agent_state, agents) NUM_THREADS = 5 with ThreadPool(NUM_THREADS) as pool: results = [pool.apply_async(check_capture_agent_state, (agent,)) for agent in agents] try: [res.get(timeout=12) for res in results] except TimeoutError as e: logger.error("Timeout while getting capture agent state! {}".format(e)) logger.info("DONE checking capture agents / recorders!") logged_events = rec_err_state_log_stream.getvalue() if len(logged_events) > 0: send_error_mail(logged_events, "Errors have been detected while checking recorder states!")