mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-01-22 17:31:03 +01:00
updated cron schedule logic
This commit is contained in:
@@ -88,17 +88,17 @@ def monitor_nodes_status(celery_app):
|
||||
recv.capture(limit=None, timeout=None, wakeup=True)
|
||||
|
||||
|
||||
# run scheduler as a separate process
|
||||
scheduler.run()
|
||||
|
||||
# monitor node status
|
||||
p_monitor = Process(target=monitor_nodes_status, args=(celery_app,))
|
||||
p_monitor.start()
|
||||
|
||||
# create folder if it does not exist
|
||||
if not os.path.exists(PROJECT_LOGS_FOLDER):
|
||||
os.makedirs(PROJECT_LOGS_FOLDER)
|
||||
|
||||
if __name__ == '__main__':
|
||||
# create folder if it does not exist
|
||||
if not os.path.exists(PROJECT_LOGS_FOLDER):
|
||||
os.makedirs(PROJECT_LOGS_FOLDER)
|
||||
|
||||
# run scheduler as a separate process
|
||||
scheduler.run()
|
||||
|
||||
# monitor node status
|
||||
p_monitor = Process(target=monitor_nodes_status, args=(celery_app,))
|
||||
p_monitor.start()
|
||||
|
||||
# run app instance
|
||||
app.run(host=FLASK_HOST, port=FLASK_PORT, threaded=True)
|
||||
|
||||
@@ -1,33 +1,50 @@
|
||||
# project variables
|
||||
# 爬虫源码路径
|
||||
PROJECT_SOURCE_FILE_FOLDER = '../spiders'
|
||||
|
||||
# 配置python虚拟环境的路径
|
||||
PYTHON_ENV_PATH = '/Users/chennan/Desktop/2019/env/bin/python'
|
||||
|
||||
# 爬虫部署路径
|
||||
PROJECT_DEPLOY_FILE_FOLDER = '../deployfile'
|
||||
|
||||
# 爬虫日志路径
|
||||
PROJECT_LOGS_FOLDER = '../deployfile/logs'
|
||||
|
||||
# 打包临时文件夹
|
||||
PROJECT_TMP_FOLDER = '/tmp'
|
||||
|
||||
# celery variables
|
||||
# Celery中间者URL
|
||||
BROKER_URL = 'redis://127.0.0.1:6379/0'
|
||||
|
||||
# Celery后台URL
|
||||
CELERY_RESULT_BACKEND = 'mongodb://127.0.0.1:27017/'
|
||||
|
||||
# Celery MongoDB设置
|
||||
CELERY_MONGODB_BACKEND_SETTINGS = {
|
||||
'database': 'crawlab_test',
|
||||
'taskmeta_collection': 'tasks_celery',
|
||||
}
|
||||
|
||||
# Celery时区
|
||||
CELERY_TIMEZONE = 'Asia/Shanghai'
|
||||
|
||||
# 是否启用UTC
|
||||
CELERY_ENABLE_UTC = True
|
||||
|
||||
# Celery Scheduler Redis URL
|
||||
CELERY_BEAT_SCHEDULER = 'utils.redisbeat.RedisScheduler'
|
||||
CELERY_REDIS_SCHEDULER_URL = 'redis://localhost:6379'
|
||||
CELERY_REDIS_SCHEDULER_KEY = 'celery:beat:order_tasks'
|
||||
|
||||
# flower variables
|
||||
FLOWER_API_ENDPOINT = 'http://localhost:5555/api'
|
||||
|
||||
# database variables
|
||||
# MongoDB 变量
|
||||
MONGO_HOST = '127.0.0.1'
|
||||
MONGO_PORT = 27017
|
||||
MONGO_DB = 'crawlab_test'
|
||||
|
||||
# flask variables
|
||||
# Flask 变量
|
||||
DEBUG = True
|
||||
FLASK_HOST = '127.0.0.1'
|
||||
FLASK_PORT = 8000
|
||||
|
||||
@@ -5,6 +5,7 @@ import requests
|
||||
from constants.task import TaskStatus
|
||||
from db.manager import db_manager
|
||||
from routes.base import BaseApi
|
||||
from tasks.scheduler import scheduler
|
||||
from utils import jsonify
|
||||
from utils.spider import get_spider_col_fields
|
||||
|
||||
@@ -18,3 +19,6 @@ class ScheduleApi(BaseApi):
|
||||
('cron', str),
|
||||
('spider_id', str)
|
||||
)
|
||||
|
||||
def after_update(self, id: str = None):
|
||||
scheduler.update()
|
||||
|
||||
@@ -60,13 +60,26 @@ class TaskApi(BaseApi):
|
||||
sort_key='create_ts')
|
||||
items = []
|
||||
for task in tasks:
|
||||
# celery tasks
|
||||
# _task = db_manager.get('tasks_celery', id=task['_id'])
|
||||
|
||||
# get spider
|
||||
_spider = db_manager.get(col_name='spiders', id=str(task['spider_id']))
|
||||
|
||||
# status
|
||||
if task.get('status') is None:
|
||||
task['status'] = TaskStatus.UNAVAILABLE
|
||||
|
||||
# spider name
|
||||
if _spider:
|
||||
task['spider_name'] = _spider['name']
|
||||
|
||||
# duration
|
||||
if task.get('finish_ts') is not None:
|
||||
task['duration'] = (task['finish_ts'] - task['create_ts']).total_seconds()
|
||||
|
||||
items.append(task)
|
||||
|
||||
return {
|
||||
'status': 'ok',
|
||||
'total_count': db_manager.count('tasks', {}),
|
||||
|
||||
@@ -1,5 +1,10 @@
|
||||
from celery import Celery
|
||||
# from redisbeat.scheduler import RedisScheduler
|
||||
from utils.redisbeat import RedisScheduler
|
||||
|
||||
# celery app instance
|
||||
celery_app = Celery(__name__)
|
||||
celery_app.config_from_object('config')
|
||||
|
||||
# RedisBeat scheduler
|
||||
celery_scheduler = RedisScheduler(app=celery_app)
|
||||
|
||||
@@ -2,24 +2,27 @@ import requests
|
||||
from apscheduler.schedulers.background import BackgroundScheduler
|
||||
from apscheduler.jobstores.mongodb import MongoDBJobStore
|
||||
from pymongo import MongoClient
|
||||
from flask import current_app
|
||||
|
||||
from config import MONGO_DB, MONGO_HOST, MONGO_PORT, FLASK_HOST, FLASK_PORT
|
||||
from constants.spider import CronEnabled
|
||||
from db.manager import db_manager
|
||||
from tasks.celery import celery_scheduler
|
||||
|
||||
|
||||
class Scheduler(object):
|
||||
mongo = MongoClient(host=MONGO_HOST, port=MONGO_PORT)
|
||||
task_col = 'apscheduler_jobs'
|
||||
|
||||
jobstores = {
|
||||
'mongo': MongoDBJobStore(database=MONGO_DB,
|
||||
collection='apscheduler_jobs',
|
||||
collection=task_col,
|
||||
client=mongo)
|
||||
}
|
||||
|
||||
scheduler = BackgroundScheduler(jobstores=jobstores)
|
||||
|
||||
# scheduler = celery_scheduler
|
||||
|
||||
def execute_spider(self, id: str):
|
||||
r = requests.get('http://%s:%s/api/spiders/%s/on_crawl' % (
|
||||
FLASK_HOST,
|
||||
@@ -27,21 +30,13 @@ class Scheduler(object):
|
||||
id
|
||||
))
|
||||
|
||||
def restart(self):
|
||||
self.scheduler.shutdown()
|
||||
self.scheduler.start()
|
||||
current_app.logger.info('restarted')
|
||||
|
||||
def update(self):
|
||||
current_app.logger.info('updating...')
|
||||
|
||||
# remove all existing periodic jobs
|
||||
self.scheduler.remove_all_jobs()
|
||||
self.mongo[MONGO_DB][self.task_col].remove()
|
||||
|
||||
# add new periodic jobs from database
|
||||
spiders = db_manager.list('spiders', {'cron_enabled': CronEnabled.ON})
|
||||
for spider in spiders:
|
||||
cron = spider.get('cron')
|
||||
periodical_tasks = db_manager.list('schedules', {})
|
||||
for task in periodical_tasks:
|
||||
cron = task.get('cron')
|
||||
cron_arr = cron.split(' ')
|
||||
second = cron_arr[0]
|
||||
minute = cron_arr[1]
|
||||
@@ -49,13 +44,11 @@ class Scheduler(object):
|
||||
day = cron_arr[3]
|
||||
month = cron_arr[4]
|
||||
day_of_week = cron_arr[5]
|
||||
self.scheduler.add_job(func=self.execute_spider, trigger='cron', args=(str(spider['_id']),),
|
||||
self.scheduler.add_job(func=self.execute_spider, trigger='cron', args=(str(task['spider_id']),),
|
||||
jobstore='mongo',
|
||||
day_of_week=day_of_week, month=month, day=day, hour=hour, minute=minute,
|
||||
second=second)
|
||||
|
||||
current_app.logger.info('updated')
|
||||
|
||||
def run(self):
|
||||
self.update()
|
||||
self.scheduler.start()
|
||||
|
||||
Reference in New Issue
Block a user