From 6ceb00aedc31d174f0d7b9b73ed50c7549b071c6 Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Wed, 17 Apr 2019 22:07:48 +0800 Subject: [PATCH] updated scheduler --- crawlab/routes/base.py | 4 ++-- crawlab/routes/tasks.py | 10 ++++++++-- crawlab/tasks/celery.py | 5 ----- crawlab/tasks/scheduler.py | 6 +++--- 4 files changed, 13 insertions(+), 12 deletions(-) diff --git a/crawlab/routes/base.py b/crawlab/routes/base.py index 8a3d6709..1578b3f8 100644 --- a/crawlab/routes/base.py +++ b/crawlab/routes/base.py @@ -23,7 +23,7 @@ class BaseApi(Resource): super(BaseApi).__init__() self.parser.add_argument('page_num', type=int) self.parser.add_argument('page_size', type=int) - self.parser.add_argument('filter', type=dict) + self.parser.add_argument('filter', type=str) for arg, type in self.arguments: self.parser.add_argument(arg, type=type) @@ -109,7 +109,7 @@ class BaseApi(Resource): item[k] = args.get(k) item = db_manager.save(col_name=self.col_name, item=item) - self.after_update(item._id) + self.after_update() return item diff --git a/crawlab/routes/tasks.py b/crawlab/routes/tasks.py index ebe5a33b..86e75dab 100644 --- a/crawlab/routes/tasks.py +++ b/crawlab/routes/tasks.py @@ -56,7 +56,13 @@ class TaskApi(BaseApi): args = self.parser.parse_args() page_size = args.get('page_size') or 10 page_num = args.get('page_num') or 1 - tasks = db_manager.list(col_name=self.col_name, cond={}, limit=page_size, skip=page_size * (page_num - 1), + filter_str = args.get('filter') + filter_ = {} + if filter_str is not None: + filter_ = json.loads(filter_str) + if filter_.get('spider_id'): + filter_['spider_id'] = ObjectId(filter_['spider_id']) + tasks = db_manager.list(col_name=self.col_name, cond=filter_, limit=page_size, skip=page_size * (page_num - 1), sort_key='create_ts') items = [] for task in tasks: @@ -82,7 +88,7 @@ class TaskApi(BaseApi): return { 'status': 'ok', - 'total_count': db_manager.count('tasks', {}), + 'total_count': db_manager.count('tasks', filter_), 'page_num': page_num, 'page_size': page_size, 'items': jsonify(items) diff --git a/crawlab/tasks/celery.py b/crawlab/tasks/celery.py index 8cb7501b..3def1b29 100644 --- a/crawlab/tasks/celery.py +++ b/crawlab/tasks/celery.py @@ -1,10 +1,5 @@ from celery import Celery -# from redisbeat.scheduler import RedisScheduler -from utils.redisbeat import RedisScheduler # celery app instance celery_app = Celery(__name__) celery_app.config_from_object('config') - -# RedisBeat scheduler -celery_scheduler = RedisScheduler(app=celery_app) diff --git a/crawlab/tasks/scheduler.py b/crawlab/tasks/scheduler.py index b03639ea..bf29607f 100644 --- a/crawlab/tasks/scheduler.py +++ b/crawlab/tasks/scheduler.py @@ -6,23 +6,22 @@ from pymongo import MongoClient from config import MONGO_DB, MONGO_HOST, MONGO_PORT, FLASK_HOST, FLASK_PORT from constants.spider import CronEnabled from db.manager import db_manager -from tasks.celery import celery_scheduler class Scheduler(object): mongo = MongoClient(host=MONGO_HOST, port=MONGO_PORT) task_col = 'apscheduler_jobs' + # scheduler jobstore jobstores = { 'mongo': MongoDBJobStore(database=MONGO_DB, collection=task_col, client=mongo) } + # scheduler instance scheduler = BackgroundScheduler(jobstores=jobstores) - # scheduler = celery_scheduler - def execute_spider(self, id: str): r = requests.get('http://%s:%s/api/spiders/%s/on_crawl' % ( FLASK_HOST, @@ -32,6 +31,7 @@ class Scheduler(object): def update(self): # remove all existing periodic jobs + self.scheduler.remove_all_jobs() self.mongo[MONGO_DB][self.task_col].remove() periodical_tasks = db_manager.list('schedules', {})