176 lines
7.7 KiB
Python
176 lines
7.7 KiB
Python
# -*- coding: utf-8 -*-
|
|
import copy
|
|
import datetime
|
|
import logging
|
|
from multiprocessing.context import TimeoutError
|
|
from multiprocessing.pool import ThreadPool
|
|
from pprint import pprint
|
|
from threading import Lock
|
|
from typing import Union, Callable, TypeVar, Generic, Set, List
|
|
|
|
from backend.models import Recorder
|
|
from backend.tools.recorder_state_checker import check_capture_agent_state, ping_capture_agent, check_stream_sanity
|
|
|
|
logger = logging.getLogger("lrc.cron.recorder_state")
|
|
|
|
recorder_jobs_lock = Lock()
|
|
recorder_jobs = set()
|
|
|
|
NUM_THREADS = 8
|
|
|
|
T = TypeVar('T')
|
|
|
|
|
|
class StateChecker(Generic[T]):
|
|
"""
|
|
This class is designed generically to regularly check the state of objects with given function(s).
|
|
The determined state is stored "locally" in the state checker object and NOT reflected back to the checked objects!
|
|
It can be retrieved by calling get_current_state.
|
|
"""
|
|
|
|
def __init__(self, state_checker_func: Union[Callable, List[Callable]], type_to_check: T, type_name=None,
|
|
threads=NUM_THREADS):
|
|
self.num_threads = threads
|
|
self.lock = Lock()
|
|
self.jobs: Set[T] = set()
|
|
self.checker_func = state_checker_func
|
|
self.checker_type = type_to_check
|
|
self.update_state_lock = Lock()
|
|
self.state_results = {}
|
|
self.type_name = type_name if type_name is not None else self.checker_type.__name__
|
|
|
|
def add_object_to_state_check(self, object_to_check: Union[int, T]):
|
|
if isinstance(object_to_check, int):
|
|
if not hasattr(self.checker_type, 'get_by_identifier'):
|
|
logger.error(
|
|
'Can\'t add object to state check, as >get_by_identifier< not defined on checker_type ({})!'.format(
|
|
str(self.checker_type)))
|
|
return
|
|
object_to_check = self.checker_type.get_by_identifier(object_to_check)
|
|
if object_to_check is None:
|
|
logger.warning(
|
|
"Could not add object ({}) to state check, as specified >id ({})< could not be found / object is None".format(
|
|
self.type_name, object_to_check))
|
|
return
|
|
with self.lock:
|
|
if hasattr(object_to_check, 'name'):
|
|
name = object_to_check.name
|
|
else:
|
|
name = str(object_to_check)
|
|
if any([j.id == object_to_check.id for j in self.jobs]):
|
|
logger.info(
|
|
"Not adding {} ({}) ({}) to state check (already in job list)".format(object_to_check.id, name,
|
|
self.type_name))
|
|
else:
|
|
logger.debug("Adding {} to object ({}) to state check".format(self.type_name, name))
|
|
self.jobs.add(object_to_check)
|
|
|
|
def remove_recorder_from_state_check(self, object_to_check: Union[int, T]):
|
|
if isinstance(object_to_check, int):
|
|
object_to_check = self.checker_type.get_by_identifier(object_to_check)
|
|
if object_to_check is None:
|
|
logger.warning(
|
|
"Could not remove object ({}) from state check, as specified id could not be found / object is None".format(
|
|
self.type_name))
|
|
return
|
|
self.lock.acquire()
|
|
if hasattr(object_to_check, 'name'):
|
|
name = object_to_check.name
|
|
else:
|
|
name = str(object_to_check)
|
|
logger.debug("Removing {} from object ({}) to state check".format(self.type_name, name))
|
|
self.jobs.remove(object_to_check)
|
|
self.lock.release()
|
|
|
|
def execute_checker_func(self, func, jobs: List[T], object_states: dict) -> dict:
|
|
with ThreadPool(self.num_threads) as pool:
|
|
results = [pool.apply_async(func, (job,)) for job in jobs]
|
|
try:
|
|
state_results = [res.get(timeout=12) for res in results]
|
|
for r in state_results:
|
|
if r[0]: # ok :)
|
|
if object_states[r[2]].get('msg', "") == "unknown state!":
|
|
del object_states[r[2]]['msg']
|
|
ok = True
|
|
else:
|
|
ok = object_states[r[2]].get('state_ok', False),
|
|
object_states[r[2]] = {
|
|
'id': object_states[r[2]].get('id', None),
|
|
'msg': ", ".join([s for s in [object_states[r[2]].get('msg', None), r[1]] if s]),
|
|
'time_stamp': datetime.datetime.now(datetime.timezone.utc).strftime(
|
|
"%d.%m.%Y - %H:%M:%S %Z"),
|
|
'state_ok': ok}
|
|
else:
|
|
object_states[r[2]] = {'id': object_states[r[2]].get('id', None),
|
|
'msg': r[1],
|
|
'time_stamp': datetime.datetime.now(datetime.timezone.utc).strftime(
|
|
"%d.%m.%Y - %H:%M:%S %Z"),
|
|
'state_ok': False}
|
|
except TimeoutError as e:
|
|
logger.error("Timeout while performing state check func! {}".format(e))
|
|
|
|
return object_states
|
|
|
|
def check_object_state(self) -> dict:
|
|
logger.info("checking object ({}) state...".format(self.type_name))
|
|
self.lock.acquire()
|
|
jobs = list(self.jobs)
|
|
self.lock.release()
|
|
|
|
if len(jobs) <= 0:
|
|
logger.info("No objects ({}) to check... returning".format(self.type_name))
|
|
return {}
|
|
logger.info("checking state of {} recorders".format(len(jobs)))
|
|
|
|
object_states = {j.name: {'id': j.id, 'state_ok': False, 'msg': 'unknown state!'} for j in jobs}
|
|
|
|
if isinstance(self.checker_func, list):
|
|
for c_f in self.checker_func:
|
|
self.execute_checker_func(c_f, jobs, object_states)
|
|
else:
|
|
self.execute_checker_func(self.checker_func, jobs, object_states)
|
|
|
|
self.update_state_dict(object_states)
|
|
|
|
return object_states
|
|
|
|
def update_state_dict(self, object_states: dict):
|
|
self.update_state_lock.acquire()
|
|
for o_s in object_states.keys():
|
|
if o_s in self.state_results:
|
|
# update existing state
|
|
self.state_results[o_s] = {**object_states[o_s],
|
|
'time_stamp': datetime.datetime.now(datetime.timezone.utc).strftime(
|
|
"%d.%m.%Y - %H:%M:%S %Z"),
|
|
'previous': {'state_ok': self.state_results[o_s]['state_ok'],
|
|
'msg': self.state_results[o_s].get('msg', None),
|
|
'time_stamp': self.state_results[o_s].get('time_stamp', None)}}
|
|
else:
|
|
self.state_results[o_s] = object_states[o_s]
|
|
|
|
self.update_state_lock.release()
|
|
|
|
def get_current_state(self):
|
|
with self.update_state_lock:
|
|
return copy.deepcopy(self.state_results)
|
|
|
|
def get_current_state_for_recorder_name(self, recorder_name: str):
|
|
return self.get_current_state().get(recorder_name, None)
|
|
|
|
def get_current_state_for_recorder_id(self, recorder_id: int):
|
|
states = self.get_current_state()
|
|
for key in states:
|
|
state = states[key]
|
|
if state.get('id', None) == recorder_id: # found!
|
|
return state
|
|
return None
|
|
|
|
|
|
async_cron_recorder_checker = StateChecker([check_capture_agent_state, ping_capture_agent], Recorder)
|
|
async_permanent_cron_recorder_checker = StateChecker(
|
|
[check_capture_agent_state, ping_capture_agent, check_stream_sanity], Recorder)
|
|
|
|
for r in Recorder.get_all():
|
|
async_permanent_cron_recorder_checker.add_object_to_state_check(r.id)
|
|
async_permanent_cron_recorder_checker.check_object_state() # initial check of all recorders
|