working websocket communication for recorder states
This commit is contained in:
@@ -4,10 +4,12 @@
|
||||
# Copyright (c) 2019. Tobias Kurze
|
||||
import logging
|
||||
import ssl
|
||||
import sys
|
||||
|
||||
from jinja2.exceptions import TemplateNotFound
|
||||
|
||||
from backend import app, db
|
||||
from backend.cron import get_default_scheduler, add_default_jobs
|
||||
from backend.models import room_model, recorder_model, RecorderCommand
|
||||
from backend.recorder_adapters import get_defined_recorder_adapters
|
||||
from backend.tools.model_updater import update_recorder_models_database, create_default_recorders
|
||||
@@ -38,11 +40,25 @@ def main():
|
||||
except Exception as e:
|
||||
logging.critical(e)
|
||||
|
||||
print("Starting Scheduler")
|
||||
scheduler = get_default_scheduler()
|
||||
add_default_jobs(scheduler)
|
||||
scheduler.start()
|
||||
|
||||
wsb = WebSocketBase()
|
||||
print("running websocket...(replaces normal app.run()")
|
||||
wsb.start_websocket(debug=True)
|
||||
# print("running web app...")
|
||||
#app.run(debug=True, host="0.0.0.0", threaded=True)
|
||||
wsb.send_test_msg()
|
||||
|
||||
while True:
|
||||
user_in = input("Type >exit< to quit.")
|
||||
if user_in == "exit" or user_in == ">exit<":
|
||||
break
|
||||
|
||||
scheduler.shutdown()
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
@@ -13,10 +13,11 @@ from backend import app, main_logger
|
||||
from apscheduler.schedulers.background import BackgroundScheduler
|
||||
|
||||
from backend.cron.cron_state_checker import recorder_checker
|
||||
from backend.websocket.handlers import send_state_update_to_recorders
|
||||
|
||||
cron_log_handler = TimedRotatingFileHandler(app.config.get('CRON_LOG_FILE'), interval=1, when='d', backupCount=3)
|
||||
cron_log_handler.setFormatter(logging.Formatter('[%(asctime)s] - %(funcName)20s() %(message)s'))
|
||||
cron_logger = logging.getLogger("mal.cron")
|
||||
cron_logger = logging.getLogger("lrc.cron")
|
||||
cron_logger.addHandler(cron_log_handler)
|
||||
logging.getLogger("apscheduler.scheduler").addHandler(cron_log_handler)
|
||||
logging.getLogger("apscheduler.executors.default").addHandler(cron_log_handler)
|
||||
@@ -46,7 +47,11 @@ def add_default_jobs(sched=None, testing=False):
|
||||
check_recorder_state_job = sched.add_job(recorder_checker.check_object_state, 'interval', minutes=2,
|
||||
id="check_recorder_state_job")
|
||||
|
||||
return [check_recorder_state_job]
|
||||
send_update_state_to_recorder_job = sched.add_job(
|
||||
lambda: send_state_update_to_recorders(recorder_checker.get_current_state()), 'interval', minutes=1,
|
||||
id="send_update_state_to_recorder_job")
|
||||
|
||||
return [check_recorder_state_job, send_update_state_to_recorder_job]
|
||||
|
||||
|
||||
def signal_handler(sig, frame):
|
||||
@@ -79,7 +84,7 @@ if __name__ == '__main__':
|
||||
recorder_id = random.randint(0, 15)
|
||||
cron_logger.info("Using recorder id {}".format(recorder_id))
|
||||
recorder_checker.add_object_to_state_check(recorder_id)
|
||||
recorder_checker.add_object_to_state_check(recorder_id+1)
|
||||
recorder_checker.add_object_to_state_check(recorder_id + 1)
|
||||
pprint(recorder_checker.get_current_state())
|
||||
|
||||
while True:
|
||||
|
||||
@@ -1,15 +1,17 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import copy
|
||||
import datetime
|
||||
import logging
|
||||
from multiprocessing.context import TimeoutError
|
||||
from multiprocessing.pool import ThreadPool
|
||||
from pprint import pprint
|
||||
from threading import Lock
|
||||
from typing import Union, Callable, TypeVar, Generic, Set, List
|
||||
|
||||
from backend.models import Recorder
|
||||
from backend.tools.simple_state_checker import check_capture_agent_state, ping_capture_agent
|
||||
|
||||
logger = logging.getLogger("mal.cron.recorder_state")
|
||||
logger = logging.getLogger("lrc.cron.recorder_state")
|
||||
|
||||
recorder_jobs_lock = Lock()
|
||||
recorder_jobs = set()
|
||||
@@ -44,14 +46,18 @@ class StateChecker(Generic[T]):
|
||||
"Could not add object ({}) to state check, as specified >id ({})< could not be found / object is None".format(
|
||||
self.type_name, object_to_check))
|
||||
return
|
||||
self.lock.acquire()
|
||||
if hasattr(object_to_check, 'name'):
|
||||
name = object_to_check.name
|
||||
else:
|
||||
name = str(object_to_check)
|
||||
logger.debug("Adding {} to object ({}) to state check".format(self.type_name, name))
|
||||
self.jobs.add(object_to_check)
|
||||
self.lock.release()
|
||||
with self.lock:
|
||||
if hasattr(object_to_check, 'name'):
|
||||
name = object_to_check.name
|
||||
else:
|
||||
name = str(object_to_check)
|
||||
if any([j.id == object_to_check.id for j in self.jobs]):
|
||||
logger.info(
|
||||
"Not adding {} ({}) ({}) to state check (already in job list)".format(object_to_check.id, name,
|
||||
self.type_name))
|
||||
else:
|
||||
logger.debug("Adding {} to object ({}) to state check".format(self.type_name, name))
|
||||
self.jobs.add(object_to_check)
|
||||
|
||||
def remove_recorder_from_state_check(self, object_to_check: Union[int, T]):
|
||||
if isinstance(object_to_check, int):
|
||||
@@ -79,11 +85,17 @@ class StateChecker(Generic[T]):
|
||||
if r[0]: # ok :)
|
||||
if object_states[r[2]].get('msg', "") == "unknown state!":
|
||||
del object_states[r[2]]['msg']
|
||||
ok = True
|
||||
else:
|
||||
ok = object_states[r[2]].get('state_ok', False),
|
||||
object_states[r[2]] = {
|
||||
'id': object_states[r[2]].get('id', None),
|
||||
'msg': ", ".join([s for s in [object_states[r[2]].get('msg', None), r[1]] if s]),
|
||||
'state_ok': True}
|
||||
'state_ok': ok}
|
||||
else:
|
||||
object_states[r[2]]['msg'] = r[1]
|
||||
object_states[r[2]] = {'id': object_states[r[2]].get('id', None),
|
||||
'msg': r[1],
|
||||
'state_ok': False}
|
||||
except TimeoutError as e:
|
||||
logger.error("Timeout while performing state check func! {}".format(e))
|
||||
|
||||
@@ -100,7 +112,7 @@ class StateChecker(Generic[T]):
|
||||
return {}
|
||||
logger.info("checking state of {} recorders".format(len(jobs)))
|
||||
|
||||
object_states = {j.name: {'state_ok': False, 'msg': 'unknown state!'} for j in jobs}
|
||||
object_states = {j.name: {'id': j.id, 'state_ok': False, 'msg': 'unknown state!'} for j in jobs}
|
||||
|
||||
if isinstance(self.checker_func, list):
|
||||
for c_f in self.checker_func:
|
||||
@@ -130,7 +142,8 @@ class StateChecker(Generic[T]):
|
||||
self.update_state_lock.release()
|
||||
|
||||
def get_current_state(self):
|
||||
return self.check_object_state()
|
||||
with self.update_state_lock:
|
||||
return copy.deepcopy(self.state_results)
|
||||
|
||||
|
||||
recorder_checker = StateChecker([check_capture_agent_state, ping_capture_agent], Recorder)
|
||||
|
||||
@@ -1,2 +1,4 @@
|
||||
# Copyright (c) 2019. Tobias Kurze
|
||||
|
||||
import backend.websocket.handlers
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ import threading
|
||||
from flask_jwt_extended import verify_jwt_in_request, get_current_user, jwt_required, get_jwt_claims, get_jwt_identity
|
||||
from flask_login import current_user
|
||||
from flask_socketio import SocketIO, emit
|
||||
from jwt import ExpiredSignatureError
|
||||
|
||||
from backend import app
|
||||
|
||||
@@ -32,6 +33,9 @@ class WebSocketBase:
|
||||
debug = self.flask_app_context.debug
|
||||
socketio.run(self.flask_app_context, host=host, port=port, debug=debug)
|
||||
|
||||
def send_test_msg(self):
|
||||
socketio.emit('test', "tolle nachricht")
|
||||
|
||||
@staticmethod
|
||||
@socketio.on('connect')
|
||||
def connect_handler():
|
||||
@@ -39,16 +43,45 @@ class WebSocketBase:
|
||||
try:
|
||||
print(verify_jwt_in_request())
|
||||
print(get_jwt_identity())
|
||||
except:
|
||||
except ExpiredSignatureError:
|
||||
logger.info("user is not authenticated! Signature expired.")
|
||||
except Exception as e:
|
||||
logger.info("user is not authenticated!")
|
||||
print(str(e))
|
||||
print(type(e))
|
||||
print("not allowed!!")
|
||||
return False # not allowed here
|
||||
logger.debug("user is authenticated")
|
||||
print("allowed!")
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
@socketio.on('message')
|
||||
def handle_message(message):
|
||||
print('received message: ' + message)
|
||||
|
||||
@staticmethod
|
||||
@socketio.on('json')
|
||||
def handle_json(json):
|
||||
print('received json: ' + str(json))
|
||||
|
||||
@staticmethod
|
||||
def _request_request_recorder_status_update():
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def _test_on_msg_func():
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
@socketio.on('update_message_test_blabla')
|
||||
def handle_msg(msg=None):
|
||||
print('received msg: ' + str(msg))
|
||||
socketio.on_event('my event', WebSocketBase._test_on_msg_func, namespace='/')
|
||||
|
||||
@staticmethod
|
||||
@socketio.on_error()
|
||||
def handle_error(self, error):
|
||||
def handle_error(error):
|
||||
logger.error(error)
|
||||
print(error)
|
||||
|
||||
|
||||
@@ -0,0 +1,50 @@
|
||||
import json
|
||||
import logging
|
||||
|
||||
from backend.cron import recorder_checker
|
||||
from backend.websocket.base import socketio
|
||||
|
||||
logger = logging.getLogger("lrc.websocket.handlers")
|
||||
|
||||
recorder_state_checker = recorder_checker
|
||||
|
||||
|
||||
@socketio.on('request_recorder_state_updates')
|
||||
def handle_request_recorder_state_updates_msg(recorder_id=None):
|
||||
if recorder_id is None:
|
||||
logger.warning("No recorder_id communicated, ignoring!")
|
||||
return
|
||||
logger.info("Adding recorder {} to state checker".format(recorder_id))
|
||||
recorder_state_checker.add_object_to_state_check(recorder_id)
|
||||
|
||||
|
||||
@socketio.on('force_recorder_state_update')
|
||||
def handle_force_recorder_state_update_msg(recorder_id=None):
|
||||
if recorder_id is None:
|
||||
logger.warning("No recorder_id communicated, ignoring!")
|
||||
return
|
||||
|
||||
current_states = recorder_state_checker.get_current_state()
|
||||
for key in current_states:
|
||||
state = current_states[key]
|
||||
if state.get('id', None) == recorder_id:
|
||||
logger.debug("Sending state to: {}".format(
|
||||
'recorder_state_update_{}'.format(recorder_id)))
|
||||
socketio.emit('recorder_state_update_{}'.format(recorder_id),
|
||||
json.dumps(state))
|
||||
return
|
||||
logger.warning("Can't force update, no state found for recorder id {}.".format(recorder_id))
|
||||
|
||||
|
||||
def send_state_update_to_recorders(recorder_results_dict: dict):
|
||||
if len(recorder_results_dict) <= 0:
|
||||
logger.debug("Sending state of recorders via web socket... => nothing to send!")
|
||||
return
|
||||
|
||||
logger.debug("Sending state of recorders via web socket...")
|
||||
for recorder_id in recorder_results_dict:
|
||||
print(recorder_results_dict[recorder_id])
|
||||
logger.debug("Sending state to: {}".format(
|
||||
'recorder_state_update_{}'.format(recorder_results_dict[recorder_id].get('id'))))
|
||||
socketio.emit('recorder_state_update_{}'.format(recorder_results_dict[recorder_id].get('id')),
|
||||
json.dumps(recorder_results_dict[recorder_id]))
|
||||
|
||||
Reference in New Issue
Block a user