updated cron schedule logic

This commit is contained in:
Marvin Zhang
2019-04-17 21:22:02 +08:00
parent 20c721a20e
commit c0a987dad1
6 changed files with 64 additions and 32 deletions

View File

@@ -88,17 +88,17 @@ def monitor_nodes_status(celery_app):
recv.capture(limit=None, timeout=None, wakeup=True)
# run scheduler as a separate process
scheduler.run()
# monitor node status
p_monitor = Process(target=monitor_nodes_status, args=(celery_app,))
p_monitor.start()
# create folder if it does not exist
if not os.path.exists(PROJECT_LOGS_FOLDER):
os.makedirs(PROJECT_LOGS_FOLDER)
if __name__ == '__main__':
# create folder if it does not exist
if not os.path.exists(PROJECT_LOGS_FOLDER):
os.makedirs(PROJECT_LOGS_FOLDER)
# run scheduler as a separate process
scheduler.run()
# monitor node status
p_monitor = Process(target=monitor_nodes_status, args=(celery_app,))
p_monitor.start()
# run app instance
app.run(host=FLASK_HOST, port=FLASK_PORT, threaded=True)

View File

@@ -1,33 +1,50 @@
# project variables
# 爬虫源码路径
PROJECT_SOURCE_FILE_FOLDER = '../spiders'
# 配置python虚拟环境的路径
PYTHON_ENV_PATH = '/Users/chennan/Desktop/2019/env/bin/python'
# 爬虫部署路径
PROJECT_DEPLOY_FILE_FOLDER = '../deployfile'
# 爬虫日志路径
PROJECT_LOGS_FOLDER = '../deployfile/logs'
# 打包临时文件夹
PROJECT_TMP_FOLDER = '/tmp'
# celery variables
# Celery中间者URL
BROKER_URL = 'redis://127.0.0.1:6379/0'
# Celery后台URL
CELERY_RESULT_BACKEND = 'mongodb://127.0.0.1:27017/'
# Celery MongoDB设置
CELERY_MONGODB_BACKEND_SETTINGS = {
'database': 'crawlab_test',
'taskmeta_collection': 'tasks_celery',
}
# Celery时区
CELERY_TIMEZONE = 'Asia/Shanghai'
# 是否启用UTC
CELERY_ENABLE_UTC = True
# Celery Scheduler Redis URL
CELERY_BEAT_SCHEDULER = 'utils.redisbeat.RedisScheduler'
CELERY_REDIS_SCHEDULER_URL = 'redis://localhost:6379'
CELERY_REDIS_SCHEDULER_KEY = 'celery:beat:order_tasks'
# flower variables
FLOWER_API_ENDPOINT = 'http://localhost:5555/api'
# database variables
# MongoDB 变量
MONGO_HOST = '127.0.0.1'
MONGO_PORT = 27017
MONGO_DB = 'crawlab_test'
# flask variables
# Flask 变量
DEBUG = True
FLASK_HOST = '127.0.0.1'
FLASK_PORT = 8000

View File

@@ -5,6 +5,7 @@ import requests
from constants.task import TaskStatus
from db.manager import db_manager
from routes.base import BaseApi
from tasks.scheduler import scheduler
from utils import jsonify
from utils.spider import get_spider_col_fields
@@ -18,3 +19,6 @@ class ScheduleApi(BaseApi):
('cron', str),
('spider_id', str)
)
def after_update(self, id: str = None):
scheduler.update()

View File

@@ -60,13 +60,26 @@ class TaskApi(BaseApi):
sort_key='create_ts')
items = []
for task in tasks:
# celery tasks
# _task = db_manager.get('tasks_celery', id=task['_id'])
# get spider
_spider = db_manager.get(col_name='spiders', id=str(task['spider_id']))
# status
if task.get('status') is None:
task['status'] = TaskStatus.UNAVAILABLE
# spider name
if _spider:
task['spider_name'] = _spider['name']
# duration
if task.get('finish_ts') is not None:
task['duration'] = (task['finish_ts'] - task['create_ts']).total_seconds()
items.append(task)
return {
'status': 'ok',
'total_count': db_manager.count('tasks', {}),

View File

@@ -1,5 +1,10 @@
from celery import Celery
# from redisbeat.scheduler import RedisScheduler
from utils.redisbeat import RedisScheduler
# celery app instance
celery_app = Celery(__name__)
celery_app.config_from_object('config')
# RedisBeat scheduler
celery_scheduler = RedisScheduler(app=celery_app)

View File

@@ -2,24 +2,27 @@ import requests
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from pymongo import MongoClient
from flask import current_app
from config import MONGO_DB, MONGO_HOST, MONGO_PORT, FLASK_HOST, FLASK_PORT
from constants.spider import CronEnabled
from db.manager import db_manager
from tasks.celery import celery_scheduler
class Scheduler(object):
mongo = MongoClient(host=MONGO_HOST, port=MONGO_PORT)
task_col = 'apscheduler_jobs'
jobstores = {
'mongo': MongoDBJobStore(database=MONGO_DB,
collection='apscheduler_jobs',
collection=task_col,
client=mongo)
}
scheduler = BackgroundScheduler(jobstores=jobstores)
# scheduler = celery_scheduler
def execute_spider(self, id: str):
r = requests.get('http://%s:%s/api/spiders/%s/on_crawl' % (
FLASK_HOST,
@@ -27,21 +30,13 @@ class Scheduler(object):
id
))
def restart(self):
self.scheduler.shutdown()
self.scheduler.start()
current_app.logger.info('restarted')
def update(self):
current_app.logger.info('updating...')
# remove all existing periodic jobs
self.scheduler.remove_all_jobs()
self.mongo[MONGO_DB][self.task_col].remove()
# add new periodic jobs from database
spiders = db_manager.list('spiders', {'cron_enabled': CronEnabled.ON})
for spider in spiders:
cron = spider.get('cron')
periodical_tasks = db_manager.list('schedules', {})
for task in periodical_tasks:
cron = task.get('cron')
cron_arr = cron.split(' ')
second = cron_arr[0]
minute = cron_arr[1]
@@ -49,13 +44,11 @@ class Scheduler(object):
day = cron_arr[3]
month = cron_arr[4]
day_of_week = cron_arr[5]
self.scheduler.add_job(func=self.execute_spider, trigger='cron', args=(str(spider['_id']),),
self.scheduler.add_job(func=self.execute_spider, trigger='cron', args=(str(task['spider_id']),),
jobstore='mongo',
day_of_week=day_of_week, month=month, day=day, hour=hour, minute=minute,
second=second)
current_app.logger.info('updated')
def run(self):
self.update()
self.scheduler.start()