updated cron tasks: fixed task run duplication issue

This commit is contained in:
Marvin Zhang
2019-03-09 15:35:48 +08:00
parent cc4a0eefc3
commit 189d7da299
7 changed files with 38 additions and 71 deletions

View File

@@ -15,15 +15,13 @@ file_dir = os.path.dirname(os.path.realpath(__file__))
root_path = os.path.abspath(os.path.join(file_dir, '.'))
sys.path.append(root_path)
from config import FLASK_HOST, FLASK_PORT, PROJECT_LOGS_FOLDER, BROKER_URL
from constants.manage import ActionType
from config import FLASK_HOST, FLASK_PORT, PROJECT_LOGS_FOLDER
from routes.deploys import DeployApi
from routes.files import FileApi
from routes.nodes import NodeApi
from routes.spiders import SpiderApi, SpiderImportApi, SpiderManageApi
from routes.stats import StatsApi
from routes.tasks import TaskApi
from tasks.celery import celery_app
# flask app instance
app = Flask(__name__)
@@ -72,5 +70,8 @@ if __name__ == '__main__':
if not os.path.exists(PROJECT_LOGS_FOLDER):
os.makedirs(PROJECT_LOGS_FOLDER)
# run scheduler as a separate process
scheduler.run()
# run app instance
app.run(host=FLASK_HOST, port=FLASK_PORT, threaded=True)

View File

@@ -29,5 +29,5 @@ MONGO_DB = 'crawlab_test'
# flask variables
DEBUG = True
FLASK_HOST = '0.0.0.0'
FLASK_PORT = 5000
FLASK_HOST = '127.0.0.1'
FLASK_PORT = 8000

View File

@@ -1,60 +0,0 @@
amqp==2.4.1
apscheduler==3.5.3
aniso8601==4.1.0
asn1crypto==0.24.0
attrs==18.2.0
Automat==0.7.0
Babel==2.6.0
billiard==3.5.0.5
celery==4.2.1
certifi==2018.11.29
cffi==1.11.5
chardet==3.0.4
Click==7.0
constantly==15.1.0
cryptography==2.5
cssselect==1.0.3
dnspython==1.16.0
docopt==0.6.2
eventlet==0.24.1
Flask==1.0.2
Flask-Cors==3.0.7
Flask-RESTful==0.3.7
flower==0.9.2
greenlet==0.4.15
hyperlink==18.0.0
idna==2.8
incremental==17.5.0
itsdangerous==1.1.0
Jinja2==2.10
kombu==4.3.0
lxml==4.3.1
MarkupSafe==1.1.0
mongoengine==0.16.3
monotonic==1.5
parsel==1.5.1
pyasn1==0.4.5
pyasn1-modules==0.2.4
pycparser==2.19
PyDispatcher==2.0.5
PyHamcrest==1.9.0
pymongo==3.7.2
PyMySQL==0.9.3
pyOpenSSL==19.0.0
python-scrapyd-api==2.1.2
pytz==2018.9
queuelib==1.5.0
redis==3.1.0
requests==2.21.0
Scrapy==1.6.0
scrapy-redis==0.6.8
scrapy-splash==0.7.2
service-identity==18.1.0
six==1.12.0
tornado==5.1.1
Twisted==18.9.0
urllib3==1.24.1
vine==1.2.0
w3lib==1.20.0
Werkzeug==0.14.1
zope.interface==4.6.0

View File

@@ -105,6 +105,10 @@ class BaseApi(Resource):
if k not in DEFAULT_ARGS:
values[k] = args.get(k)
item = db_manager.update_one(col_name=self.col_name, id=id, values=values)
# execute after_update hook
self.after_update(id)
return item
def post(self, id=None, action=None):
@@ -122,3 +126,6 @@ class BaseApi(Resource):
def delete(self, id=None):
db_manager.remove_one(col_name=self.col_name, id=id)
def after_update(self, id=None):
pass

View File

@@ -16,6 +16,7 @@ from constants.node import NodeStatus
from constants.task import TaskStatus
from db.manager import db_manager
from routes.base import BaseApi
from tasks.scheduler import scheduler
from tasks.spider import execute_spider
from utils import jsonify
from utils.deploy import zip_file, unzip_file
@@ -25,6 +26,10 @@ from utils.spider import get_lang_by_stats
parser = reqparse.RequestParser()
parser.add_argument('file', type=FileStorage, location='files')
IGNORE_DIRS = [
'.idea'
]
class SpiderApi(BaseApi):
col_name = 'spiders'
@@ -75,6 +80,9 @@ class SpiderApi(BaseApi):
items = []
dirs = os.listdir(PROJECT_SOURCE_FILE_FOLDER)
for _dir in dirs:
if _dir in IGNORE_DIRS:
continue
dir_path = os.path.join(PROJECT_SOURCE_FILE_FOLDER, _dir)
dir_name = _dir
spider = db_manager.get_one_by_key('spiders', key='src', value=dir_path)
@@ -274,6 +282,9 @@ class SpiderApi(BaseApi):
'items': items
})
def after_update(self, id=None):
scheduler.update()
class SpiderImportApi(Resource):
parser = reqparse.RequestParser()

View File

@@ -1,9 +1,9 @@
import requests
from apscheduler.schedulers.background import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from pymongo import MongoClient
from config import MONGO_DB, MONGO_HOST, MONGO_PORT
from config import MONGO_DB, MONGO_HOST, MONGO_PORT, FLASK_HOST, FLASK_PORT
from constants.spider import CronEnabled
from db.manager import db_manager
@@ -17,17 +17,24 @@ class Scheduler(object):
client=mongo)
}
scheduler = BlockingScheduler(jobstores=jobstores)
scheduler = BackgroundScheduler(jobstores=jobstores)
def execute_spider(self, id: str):
r = requests.get('http://localhost:5000/api/spiders/%s/on_crawl' % id)
r = requests.get('http://%s:%s/api/spiders/%s/on_crawl' % (
FLASK_HOST,
FLASK_PORT,
id
))
def restart(self):
self.scheduler.shutdown()
self.scheduler.start()
def update(self):
# remove all existing periodic jobs
self.scheduler.remove_all_jobs()
# add new periodic jobs from database
spiders = db_manager.list('spiders', {'cron_enabled': CronEnabled.ON})
for spider in spiders:
cron = spider.get('cron')
@@ -38,7 +45,7 @@ class Scheduler(object):
day = cron_arr[3]
month = cron_arr[4]
day_of_week = cron_arr[5]
self.scheduler.add_job(func=self.execute_spider, trigger='cron', args=(str(spider['_id']),),
self.scheduler.add_job(func=self.execute_spider, trigger='cron', args=(str(spider['_id']),), jobstore='mongo',
day_of_week=day_of_week, month=month, day=day, hour=hour, minute=minute,
second=second)

View File

@@ -1,6 +1,7 @@
import axios from 'axios'
const baseUrl = 'http://localhost:5000/api'
const baseUrl = 'http://localhost:8000/api'
// const baseUrl = 'http://139.129.230.98:8000/api'
const request = (method, path, params, data) => {
return new Promise((resolve, reject) => {