import logging from multiprocessing.pool import ThreadPool from threading import Lock from typing import Union from backend import app, LrcException from backend.models import Recorder from backend.tools.simple_state_checker import check_capture_agent_state, ping_capture_agent cron_log_handler = logging.FileHandler(app.config.get('CRON_LOG_FILE')) cron_logger = logging.getLogger("mal.cron") cron_logger.addHandler(cron_log_handler) logging.getLogger("apscheduler.scheduler").addHandler(cron_log_handler) logging.getLogger("apscheduler.executors.default").addHandler(cron_log_handler) recorder_jobs_lock = Lock() recorder_jobs = set() NUM_THREADS = 8 def add_recorder_to_state_check(recorder: Union[int, Recorder]): if isinstance(recorder, int): recorder = Recorder.get_by_identifier(recorder) if recorder is None: cron_logger.warning( "Could not add recorder to state check, as specified id could not be found / recorder is None") raise LrcException("Recorder is None / could not be found!") recorder_jobs_lock.acquire() recorder_jobs.add(recorder) recorder_jobs_lock.release() def remove_recorder_from_state_check(recorder: Union[int, Recorder]): if isinstance(recorder, int): recorder = Recorder.get_by_identifier(recorder) if recorder is None: cron_logger.warning( "Could not remove recorder from state check, as specified id could not be found / recorder is None") raise LrcException("Recorder is None / could not be found (and therefor not removed)!") recorder_jobs_lock.acquire() recorder_jobs.remove(recorder) recorder_jobs_lock.release() def check_recorder_state(): recorder_jobs_lock.acquire() recorders = list(recorder_jobs) recorder_jobs_lock.release() recorder_states = {r['name']: {'state_ok': False, 'msg': 'unknown state!'} for r in recorders} with ThreadPool(NUM_THREADS) as pool: results = [pool.apply_async(check_capture_agent_state, (recorder,)) for recorder in recorders] try: state_results = [res.get(timeout=12) for res in results] except TimeoutError as e: cron_logger.error("Timeout while getting capture agent state! {}".format(e)) for r in state_results: if r[0]: # ok :) recorder_states[r[2]] = {'state_ok': True} else: recorder_states[r[2]]['msg'] = r[1] with ThreadPool(NUM_THREADS) as pool: results = [pool.apply_async(ping_capture_agent, (recorder,)) for recorder in recorders] try: ping_results = [res.get(timeout=12) for res in results] except TimeoutError as e: cron_logger.error("Timeout while pinging capture agent! {}".format(e)) for r in ping_results: if not r[0]: # ok :) recorder_states[r[2]]['msg'] = r[1]