mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-01-21 17:21:09 +01:00
updated scheduler
This commit is contained in:
@@ -23,7 +23,7 @@ class BaseApi(Resource):
|
||||
super(BaseApi).__init__()
|
||||
self.parser.add_argument('page_num', type=int)
|
||||
self.parser.add_argument('page_size', type=int)
|
||||
self.parser.add_argument('filter', type=dict)
|
||||
self.parser.add_argument('filter', type=str)
|
||||
|
||||
for arg, type in self.arguments:
|
||||
self.parser.add_argument(arg, type=type)
|
||||
@@ -109,7 +109,7 @@ class BaseApi(Resource):
|
||||
item[k] = args.get(k)
|
||||
item = db_manager.save(col_name=self.col_name, item=item)
|
||||
|
||||
self.after_update(item._id)
|
||||
self.after_update()
|
||||
|
||||
return item
|
||||
|
||||
|
||||
@@ -56,7 +56,13 @@ class TaskApi(BaseApi):
|
||||
args = self.parser.parse_args()
|
||||
page_size = args.get('page_size') or 10
|
||||
page_num = args.get('page_num') or 1
|
||||
tasks = db_manager.list(col_name=self.col_name, cond={}, limit=page_size, skip=page_size * (page_num - 1),
|
||||
filter_str = args.get('filter')
|
||||
filter_ = {}
|
||||
if filter_str is not None:
|
||||
filter_ = json.loads(filter_str)
|
||||
if filter_.get('spider_id'):
|
||||
filter_['spider_id'] = ObjectId(filter_['spider_id'])
|
||||
tasks = db_manager.list(col_name=self.col_name, cond=filter_, limit=page_size, skip=page_size * (page_num - 1),
|
||||
sort_key='create_ts')
|
||||
items = []
|
||||
for task in tasks:
|
||||
@@ -82,7 +88,7 @@ class TaskApi(BaseApi):
|
||||
|
||||
return {
|
||||
'status': 'ok',
|
||||
'total_count': db_manager.count('tasks', {}),
|
||||
'total_count': db_manager.count('tasks', filter_),
|
||||
'page_num': page_num,
|
||||
'page_size': page_size,
|
||||
'items': jsonify(items)
|
||||
|
||||
@@ -1,10 +1,5 @@
|
||||
from celery import Celery
|
||||
# from redisbeat.scheduler import RedisScheduler
|
||||
from utils.redisbeat import RedisScheduler
|
||||
|
||||
# celery app instance
|
||||
celery_app = Celery(__name__)
|
||||
celery_app.config_from_object('config')
|
||||
|
||||
# RedisBeat scheduler
|
||||
celery_scheduler = RedisScheduler(app=celery_app)
|
||||
|
||||
@@ -6,23 +6,22 @@ from pymongo import MongoClient
|
||||
from config import MONGO_DB, MONGO_HOST, MONGO_PORT, FLASK_HOST, FLASK_PORT
|
||||
from constants.spider import CronEnabled
|
||||
from db.manager import db_manager
|
||||
from tasks.celery import celery_scheduler
|
||||
|
||||
|
||||
class Scheduler(object):
|
||||
mongo = MongoClient(host=MONGO_HOST, port=MONGO_PORT)
|
||||
task_col = 'apscheduler_jobs'
|
||||
|
||||
# scheduler jobstore
|
||||
jobstores = {
|
||||
'mongo': MongoDBJobStore(database=MONGO_DB,
|
||||
collection=task_col,
|
||||
client=mongo)
|
||||
}
|
||||
|
||||
# scheduler instance
|
||||
scheduler = BackgroundScheduler(jobstores=jobstores)
|
||||
|
||||
# scheduler = celery_scheduler
|
||||
|
||||
def execute_spider(self, id: str):
|
||||
r = requests.get('http://%s:%s/api/spiders/%s/on_crawl' % (
|
||||
FLASK_HOST,
|
||||
@@ -32,6 +31,7 @@ class Scheduler(object):
|
||||
|
||||
def update(self):
|
||||
# remove all existing periodic jobs
|
||||
self.scheduler.remove_all_jobs()
|
||||
self.mongo[MONGO_DB][self.task_col].remove()
|
||||
|
||||
periodical_tasks = db_manager.list('schedules', {})
|
||||
|
||||
Reference in New Issue
Block a user