# Copyright (c) 2019. Tobias Kurze import logging import threading import time from flask import Flask, request from flask_socketio import SocketIO, emit from backend import app logging.basicConfig() app = Flask(__name__) app.config['SECRET_KEY'] = 'secret!' async_mode = 'threading' async_mode = None socketio = SocketIO(app, cors_allowed_origins="*", async_mode=async_mode) thread = None thread_lock = threading.Lock() clients = [] def background_thread(): """Example of how to send server generated events to clients.""" count = 0 while True: socketio.sleep(3) print("server_event ") count += 1 socketio.emit('server_event', {'data': 'Server generated event', 'count': count}, ) def ack(): print('message was received!') @socketio.on('my_event') def test_message(message: dict): print("received msg!") print(message) if isinstance(message, str): emit('my_response', 'tolle Antwort! (got only a string!)', callback=ack) else: emit('my_response', {'data': message['data']}, callback=ack) @socketio.on('my broadcast event', namespace='/') def test_message(message): emit('my response', {'data': message['data']}, broadcast=True) @socketio.on('connect') def test_connect(): print("connected") global thread with thread_lock: if thread is None: thread = socketio.start_background_task(background_thread) emit('connect', "You are connected!") @socketio.on('disconnect', namespace='/') def test_disconnect(): print('Client disconnected') @socketio.on_error() def handle_error(error): print(error) def start_in_thread(): socket_thread = threading.Thread(target=lambda appl, host, port: socketio.run(app=appl, host=host, port=port), args=(app, "localhost", 5000)) socket_thread.start() return socket_thread @app.route("/") def root(): return """ """ if __name__ == '__main__': print("running main") # socketio.init_app(app, cors_allowed_origins="*") #while len(clients) <= 0: # time.sleep(1) #while True: # time.sleep(2) # print("sending message...") # socketio.send("lala") # print("send lala") # print(clients[0]) # socketio.emit("server_event", {'msg': "tolle nachricht"}, room=clients[0]) # with app.test_request_context('/'): # socketio.emit('server_event', "You!: server_event", namespace="/") # print("send bla") #socketio.run(app, host="localhost", port=5000, debug=True) start_in_thread() while True: time.sleep(2) print("still running! :)") # socketio.run(app, debug=True) # ENDE # t.join()