updated scheduler

This commit is contained in:
Marvin Zhang
2019-04-17 22:07:48 +08:00
parent aa6f414e42
commit 6ceb00aedc
4 changed files with 13 additions and 12 deletions

View File

@@ -23,7 +23,7 @@ class BaseApi(Resource):
super(BaseApi).__init__()
self.parser.add_argument('page_num', type=int)
self.parser.add_argument('page_size', type=int)
self.parser.add_argument('filter', type=dict)
self.parser.add_argument('filter', type=str)
for arg, type in self.arguments:
self.parser.add_argument(arg, type=type)
@@ -109,7 +109,7 @@ class BaseApi(Resource):
item[k] = args.get(k)
item = db_manager.save(col_name=self.col_name, item=item)
self.after_update(item._id)
self.after_update()
return item

View File

@@ -56,7 +56,13 @@ class TaskApi(BaseApi):
args = self.parser.parse_args()
page_size = args.get('page_size') or 10
page_num = args.get('page_num') or 1
tasks = db_manager.list(col_name=self.col_name, cond={}, limit=page_size, skip=page_size * (page_num - 1),
filter_str = args.get('filter')
filter_ = {}
if filter_str is not None:
filter_ = json.loads(filter_str)
if filter_.get('spider_id'):
filter_['spider_id'] = ObjectId(filter_['spider_id'])
tasks = db_manager.list(col_name=self.col_name, cond=filter_, limit=page_size, skip=page_size * (page_num - 1),
sort_key='create_ts')
items = []
for task in tasks:
@@ -82,7 +88,7 @@ class TaskApi(BaseApi):
return {
'status': 'ok',
'total_count': db_manager.count('tasks', {}),
'total_count': db_manager.count('tasks', filter_),
'page_num': page_num,
'page_size': page_size,
'items': jsonify(items)

View File

@@ -1,10 +1,5 @@
from celery import Celery
# from redisbeat.scheduler import RedisScheduler
from utils.redisbeat import RedisScheduler
# celery app instance
celery_app = Celery(__name__)
celery_app.config_from_object('config')
# RedisBeat scheduler
celery_scheduler = RedisScheduler(app=celery_app)

View File

@@ -6,23 +6,22 @@ from pymongo import MongoClient
from config import MONGO_DB, MONGO_HOST, MONGO_PORT, FLASK_HOST, FLASK_PORT
from constants.spider import CronEnabled
from db.manager import db_manager
from tasks.celery import celery_scheduler
class Scheduler(object):
mongo = MongoClient(host=MONGO_HOST, port=MONGO_PORT)
task_col = 'apscheduler_jobs'
# scheduler jobstore
jobstores = {
'mongo': MongoDBJobStore(database=MONGO_DB,
collection=task_col,
client=mongo)
}
# scheduler instance
scheduler = BackgroundScheduler(jobstores=jobstores)
# scheduler = celery_scheduler
def execute_spider(self, id: str):
r = requests.get('http://%s:%s/api/spiders/%s/on_crawl' % (
FLASK_HOST,
@@ -32,6 +31,7 @@ class Scheduler(object):
def update(self):
# remove all existing periodic jobs
self.scheduler.remove_all_jobs()
self.mongo[MONGO_DB][self.task_col].remove()
periodical_tasks = db_manager.list('schedules', {})