From f181e4a785b2ff8c7c66f47df3387273b8cc3153 Mon Sep 17 00:00:00 2001 From: Tobias Kurze Date: Tue, 25 Feb 2020 17:00:01 +0100 Subject: [PATCH] further advanced state api and stream checks --- backend/__main__.py | 6 +- backend/api/state_api.py | 35 ++++++-- backend/cron/__init__.py | 14 ++-- backend/cron/cron_state_checker.py | 14 +++- backend/tools/recorder_state_checker.py | 82 ++++++++++++++----- .../tools/recorder_streams_sanity_checks.py | 18 ++-- backend/websocket/handlers.py | 4 +- 7 files changed, 126 insertions(+), 47 deletions(-) diff --git a/backend/__main__.py b/backend/__main__.py index 464a2eb..5ef1e38 100644 --- a/backend/__main__.py +++ b/backend/__main__.py @@ -20,10 +20,10 @@ from backend.websocket.base import WebSocketBase def main(): logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') - db.drop_all() - db.create_all() + #db.drop_all() + #db.create_all() - Recorder() + #Recorder() room_model.pre_fill_table() update_recorder_models_database(drop=False) create_default_recorders() diff --git a/backend/api/state_api.py b/backend/api/state_api.py index bbab7a8..0d6b3e2 100644 --- a/backend/api/state_api.py +++ b/backend/api/state_api.py @@ -11,14 +11,18 @@ from flask_restx import fields, Resource, inputs from backend import db, app from backend.api import api_state from backend.api.models import recorder_model, recorder_model_model, recorder_command_model, state_model +from backend.cron import async_cron_recorder_checker +from backend.cron.cron_state_checker import StateChecker from backend.models.recorder_model import Recorder, RecorderModel, RecorderCommand from backend.models.room_model import Room import backend.recorder_adapters as r_a # == +from backend.tools.recorder_state_checker import get_recorder_adapter, check_capture_agent_state, check_stream_sanity -@api_state.route('/') + +@api_state.route('/recorder/') @api_state.response(404, 'Recorder not found') @api_state.param('id', 'The recorder identifier') class RecorderStateResource(Resource): @@ -27,22 +31,37 @@ class RecorderStateResource(Resource): @api_state.marshal_with(state_model, skip_none=False) def get(self, id): """Fetch a recorder given its identifier""" - recorder = Recorder.query.get(id) + recorder: Recorder = Recorder.query.get(id) if recorder is None: api_state.abort(404) - return recorder + + current_states_by_checker = async_cron_recorder_checker.get_current_state() + state = async_cron_recorder_checker.get_current_state_for_recorder_id(recorder.id) + if state is None: + state_checker = StateChecker([check_capture_agent_state], Recorder) + state_checker.add_object_to_state_check(recorder) + state_checker.check_object_state() + state = state_checker.get_current_state_for_recorder_id(recorder.id) + if not state.get('state_ok', False): # if state is not OK, return state -> no more checks! + return state + + # do additional checks, such as: check for single color, sound check, etc. + stream_state = check_stream_sanity(recorder) + print(stream_state) + + return stream_state -@api_state.route('') +@api_state.route('/recorder') class RecorderStateList(Resource): @jwt_required - @api_state.doc('recorders') - @api_state.marshal_list_with(recorder_model, skip_none=False) + @api_state.doc('get_recorders_states') + @api_state.marshal_list_with(state_model, skip_none=False) def get(self): """ - List all recorders - :return: recorders + Get state of all recorders + :return: state """ return Recorder.get_all() diff --git a/backend/cron/__init__.py b/backend/cron/__init__.py index 665c0d6..4d676ab 100644 --- a/backend/cron/__init__.py +++ b/backend/cron/__init__.py @@ -12,7 +12,7 @@ from backend import app, main_logger from apscheduler.schedulers.background import BackgroundScheduler -from backend.cron.cron_state_checker import recorder_checker +from backend.cron.cron_state_checker import async_cron_recorder_checker from backend.websocket.handlers import send_state_update_to_recorders cron_log_handler = TimedRotatingFileHandler(app.config.get('CRON_LOG_FILE'), interval=1, when='d', backupCount=3) @@ -40,18 +40,18 @@ def add_default_jobs(sched=None, testing=False): sched = scheduler if testing: - check_recorder_state_job = sched.add_job(recorder_checker.check_object_state, 'interval', seconds=40, + check_recorder_state_job = sched.add_job(async_cron_recorder_checker.check_object_state, 'interval', seconds=40, id="check_recorder_state_job") else: - check_recorder_state_job = sched.add_job(recorder_checker.check_object_state, 'interval', minutes=2, + check_recorder_state_job = sched.add_job(async_cron_recorder_checker.check_object_state, 'interval', minutes=2, id="check_recorder_state_job") """ Job regularly sending the state to "frontend recorders" through websocket """ send_update_state_to_recorder_job = sched.add_job( - lambda: send_state_update_to_recorders(recorder_checker.get_current_state()), 'interval', minutes=1, + lambda: send_state_update_to_recorders(async_cron_recorder_checker.get_current_state()), 'interval', minutes=1, id="send_update_state_to_recorder_job") return [check_recorder_state_job, send_update_state_to_recorder_job] @@ -86,9 +86,9 @@ if __name__ == '__main__': time.sleep(sleep_time) recorder_id = random.randint(0, 15) cron_logger.info("Using recorder id {}".format(recorder_id)) - recorder_checker.add_object_to_state_check(recorder_id) - recorder_checker.add_object_to_state_check(recorder_id + 1) - pprint(recorder_checker.get_current_state()) + async_cron_recorder_checker.add_object_to_state_check(recorder_id) + async_cron_recorder_checker.add_object_to_state_check(recorder_id + 1) + pprint(async_cron_recorder_checker.get_current_state()) while True: user_in = input("Type >exit< to quit.") diff --git a/backend/cron/cron_state_checker.py b/backend/cron/cron_state_checker.py index 6d0cedb..049988e 100644 --- a/backend/cron/cron_state_checker.py +++ b/backend/cron/cron_state_checker.py @@ -27,6 +27,7 @@ class StateChecker(Generic[T]): The determined state is stored "locally" in the state checker object and NOT reflected back to the checked objects! It can be retrieved by calling get_current_state. """ + def __init__(self, state_checker_func: Union[Callable, List[Callable]], type_to_check: T, type_name=None, threads=NUM_THREADS): self.num_threads = threads @@ -149,5 +150,16 @@ class StateChecker(Generic[T]): with self.update_state_lock: return copy.deepcopy(self.state_results) + def get_current_state_for_recorder_name(self, recorder_name: str): + return self.get_current_state().get(recorder_name, None) -recorder_checker = StateChecker([check_capture_agent_state, ping_capture_agent], Recorder) + def get_current_state_for_recorder_id(self, recorder_id: int): + states = self.get_current_state() + for key in states: + state = states[key] + if state.get('id', None) == recorder_id: # found! + return state + return None + + +async_cron_recorder_checker = StateChecker([check_capture_agent_state, ping_capture_agent], Recorder) diff --git a/backend/tools/recorder_state_checker.py b/backend/tools/recorder_state_checker.py index a821cbc..bf44213 100644 --- a/backend/tools/recorder_state_checker.py +++ b/backend/tools/recorder_state_checker.py @@ -23,20 +23,21 @@ from backend.recorder_adapters import RecorderAdapter from backend.recorder_adapters.epiphan_base import Epiphan from backend.recorder_adapters.extron_smp import SMP35x +from backend.tools.recorder_streams_sanity_checks import check_frame_is_valid, check_if_audio_is_valid from backend.tools.send_mail import send_error_mail, get_smtp_error_handler logger = logging.getLogger("lrc.tools.simple_state_checker") smtp_error_handler = get_smtp_error_handler(subject="Errors have been detected while checking recorder states!") -#mem_handler = MemoryHandler(capacity=100, flushLevel=logging.FATAL, target=smtp_error_handler) -#mem_handler.setLevel(logging.WARNING) +# mem_handler = MemoryHandler(capacity=100, flushLevel=logging.FATAL, target=smtp_error_handler) +# mem_handler.setLevel(logging.WARNING) rec_err_state_log_stream = StringIO() rec_err_state_log_stream_handler = logging.StreamHandler(stream=rec_err_state_log_stream) rec_err_state_log_stream_handler.setLevel(logging.WARNING) logger.addHandler(rec_err_state_log_stream_handler) -#logger.addHandler(mem_handler) +# logger.addHandler(mem_handler) session = requests.session() session.auth = HTTPBasicAuth(Config.OPENCAST_USER, Config.OPENCAST_PW) @@ -123,22 +124,60 @@ def get_recorder_adapter(recorder_info: Union[dict, Recorder]) -> RecorderAdapte return rec -def check_stream_sanity(recorder: Recorder, recorder_adapter: RecorderAdapter): - if recorder.archive_stream1 is None and recorder.archive_stream2 is None: # fall back to default names and rtsp +def check_stream_sanity(recorder_agent: Union[Recorder, dict], recorder_adapter: RecorderAdapter = None): + if recorder_adapter is None: + recorder_info = get_recorder_by_name(recorder_agent.get('name')) + recorder_adapter = get_recorder_adapter(recorder_info) + if not recorder_adapter.is_recording(): + return True, "not recording, so there is no stream!", recorder_agent.get('name') + if recorder_agent.get('archive_stream1') is None and recorder_agent.get( + 'archive_stream2') is None: # fall back to default names and rtsp archive_stream_1_url = "rtsp://{}/{}".format(recorder_adapter.address, Config.DEFAULT_ARCHIVE_STREAM_1_NAME) archive_stream_2_url = "rtsp://{}/{}".format(recorder_adapter.address, Config.DEFAULT_ARCHIVE_STREAM_2_NAME) else: - archive_stream_1_url = recorder.archive_stream1 - archive_stream_2_url = recorder.archive_stream2 + archive_stream_1_url = recorder_agent.get('archive_stream1') + archive_stream_2_url = recorder_agent.get('archive_stream2') + frame_msg = frame_msg2 = sound_msg = sound_msg2 = "unknown" for i in range(0, Config.STREAM_SANITY_CHECK_RETRIES): - do_checks = False + frame_ok, frame_msg = check_frame_is_valid(archive_stream_1_url, raise_errors=False) + if not frame_ok: + logger.warning( + "Archive stream 1 ({}) of {} is not ok (frame): {}".format(archive_stream_1_url, + recorder_agent.get('name'), + frame_msg)) + frame_ok, frame_msg2 = check_frame_is_valid(archive_stream_2_url, raise_errors=False) + if not frame_ok: + logger.warning( + "Archive stream 2 {} of ({}) is not ok: {} (frame)".format(archive_stream_2_url, + recorder_agent.get('name'), + frame_msg2)) + + sound_ok, sound_msg = check_if_audio_is_valid(archive_stream_1_url, raise_errors=False) + if not sound_ok: + logger.warning( + "Archive stream 1 {} of ({}) is not ok (audio): {}".format(archive_stream_1_url, + recorder_agent.get('name'), + sound_msg)) + sound_ok, sound_msg2 = check_if_audio_is_valid(archive_stream_2_url, raise_errors=False) + if not sound_ok: + logger.warning( + "Archive stream 2 {} of ({}) is not ok (audio): {}".format(archive_stream_2_url, + recorder_agent.get('name'), + sound_msg2)) + + if frame_ok and sound_ok: + return True, "At least one archive stream is fine (audio and image)! " \ + "(s1: a: {}, v: {}, s2: a: {}, v: {})".format(sound_msg, frame_msg, sound_msg2, + frame_msg2), recorder_agent.get('name') - if do_checks: - return True else: time.sleep(Config.STREAM_SANITY_CHECK_INTERVAL_SEC) - return False # stream sanity check failed! + error_msg = "After {} retries, stream checks failed and returned: archive_stream1: audio:{}, frame:{}, " \ + "archive_stream2: audio:{}, frame:{}".format(Config.STREAM_SANITY_CHECK_RETRIES, sound_msg, + frame_msg, sound_msg2, frame_msg2) + logger.error(error_msg) + return False, error_msg, recorder_agent.get('name') # stream sanity check failed! def check_capture_agent_state(recorder_agent: Union[Recorder, dict]): @@ -150,7 +189,8 @@ def check_capture_agent_state(recorder_agent: Union[Recorder, dict]): c = get_calender(recorder_agent.get('name')) is_recording_in_calendar = len(list(c.timeline.now())) >= 1 if is_recording_in_calendar: - logger.info("{} has entry in Calender and should therefore be recording... checking now!".format(recorder_agent.get('name'))) + logger.info("{} has entry in Calender and should therefore be recording... checking now!".format( + recorder_agent.get('name'))) if recorder_agent['state'] == "capturing": recorder_info = get_recorder_by_name(recorder_agent.get('name')) try: @@ -159,16 +199,17 @@ def check_capture_agent_state(recorder_agent: Union[Recorder, dict]): logger.info("OK – recorder {} is recording :)".format(recorder_agent.get('name'))) with agent_states_lock: agent_states[recorder_agent.get('name')] = 'OK - recorder is recording' - else: logger.info(rec.get_recording_status()) - logger.error("FATAL - recorder {} must be recording but is not!!!!".format(recorder_agent.get('name'))) + logger.error( + "FATAL - recorder {} must be recording but is not!!!!".format(recorder_agent.get('name'))) agent_state_error_msg = "FATAL - recorder must be recording but is not!" with agent_states_lock: agent_states[recorder_agent['name']] = 'FATAL - recorder is NOT recording, but should!' except LrcException as e: logger.fatal("Exception occurred: {}".format(str(e))) - logger.error("Could not check state of recorder {}, Address: {}".format(recorder_agent.get('name'), recorder_info.get('ip'))) + logger.error("Could not check state of recorder {}, Address: {}".format(recorder_agent.get('name'), + recorder_info.get('ip'))) else: logger.error("FATAL: {} is not in capturing state...but should be!!".format(recorder_agent.get('name'))) agent_state_error_msg = "FATAL - is not in capturing state...but should be!" @@ -187,8 +228,10 @@ def check_capture_agent_state(recorder_agent: Union[Recorder, dict]): agent_states[recorder_agent.get('name')] = 'OK - recorder is NOT recording' except LrcException as e: logger.fatal("Exception occurred: {}".format(str(e))) - logger.error("Could not check state of recorder {}, Address: {}".format(recorder_agent.get('name'), recorder_info.get('ip'))) - agent_state_error_msg = "FATAL - Could not check state of recorder! Address: {}".format(recorder_info.get('ip')) + logger.error("Could not check state of recorder {}, Address: {}".format(recorder_agent.get('name'), + recorder_info.get('ip'))) + agent_state_error_msg = "FATAL - Could not check state of recorder! Address: {}".format( + recorder_info.get('ip')) if agent_state_error_msg is None: return True, agent_states[recorder_agent.get('name')], recorder_agent.get('name') @@ -213,7 +256,8 @@ def ping_capture_agent(recorder_agent: Union[Recorder, dict]): return True, "Successfully pinged {}. :-)".format(recorder_agent.get('name')), recorder_agent.get('name') except subprocess.CalledProcessError: logger.error("Can not ping {} ({})!!".format(recorder_agent.get('name'), recorder_ip)) - return False, "Unable to ping {} ({})".format(recorder_agent.get('name'), recorder_ip), recorder_agent.get('name') + return False, "Unable to ping {} ({})".format(recorder_agent.get('name'), recorder_ip), recorder_agent.get( + 'name') if __name__ == '__main__': @@ -253,4 +297,4 @@ if __name__ == '__main__': )) send_error_mail(logged_events, "Errors have been detected while checking recorder states!") - #mem_handler.close() + # mem_handler.close() diff --git a/backend/tools/recorder_streams_sanity_checks.py b/backend/tools/recorder_streams_sanity_checks.py index 9ae6ded..283aaad 100644 --- a/backend/tools/recorder_streams_sanity_checks.py +++ b/backend/tools/recorder_streams_sanity_checks.py @@ -1,4 +1,5 @@ import io +import logging import sys import ffmpeg @@ -8,6 +9,8 @@ from PIL import Image from pydub import AudioSegment from pydub.playback import play +logger = logging.getLogger("lrc.tools.recorder_streams_sanity_checks") + def is_single_color_image(image): single_color_image = True @@ -15,9 +18,9 @@ def is_single_color_image(image): count = 0 color_channels = image.split() for c in color_channels: # r, g, b - hist = c.histogram() num_of_non_zero_values = len([v for v in hist if v != 0]) + logger.debug("color_channel: {}, num_of_non_zero_values (NON-BLACK): {}".format(c, num_of_non_zero_values)) if num_of_non_zero_values > 1: single_color_image = False break @@ -48,6 +51,7 @@ def check_frame_is_valid(stream_url, raise_errors=True): ':'.join([str(x) for x in color.values()])) except ffmpeg.Error as e: msg = "Could not connect to stream URL or other error!" + logger.error(msg) try: msg += " ({})".format(e.stderr.decode('utf-8').strip().split("\n")[-1]) except IndexError: @@ -58,8 +62,6 @@ def check_frame_is_valid(stream_url, raise_errors=True): return False, msg -# print(check_frame_is_valid('rtsp://172.22.246.207/extron2')) - def check_if_audio_is_valid(stream_url, sample_length_sec=3, lower_alert_limit_dBFS=-40.0, raise_errors=True): file_name = tempfile.NamedTemporaryFile(suffix='.aac').name if os.path.exists(file_name): @@ -74,7 +76,7 @@ def check_if_audio_is_valid(stream_url, sample_length_sec=3, lower_alert_limit_d sound = AudioSegment.from_file(file_name, "aac") # print(sound.dBFS) - #play(sound) + # play(sound) if sound.max_dBFS == -float('inf'): return False, "No active audio signal detected!" elif sound.max_dBFS < lower_alert_limit_dBFS: @@ -83,6 +85,7 @@ def check_if_audio_is_valid(stream_url, sample_length_sec=3, lower_alert_limit_d return True, "good audio signal detected! ({} max dBFS in {}s sample)".format(sound.max_dBFS, sample_length_sec) except ffmpeg.Error as e: msg = "Could not connect to stream URL or other error!" + logger.error(msg) try: msg += " ({})".format(e.stderr.decode('utf-8').strip().split("\n")[-1]) except IndexError: @@ -93,9 +96,6 @@ def check_if_audio_is_valid(stream_url, sample_length_sec=3, lower_alert_limit_d return False, msg -print(check_if_audio_is_valid('rtsp://172.22.246.207/extron1')) - - """ Following code is not working correctly - ffmpeg parameters are wrong. """ @@ -113,3 +113,7 @@ def check_if_audio_is_valid_stream(stream_url, raise_errors=True): # check_if_audio_is_valid('rtsp://172.22.246.207/extron1') """ + +if __name__ == '__main__': + # print(check_frame_is_valid('rtsp://172.22.246.207/extron2')) + print(check_if_audio_is_valid('rtsp://172.22.246.207/extron1')) diff --git a/backend/websocket/handlers.py b/backend/websocket/handlers.py index 35481c8..b4ab7cd 100644 --- a/backend/websocket/handlers.py +++ b/backend/websocket/handlers.py @@ -1,12 +1,12 @@ import json import logging -from backend.cron import recorder_checker +from backend.cron import async_cron_recorder_checker from backend.websocket.base import socketio logger = logging.getLogger("lrc.websocket.handlers") -recorder_state_checker = recorder_checker +recorder_state_checker = async_cron_recorder_checker @socketio.on('request_recorder_state_updates')