import json import os import logging import subprocess import threading import time from io import StringIO from logging.handlers import MemoryHandler from pprint import pprint from typing import Union import requests from requests.auth import HTTPBasicAuth from requests.exceptions import ConnectionError from multiprocessing.pool import ThreadPool from multiprocessing.context import TimeoutError from ics import Calendar from backend import LrcException from backend.config import Config from backend.models import Recorder, RecorderModel from backend.recorder_adapters import RecorderAdapter from backend.recorder_adapters.epiphan_base import Epiphan from backend.recorder_adapters.extron_smp import SMP35x from backend.tools.recorder_streams_sanity_checks import check_frame_is_valid, check_if_audio_is_valid from backend.tools.send_mail import send_error_mail, get_smtp_error_handler from backend.tools.exception_decorator import exception_decorator logger = logging.getLogger("lrc.tools.simple_state_checker") smtp_error_handler = get_smtp_error_handler(subject="Errors have been detected while checking recorder states!") # mem_handler = MemoryHandler(capacity=100, flushLevel=logging.FATAL, target=smtp_error_handler) # mem_handler.setLevel(logging.WARNING) rec_err_state_log_stream = StringIO() rec_err_state_log_stream_handler = logging.StreamHandler(stream=rec_err_state_log_stream) rec_err_state_log_stream_handler.setLevel(logging.WARNING) logger.addHandler(rec_err_state_log_stream_handler) # logger.addHandler(mem_handler) session = requests.session() session.auth = HTTPBasicAuth(Config.OPENCAST_USER, Config.OPENCAST_PW) config = {'service_urls': {}} recorders = None agent_states_lock = threading.RLock() agent_states = {} @exception_decorator(ConnectionError) def get_service_url(service_type: str): if service_type in config['service_urls']: return config['service_urls'][service_type] params = {'serviceType': service_type} url = Config.OPENCAST_URL + "/services/available.json" res = session.get(url, params=params) if res.ok: service = res.json()["services"]["service"] config["service_urls"][service_type] = service["host"] + \ service["path"] return service["host"] + service["path"] return None @exception_decorator(ConnectionError) def get_calender(rec_id): params = {'agentid': rec_id} url = get_service_url('org.opencastproject.scheduler') + "/calendars" res = session.get(url, params=params) if res.ok: return Calendar(res.text) raise LrcException(res.text, res.status_code) @exception_decorator(ConnectionError) def get_capture_agents(): url = get_service_url("org.opencastproject.capture.admin") + "/agents.json" res = session.get(url) if res.ok: return res.json()["agents"]["agent"] raise LrcException(res.text, res.status_code) def get_recorder_details_old(): """Temporary implementation using initial_recorders.json. Should be replaced by DB layer later!""" global recorders if recorders is None: f = os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir, 'models', 'initial_recorders.json')) with open(f, 'r') as json_file: recorders = json.load(json_file)['recorders'] return recorders def get_recorder_details(): """New implementation using DB""" global recorders if recorders is None: recorders = list(Recorder.get_all()) return recorders def get_recorder_by_name(name: str): for r in get_recorder_details(): logger.debug("Got recorder {}".format(r.get("name"))) if r.get("name") == name or r.get("name") + " Recorder" == name or r.get("name") == name + " Recorder": return r logger.error("Could not find recorder for name {}".format(name)) return None def notify_users_of_problem(msg: str): pass def get_recorder_adapter(recorder_info: Union[dict, Recorder]) -> RecorderAdapter: if recorder_info is None: raise LrcException("Could not find recorder Adapter as recorder info was NONE!") try: type = recorder_info.get("type") except KeyError: type = RecorderModel.get_by_id(recorder_info.get('recorder_model_id')).model_name try: if "SMP" in type: rec = SMP35x(recorder_info.get('ip'), recorder_info.get('password')) else: rec = Epiphan(recorder_info.get('ip'), recorder_info.get("username"), recorder_info.get("password")) return rec except LrcException: raise def check_stream_sanity(recorder_agent: Union[Recorder, dict], recorder_adapter: RecorderAdapter = None): try: if recorder_adapter is None: recorder_info = get_recorder_by_name(recorder_agent.get('name')) recorder_adapter = get_recorder_adapter(recorder_info) if not recorder_adapter.is_recording(): return True, "not recording, so there is no stream!", recorder_agent.get('name') except LrcException: return False, "Could not determine if recorder is recording!", recorder_agent.get('name') if recorder_agent.get('archive_stream1') is None and recorder_agent.get( 'archive_stream2') is None: # fall back to default names and rtsp archive_stream_1_url = "rtsp://{}/{}".format(recorder_adapter.address, Config.DEFAULT_ARCHIVE_STREAM_1_NAME) archive_stream_2_url = "rtsp://{}/{}".format(recorder_adapter.address, Config.DEFAULT_ARCHIVE_STREAM_2_NAME) else: archive_stream_1_url = recorder_agent.get('archive_stream1') archive_stream_2_url = recorder_agent.get('archive_stream2') frame_msg = frame_msg2 = sound_msg = sound_msg2 = "unknown" for i in range(0, Config.STREAM_SANITY_CHECK_RETRIES): frame_ok, frame_msg = check_frame_is_valid(archive_stream_1_url, raise_errors=False) if not frame_ok: logger.warning( "Archive stream 1 ({}) of {} is not ok (frame): {}".format(archive_stream_1_url, recorder_agent.get('name'), frame_msg)) frame_ok, frame_msg2 = check_frame_is_valid(archive_stream_2_url, raise_errors=False) if not frame_ok: logger.warning( "Archive stream 2 {} of ({}) is not ok: {} (frame)".format(archive_stream_2_url, recorder_agent.get('name'), frame_msg2)) sound_ok, sound_msg = check_if_audio_is_valid(archive_stream_1_url, raise_errors=False) if not sound_ok: logger.warning( "Archive stream 1 {} of ({}) is not ok (audio): {}".format(archive_stream_1_url, recorder_agent.get('name'), sound_msg)) sound_ok, sound_msg2 = check_if_audio_is_valid(archive_stream_2_url, raise_errors=False) if not sound_ok: logger.warning( "Archive stream 2 {} of ({}) is not ok (audio): {}".format(archive_stream_2_url, recorder_agent.get('name'), sound_msg2)) if frame_ok and sound_ok: return True, "At least one archive stream is fine (audio and image)! " \ "(s1: a: {}, v: {}, s2: a: {}, v: {})".format(sound_msg, frame_msg, sound_msg2, frame_msg2), recorder_agent.get('name') else: time.sleep(Config.STREAM_SANITY_CHECK_INTERVAL_SEC) error_msg = "After {} retries, stream checks failed and returned: archive_stream1: audio:{}, frame:{}, " \ "archive_stream2: audio:{}, frame:{}".format(Config.STREAM_SANITY_CHECK_RETRIES, sound_msg, frame_msg, sound_msg2, frame_msg2) logger.error(error_msg) return False, error_msg, recorder_agent.get('name') # stream sanity check failed! def check_capture_agent_state(recorder_agent: Union[Recorder, dict]): if recorder_agent.get('offline', False): logger.info("OK - Recorder {} is in offline / maintenance mode".format(recorder_agent.get('name'))) return True, "Recorder is in offline / maintenance mode", recorder_agent.get('name') agent_state_error_msg = None logger.debug("Checking Agent {}".format(recorder_agent.get('name'))) try: c = get_calender(recorder_agent.get('name')) except LrcException: error_msg = "Could not get calender of recorder agent: {}!".format(recorder_agent.get('name')) logger.fatal(error_msg) return False, error_msg, recorder_agent.get('name') is_recording_in_calendar = len(list(c.timeline.now())) >= 1 if is_recording_in_calendar: logger.info("{} has entry in Calender and should therefore be recording... checking now!".format( recorder_agent.get('name'))) if recorder_agent['state'] == "capturing": recorder_info = get_recorder_by_name(recorder_agent.get('name')) try: rec = get_recorder_adapter(recorder_info) if rec.is_recording(): logger.info("OK – recorder {} is recording :)".format(recorder_agent.get('name'))) with agent_states_lock: agent_states[recorder_agent.get('name')] = 'OK - recorder is recording' else: logger.info(rec.get_recording_status()) logger.error( "FATAL - recorder {} must be recording but is not!!!!".format(recorder_agent.get('name'))) agent_state_error_msg = "FATAL - recorder must be recording but is not!" with agent_states_lock: agent_states[recorder_agent['name']] = 'FATAL - recorder is NOT recording, but should!' except LrcException as e: logger.fatal("Exception occurred: {}".format(str(e))) logger.error("Could not check state of recorder {}, Address: {}".format(recorder_agent.get('name'), recorder_info.get('ip'))) else: logger.error("FATAL: {} is not in capturing state...but should be!!".format(recorder_agent.get('name'))) agent_state_error_msg = "FATAL - is not in capturing state...but should be!" else: recorder_info = get_recorder_by_name(recorder_agent.get('name')) try: rec = get_recorder_adapter(recorder_info) if rec.is_recording(): logger.error("FATAL - recorder must not be recording!!!!") agent_state_error_msg = "FATAL - is not in capturing state...but should be!" with agent_states_lock: agent_states[recorder_agent.get('name')] = 'FATAL - recorder IS recording, but should NOT!' else: logger.info("OK – recorder is not recording :)") with agent_states_lock: agent_states[recorder_agent.get('name')] = 'OK - recorder is NOT recording' except LrcException as e: logger.fatal("Exception occurred: {}".format(str(e))) logger.error("Could not check state of recorder {}, Address: {}".format(recorder_agent.get('name'), recorder_info.get('ip'))) agent_state_error_msg = "FATAL - Could not check state of recorder! Address: {}".format( recorder_info.get('ip')) if agent_state_error_msg is None: return True, agent_states[recorder_agent.get('name')], recorder_agent.get('name') return False, agent_state_error_msg, recorder_agent.get('name') def ping_capture_agent(recorder_agent: Union[Recorder, dict]): if recorder_agent.get('offline', False): print("is offline!") logger.info("OK - Ping skipped, recorder {} is in offline mode.".format(recorder_agent.get('name'))) return True, "Recorder is in offline / maintenance mode", recorder_agent.get('name') recorder_ip = get_recorder_by_name(recorder_agent.get('name')).get('ip') try: subprocess.check_call( ['ping', '-W', '10', '-c', '2', recorder_ip], # stderr=subprocess.STDOUT, # get all output stdout=subprocess.DEVNULL, # suppress output stderr=subprocess.DEVNULL, universal_newlines=True # return string not bytes ) logger.info("Successfully pinged {} ({}). :-)".format(recorder_agent.get('name'), recorder_ip)) return True, "Successfully pinged {}. :-)".format(recorder_agent.get('name')), recorder_agent.get('name') except subprocess.CalledProcessError: logger.error("Can not ping {} ({})!!".format(recorder_agent.get('name'), recorder_ip)) return False, "Unable to ping {} ({})".format(recorder_agent.get('name'), recorder_ip), recorder_agent.get( 'name') if __name__ == '__main__': agents = get_capture_agents() logger.info("Got {} capture agents that will be checked...".format(len(agents))) for a in agents: agent_states[a.get('name')] = 'PROBLEMATIC - unknown' # pool = ThreadPool(5) # pool.map(check_capture_agent_state, agents) NUM_THREADS = 8 recorders = get_recorder_details() with ThreadPool(NUM_THREADS) as pool: # results = [pool.apply_async(ping_capture_agent, (agent,)) for agent in agents] results = [pool.apply_async(ping_capture_agent, (agent,)) for agent in recorders] try: [res.get(timeout=12) for res in results] except TimeoutError as e: logger.error("Timeout while pinging capture agent! {}".format(e)) with ThreadPool(NUM_THREADS) as pool: results = [pool.apply_async(check_capture_agent_state, (agent,)) for agent in agents] try: [res.get(timeout=12) for res in results] except TimeoutError as e: logger.error("Timeout while getting capture agent state! {}".format(e)) logger.info("DONE checking capture agents / recorders!") logged_events = rec_err_state_log_stream.getvalue() if len(logged_events) > 0: logged_events += "\n\n=============\nAgent States:\n\n{}".format(''.join( "{:<48}: {}\n".format(a, agent_states[a]) for a in agent_states )) send_error_mail(logged_events, "Errors have been detected while checking recorder states!") # mem_handler.close()