From c0a987dad18c455a76427475d34da6ddb3f3e6bf Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Wed, 17 Apr 2019 21:22:02 +0800 Subject: [PATCH] updated cron schedule logic --- crawlab/app.py | 22 +++++++++++----------- crawlab/config.py | 25 +++++++++++++++++++++---- crawlab/routes/schedules.py | 4 ++++ crawlab/routes/tasks.py | 13 +++++++++++++ crawlab/tasks/celery.py | 5 +++++ crawlab/tasks/scheduler.py | 27 ++++++++++----------------- 6 files changed, 64 insertions(+), 32 deletions(-) diff --git a/crawlab/app.py b/crawlab/app.py index b7a245b3..4eebc804 100644 --- a/crawlab/app.py +++ b/crawlab/app.py @@ -88,17 +88,17 @@ def monitor_nodes_status(celery_app): recv.capture(limit=None, timeout=None, wakeup=True) +# run scheduler as a separate process +scheduler.run() + +# monitor node status +p_monitor = Process(target=monitor_nodes_status, args=(celery_app,)) +p_monitor.start() + +# create folder if it does not exist +if not os.path.exists(PROJECT_LOGS_FOLDER): + os.makedirs(PROJECT_LOGS_FOLDER) + if __name__ == '__main__': - # create folder if it does not exist - if not os.path.exists(PROJECT_LOGS_FOLDER): - os.makedirs(PROJECT_LOGS_FOLDER) - - # run scheduler as a separate process - scheduler.run() - - # monitor node status - p_monitor = Process(target=monitor_nodes_status, args=(celery_app,)) - p_monitor.start() - # run app instance app.run(host=FLASK_HOST, port=FLASK_PORT, threaded=True) diff --git a/crawlab/config.py b/crawlab/config.py index 4ede83b7..d2d69f81 100644 --- a/crawlab/config.py +++ b/crawlab/config.py @@ -1,33 +1,50 @@ -# project variables # 爬虫源码路径 PROJECT_SOURCE_FILE_FOLDER = '../spiders' + # 配置python虚拟环境的路径 PYTHON_ENV_PATH = '/Users/chennan/Desktop/2019/env/bin/python' + # 爬虫部署路径 PROJECT_DEPLOY_FILE_FOLDER = '../deployfile' +# 爬虫日志路径 PROJECT_LOGS_FOLDER = '../deployfile/logs' + +# 打包临时文件夹 PROJECT_TMP_FOLDER = '/tmp' -# celery variables +# Celery中间者URL BROKER_URL = 'redis://127.0.0.1:6379/0' + +# Celery后台URL CELERY_RESULT_BACKEND = 'mongodb://127.0.0.1:27017/' + +# Celery MongoDB设置 CELERY_MONGODB_BACKEND_SETTINGS = { 'database': 'crawlab_test', 'taskmeta_collection': 'tasks_celery', } + +# Celery时区 CELERY_TIMEZONE = 'Asia/Shanghai' + +# 是否启用UTC CELERY_ENABLE_UTC = True +# Celery Scheduler Redis URL +CELERY_BEAT_SCHEDULER = 'utils.redisbeat.RedisScheduler' +CELERY_REDIS_SCHEDULER_URL = 'redis://localhost:6379' +CELERY_REDIS_SCHEDULER_KEY = 'celery:beat:order_tasks' + # flower variables FLOWER_API_ENDPOINT = 'http://localhost:5555/api' -# database variables +# MongoDB 变量 MONGO_HOST = '127.0.0.1' MONGO_PORT = 27017 MONGO_DB = 'crawlab_test' -# flask variables +# Flask 变量 DEBUG = True FLASK_HOST = '127.0.0.1' FLASK_PORT = 8000 diff --git a/crawlab/routes/schedules.py b/crawlab/routes/schedules.py index f966e2cb..532a4ec5 100644 --- a/crawlab/routes/schedules.py +++ b/crawlab/routes/schedules.py @@ -5,6 +5,7 @@ import requests from constants.task import TaskStatus from db.manager import db_manager from routes.base import BaseApi +from tasks.scheduler import scheduler from utils import jsonify from utils.spider import get_spider_col_fields @@ -18,3 +19,6 @@ class ScheduleApi(BaseApi): ('cron', str), ('spider_id', str) ) + + def after_update(self, id: str = None): + scheduler.update() diff --git a/crawlab/routes/tasks.py b/crawlab/routes/tasks.py index 59e8469b..ebe5a33b 100644 --- a/crawlab/routes/tasks.py +++ b/crawlab/routes/tasks.py @@ -60,13 +60,26 @@ class TaskApi(BaseApi): sort_key='create_ts') items = [] for task in tasks: + # celery tasks # _task = db_manager.get('tasks_celery', id=task['_id']) + + # get spider _spider = db_manager.get(col_name='spiders', id=str(task['spider_id'])) + + # status if task.get('status') is None: task['status'] = TaskStatus.UNAVAILABLE + + # spider name if _spider: task['spider_name'] = _spider['name'] + + # duration + if task.get('finish_ts') is not None: + task['duration'] = (task['finish_ts'] - task['create_ts']).total_seconds() + items.append(task) + return { 'status': 'ok', 'total_count': db_manager.count('tasks', {}), diff --git a/crawlab/tasks/celery.py b/crawlab/tasks/celery.py index 3def1b29..8cb7501b 100644 --- a/crawlab/tasks/celery.py +++ b/crawlab/tasks/celery.py @@ -1,5 +1,10 @@ from celery import Celery +# from redisbeat.scheduler import RedisScheduler +from utils.redisbeat import RedisScheduler # celery app instance celery_app = Celery(__name__) celery_app.config_from_object('config') + +# RedisBeat scheduler +celery_scheduler = RedisScheduler(app=celery_app) diff --git a/crawlab/tasks/scheduler.py b/crawlab/tasks/scheduler.py index da6303c9..b03639ea 100644 --- a/crawlab/tasks/scheduler.py +++ b/crawlab/tasks/scheduler.py @@ -2,24 +2,27 @@ import requests from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.mongodb import MongoDBJobStore from pymongo import MongoClient -from flask import current_app from config import MONGO_DB, MONGO_HOST, MONGO_PORT, FLASK_HOST, FLASK_PORT from constants.spider import CronEnabled from db.manager import db_manager +from tasks.celery import celery_scheduler class Scheduler(object): mongo = MongoClient(host=MONGO_HOST, port=MONGO_PORT) + task_col = 'apscheduler_jobs' jobstores = { 'mongo': MongoDBJobStore(database=MONGO_DB, - collection='apscheduler_jobs', + collection=task_col, client=mongo) } scheduler = BackgroundScheduler(jobstores=jobstores) + # scheduler = celery_scheduler + def execute_spider(self, id: str): r = requests.get('http://%s:%s/api/spiders/%s/on_crawl' % ( FLASK_HOST, @@ -27,21 +30,13 @@ class Scheduler(object): id )) - def restart(self): - self.scheduler.shutdown() - self.scheduler.start() - current_app.logger.info('restarted') - def update(self): - current_app.logger.info('updating...') - # remove all existing periodic jobs - self.scheduler.remove_all_jobs() + self.mongo[MONGO_DB][self.task_col].remove() - # add new periodic jobs from database - spiders = db_manager.list('spiders', {'cron_enabled': CronEnabled.ON}) - for spider in spiders: - cron = spider.get('cron') + periodical_tasks = db_manager.list('schedules', {}) + for task in periodical_tasks: + cron = task.get('cron') cron_arr = cron.split(' ') second = cron_arr[0] minute = cron_arr[1] @@ -49,13 +44,11 @@ class Scheduler(object): day = cron_arr[3] month = cron_arr[4] day_of_week = cron_arr[5] - self.scheduler.add_job(func=self.execute_spider, trigger='cron', args=(str(spider['_id']),), + self.scheduler.add_job(func=self.execute_spider, trigger='cron', args=(str(task['spider_id']),), jobstore='mongo', day_of_week=day_of_week, month=month, day=day, hour=hour, minute=minute, second=second) - current_app.logger.info('updated') - def run(self): self.update() self.scheduler.start()