Merge remote-tracking branch 'upstream/develop' into develop

This commit is contained in:
casperwnb
2019-04-19 12:34:35 +08:00
44 changed files with 1575 additions and 187 deletions

27
LICENSE
View File

@@ -1,27 +0,0 @@
Copyright (c) 2019, Marvin Zhang
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
3. All advertising materials mentioning features or use of this software
must display the following acknowledgement:
This product includes software developed by the Marvin Zhang.
4. Neither the name of the Marvin Zhang nor the
names of its contributors may be used to endorse or promote products
derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY MARVIN ZHANG ''AS IS'' AND ANY
EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL MARVIN ZHANG BE LIABLE FOR ANY
DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@@ -43,23 +43,22 @@ api.add_resource(NodeApi,
'/api/nodes',
'/api/nodes/<string:id>',
'/api/nodes/<string:id>/<string:action>')
api.add_resource(SpiderImportApi,
'/api/spiders/import/<string:platform>')
api.add_resource(SpiderManageApi,
'/api/spiders/manage/<string:action>')
api.add_resource(SpiderApi,
'/api/spiders',
'/api/spiders/<string:id>',
'/api/spiders/<string:id>/<string:action>')
api.add_resource(SpiderImportApi,
'/api/spiders/import/<string:platform>')
api.add_resource(SpiderManageApi,
'/api/spiders/manage/<string:action>')
api.add_resource(TaskApi,
'/api/tasks',
'/api/tasks/<string:id>',
'/api/tasks/<string:id>/<string:action>')
api.add_resource(DeployApi,
'/api/deploys',
'/api/deploys/<string:id>',
'/api/deploys/<string:id>/<string:action>')
api.add_resource(TaskApi,
'/api/tasks',
'/api/tasks/<string:id>',
'/api/tasks/<string:id>/<string:action>'
)
api.add_resource(FileApi,
'/api/files',
'/api/files/<string:action>')
@@ -89,17 +88,17 @@ def monitor_nodes_status(celery_app):
recv.capture(limit=None, timeout=None, wakeup=True)
# run scheduler as a separate process
scheduler.run()
# monitor node status
p_monitor = Process(target=monitor_nodes_status, args=(celery_app,))
p_monitor.start()
# create folder if it does not exist
if not os.path.exists(PROJECT_LOGS_FOLDER):
os.makedirs(PROJECT_LOGS_FOLDER)
if __name__ == '__main__':
# create folder if it does not exist
if not os.path.exists(PROJECT_LOGS_FOLDER):
os.makedirs(PROJECT_LOGS_FOLDER)
# run scheduler as a separate process
scheduler.run()
# monitor node status
p_monitor = Process(target=monitor_nodes_status, args=(celery_app,))
p_monitor.start()
# run app instance
app.run(host=FLASK_HOST, port=FLASK_PORT, threaded=True)

View File

@@ -1,33 +1,50 @@
# project variables
# 爬虫源码路径
PROJECT_SOURCE_FILE_FOLDER = '../spiders'
# 配置python虚拟环境的路径
PYTHON_ENV_PATH = '/Users/chennan/Desktop/2019/env/bin/python'
# 爬虫部署路径
PROJECT_DEPLOY_FILE_FOLDER = '../deployfile'
# 爬虫日志路径
PROJECT_LOGS_FOLDER = '../deployfile/logs'
# 打包临时文件夹
PROJECT_TMP_FOLDER = '/tmp'
# celery variables
# Celery中间者URL
BROKER_URL = 'redis://127.0.0.1:6379/0'
# Celery后台URL
CELERY_RESULT_BACKEND = 'mongodb://127.0.0.1:27017/'
# Celery MongoDB设置
CELERY_MONGODB_BACKEND_SETTINGS = {
'database': 'crawlab_test',
'taskmeta_collection': 'tasks_celery',
}
# Celery时区
CELERY_TIMEZONE = 'Asia/Shanghai'
# 是否启用UTC
CELERY_ENABLE_UTC = True
# Celery Scheduler Redis URL
CELERY_BEAT_SCHEDULER = 'utils.redisbeat.RedisScheduler'
CELERY_REDIS_SCHEDULER_URL = 'redis://localhost:6379'
CELERY_REDIS_SCHEDULER_KEY = 'celery:beat:order_tasks'
# flower variables
FLOWER_API_ENDPOINT = 'http://localhost:5555/api'
# database variables
# MongoDB 变量
MONGO_HOST = '127.0.0.1'
MONGO_PORT = 27017
MONGO_DB = 'crawlab_test'
# flask variables
# Flask 变量
DEBUG = True
FLASK_HOST = '127.0.0.1'
FLASK_PORT = 8000

View File

@@ -2,17 +2,26 @@ from bson import ObjectId
from mongoengine import connect
from pymongo import MongoClient, DESCENDING
from config import MONGO_HOST, MONGO_PORT, MONGO_DB
from utils import is_object_id, jsonify
from utils import is_object_id
connect(db=MONGO_DB, host=MONGO_HOST, port=MONGO_PORT)
class DbManager(object):
__doc__ = """
Database Manager class for handling database CRUD actions.
"""
def __init__(self):
self.mongo = MongoClient(host=MONGO_HOST, port=MONGO_PORT)
self.db = self.mongo[MONGO_DB]
def save(self, col_name: str, item, **kwargs):
def save(self, col_name: str, item: dict, **kwargs) -> None:
"""
Save the item in the specified collection
:param col_name: collection name
:param item: item object
"""
col = self.db[col_name]
# in case some fields cannot be saved in MongoDB
@@ -21,15 +30,32 @@ class DbManager(object):
col.save(item, **kwargs)
def remove(self, col_name: str, cond: dict, **kwargs):
def remove(self, col_name: str, cond: dict, **kwargs) -> None:
"""
Remove items given specified condition.
:param col_name: collection name
:param cond: condition or filter
"""
col = self.db[col_name]
col.remove(cond, **kwargs)
def update(self, col_name: str, cond: dict, values: dict, **kwargs):
"""
Update items given specified condition.
:param col_name: collection name
:param cond: condition or filter
:param values: values to update
"""
col = self.db[col_name]
col.update(cond, {'$set': values}, **kwargs)
def update_one(self, col_name: str, id: str, values: dict, **kwargs):
"""
Update an item given specified _id
:param col_name: collection name
:param id: _id
:param values: values to update
"""
col = self.db[col_name]
_id = id
if is_object_id(id):
@@ -38,6 +64,11 @@ class DbManager(object):
col.find_one_and_update({'_id': _id}, {'$set': values})
def remove_one(self, col_name: str, id: str, **kwargs):
"""
Remove an item given specified _id
:param col_name: collection name
:param id: _id
"""
col = self.db[col_name]
_id = id
if is_object_id(id):
@@ -45,7 +76,16 @@ class DbManager(object):
col.remove({'_id': _id})
def list(self, col_name: str, cond: dict, sort_key=None, sort_direction=DESCENDING, skip: int = 0, limit: int = 100,
**kwargs):
**kwargs) -> list:
"""
Return a list of items given specified condition, sort_key, sort_direction, skip, and limit.
:param col_name: collection name
:param cond: condition or filter
:param sort_key: key to sort
:param sort_direction: sort direction
:param skip: skip number
:param limit: limit number
"""
if sort_key is None:
sort_key = '_i'
col = self.db[col_name]
@@ -54,11 +94,21 @@ class DbManager(object):
data.append(item)
return data
def _get(self, col_name: str, cond: dict):
def _get(self, col_name: str, cond: dict) -> dict:
"""
Get an item given specified condition.
:param col_name: collection name
:param cond: condition or filter
"""
col = self.db[col_name]
return col.find_one(cond)
def get(self, col_name: str, id):
def get(self, col_name: str, id: (ObjectId, str)) -> dict:
"""
Get an item given specified _id.
:param col_name: collection name
:param id: _id
"""
if type(id) == ObjectId:
_id = id
elif is_object_id(id):
@@ -67,14 +117,28 @@ class DbManager(object):
_id = id
return self._get(col_name=col_name, cond={'_id': _id})
def get_one_by_key(self, col_name: str, key, value):
def get_one_by_key(self, col_name: str, key, value) -> dict:
"""
Get an item given key/value condition.
:param col_name: collection name
:param key: key
:param value: value
"""
return self._get(col_name=col_name, cond={key: value})
def count(self, col_name: str, cond):
def count(self, col_name: str, cond) -> int:
"""
Get total count of a collection given specified condition
:param col_name: collection name
:param cond: condition or filter
"""
col = self.db[col_name]
return col.count(cond)
def get_latest_version(self, spider_id, node_id):
"""
@deprecated
"""
col = self.db['deploys']
for item in col.find({'spider_id': ObjectId(spider_id), 'node_id': node_id}) \
.sort('version', DESCENDING):
@@ -82,13 +146,32 @@ class DbManager(object):
return None
def get_last_deploy(self, spider_id):
"""
Get latest deploy for a given spider_id
"""
col = self.db['deploys']
for item in col.find({'spider_id': ObjectId(spider_id)}) \
.sort('finish_ts', DESCENDING):
return item
return None
def get_last_task(self, spider_id):
"""
Get latest deploy for a given spider_id
"""
col = self.db['tasks']
for item in col.find({'spider_id': ObjectId(spider_id)}) \
.sort('create_ts', DESCENDING):
return item
return None
def aggregate(self, col_name: str, pipelines, **kwargs):
"""
Perform MongoDB col.aggregate action to aggregate stats given collection name and pipelines.
Reference: https://docs.mongodb.com/manual/reference/command/aggregate/
:param col_name: collection name
:param pipelines: pipelines
"""
col = self.db[col_name]
return col.aggregate(pipelines, **kwargs)

