# Copyright (c) 2019. Tobias Kurze
import logging
import threading
import time
from flask import Flask, request
from flask_socketio import SocketIO, emit
from backend import app
logging.basicConfig()
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
async_mode = 'threading'
async_mode = None
socketio = SocketIO(app, cors_allowed_origins="*", async_mode=async_mode)
thread = None
thread_lock = threading.Lock()
clients = []
def background_thread():
"""Example of how to send server generated events to clients."""
count = 0
while True:
socketio.sleep(3)
print("server_event ")
count += 1
socketio.emit('server_event',
{'data': 'Server generated event', 'count': count},
)
def ack():
print('message was received!')
@socketio.on('my_event')
def test_message(message: dict):
print("received msg!")
print(message)
if isinstance(message, str):
emit('my_response', 'tolle Antwort! (got only a string!)', callback=ack)
else:
emit('my_response', {'data': message['data']}, callback=ack)
@socketio.on('my broadcast event', namespace='/')
def test_message(message):
emit('my response', {'data': message['data']}, broadcast=True)
@socketio.on('connect')
def test_connect():
print("connected")
global thread
with thread_lock:
if thread is None:
thread = socketio.start_background_task(background_thread)
emit('connect', "You are connected!")
@socketio.on('disconnect', namespace='/')
def test_disconnect():
print('Client disconnected')
@socketio.on_error()
def handle_error(error):
print(error)
def start_in_thread():
socket_thread = threading.Thread(target=lambda appl, host, port: socketio.run(app=appl, host=host, port=port),
args=(app, "localhost", 5000))
socket_thread.start()
return socket_thread
@app.route("/")
def root():
return """
"""
if __name__ == '__main__':
print("running main")
# socketio.init_app(app, cors_allowed_origins="*")
#while len(clients) <= 0:
# time.sleep(1)
#while True:
# time.sleep(2)
# print("sending message...")
# socketio.send("lala")
# print("send lala")
# print(clients[0])
# socketio.emit("server_event", {'msg': "tolle nachricht"}, room=clients[0])
# with app.test_request_context('/'):
# socketio.emit('server_event', "You!: server_event", namespace="/")
# print("send bla")
socketio.run(app, host="localhost", port=5000, debug=True)
# socketio.run(app, debug=True)
# ENDE
print("running?!")
# t.join()