diff --git a/crawlab/app.py b/crawlab/app.py index 7a3f60b9..d90bc2d0 100644 --- a/crawlab/app.py +++ b/crawlab/app.py @@ -4,11 +4,15 @@ import sys from multiprocessing import Process import click +from celery import Celery from flask import Flask from flask_cors import CORS from flask_restful import Api +from constants.node import NodeStatus +from db.manager import db_manager from routes.schedules import ScheduleApi +from tasks.celery import celery_app from tasks.scheduler import scheduler file_dir = os.path.dirname(os.path.realpath(__file__)) @@ -65,6 +69,25 @@ api.add_resource(ScheduleApi, '/api/schedules', '/api/schedules/') + +def monitor_nodes_status(celery_app): + def update_nodes_status(event): + node_id = event.get('hostname') + db_manager.update_one('nodes', id=node_id, values={ + 'status': NodeStatus.ONLINE + }) + + def update_nodes_status_online(event): + print(event) + + with celery_app.connection() as connection: + recv = celery_app.events.Receiver(connection, handlers={ + 'worker-heartbeat': update_nodes_status, + # 'worker-online': update_nodes_status_online, + }) + recv.capture(limit=None, timeout=None, wakeup=True) + + if __name__ == '__main__': # create folder if it does not exist if not os.path.exists(PROJECT_LOGS_FOLDER): @@ -73,5 +96,9 @@ if __name__ == '__main__': # run scheduler as a separate process scheduler.run() + # monitor node status + p_monitor = Process(target=monitor_nodes_status, args=(celery_app,)) + p_monitor.start() + # run app instance app.run(host=FLASK_HOST, port=FLASK_PORT, threaded=True)