View File

@@ -12,6 +12,9 @@ DEFAULT_ARGS = [
class BaseApi(Resource):
"""
Base class for API. All API classes should inherit this class.
"""
col_name = 'tmp'
parser = reqparse.RequestParser()
arguments = []
@@ -20,14 +23,23 @@ class BaseApi(Resource):
super(BaseApi).__init__()
self.parser.add_argument('page_num', type=int)
self.parser.add_argument('page_size', type=int)
self.parser.add_argument('filter', type=dict)
self.parser.add_argument('filter', type=str)
for arg, type in self.arguments:
self.parser.add_argument(arg, type=type)
def get(self, id=None, action=None):
import pdb
pdb.set_trace()
def get(self, id: str = None, action: str = None) -> (dict, tuple):
"""
GET method for retrieving item information.
If id is specified and action is not, return the object of the given id;
If id and action are both specified, execute the given action results of the given id;
If neither id nor action is specified, return the list of items given the page_size, page_num and filter
:param id:
:param action:
:return:
"""
# import pdb
# pdb.set_trace()
args = self.parser.parse_args()
# action by id
@@ -73,28 +85,40 @@ class BaseApi(Resource):
# TODO: getting status for node
return jsonify({
return {
'status': 'ok',
'total_count': total_count,
'page_num': page,
'page_size': page_size,
'items': items
})
'items': jsonify(items)
}
# get item by id
else:
return jsonify(db_manager.get(col_name=self.col_name, id=id))
def put(self):
def put(self) -> (dict, tuple):
"""
PUT method for creating a new item.
:return:
"""
args = self.parser.parse_args()
item = {}
for k in args.keys():
if k not in DEFAULT_ARGS:
item[k] = args.get(k)
item = db_manager.save(col_name=self.col_name, item=item)
self.after_update()
return item
def update(self, id=None):
def update(self, id: str = None) -> (dict, tuple):
"""
Helper function for update action given the id.
:param id:
:return:
"""
args = self.parser.parse_args()
item = db_manager.get(col_name=self.col_name, id=id)
if item is None:
@@ -106,7 +130,8 @@ class BaseApi(Resource):
values = {}
for k in args.keys():
if k not in DEFAULT_ARGS:
values[k] = args.get(k)
if args.get(k) is not None:
values[k] = args.get(k)
item = db_manager.update_one(col_name=self.col_name, id=id, values=values)
# execute after_update hook
@@ -114,10 +139,18 @@ class BaseApi(Resource):
return item
def post(self, id=None, action=None):
def post(self, id: str = None, action: str = None):
"""
POST method of the given id for performing an action.
:param id:
:param action:
:return:
"""
# perform update action if action is not specified
if action is None:
return self.update(id)
# if action is not defined in the attributes, return 400 error
if not hasattr(self, action):
return {
'status': 'ok',
@@ -125,10 +158,27 @@ class BaseApi(Resource):
'error': 'action "%s" invalid' % action
}, 400
# perform specified action of given id
return getattr(self, action)(id)
def delete(self, id=None):
def delete(self, id: str = None) -> (dict, tuple):
"""
DELETE method of given id for deleting an item.
:param id:
:return:
"""
# perform delete action
db_manager.remove_one(col_name=self.col_name, id=id)
return {
'status': 'ok',
'message': 'deleted successfully',
}
def after_update(self, id=None):
def after_update(self, id: str = None):
"""
This is the after update hook once the update method is performed.
To be overridden.
:param id:
:return:
"""
pass

View File

@@ -11,7 +11,12 @@ class DeployApi(BaseApi):
('node_id', str),
)
def get(self, id=None, action=None):
def get(self, id: str = None, action: str = None) -> (dict, tuple):
"""
GET method of DeployAPI.
:param id: deploy_id
:param action: action
"""
# action by id
if action is not None:
if not hasattr(self, action):

View File

@@ -15,6 +15,10 @@ class FileApi(Resource):
self.parser.add_argument('path', type=str)
def get(self, action=None):
"""
GET method of FileAPI.
:param action: action
"""
args = self.parser.parse_args()
path = args.get('path')

View File

@@ -15,7 +15,12 @@ class NodeApi(BaseApi):
('port', str),
)
def get(self, id=None, action=None):
def get(self, id: str = None, action: str = None) -> (dict, tuple):
"""
GET method of NodeAPI.
:param id: item id
:param action: action
"""
# action by id
if action is not None:
if not hasattr(self, action):
@@ -43,10 +48,11 @@ class NodeApi(BaseApi):
'items': jsonify(nodes)
}
def get_spiders(self, id=None):
items = db_manager.list('spiders')
def get_deploys(self, id):
def get_deploys(self, id: str) -> (dict, tuple):
"""
Get a list of latest deploys of given node_id
:param id: node_id
"""
items = db_manager.list('deploys', {'node_id': id}, limit=10, sort_key='finish_ts')
deploys = []
for item in items:
@@ -60,6 +66,10 @@ class NodeApi(BaseApi):
}
def get_tasks(self, id):
"""
Get a list of latest tasks of given node_id
:param id: node_id
"""
items = db_manager.list('tasks', {'node_id': id}, limit=10, sort_key='create_ts')
for item in items:
spider_id = item['spider_id']

View File

@@ -5,6 +5,7 @@ import requests
from constants.task import TaskStatus
from db.manager import db_manager
from routes.base import BaseApi
from tasks.scheduler import scheduler
from utils import jsonify
from utils.spider import get_spider_col_fields
@@ -13,6 +14,11 @@ class ScheduleApi(BaseApi):
col_name = 'schedules'
arguments = (
('name', str),
('description', str),
('cron', str),
('spider_id', str)
)
def after_update(self, id: str = None):
scheduler.update()

View File

