added stream sanity checks (sound, singe color)
This commit is contained in:
259
backend/tools/recorder_state_checker.py
Normal file
259
backend/tools/recorder_state_checker.py
Normal file
@@ -0,0 +1,259 @@
|
||||
import json
|
||||
import os
|
||||
import logging
|
||||
import subprocess
|
||||
import threading
|
||||
import time
|
||||
from io import StringIO
|
||||
from logging.handlers import MemoryHandler
|
||||
from pprint import pprint
|
||||
from typing import Union
|
||||
|
||||
import requests
|
||||
from requests.auth import HTTPBasicAuth
|
||||
from multiprocessing.pool import ThreadPool
|
||||
from multiprocessing.context import TimeoutError
|
||||
|
||||
from ics import Calendar
|
||||
|
||||
from backend import LrcException
|
||||
from backend.config import Config
|
||||
from backend.models import Recorder, RecorderModel
|
||||
from backend.recorder_adapters import RecorderAdapter
|
||||
|
||||
from backend.recorder_adapters.epiphan_base import Epiphan
|
||||
from backend.recorder_adapters.extron_smp import SMP35x
|
||||
from backend.tools.send_mail import send_error_mail, get_smtp_error_handler
|
||||
|
||||
logger = logging.getLogger("lrc.tools.simple_state_checker")
|
||||
|
||||
smtp_error_handler = get_smtp_error_handler(subject="Errors have been detected while checking recorder states!")
|
||||
#mem_handler = MemoryHandler(capacity=100, flushLevel=logging.FATAL, target=smtp_error_handler)
|
||||
#mem_handler.setLevel(logging.WARNING)
|
||||
|
||||
rec_err_state_log_stream = StringIO()
|
||||
rec_err_state_log_stream_handler = logging.StreamHandler(stream=rec_err_state_log_stream)
|
||||
rec_err_state_log_stream_handler.setLevel(logging.WARNING)
|
||||
|
||||
logger.addHandler(rec_err_state_log_stream_handler)
|
||||
#logger.addHandler(mem_handler)
|
||||
|
||||
base_url = "https://opencast.bibliothek.kit.edu"
|
||||
|
||||
session = requests.session()
|
||||
session.auth = HTTPBasicAuth(Config.OPENCAST_USER, Config.OPENCAST_PW)
|
||||
|
||||
config = {'service_urls': {}}
|
||||
|
||||
recorders = None
|
||||
|
||||
agent_states_lock = threading.RLock()
|
||||
agent_states = {}
|
||||
|
||||
|
||||
def get_service_url(service_type: str):
|
||||
if service_type in config['service_urls']:
|
||||
return config['service_urls'][service_type]
|
||||
params = {'serviceType': service_type}
|
||||
url = base_url + "/services/available.json"
|
||||
res = session.get(url, params=params)
|
||||
if res.ok:
|
||||
service = res.json()["services"]["service"]
|
||||
config["service_urls"][service_type] = service["host"] + \
|
||||
service["path"]
|
||||
return service["host"] + service["path"]
|
||||
return None
|
||||
|
||||
|
||||
def get_calender(rec_id):
|
||||
params = {'agentid': rec_id}
|
||||
url = get_service_url('org.opencastproject.scheduler') + "/calendars"
|
||||
res = session.get(url, params=params)
|
||||
if res.ok:
|
||||
return Calendar(res.text)
|
||||
|
||||
|
||||
def get_capture_agents():
|
||||
url = get_service_url("org.opencastproject.capture.admin") + "/agents.json"
|
||||
res = session.get(url)
|
||||
if res.ok:
|
||||
return res.json()["agents"]["agent"]
|
||||
|
||||
|
||||
def get_recorder_details_old():
|
||||
"""Temporary implementation using initial_recorders.json. Should be replaced by DB layer later!"""
|
||||
global recorders
|
||||
if recorders is None:
|
||||
f = os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir, 'models', 'initial_recorders.json'))
|
||||
with open(f, 'r') as json_file:
|
||||
recorders = json.load(json_file)['recorders']
|
||||
return recorders
|
||||
|
||||
|
||||
def get_recorder_details():
|
||||
"""New implementation using DB"""
|
||||
global recorders
|
||||
if recorders is None:
|
||||
recorders = list(Recorder.get_all())
|
||||
return recorders
|
||||
|
||||
|
||||
def get_recorder_by_name(name: str):
|
||||
for r in get_recorder_details():
|
||||
logger.debug("Got recorder {}".format(r.get("name")))
|
||||
if r.get("name") == name or r.get("name") + " Recorder" == name or r.get("name") == name + " Recorder":
|
||||
return r
|
||||
logger.error("Could not find recorder for name {}".format(name))
|
||||
return None
|
||||
|
||||
|
||||
def notify_users_of_problem(msg: str):
|
||||
pass
|
||||
|
||||
|
||||
def get_recorder_adapter(recorder_info: Union[dict, Recorder]) -> RecorderAdapter:
|
||||
if recorder_info is None:
|
||||
raise LrcException("Could not find recorder Adapter as recorder info was NONE!")
|
||||
try:
|
||||
type = recorder_info.get("type")
|
||||
except KeyError:
|
||||
type = RecorderModel.get_by_id(recorder_info.get('recorder_model_id')).model_name
|
||||
if "SMP" in type:
|
||||
rec = SMP35x(recorder_info.get('ip'), recorder_info.get('password'))
|
||||
else:
|
||||
rec = Epiphan(recorder_info.get('ip'), recorder_info.get("username"), recorder_info.get("password"))
|
||||
return rec
|
||||
|
||||
|
||||
def check_stream_sanity(recorder: Recorder, recorder_adapter: RecorderAdapter):
|
||||
if recorder.archive_stream1 is None and recorder.archive_stream2 is None: # fall back to default names and rtsp
|
||||
archive_stream_1_url = "rtsp://{}/{}".format(recorder_adapter.address, Config.DEFAULT_ARCHIVE_STREAM_1_NAME)
|
||||
archive_stream_2_url = "rtsp://{}/{}".format(recorder_adapter.address, Config.DEFAULT_ARCHIVE_STREAM_2_NAME)
|
||||
else:
|
||||
archive_stream_1_url = recorder.archive_stream1
|
||||
archive_stream_2_url = recorder.archive_stream2
|
||||
|
||||
for i in range(0, Config.STREAM_SANITY_CHECK_RETRIES):
|
||||
do_checks = False
|
||||
if do_checks:
|
||||
return True
|
||||
else:
|
||||
time.sleep(Config.STREAM_SANITY_CHECK_INTERVAL_SEC)
|
||||
return False # stream sanity check failed!
|
||||
|
||||
|
||||
def check_capture_agent_state(recorder_agent: Union[Recorder, dict]):
|
||||
if recorder_agent.get('offline', False):
|
||||
logger.info("OK - Recorder {} is in offline / maintenance mode".format(recorder_agent.get('name')))
|
||||
return True, "Recorder is in offline / maintenance mode", recorder_agent.get('name')
|
||||
agent_state_error_msg = None
|
||||
logger.debug("Checking Agent {}".format(recorder_agent.get('name')))
|
||||
c = get_calender(recorder_agent.get('name'))
|
||||
is_recording_in_calendar = len(list(c.timeline.now())) >= 1
|
||||
if is_recording_in_calendar:
|
||||
logger.info("{} has entry in Calender and should therefore be recording... checking now!".format(recorder_agent.get('name')))
|
||||
if recorder_agent['state'] == "capturing":
|
||||
recorder_info = get_recorder_by_name(recorder_agent.get('name'))
|
||||
try:
|
||||
rec = get_recorder_adapter(recorder_info)
|
||||
if rec.is_recording():
|
||||
logger.info("OK – recorder {} is recording :)".format(recorder_agent.get('name')))
|
||||
with agent_states_lock:
|
||||
agent_states[recorder_agent.get('name')] = 'OK - recorder is recording'
|
||||
|
||||
else:
|
||||
logger.info(rec.get_recording_status())
|
||||
logger.error("FATAL - recorder {} must be recording but is not!!!!".format(recorder_agent.get('name')))
|
||||
agent_state_error_msg = "FATAL - recorder must be recording but is not!"
|
||||
with agent_states_lock:
|
||||
agent_states[recorder_agent['name']] = 'FATAL - recorder is NOT recording, but should!'
|
||||
except LrcException as e:
|
||||
logger.fatal("Exception occurred: {}".format(str(e)))
|
||||
logger.error("Could not check state of recorder {}, Address: {}".format(recorder_agent.get('name'), recorder_info.get('ip')))
|
||||
else:
|
||||
logger.error("FATAL: {} is not in capturing state...but should be!!".format(recorder_agent.get('name')))
|
||||
agent_state_error_msg = "FATAL - is not in capturing state...but should be!"
|
||||
else:
|
||||
recorder_info = get_recorder_by_name(recorder_agent.get('name'))
|
||||
try:
|
||||
rec = get_recorder_adapter(recorder_info)
|
||||
if rec.is_recording():
|
||||
logger.error("FATAL - recorder must not be recording!!!!")
|
||||
agent_state_error_msg = "FATAL - is not in capturing state...but should be!"
|
||||
with agent_states_lock:
|
||||
agent_states[recorder_agent.get('name')] = 'FATAL - recorder IS recording, but should NOT!'
|
||||
else:
|
||||
logger.info("OK – recorder is not recording :)")
|
||||
with agent_states_lock:
|
||||
agent_states[recorder_agent.get('name')] = 'OK - recorder is NOT recording'
|
||||
except LrcException as e:
|
||||
logger.fatal("Exception occurred: {}".format(str(e)))
|
||||
logger.error("Could not check state of recorder {}, Address: {}".format(recorder_agent.get('name'), recorder_info.get('ip')))
|
||||
agent_state_error_msg = "FATAL - Could not check state of recorder! Address: {}".format(recorder_info.get('ip'))
|
||||
|
||||
if agent_state_error_msg is None:
|
||||
return True, agent_states[recorder_agent.get('name')], recorder_agent.get('name')
|
||||
return False, agent_state_error_msg, recorder_agent.get('name')
|
||||
|
||||
|
||||
def ping_capture_agent(recorder_agent: Union[Recorder, dict]):
|
||||
if recorder_agent.get('offline', False):
|
||||
print("is offline!")
|
||||
logger.info("OK - Ping skipped, recorder {} is in offline mode.".format(recorder_agent.get('name')))
|
||||
return True, "Recorder is in offline / maintenance mode", recorder_agent.get('name')
|
||||
recorder_ip = get_recorder_by_name(recorder_agent.get('name')).get('ip')
|
||||
try:
|
||||
subprocess.check_call(
|
||||
['ping', '-W', '10', '-c', '2', recorder_ip],
|
||||
# stderr=subprocess.STDOUT, # get all output
|
||||
stdout=subprocess.DEVNULL, # suppress output
|
||||
stderr=subprocess.DEVNULL,
|
||||
universal_newlines=True # return string not bytes
|
||||
)
|
||||
logger.info("Successfully pinged {} ({}). :-)".format(recorder_agent.get('name'), recorder_ip))
|
||||
return True, "Successfully pinged {}. :-)".format(recorder_agent.get('name')), recorder_agent.get('name')
|
||||
except subprocess.CalledProcessError:
|
||||
logger.error("Can not ping {} ({})!!".format(recorder_agent.get('name'), recorder_ip))
|
||||
return False, "Unable to ping {} ({})".format(recorder_agent.get('name'), recorder_ip), recorder_agent.get('name')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
agents = get_capture_agents()
|
||||
logger.info("Got {} capture agents that will be checked...".format(len(agents)))
|
||||
|
||||
for a in agents:
|
||||
agent_states[a.get('name')] = 'PROBLEMATIC - unknown'
|
||||
|
||||
|
||||
# pool = ThreadPool(5)
|
||||
# pool.map(check_capture_agent_state, agents)
|
||||
|
||||
NUM_THREADS = 8
|
||||
|
||||
recorders = get_recorder_details()
|
||||
with ThreadPool(NUM_THREADS) as pool:
|
||||
# results = [pool.apply_async(ping_capture_agent, (agent,)) for agent in agents]
|
||||
results = [pool.apply_async(ping_capture_agent, (agent,)) for agent in recorders]
|
||||
try:
|
||||
[res.get(timeout=12) for res in results]
|
||||
except TimeoutError as e:
|
||||
logger.error("Timeout while pinging capture agent! {}".format(e))
|
||||
|
||||
|
||||
with ThreadPool(NUM_THREADS) as pool:
|
||||
results = [pool.apply_async(check_capture_agent_state, (agent,)) for agent in agents]
|
||||
try:
|
||||
[res.get(timeout=12) for res in results]
|
||||
except TimeoutError as e:
|
||||
logger.error("Timeout while getting capture agent state! {}".format(e))
|
||||
|
||||
logger.info("DONE checking capture agents / recorders!")
|
||||
|
||||
logged_events = rec_err_state_log_stream.getvalue()
|
||||
if len(logged_events) > 0:
|
||||
logged_events += "\n\n=============\nAgent States:\n\n{}".format(''.join(
|
||||
"{:<48}: {}\n".format(a, agent_states[a]) for a in agent_states
|
||||
))
|
||||
send_error_mail(logged_events, "Errors have been detected while checking recorder states!")
|
||||
|
||||
#mem_handler.close()
|
||||
Reference in New Issue
Block a user