@@ -58,9 +58,17 @@ class SpiderApi(BaseApi):
# spider schedule cron enabled
('cron_enabled', int),
# spider schedule cron enabled
('envs', str),
)
def get(self, id=None, action=None):
"""
GET method of SpiderAPI.
:param id: spider_id
:param action: action
"""
# action by id
if action is not None:
if not hasattr(self, action):
@@ -73,7 +81,14 @@ class SpiderApi(BaseApi):
# get one node
elif id is not None:
return jsonify(db_manager.get('spiders', id=id))
spider = db_manager.get('spiders', id=id)
# get deploy
last_deploy = db_manager.get_last_deploy(spider_id=spider['_id'])
if last_deploy is not None:
spider['deploy_ts'] = last_deploy['finish_ts']
return jsonify(spider)
# get a list of items
else:
@@ -100,8 +115,23 @@ class SpiderApi(BaseApi):
# existing spider
else:
# get last deploy
last_deploy = db_manager.get_last_deploy(spider_id=spider['_id'])
if last_deploy is not None:
spider['deploy_ts'] = last_deploy['finish_ts']
# get last task
last_task = db_manager.get_last_task(spider_id=spider['_id'])
if last_task is not None:
spider['task_ts'] = last_task['create_ts']
# file stats
stats = get_file_suffix_stats(dir_path)
# language
lang = get_lang_by_stats(stats)
# update spider data
db_manager.update_one('spiders', id=str(spider['_id']), values={
'lang': lang,
'suffix_stats': stats,
@@ -115,7 +145,12 @@ class SpiderApi(BaseApi):
'items': jsonify(items)
}
def crawl(self, id):
def crawl(self, id: str) -> (dict, tuple):
"""
Submit an HTTP request to start a crawl task in the node of given spider_id.
@deprecated
:param id: spider_id
"""
args = self.parser.parse_args()
node_id = args.get('node_id')
@@ -152,7 +187,12 @@ class SpiderApi(BaseApi):
'task': data.get('task')
}
def on_crawl(self, id):
def on_crawl(self, id: str) -> (dict, tuple):
"""
Start a crawl task.
:param id: spider_id
:return:
"""
job = execute_spider.delay(id)
# create a new task
@@ -172,7 +212,12 @@ class SpiderApi(BaseApi):
}
}
def deploy(self, id):
def deploy(self, id: str) -> (dict, tuple):
"""
Submit HTTP requests to deploy the given spider to all nodes.
:param id:
:return:
"""
spider = db_manager.get('spiders', id=id)
nodes = db_manager.list('nodes', {'status': NodeStatus.ONLINE})
@@ -198,13 +243,19 @@ class SpiderApi(BaseApi):
node_id,
), files=files)
# TODO: checkpoint for errors
return {
'code': 200,
'status': 'ok',
'message': 'deploy success'
}
def deploy_file(self, id=None):
def deploy_file(self, id: str = None) -> (dict, tuple):
"""
Receive HTTP request of deploys and unzip zip files and copy to the destination directories.
:param id: spider_id
"""
args = parser.parse_args()
node_id = request.args.get('node_id')
f = args.file
@@ -261,7 +312,11 @@ class SpiderApi(BaseApi):
'message': 'deploy success'
}
def get_deploys(self, id):
def get_deploys(self, id: str) -> (dict, tuple):
"""
Get a list of latest deploys of given spider_id
:param id: spider_id
"""
items = db_manager.list('deploys', cond={'spider_id': ObjectId(id)}, limit=10, sort_key='finish_ts')
deploys = []
for item in items:
@@ -274,7 +329,11 @@ class SpiderApi(BaseApi):
'items': jsonify(deploys)
}
def get_tasks(self, id):
def get_tasks(self, id: str) -> (dict, tuple):
"""
Get a list of latest tasks of given spider_id
:param id:
"""
items = db_manager.list('tasks', cond={'spider_id': ObjectId(id)}, limit=10, sort_key='create_ts')
for item in items:
spider_id = item['spider_id']
@@ -287,11 +346,23 @@ class SpiderApi(BaseApi):
'items': jsonify(items)
}
def after_update(self, id=None):
def after_update(self, id: str = None) -> None:
"""
After each spider is updated, update the cron scheduler correspondingly.
:param id: spider_id
"""
scheduler.update()
def update_envs(self, id: str):
args = self.parser.parse_args()
envs = json.loads(args.envs)
db_manager.update_one(col_name='spiders', id=id, values={'envs': envs})
class SpiderImportApi(Resource):
__doc__ = """
API for importing spiders from external resources including Github, Gitlab, and subversion (WIP)
"""
parser = reqparse.RequestParser()
arguments = [
('url', str)
@@ -302,7 +373,7 @@ class SpiderImportApi(Resource):
for arg, type in self.arguments:
self.parser.add_argument(arg, type=type)
def post(self, platform=None):
def post(self, platform: str = None) -> (dict, tuple):
if platform is None:
return {
'status': 'ok',
@@ -319,13 +390,22 @@ class SpiderImportApi(Resource):
return getattr(self, platform)()
def github(self):
def github(self) -> None:
"""
Import Github API
"""
self._git()
def gitlab(self):
def gitlab(self) -> None:
"""
Import Gitlab API
"""
self._git()
def _git(self):
"""
Helper method to perform github important (basically "git clone" method).
"""
args = self.parser.parse_args()
url = args.get('url')
if url is None:
@@ -357,7 +437,11 @@ class SpiderManageApi(Resource):
('url', str)
]
def post(self, action):
def post(self, action: str) -> (dict, tuple):
"""
POST method for SpiderManageAPI.
:param action:
"""
if not hasattr(self, action):
return {
'status': 'ok',
@@ -367,7 +451,10 @@ class SpiderManageApi(Resource):
return getattr(self, action)()
def deploy_all(self):
def deploy_all(self) -> (dict, tuple):
"""
Deploy all spiders to all nodes.
"""
# active nodes
nodes = db_manager.list('nodes', {'status': NodeStatus.ONLINE})

View File

@@ -8,7 +8,11 @@ from utils import jsonify
class StatsApi(Resource):
def get(self, action=None):
def get(self, action: str = None) -> (dict, tuple):
"""
GET method of StatsApi.
:param action: action
"""
# action
if action is not None:
if not hasattr(self, action):
@@ -23,6 +27,9 @@ class StatsApi(Resource):
return {}
def get_home_stats(self):
"""
Get stats for home page
"""
# overview stats
task_count = db_manager.count('tasks', {})
spider_count = db_manager.count('spiders', {})

View File

@@ -1,7 +1,9 @@
import json
from datetime import datetime
import requests
from celery.worker.control import revoke
from bson import ObjectId
from tasks.celery import celery_app
from constants.task import TaskStatus
from db.manager import db_manager
@@ -12,6 +14,7 @@ from utils.log import other
class TaskApi(BaseApi):
# collection name
col_name = 'tasks'
arguments = (
@@ -19,7 +22,12 @@ class TaskApi(BaseApi):
('file_path', str)
)
def get(self, id=None, action=None):
def get(self, id: str = None, action: str = None):
"""
GET method of TaskAPI.
:param id: item id
:param action: action
"""
# action by id
if action is not None:
if not hasattr(self, action):
@@ -28,13 +36,15 @@ class TaskApi(BaseApi):
'code': 400,
'error': 'action "%s" invalid' % action
}, 400
other.info(f"到这了{action},{id}")
# other.info(f"到这了{action},{id}")
return getattr(self, action)(id)
elif id is not None:
task = db_manager.get('tasks', id=id)
spider = db_manager.get('spiders', id=str(task['spider_id']))
task = db_manager.get(col_name=self.col_name, id=id)
spider = db_manager.get(col_name='spiders', id=str(task['spider_id']))
task['spider_name'] = spider['name']
if task.get('finish_ts') is not None:
task['duration'] = (task['finish_ts'] - task['create_ts']).total_seconds()
try:
with open(task['log_file_path']) as f:
task['log'] = f.read()
@@ -46,26 +56,51 @@ class TaskApi(BaseApi):
args = self.parser.parse_args()
page_size = args.get('page_size') or 10
page_num = args.get('page_num') or 1
tasks = db_manager.list('tasks', {}, limit=page_size, skip=page_size * (page_num - 1), sort_key='create_ts')
filter_str = args.get('filter')
filter_ = {}
if filter_str is not None:
filter_ = json.loads(filter_str)
if filter_.get('spider_id'):
filter_['spider_id'] = ObjectId(filter_['spider_id'])
tasks = db_manager.list(col_name=self.col_name, cond=filter_, limit=page_size, skip=page_size * (page_num - 1),
sort_key='create_ts')
items = []
for task in tasks:
# celery tasks
# _task = db_manager.get('tasks_celery', id=task['_id'])
_spider = db_manager.get('spiders', id=str(task['spider_id']))
# get spider
_spider = db_manager.get(col_name='spiders', id=str(task['spider_id']))
# status
if task.get('status') is None:
task['status'] = TaskStatus.UNAVAILABLE
task['spider_name'] = _spider['name']
# spider name
if _spider:
task['spider_name'] = _spider['name']
# duration
if task.get('finish_ts') is not None:
task['duration'] = (task['finish_ts'] - task['create_ts']).total_seconds()
items.append(task)
return {
'status': 'ok',
'total_count': db_manager.count('tasks', {}),
'total_count': db_manager.count('tasks', filter_),
'page_num': page_num,
'page_size': page_size,
'items': jsonify(items)
}
def on_get_log(self, id):
def on_get_log(self, id: (str, ObjectId)) -> (dict, tuple):
"""
Get the log of given task_id
:param id: task_id
"""
try:
task = db_manager.get('tasks', id=id)
task = db_manager.get(col_name=self.col_name, id=id)
with open(task['log_file_path']) as f:
log = f.read()
return {
@@ -79,9 +114,14 @@ class TaskApi(BaseApi):
'error': str(err)
}, 500
def get_log(self, id):
task = db_manager.get('tasks', id=id)
node = db_manager.get('nodes', id=task['node_id'])
def get_log(self, id: (str, ObjectId)) -> (dict, tuple):
"""
Submit an HTTP request to fetch log from the node of a given task.
:param id: task_id
:return:
"""
task = db_manager.get(col_name=self.col_name, id=id)
node = db_manager.get(col_name='nodes', id=task['node_id'])
r = requests.get('http://%s:%s/api/tasks/%s/on_get_log' % (
node['ip'],
node['port'],
@@ -101,7 +141,11 @@ class TaskApi(BaseApi):
'error': data['error']
}, 500
def get_results(self, id):
def get_results(self, id: str) -> (dict, tuple):
"""
Get a list of results crawled in a given task.
:param id: task_id
"""
args = self.parser.parse_args()
page_size = args.get('page_size') or 10
page_num = args.get('page_num') or 1
@@ -123,7 +167,15 @@ class TaskApi(BaseApi):
}
def stop(self, id):
revoke(id, terminate=True)
"""
Stop the task in progress.
:param id:
:return:
"""
celery_app.control.revoke(id, terminate=True)
db_manager.update_one('tasks', id=id, values={
'status': TaskStatus.REVOKED
})
return {
'id': id,
'status': 'ok',

353
crawlab/swagger.yaml Normal file
View File

@@ -0,0 +1,353 @@
---
swagger: '2.0'
basePath: "/api"
paths:
"/deploys":
get:
responses:
'200':
description: Success
summary: GET method of DeployAPI
operationId: get_deploy_api
tags:
- deploy
put:
responses:
'200':
description: Success
summary: PUT method for creating a new item
operationId: put_deploy_api
tags:
- deploy
"/deploys/{id}":
parameters:
- name: id
in: path
required: true
type: string
get:
responses:
'200':
description: Success
summary: GET method of DeployAPI
operationId: get_deploy_api_by_id
tags:
- deploy
post:
responses:
'200':
description: Success
summary: POST method of the given id for performing an action
operationId: post_deploy_api
tags:
- deploy
delete:
responses:
'200':
description: Success
summary: DELETE method of given id for deleting an item
operationId: delete_deploy_api
tags:
- deploy
"/files":
get:
responses:
'200':
description: Success
summary: GET method of FileAPI
operationId: get_file_api
tags:
- file
"/nodes":
get:
responses:
'200':
description: Success
summary: GET method of NodeAPI
operationId: get_node_api
tags:
- node
put:
responses:
'200':
description: Success
summary: PUT method for creating a new item
operationId: put_node_api
tags:
- node
"/nodes/{id}":
parameters:
- name: id
in: path
required: true
type: string
get:
responses:
'200':
description: Success
summary: GET method of NodeAPI
operationId: get_node_api_by_id
tags:
- node
post:
responses:
'200':
description: Success
summary: POST method of the given id for performing an action
operationId: post_node_api
tags:
- node
delete:
responses:
'200':
description: Success
summary: DELETE method of the given id
operationId: delete_node_api
tags:
- node
"/nodes/{id}/get_deploys":
parameters:
- name: id
in: path
required: true
type: string
get:
responses:
'200':
description: Success
summary: Get a list of latest deploys of given node_id
tags:
- node
"/nodes/{id}/get_tasks":
parameters:
- name: id
in: path
required: true
type: string
get:
responses:
'200':
description: Success
summary: Get a list of latest tasks of given node_id
tags:
- node
"/spiders":
get:
responses:
'200':
description: Success
summary: GET method of SpiderAPI
operationId: get_spider_api
tags:
- spider
put:
responses:
'200':
description: Success
summary: PUT method for creating a new item
operationId: put_spider_api
tags:
- spider
"/spiders/import/{platform}":
parameters:
- name: platform
in: path
required: true
type: string
post:
responses:
'200':
description: Success
operationId: post_spider_import_api
tags:
- spider
"/spiders/manage/deploy_all":
post:
responses:
'200':
description: Success
summary: Deploy all spiders to all nodes.
tags:
- spider
"/spiders/{id}":
parameters:
- name: id
in: path
required: true
type: string
get:
responses:
'200':
description: Success
summary: GET method of SpiderAPI
operationId: get_spider_api_by_id
tags:
- spider
post:
responses:
'200':
description: Success
summary: POST method of the given id for performing an action
operationId: post_spider_api
tags:
- spider
delete:
responses:
'200':
description: Success
summary: DELETE method of given id for deleting an item
operationId: delete_spider_api
tags:
- spider
"/spiders/{id}/get_tasks":
parameters:
- name: id
in: path
required: true
type: string
description: spider_id
get:
responses:
'200':
description: Success
summary: Get a list of latest tasks of given spider_id
tags:
- spider
"/spiders/{id}/get_deploys":
parameters:
- name: id
in: path
required: true
type: string
description: spider_id
get:
responses:
'200':
description: Success
summary: Get a list of latest deploys of given spider_id
tags:
- spider
"/spiders/{id}/on_crawl":
parameters:
- name: id
in: path
required: true
type: string
description: spider_id
post:
responses:
'200':
description: Success
summary: Start a crawl task.
tags:
- spider
"/spiders/{id}/deploy":
parameters:
- name: id
in: path
required: true
type: string
description: spider_id
post:
responses:
'200':
description: Success
summary: Start a crawl task.
tags:
- spider
"/stats/get_home_stats":
get:
responses:
'200':
description: Success
summary: Get stats for home page
operationId: get_stats_api
tags:
- stats
"/tasks":
get:
responses:
'200':
description: Success
summary: GET method of TaskAPI
operationId: get_task_api
tags:
- task
put:
responses:
'200':
description: Success
summary: PUT method for creating a new item
operationId: put_task_api
tags:
- task
"/tasks/{id}":
parameters:
- name: id
in: path
required: true
type: string
get:
responses:
'200':
description: Success
summary: GET method of TaskAPI
operationId: get_task_api_by_id
tags:
- task
post:
responses:
'200':
description: Success
summary: POST method of the given id for performing an action
operationId: post_task_api
tags:
- task
delete:
responses:
'200':
description: Success
summary: DELETE method of given id for deleting an item
operationId: delete_task_api
tags:
- task
"/tasks/{id}/get_log":
parameters:
- name: id
in: path
required: true
type: string
get:
responses:
'200':
description: Success
summary: Submit an HTTP request to fetch log from the node of a given task.
operationId: get_task_api_get_log
tags:
- task
"/tasks/{id}/on_get_log":
parameters:
- name: id
in: path
required: true
type: string
get:
responses:
'200':
description: Success
summary: Get the log of given task_id
operationId: get_task_api_on_get_log
tags:
- task
info:
title: Crawlab API
version: '1.0'
produces:
- application/json
consumes:
- application/json
responses:
ParseError:
description: When a mask can't be parsed
MaskError:
description: When any error occurs on mask

View File

@@ -10,35 +10,33 @@ from db.manager import db_manager
class Scheduler(object):
mongo = MongoClient(host=MONGO_HOST, port=MONGO_PORT)
task_col = 'apscheduler_jobs'
# scheduler jobstore
jobstores = {
'mongo': MongoDBJobStore(database=MONGO_DB,
collection='apscheduler_jobs',
collection=task_col,
client=mongo)
}
# scheduler instance
scheduler = BackgroundScheduler(jobstores=jobstores)
def execute_spider(self, id: str):
r = requests.get('http://%s:%s/api/spiders/%s/on_crawl' % (
FLASK_HOST,
FLASK_PORT,
id
))
def restart(self):
self.scheduler.shutdown()
self.scheduler.start()
def update(self):
# remove all existing periodic jobs
self.scheduler.remove_all_jobs()
self.mongo[MONGO_DB][self.task_col].remove()
# add new periodic jobs from database
spiders = db_manager.list('spiders', {'cron_enabled': CronEnabled.ON})
for spider in spiders:
cron = spider.get('cron')
periodical_tasks = db_manager.list('schedules', {})
for task in periodical_tasks:
cron = task.get('cron')
cron_arr = cron.split(' ')
second = cron_arr[0]
minute = cron_arr[1]
@@ -46,7 +44,7 @@ class Scheduler(object):
day = cron_arr[3]
month = cron_arr[4]
day_of_week = cron_arr[5]
self.scheduler.add_job(func=self.execute_spider, trigger='cron', args=(str(spider['_id']),),
self.scheduler.add_job(func=self.execute_spider, trigger='cron', args=(str(task['spider_id']),),
jobstore='mongo',
day_of_week=day_of_week, month=month, day=day, hour=hour, minute=minute,
second=second)

View File

@@ -2,7 +2,7 @@ import os
from datetime import datetime
from bson import ObjectId
from config import PROJECT_DEPLOY_FILE_FOLDER, PROJECT_LOGS_FOLDER,PYTHON_ENV_PATH
from config import PROJECT_DEPLOY_FILE_FOLDER, PROJECT_LOGS_FOLDER, PYTHON_ENV_PATH
from constants.task import TaskStatus
from db.manager import db_manager
from .celery import celery_app
@@ -12,12 +12,17 @@ from utils.log import other as logger
@celery_app.task(bind=True)
def execute_spider(self, id: str):
"""
Execute spider task.
:param self:
:param id: task_id
"""
task_id = self.request.id
hostname = self.request.hostname
spider = db_manager.get('spiders', id=id)
command = spider.get('cmd')
if command.startswith("env"):
command = PYTHON_ENV_PATH + command.replace("env","")
command = PYTHON_ENV_PATH + command.replace("env", "")
current_working_directory = os.path.join(PROJECT_DEPLOY_FILE_FOLDER, str(spider.get('_id')))
@@ -47,11 +52,22 @@ def execute_spider(self, id: str):
'status': TaskStatus.STARTED
})
# start the process and pass params as env variables
# pass params as env variables
env = os.environ.copy()
# custom environment variables
if spider.get('envs'):
for _env in spider.get('envs'):
env[_env['name']] = _env['value']
# task id environment variable
env['CRAWLAB_TASK_ID'] = task_id
# collection environment variable
if spider.get('col'):
env['CRAWLAB_COLLECTION'] = spider.get('col')
# start process
p = subprocess.Popen(command.split(' '),
stdout=stdout.fileno(),
stderr=stderr.fileno(),

View File

@@ -5,11 +5,20 @@ from datetime import datetime
from bson import json_util
def is_object_id(id):
def is_object_id(id: str) -> bool:
"""
Determine if the id is a valid ObjectId string
:param id: ObjectId string
"""
return re.search('^[a-zA-Z0-9]{24}$', id) is not None
def jsonify(obj):
def jsonify(obj: (dict, list)) -> (dict, list):
"""
Convert dict/list to a valid json object.
:param obj: object to be converted
:return: dict/list
"""
dump_str = json_util.dumps(obj)
converted_obj = json.loads(dump_str)
if type(converted_obj) == dict:

View File

@@ -1,8 +1,13 @@
import os, zipfile
from utils.log import other
# 打包目录为zip文件未压缩
def zip_file(source_dir, output_filename):
"""
打包目录为zip文件未压缩
:param source_dir: source directory
:param output_filename: output file name
"""
zipf = zipfile.ZipFile(output_filename, 'w')
pre_len = len(os.path.dirname(source_dir))
for parent, dirnames, filenames in os.walk(source_dir):
@@ -14,6 +19,11 @@ def zip_file(source_dir, output_filename):
def unzip_file(zip_src, dst_dir):
"""
Unzip file
:param zip_src: source zip file
:param dst_dir: destination directory
"""
r = zipfile.is_zipfile(zip_src)
if r:
fz = zipfile.ZipFile(zip_src, 'r')

View File

@@ -14,7 +14,12 @@ SUFFIX_LANG_MAPPING = {
}
def get_file_suffix(file_name: str):
def get_file_suffix(file_name: str) -> (str, None):
"""
Get suffix of a file
:param file_name:
:return:
"""
file_name = file_name.lower()
m = suffix_regex.search(file_name)
if m is not None:
@@ -23,7 +28,11 @@ def get_file_suffix(file_name: str):
return None
def get_file_list(path):
def get_file_list(path: str) -> list:
"""
Get a list of files of given directory path
:param path: directory path
"""
for root, dirs, file_names in os.walk(path):
# print(root) # 当前目录路径
# print(dirs) # 当前路径下所有子目录
@@ -35,6 +44,10 @@ def get_file_list(path):
def get_file_suffix_stats(path) -> dict:
"""
Get suffix stats of given file
:param path: file path
"""
stats = defaultdict(int)
for file_path in get_file_list(path):
suffix = get_file_suffix(file_path)
@@ -44,6 +57,10 @@ def get_file_suffix_stats(path) -> dict:
def get_file_content(path) -> dict:
"""
Get file content
:param path: file path
"""
with open(path) as f:
suffix = get_file_suffix(path)
lang = SUFFIX_LANG_MAPPING.get(suffix)

View File

@@ -3,8 +3,10 @@ import os
from constants.spider import FILE_SUFFIX_LANG_MAPPING, LangType, SUFFIX_IGNORE, SpiderType
from db.manager import db_manager
def get_lang_by_stats(stats: dict) -> LangType:
"""
Get programming language provided suffix stats
:param stats: stats is generated by utils.file.get_file_suffix_stats
:return:
"""
@@ -20,14 +22,21 @@ def get_lang_by_stats(stats: dict) -> LangType:
pass
def get_spider_type(path: str) -> SpiderType:
"""
Get spider type
:param path: spider directory path
"""
for file_name in os.listdir(path):
if file_name == 'scrapy.cfg':
return SpiderType.SCRAPY
def get_spider_col_fields(col_name):
def get_spider_col_fields(col_name: str) -> list:
"""
Get spider collection fields
:param col_name: collection name
"""
items = db_manager.list(col_name, {}, limit=100, sort_key='_id')
fields = set()
for item in items:

View File

@@ -0,0 +1,75 @@
<template>
<div class="environment-list">
<el-row>
<div class="button-group">
<el-button type="primary" @click="addEnv" icon="el-icon-plus">{{$t('Add Environment Variables')}}</el-button>
<el-button type="success" @click="save">{{$t('Save')}}</el-button>
</div>
</el-row>
<el-row>
<el-table :data="spiderForm.envs">
<el-table-column :label="$t('Variable')">
<template slot-scope="scope">
<el-input v-model="scope.row.name" :placeholder="$t('Variable')"></el-input>
</template>
</el-table-column>
<el-table-column :label="$t('Value')">
<template slot-scope="scope">
<el-input v-model="scope.row.value" :placeholder="$t('Value')"></el-input>
</template>
</el-table-column>
<el-table-column :label="$t('Action')">
<template slot-scope="scope">
<el-button size="mini" icon="el-icon-delete" type="danger" @click="deleteEnv(scope.$index)"></el-button>
</template>
</el-table-column>
</el-table>
</el-row>
</div>
</template>
<script>
import {
mapState
} from 'vuex'
export default {
name: 'EnvironmentList',
computed: {
...mapState('spider', [
'spiderForm'
])
},
methods: {
addEnv () {
if (!this.spiderForm.envs) {
this.$set(this.spiderForm, 'envs', [])
}
this.spiderForm.envs.push({
name: '',
value: ''
})
console.log(this.spiderForm)
},
deleteEnv (index) {
this.spiderForm.envs.splice(index, 1)
},
save () {
this.$store.dispatch('spider/updateSpiderEnvs')
.then(() => {
this.$message.success(this.$t('Spider info has been saved successfully'))
})
.catch(error => {
this.$message.error(error)
})
}
}
}
</script>
<style scoped>
.button-group {
width: 100%;
text-align: right;
}
</style>

View File

@@ -38,30 +38,30 @@
<el-option value="go" label="Go"></el-option>
</el-select>
</el-form-item>
<el-form-item :label="$t('Schedule Enabled')">
<el-switch v-model="spiderForm.cron_enabled" :disabled="isView">
</el-switch>
</el-form-item>
<el-form-item :label="$t('Schedule Cron')" v-if="spiderForm.cron_enabled"
prop="cron"
:rules="cronRules"
:inline-message="true">
<template slot="label">
<el-tooltip :content="$t('Cron Format: [second] [minute] [hour] [day of month] [month] [day of week]')"
placement="top">
<span>
{{$t('Schedule Cron')}}
<i class="fa fa-exclamation-circle"></i>
</span>
</el-tooltip>
</template>
<el-input v-model="spiderForm.cron" :placeholder="$t('Schedule Cron')"
:disabled="isView"></el-input>
</el-form-item>
<!--<el-form-item :label="$t('Schedule Enabled')">-->
<!--<el-switch v-model="spiderForm.cron_enabled" :disabled="isView">-->
<!--</el-switch>-->
<!--</el-form-item>-->
<!--<el-form-item :label="$t('Schedule Cron')" v-if="spiderForm.cron_enabled"-->
<!--prop="cron"-->
<!--:rules="cronRules"-->
<!--:inline-message="true">-->
<!--<template slot="label">-->
<!--<el-tooltip :content="$t('Cron Format: [second] [minute] [hour] [day of month] [month] [day of week]')"-->
<!--placement="top">-->
<!--<span>-->
<!--{{$t('Schedule Cron')}}-->
<!--<i class="fa fa-exclamation-circle"></i>-->
<!--</span>-->
<!--</el-tooltip>-->
<!--</template>-->
<!--<el-input v-model="spiderForm.cron" :placeholder="$t('Schedule Cron')"-->
<!--:disabled="isView"></el-input>-->
<!--</el-form-item>-->
</el-form>
</el-row>
<el-row class="button-container" v-if="!isView">
<el-button type="danger" @click="onRun">{{$t('Run')}}</el-button>
<el-button v-if="isShowRun" type="danger" @click="onRun">{{$t('Run')}}</el-button>
<el-button type="primary" @click="onDeploy">{{$t('Deploy')}}</el-button>
<el-button type="success" @click="onSave">{{$t('Save')}}</el-button>
</el-row>
@@ -109,7 +109,16 @@ export default {
computed: {
...mapState('spider', [
'spiderForm'
])
]),
isShowRun () {
if (!this.spiderForm.deploy_ts) {
return false
}
if (!this.spiderForm.cmd) {
return false
}
return true
}
},
methods: {
onRun () {
@@ -131,6 +140,11 @@ export default {
},
onDeploy () {
const row = this.spiderForm
// save spider
this.$store.dispatch('spider/editSpider', row._id)
// validate fields
this.$refs['spiderForm'].validate(res => {
if (res) {
this.$confirm(this.$t('Are you sure to deploy this spider?'), this.$t('Notification'), {

View File

@@ -32,13 +32,13 @@
</el-form-item>
<el-form-item :label="$t('Error Message')" v-if="taskForm.status === 'FAILURE'">
<div class="error-message">
{{taskForm.result}}
{{taskForm.log}}
</div>
</el-form-item>
</el-form>
</el-row>
<el-row class="button-container">
<el-button v-if="isRunning" type="danger" @click="onStop">Stop</el-button>
<el-button v-if="isRunning" type="danger" @click="onStop">{{$t('Stop')}}</el-button>
<!--<el-button type="danger" @click="onRestart">Restart</el-button>-->
</el-row>
</div>

View File

@@ -8,6 +8,7 @@ export default {
'Task': '任务',
'Tasks': '任务',
'Task Detail': '任务详情',
'Schedules': '定时任务',
'Deploys': '部署',
// 标签
@@ -16,6 +17,7 @@ export default {
'Deployed Spiders': '已部署爬虫',
'Log': '日志',
'Results': '结果',
'Environment': '环境',
// 选择
Spider: '爬虫',
@@ -30,6 +32,7 @@ export default {
SUCCESS: '成功',
FAILURE: '错误',
UNAVAILABLE: '未知',
REVOKED: '已取消',
// 操作
Run: '运行',
@@ -45,6 +48,7 @@ export default {
Edit: '编辑',
Remove: '删除',
Confirm: '确认',
Stop: '停止',
// 主页
'Total Tasks': '总任务数',
@@ -79,6 +83,9 @@ export default {
'Language': '语言',
'Schedule Enabled': '是否开启定时任务',
'Schedule Cron': '定时任务',
'Variable': '变量',
'Value': '值',
'Add Environment Variables': '添加环境变量',
// 爬虫列表
'Name': '名称',

View File

@@ -140,7 +140,7 @@ export const constantRouterMap = [
title: 'Schedules',
icon: 'fa fa-calendar'
},
hidden: true,
hidden: false,
children: [
{
path: '',

View File

@@ -9,6 +9,7 @@ import spider from './modules/spider'
import deploy from './modules/deploy'
import task from './modules/task'
import file from './modules/file'
import schedule from './modules/schedule'
import lang from './modules/lang'
import getters from './getters'
@@ -25,6 +26,7 @@ const store = new Vuex.Store({
deploy,
task,
file,
schedule,
lang
},
getters

View File

@@ -0,0 +1,43 @@
import request from '../../api/request'
const state = {
scheduleList: [],
scheduleForm: {}
}
const getters = {}
const mutations = {
SET_SCHEDULE_LIST (state, value) {
state.scheduleList = value
},
SET_SCHEDULE_FORM (state, value) {
state.scheduleForm = value
}
}
const actions = {
getScheduleList ({ state, commit }) {
request.get('/schedules')
.then(response => {
commit('SET_SCHEDULE_LIST', response.data.items)
})
},
addSchedule ({ state }) {
request.put('/schedules', state.scheduleForm)
},
editSchedule ({ state }, id) {
request.post(`/schedules/${id}`, state.scheduleForm)
},
removeSchedule ({ state }, id) {
request.delete(`/schedules/${id}`)
}
}
export default {
namespaced: true,
state,
getters,
mutations,
actions
}

View File

@@ -5,7 +5,7 @@ const state = {
spiderList: [],
// active spider data
spiderForm: { _id: {} },
spiderForm: {},
// node to deploy/run
activeNode: {},
@@ -77,6 +77,11 @@ const actions = {
dispatch('getSpiderList')
})
},
updateSpiderEnvs ({ state }) {
return request.post(`/spiders/${state.spiderForm._id}/update_envs`, {
envs: JSON.stringify(state.spiderForm.envs)
})
},
getSpiderData ({ state, commit }, id) {
return request.get(`/spiders/${id}`)
.then(response => {
@@ -90,6 +95,10 @@ const actions = {
.then(response => {
console.log(response.data)
})
.then(response => {
dispatch('getSpiderData', id)
dispatch('getSpiderList')
})
},
crawlSpider ({ state, dispatch }, id) {
return request.post(`/spiders/${id}/on_crawl`)

View File

@@ -9,6 +9,11 @@ const state = {
taskResultsData: [],
taskResultsColumns: [],
taskResultsTotalCount: 0,
// filter
filter: {
node_id: '',
spider_id: ''
},
// pagination
pageNum: 0,
pageSize: 10,
@@ -68,7 +73,11 @@ const actions = {
getTaskList ({ state, commit }) {
return request.get('/tasks', {
page_num: state.pageNum,
page_size: state.pageSize
page_size: state.pageSize,
filter: {
node_id: state.filter.node_id || undefined,
spider_id: state.filter.spider_id || undefined
}
})
.then(response => {
commit('SET_TASK_LIST', response.data.items)

View File

@@ -50,9 +50,9 @@
<el-tooltip :content="$t('View')" placement="top">
<el-button type="primary" icon="el-icon-search" size="mini" @click="onView(scope.row)"></el-button>
</el-tooltip>
<el-tooltip :content="$t('Edit')" placement="top">
<el-button type="warning" icon="el-icon-edit" size="mini" @click="onView(scope.row)"></el-button>
</el-tooltip>
<!--<el-tooltip :content="$t('Edit')" placement="top">-->
<!--<el-button type="warning" icon="el-icon-edit" size="mini" @click="onView(scope.row)"></el-button>-->
<!--</el-tooltip>-->
<el-tooltip :content="$t('Remove')" placement="top">
<el-button type="danger" icon="el-icon-delete" size="mini" @click="onRemove(scope.row)"></el-button>
</el-tooltip>

View File

@@ -1,15 +1,200 @@
<template>
<div class="app-container">
Schedule List
<!--add popup-->
<el-dialog
:title="$t(dialogTitle)"
:visible.sync="dialogVisible"
width="60%"
:before-close="onDialogClose">
<el-form label-width="180px"
:model="scheduleForm"
:inline-message="true"
ref="scheduleForm"
label-position="right">
<el-form-item :label="$t('Schedule Name')" prop="name" required>
<el-input v-model="scheduleForm.name" :placeholder="$t('Schedule Name')"></el-input>
</el-form-item>
<el-form-item :label="$t('Spider')" prop="spider_id" required>
<el-select v-model="scheduleForm.spider_id" filterable>
<el-option v-for="op in spiderList" :key="op._id" :value="op._id" :label="op.name"></el-option>
</el-select>
</el-form-item>
<el-form-item :label="$t('Cron')" prop="cron" :rules="cronRules" required>
<template slot="label">
<el-tooltip :content="$t('Cron Format: [second] [minute] [hour] [day of month] [month] [day of week]')"
placement="top">
<span>
{{$t('Cron')}}
<i class="fa fa-exclamation-circle"></i>
</span>
</el-tooltip>
</template>
<el-input v-model="scheduleForm.cron" :placeholder="$t('Cron')"></el-input>
</el-form-item>
<el-form-item :label="$t('Schedule Description')" prop="description">
<el-input v-model="scheduleForm.description" type="textarea"
:placeholder="$t('Schedule Description')"></el-input>
</el-form-item>
</el-form>
<span slot="footer" class="dialog-footer">
<el-button @click="onCancel">{{$t('Cancel')}}</el-button>
<el-button type="primary" @click="onAddSubmit">{{$t('Submit')}}</el-button>
</span>
</el-dialog>
<!--filter-->
<div class="filter">
<div class="right">
<el-button type="primary"
icon="el-icon-plus"
class="refresh"
@click="onAdd">
{{$t('Add Schedule')}}
</el-button>
</div>
</div>
<!--table list-->
<el-table :data="filteredTableData"
class="table"
:header-cell-style="{background:'rgb(48, 65, 86)',color:'white'}"
border>
<template v-for="col in columns">
<el-table-column :key="col.name"
:property="col.name"
:label="$t(col.label)"
:sortable="col.sortable"
align="center"
:width="col.width">
</el-table-column>
</template>
<el-table-column :label="$t('Action')" align="left" width="250">
<template slot-scope="scope">
<el-tooltip :content="$t('Edit')" placement="top">
<el-button type="warning" icon="el-icon-edit" size="mini" @click="onEdit(scope.row)"></el-button>
</el-tooltip>
<el-tooltip :content="$t('Remove')" placement="top">
<el-button type="danger" icon="el-icon-delete" size="mini" @click="onRemove(scope.row)"></el-button>
</el-tooltip>
<el-tooltip v-if="isShowRun(scope.row)" :content="$t('Run')" placement="top">
<el-button type="success" icon="fa fa-bug" size="mini" @click="onCrawl(scope.row)"></el-button>
</el-tooltip>
</template>
</el-table-column>
</el-table>
</div>
</template>
<script>
import {
mapState
} from 'vuex'
export default {
name: 'ScheduleList'
name: 'ScheduleList',
data () {
const cronValidator = (rule, value, callback) => {
let patArr = []
for (let i = 0; i < 6; i++) {
patArr.push('[/*,0-9]+')
}
const pat = '^' + patArr.join(' ') + '$'
if (!value) {
callback(new Error('cron cannot be empty'))
} else if (!value.match(pat)) {
callback(new Error('cron format is invalid'))
}
callback()
}
return {
columns: [
{ name: 'name', label: 'Name', width: '220' },
{ name: 'cron', label: 'Cron', width: '220' },
{ name: 'description', label: 'Description', width: 'auto' }
],
isEdit: false,
dialogTitle: '',
dialogVisible: false,
cronRules: [
{ validator: cronValidator, trigger: 'blur' }
]
}
},
computed: {
...mapState('schedule', [
'scheduleList',
'scheduleForm'
]),
...mapState('spider', [
'spiderList'
]),
filteredTableData () {
return this.scheduleList
}
},
methods: {
onDialogClose () {
this.dialogVisible = false
},
onCancel () {
this.dialogVisible = false
},
onAdd () {
this.isEdit = false
this.dialogVisible = true
this.$store.commit('schedule/SET_SCHEDULE_FORM', {})
},
onAddSubmit () {
this.$refs.scheduleForm.validate(res => {
if (res) {
let action
if (this.isEdit) {
action = 'editSchedule'
} else {
action = 'addSchedule'
}
this.$store.dispatch('schedule/' + action, this.scheduleForm._id)
.then(() => {
this.dialogVisible = false
setTimeout(() => {
this.$store.dispatch('schedule/getScheduleList')
}, 100)
})
}
})
},
isShowRun () {
},
onEdit (row) {
this.$store.commit('schedule/SET_SCHEDULE_FORM', row)
this.dialogVisible = true
this.isEdit = true
},
onRemove (row) {
this.$store.dispatch('schedule/removeSchedule', row._id)
.then(() => {
setTimeout(() => {
this.$store.dispatch('schedule/getScheduleList')
this.$message.success(`Schedule "${row.name}" has been removed`)
}, 100)
})
},
onRun () {
}
},
created () {
this.$store.dispatch('schedule/getScheduleList')
this.$store.dispatch('spider/getSpiderList')
}
}
</script>
<style scoped>
.filter .right {
text-align: right;
}
.table {
margin-top: 10px;
}
</style>

View File

@@ -16,6 +16,9 @@
<el-tab-pane :label="$t('Files')" name="files">
<file-list/>
</el-tab-pane>
<el-tab-pane :label="$t('Environment')" name="environment">
<environment-list/>
</el-tab-pane>
</el-tabs>
</div>
</template>
@@ -26,10 +29,12 @@ import {
} from 'vuex'
import FileList from '../../components/FileList/FileList'
import SpiderOverview from '../../components/Overview/SpiderOverview'
import EnvironmentList from '../../components/Environment/EnvironmentList'
export default {
name: 'NodeDetail',
components: {
EnvironmentList,
FileList,
SpiderOverview
},

View File

@@ -93,21 +93,21 @@
:width="col.width">
</el-table-column>
</template>
<el-table-column :label="$t('Action')" align="center" width="250">
<el-table-column :label="$t('Action')" align="left" width="250">
<template slot-scope="scope">
<el-tooltip :content="$t('View')" placement="top">
<el-button type="primary" icon="el-icon-search" size="mini" @click="onView(scope.row)"></el-button>
</el-tooltip>
<el-tooltip :content="$t('Edit')" placement="top">
<el-button type="warning" icon="el-icon-edit" size="mini" @click="onView(scope.row)"></el-button>
</el-tooltip>
<!--<el-tooltip :content="$t('Edit')" placement="top">-->
<!--<el-button type="warning" icon="el-icon-edit" size="mini" @click="onView(scope.row)"></el-button>-->
<!--</el-tooltip>-->
<el-tooltip :content="$t('Remove')" placement="top">
<el-button type="danger" icon="el-icon-delete" size="mini" @click="onRemove(scope.row)"></el-button>
</el-tooltip>
<el-tooltip :content="$t('Deploy')" placement="top">
<el-button type="primary" icon="fa fa-cloud" size="mini" @click="onDeploy(scope.row)"></el-button>
</el-tooltip>
<el-tooltip :content="$t('Run')" placement="top">
<el-tooltip v-if="isShowRun(scope.row)" :content="$t('Run')" placement="top">
<el-button type="success" icon="fa fa-bug" size="mini" @click="onCrawl(scope.row)"></el-button>
</el-tooltip>
</template>
@@ -151,7 +151,7 @@ export default {
{ name: 'name', label: 'Name', width: 'auto' },
{ name: 'type', label: 'Spider Type', width: '160', sortable: true },
{ name: 'lang', label: 'Language', width: '160', sortable: true },
{ name: 'last_run_ts', label: 'Last Run', width: '120' }
{ name: 'task_ts', label: 'Last Run', width: '160' }
],
spiderFormRules: {
name: [{ required: true, message: 'Required Field', trigger: 'change' }]
@@ -301,6 +301,15 @@ export default {
this.$message.success(this.$t('Deployed all spiders successfully'))
})
})
},
isShowRun (row) {
if (!row.deploy_ts) {
return false
}
if (!row.cmd) {
return false
}
return true
}
},
created () {

View File

@@ -2,20 +2,22 @@
<div class="app-container">
<!--filter-->
<div class="filter">
<el-input prefix-icon="el-icon-search"
:placeholder="$t('Search')"
class="filter-search"
v-model="filter.keyword"
@change="onSearch">
</el-input>
<div class="right">
<div class="left">
<el-select class="filter-select" v-model="filter.node_id" :placeholder="$t('Node')" filterable clearable>
<el-option v-for="op in nodeList" :key="op._id" :value="op._id" :label="op.name"></el-option>
</el-select>
<el-select class="filter-select" v-model="filter.spider_id" :placeholder="$t('Spider')" filterable clearable>
<el-option v-for="op in spiderList" :key="op._id" :value="op._id" :label="op.name"></el-option>
</el-select>
<el-button type="success"
icon="el-icon-refresh"
icon="el-icon-search"
class="refresh"
@click="onRefresh">
{{$t('Refresh')}}
{{$t('Search')}}
</el-button>
</div>
<!--<div class="right">-->
<!--</div>-->
</div>
<!--table list-->
@@ -71,6 +73,9 @@
<el-tooltip :content="$t('View')" placement="top">
<el-button type="primary" icon="el-icon-search" size="mini" @click="onView(scope.row)"></el-button>
</el-tooltip>
<el-tooltip :content="$t('Remove')" placement="top">
<el-button type="danger" icon="el-icon-delete" size="mini" @click="onRemove(scope.row)"></el-button>
</el-tooltip>
</template>
</el-table-column>
</el-table>
@@ -99,26 +104,31 @@ export default {
return {
isEditMode: false,
dialogVisible: false,
filter: {
keyword: ''
},
// tableData,
columns: [
{ name: 'create_ts', label: 'Create Time', width: '150' },
{ name: 'start_ts', label: 'Start Time', width: '150' },
{ name: 'finish_ts', label: 'Finish Time', width: '150' },
{ name: 'duration', label: 'Duration (sec)', width: '80' },
{ name: 'spider_name', label: 'Spider', width: '160' },
{ name: 'node_id', label: 'Node', width: '160' },
{ name: 'status', label: 'Status', width: '160', sortable: true }
{ name: 'status', label: 'Status', width: '80' }
]
}
},
computed: {
...mapState('task', [
'filter',
'taskList',
'taskListTotalCount',
'taskForm'
]),
...mapState('spider', [
'spiderList'
]),
...mapState('node', [
'nodeList'
]),
pageNum: {
get () {
return this.$store.state.task.pageNum
@@ -197,6 +207,8 @@ export default {
},
created () {
this.$store.dispatch('task/getTaskList')
this.$store.dispatch('spider/getSpiderList')
this.$store.dispatch('node/getNodeList')
}
}
</script>
@@ -212,6 +224,13 @@ export default {
display: flex;
justify-content: space-between;
.left {
.filter-select {
width: 180px;
margin-right: 10px;
}
}
.filter-search {
width: 240px;
}

View File

@@ -1,6 +1,10 @@
const puppeteer = require('puppeteer');
const MongoClient = require('mongodb').MongoClient;
const MONGO_HOST = process.env.MONGO_HOST;
const MONGO_PORT = process.env.MONGO_PORT;
const MONGO_DB = process.env.MONGO_DB;
(async () => {
// browser
const browser = await (puppeteer.launch({
@@ -53,8 +57,8 @@ const MongoClient = require('mongodb').MongoClient;
});
// open database connection
const client = await MongoClient.connect('mongodb://127.0.0.1:27017');
let db = await client.db('crawlab_test');
const client = await MongoClient.connect(`mongodb://${MONGO_HOST}:${MONGO_PORT}`);
let db = await client.db(MONGO_DB);
const colName = process.env.CRAWLAB_COLLECTION || 'results_juejin';
const taskId = process.env.CRAWLAB_TASK_ID;
const col = db.collection(colName);

View File

@@ -8,9 +8,9 @@ import os
from pymongo import MongoClient
MONGO_HOST = '127.0.0.1'
MONGO_PORT = 27017
MONGO_DB = 'crawlab_test'
MONGO_HOST = os.environ['MONGO_HOST']
MONGO_PORT = int(os.environ['MONGO_PORT'])
MONGO_DB = os.environ['MONGO_DB']
class JuejinPipeline(object):

View File

@@ -52,8 +52,10 @@ const MongoClient = require('mongodb').MongoClient;
});
// open database connection
const client = await MongoClient.connect('mongodb://127.0.0.1:27017');
let db = await client.db('crawlab_test');
console.log(process.env.MONGO_HOST);
console.log(process.env.MONGO_PORT);
const client = await MongoClient.connect(`mongodb://${process.env.MONGO_HOST}:${process.env.MONGO_PORT}`);
let db = await client.db(process.env.MONGO_DB);
const colName = process.env.CRAWLAB_COLLECTION || 'results_juejin';
const taskId = process.env.CRAWLAB_TASK_ID;
const col = db.collection(colName);

View File

@@ -0,0 +1,37 @@
# -*- coding: utf-8 -*-
# Define here the models for your scraped items
#
# See documentation in:
# https://doc.scrapy.org/en/latest/topics/items.html
import scrapy
class RealEstateItem(scrapy.Item):
# _id
_id = scrapy.Field()
# task_id
task_id = scrapy.Field()
# 房产名
name = scrapy.Field()
# url
url = scrapy.Field()
# 类别
type = scrapy.Field()
# 价格(万)
price = scrapy.Field()
# 大小
size = scrapy.Field()
# 小区
region = scrapy.Field()
# 城市
city = scrapy.Field()

View File

@@ -0,0 +1,103 @@
# -*- coding: utf-8 -*-
# Define here the models for your spider middleware
#
# See documentation in:
# https://doc.scrapy.org/en/latest/topics/spider-middleware.html
from scrapy import signals
class RealestateSpiderMiddleware(object):
# Not all methods need to be defined. If a method is not defined,
# scrapy acts as if the spider middleware does not modify the
# passed objects.
@classmethod
def from_crawler(cls, crawler):
# This method is used by Scrapy to create your spiders.
s = cls()
crawler.signals.connect(s.spider_opened, signal=signals.spider_opened)
return s
def process_spider_input(self, response, spider):
# Called for each response that goes through the spider
# middleware and into the spider.
# Should return None or raise an exception.
return None
def process_spider_output(self, response, result, spider):
# Called with the results returned from the Spider, after
# it has processed the response.
# Must return an iterable of Request, dict or Item objects.
for i in result:
yield i
def process_spider_exception(self, response, exception, spider):
# Called when a spider or process_spider_input() method
# (from other spider middleware) raises an exception.
# Should return either None or an iterable of Response, dict
# or Item objects.
pass
def process_start_requests(self, start_requests, spider):
# Called with the start requests of the spider, and works
# similarly to the process_spider_output() method, except
# that it doesnt have a response associated.
# Must return only requests (not items).
for r in start_requests:
yield r
def spider_opened(self, spider):
spider.logger.info('Spider opened: %s' % spider.name)
class RealestateDownloaderMiddleware(object):
# Not all methods need to be defined. If a method is not defined,
# scrapy acts as if the downloader middleware does not modify the
# passed objects.
@classmethod
def from_crawler(cls, crawler):
# This method is used by Scrapy to create your spiders.
s = cls()
crawler.signals.connect(s.spider_opened, signal=signals.spider_opened)
return s
def process_request(self, request, spider):
# Called for each request that goes through the downloader
# middleware.
# Must either:
# - return None: continue processing this request
# - or return a Response object
# - or return a Request object
# - or raise IgnoreRequest: process_exception() methods of
# installed downloader middleware will be called
return None
def process_response(self, request, response, spider):
# Called with the response returned from the downloader.
# Must either;
# - return a Response object
# - return a Request object
# - or raise IgnoreRequest
return response
def process_exception(self, request, exception, spider):
# Called when a download handler or a process_request()
# (from other downloader middleware) raises an exception.
# Must either:
# - return None: continue processing this exception
# - return a Response object: stops process_exception() chain
# - return a Request object: stops process_exception() chain
pass
def spider_opened(self, spider):
spider.logger.info('Spider opened: %s' % spider.name)

View File

@@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: http://doc.scrapy.org/en/latest/topics/item-pipeline.html
import os
from pymongo import MongoClient
MONGO_HOST = os.environ['MONGO_HOST']
MONGO_PORT = int(os.environ['MONGO_PORT'])
MONGO_DB = os.environ['MONGO_DB']
class MongoPipeline(object):
mongo = MongoClient(host=MONGO_HOST, port=MONGO_PORT)
db = mongo[MONGO_DB]
col_name = os.environ.get('CRAWLAB_COLLECTION')
col = db[col_name]
def process_item(self, item, spider):
item['task_id'] = os.environ.get('CRAWLAB_TASK_ID')
self.col.save(item)
return item

View File

@@ -0,0 +1,89 @@
# -*- coding: utf-8 -*-
# Scrapy settings for realestate project
#
# For simplicity, this file contains only settings considered important or
# commonly used. You can find more settings consulting the documentation:
#
# https://doc.scrapy.org/en/latest/topics/settings.html
# https://doc.scrapy.org/en/latest/topics/downloader-middleware.html
# https://doc.scrapy.org/en/latest/topics/spider-middleware.html
BOT_NAME = 'realestate'
SPIDER_MODULES = ['realestate.spiders']
NEWSPIDER_MODULE = 'realestate.spiders'
# Crawl responsibly by identifying yourself (and your website) on the user-agent
# USER_AGENT = 'realestate (+http://www.yourdomain.com)'
# Obey robots.txt rules
ROBOTSTXT_OBEY = True
# Configure maximum concurrent requests performed by Scrapy (default: 16)
# CONCURRENT_REQUESTS = 32
# Configure a delay for requests for the same website (default: 0)
# See https://doc.scrapy.org/en/latest/topics/settings.html#download-delay
# See also autothrottle settings and docs
# DOWNLOAD_DELAY = 3
# The download delay setting will honor only one of:
# CONCURRENT_REQUESTS_PER_DOMAIN = 16
# CONCURRENT_REQUESTS_PER_IP = 16
# Disable cookies (enabled by default)
# COOKIES_ENABLED = False
# Disable Telnet Console (enabled by default)
# TELNETCONSOLE_ENABLED = False
# Override the default request headers:
# DEFAULT_REQUEST_HEADERS = {
# 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
# 'Accept-Language': 'en',
# }
# Enable or disable spider middlewares
# See https://doc.scrapy.org/en/latest/topics/spider-middleware.html
# SPIDER_MIDDLEWARES = {
# 'realestate.middlewares.RealestateSpiderMiddleware': 543,
# }
# Enable or disable downloader middlewares
# See https://doc.scrapy.org/en/latest/topics/downloader-middleware.html
# DOWNLOADER_MIDDLEWARES = {
# 'realestate.middlewares.RealestateDownloaderMiddleware': 543,
# }
# Enable or disable extensions
# See https://doc.scrapy.org/en/latest/topics/extensions.html
# EXTENSIONS = {
# 'scrapy.extensions.telnet.TelnetConsole': None,
# }
# Configure item pipelines
# See https://doc.scrapy.org/en/latest/topics/item-pipeline.html
ITEM_PIPELINES = {
'realestate.pipelines.MongoPipeline': 300,
}
# Enable and configure the AutoThrottle extension (disabled by default)
# See https://doc.scrapy.org/en/latest/topics/autothrottle.html
# AUTOTHROTTLE_ENABLED = True
# The initial download delay
# AUTOTHROTTLE_START_DELAY = 5
# The maximum download delay to be set in case of high latencies
# AUTOTHROTTLE_MAX_DELAY = 60
# The average number of requests Scrapy should be sending in parallel to
# each remote server
# AUTOTHROTTLE_TARGET_CONCURRENCY = 1.0
# Enable showing throttling stats for every response received:
# AUTOTHROTTLE_DEBUG = False
# Enable and configure HTTP caching (disabled by default)
# See https://doc.scrapy.org/en/latest/topics/downloader-middleware.html#httpcache-middleware-settings
# HTTPCACHE_ENABLED = True
# HTTPCACHE_EXPIRATION_SECS = 0
# HTTPCACHE_DIR = 'httpcache'
# HTTPCACHE_IGNORE_HTTP_CODES = []
# HTTPCACHE_STORAGE = 'scrapy.extensions.httpcache.FilesystemCacheStorage'

View File

@@ -0,0 +1,4 @@
# This package will contain the spiders of your Scrapy project
#
# Please refer to the documentation for information on how to create and manage
# your spiders.

View File

@@ -0,0 +1,31 @@
# -*- coding: utf-8 -*-
import scrapy
from realestate.items import RealEstateItem
class LianjiaSpider(scrapy.Spider):
name = 'lianjia'
allowed_domains = ['lianjia.com']
start_urls = ['https://cq.lianjia.com/ershoufang/']
def start_requests(self):
for i in range(100):
url = 'https://cq.lianjia.com/ershoufang/pg%s' % i
yield scrapy.Request(url=url)
def parse(self, response):
for item in response.css('.sellListContent > li'):
yield RealEstateItem(
name=item.css('.title > a::text').extract_first(),
url=item.css('.title > a::attr("href")').extract_first(),
type='secondhand',
price=item.css('.totalPrice > span::text').extract_first(),
region=item.css('.houseInfo > a::text').extract_first(),
size=item.css('.houseInfo::text').extract_first().split(' | ')[2]
)
# 分页
# a_next = response.css('.house-lst-page-box > a')[-1]
# href = a_next.css('a::attr("href")')
# yield scrapy.Response(url='https://cq.lianjia.com' + href)

View File

@@ -0,0 +1,11 @@
# Automatically created by: scrapy startproject
#
# For more information about the [deploy] section see:
# https://scrapyd.readthedocs.io/en/latest/deploy.html
[settings]
default = realestate.settings
[deploy]
#url = http://localhost:6800/
project = realestate