diff --git a/crawlab/app.py b/crawlab/app.py index 84e9507a..b25a889e 100644 --- a/crawlab/app.py +++ b/crawlab/app.py @@ -7,8 +7,8 @@ import click from celery import Celery from flask import Flask from flask_cors import CORS -from flask_restful import Api -# from flask_restplus import Api +# from flask_restful import Api +from flask_restplus import Api from utils.log import other from constants.node import NodeStatus from db.manager import db_manager @@ -43,23 +43,22 @@ api.add_resource(NodeApi, '/api/nodes', '/api/nodes/', '/api/nodes//') -api.add_resource(SpiderImportApi, - '/api/spiders/import/') -api.add_resource(SpiderManageApi, - '/api/spiders/manage/') api.add_resource(SpiderApi, '/api/spiders', '/api/spiders/', '/api/spiders//') +api.add_resource(SpiderImportApi, + '/api/spiders/import/') +api.add_resource(SpiderManageApi, + '/api/spiders/manage/') +api.add_resource(TaskApi, + '/api/tasks', + '/api/tasks/', + '/api/tasks//') api.add_resource(DeployApi, '/api/deploys', '/api/deploys/', '/api/deploys//') -api.add_resource(TaskApi, - '/api/tasks', - '/api/tasks/', - '/api/tasks//' - ) api.add_resource(FileApi, '/api/files', '/api/files/') diff --git a/crawlab/db/manager.py b/crawlab/db/manager.py index 115d7f8d..902992cf 100644 --- a/crawlab/db/manager.py +++ b/crawlab/db/manager.py @@ -2,17 +2,26 @@ from bson import ObjectId from mongoengine import connect from pymongo import MongoClient, DESCENDING from config import MONGO_HOST, MONGO_PORT, MONGO_DB -from utils import is_object_id, jsonify +from utils import is_object_id connect(db=MONGO_DB, host=MONGO_HOST, port=MONGO_PORT) class DbManager(object): + __doc__ = """ + Database Manager class for handling database CRUD actions. + """ + def __init__(self): self.mongo = MongoClient(host=MONGO_HOST, port=MONGO_PORT) self.db = self.mongo[MONGO_DB] - def save(self, col_name: str, item, **kwargs): + def save(self, col_name: str, item: dict, **kwargs) -> None: + """ + Save the item in the specified collection + :param col_name: collection name + :param item: item object + """ col = self.db[col_name] # in case some fields cannot be saved in MongoDB @@ -21,15 +30,32 @@ class DbManager(object): col.save(item, **kwargs) - def remove(self, col_name: str, cond: dict, **kwargs): + def remove(self, col_name: str, cond: dict, **kwargs) -> None: + """ + Remove items given specified condition. + :param col_name: collection name + :param cond: condition or filter + """ col = self.db[col_name] col.remove(cond, **kwargs) def update(self, col_name: str, cond: dict, values: dict, **kwargs): + """ + Update items given specified condition. + :param col_name: collection name + :param cond: condition or filter + :param values: values to update + """ col = self.db[col_name] col.update(cond, {'$set': values}, **kwargs) def update_one(self, col_name: str, id: str, values: dict, **kwargs): + """ + Update an item given specified _id + :param col_name: collection name + :param id: _id + :param values: values to update + """ col = self.db[col_name] _id = id if is_object_id(id): @@ -38,6 +64,11 @@ class DbManager(object): col.find_one_and_update({'_id': _id}, {'$set': values}) def remove_one(self, col_name: str, id: str, **kwargs): + """ + Remove an item given specified _id + :param col_name: collection name + :param id: _id + """ col = self.db[col_name] _id = id if is_object_id(id): @@ -45,7 +76,16 @@ class DbManager(object): col.remove({'_id': _id}) def list(self, col_name: str, cond: dict, sort_key=None, sort_direction=DESCENDING, skip: int = 0, limit: int = 100, - **kwargs): + **kwargs) -> list: + """ + Return a list of items given specified condition, sort_key, sort_direction, skip, and limit. + :param col_name: collection name + :param cond: condition or filter + :param sort_key: key to sort + :param sort_direction: sort direction + :param skip: skip number + :param limit: limit number + """ if sort_key is None: sort_key = '_i' col = self.db[col_name] @@ -54,11 +94,21 @@ class DbManager(object): data.append(item) return data - def _get(self, col_name: str, cond: dict): + def _get(self, col_name: str, cond: dict) -> dict: + """ + Get an item given specified condition. + :param col_name: collection name + :param cond: condition or filter + """ col = self.db[col_name] return col.find_one(cond) - def get(self, col_name: str, id): + def get(self, col_name: str, id: (ObjectId, str)) -> dict: + """ + Get an item given specified _id. + :param col_name: collection name + :param id: _id + """ if type(id) == ObjectId: _id = id elif is_object_id(id): @@ -67,14 +117,28 @@ class DbManager(object): _id = id return self._get(col_name=col_name, cond={'_id': _id}) - def get_one_by_key(self, col_name: str, key, value): + def get_one_by_key(self, col_name: str, key, value) -> dict: + """ + Get an item given key/value condition. + :param col_name: collection name + :param key: key + :param value: value + """ return self._get(col_name=col_name, cond={key: value}) - def count(self, col_name: str, cond): + def count(self, col_name: str, cond) -> int: + """ + Get total count of a collection given specified condition + :param col_name: collection name + :param cond: condition or filter + """ col = self.db[col_name] return col.count(cond) def get_latest_version(self, spider_id, node_id): + """ + @deprecated + """ col = self.db['deploys'] for item in col.find({'spider_id': ObjectId(spider_id), 'node_id': node_id}) \ .sort('version', DESCENDING): @@ -82,6 +146,9 @@ class DbManager(object): return None def get_last_deploy(self, spider_id): + """ + @deprecated + """ col = self.db['deploys'] for item in col.find({'spider_id': ObjectId(spider_id)}) \ .sort('finish_ts', DESCENDING): @@ -89,6 +156,12 @@ class DbManager(object): return None def aggregate(self, col_name: str, pipelines, **kwargs): + """ + Perform MongoDB col.aggregate action to aggregate stats given collection name and pipelines. + Reference: https://docs.mongodb.com/manual/reference/command/aggregate/ + :param col_name: collection name + :param pipelines: pipelines + """ col = self.db[col_name] return col.aggregate(pipelines, **kwargs) diff --git a/crawlab/routes/base.py b/crawlab/routes/base.py index 08a1212b..b92ca9a5 100644 --- a/crawlab/routes/base.py +++ b/crawlab/routes/base.py @@ -1,5 +1,5 @@ -from flask_restful import reqparse, Resource -# from flask_restplus import reqparse, Resource +# from flask_restful import reqparse, Resource +from flask_restplus import reqparse, Resource from db.manager import db_manager from utils import jsonify @@ -12,6 +12,9 @@ DEFAULT_ARGS = [ class BaseApi(Resource): + """ + Base class for API. All API classes should inherit this class. + """ col_name = 'tmp' parser = reqparse.RequestParser() arguments = [] @@ -25,7 +28,16 @@ class BaseApi(Resource): for arg, type in self.arguments: self.parser.add_argument(arg, type=type) - def get(self, id=None, action=None): + def get(self, id: str = None, action: str = None) -> (dict, tuple): + """ + GET method for retrieving item information. + If id is specified and action is not, return the object of the given id; + If id and action are both specified, execute the given action results of the given id; + If neither id nor action is specified, return the list of items given the page_size, page_num and filter + :param id: + :param action: + :return: + """ import pdb pdb.set_trace() args = self.parser.parse_args() @@ -85,7 +97,11 @@ class BaseApi(Resource): else: return jsonify(db_manager.get(col_name=self.col_name, id=id)) - def put(self): + def put(self) -> (dict, tuple): + """ + PUT method for creating a new item. + :return: + """ args = self.parser.parse_args() item = {} for k in args.keys(): @@ -94,7 +110,12 @@ class BaseApi(Resource): item = db_manager.save(col_name=self.col_name, item=item) return item - def update(self, id=None): + def update(self, id: str = None) -> (dict, tuple): + """ + Helper function for update action given the id. + :param id: + :return: + """ args = self.parser.parse_args() item = db_manager.get(col_name=self.col_name, id=id) if item is None: @@ -114,10 +135,18 @@ class BaseApi(Resource): return item - def post(self, id=None, action=None): + def post(self, id: str = None, action: str = None): + """ + POST method of the given id for performing an action. + :param id: + :param action: + :return: + """ + # perform update action if action is not specified if action is None: return self.update(id) + # if action is not defined in the attributes, return 400 error if not hasattr(self, action): return { 'status': 'ok', @@ -125,10 +154,27 @@ class BaseApi(Resource): 'error': 'action "%s" invalid' % action }, 400 + # perform specified action of given id return getattr(self, action)(id) - def delete(self, id=None): + def delete(self, id: str = None) -> (dict, tuple): + """ + DELETE method of given id for deleting an item. + :param id: + :return: + """ + # perform delete action db_manager.remove_one(col_name=self.col_name, id=id) + return { + 'status': 'ok', + 'message': 'deleted successfully', + } - def after_update(self, id=None): + def after_update(self, id: str = None): + """ + This is the after update hook once the update method is performed. + To be overridden. + :param id: + :return: + """ pass diff --git a/crawlab/routes/nodes.py b/crawlab/routes/nodes.py index 7dbe50b5..4ab7a4ad 100644 --- a/crawlab/routes/nodes.py +++ b/crawlab/routes/nodes.py @@ -15,7 +15,12 @@ class NodeApi(BaseApi): ('port', str), ) - def get(self, id=None, action=None): + def get(self, id: str = None, action: str = None) -> (dict, tuple): + """ + GET method of NodeAPI. + :param id: item id + :param action: action + """ # action by id if action is not None: if not hasattr(self, action): @@ -43,10 +48,11 @@ class NodeApi(BaseApi): 'items': jsonify(nodes) } - def get_spiders(self, id=None): - items = db_manager.list('spiders') - - def get_deploys(self, id): + def get_deploys(self, id: str) -> (dict, tuple): + """ + Get a list of latest deploys of given node_id + :param id: node_id + """ items = db_manager.list('deploys', {'node_id': id}, limit=10, sort_key='finish_ts') deploys = [] for item in items: @@ -60,6 +66,10 @@ class NodeApi(BaseApi): } def get_tasks(self, id): + """ + Get a list of latest tasks of given node_id + :param id: node_id + """ items = db_manager.list('tasks', {'node_id': id}, limit=10, sort_key='create_ts') for item in items: spider_id = item['spider_id'] diff --git a/crawlab/routes/spiders.py b/crawlab/routes/spiders.py index a885a225..044210b2 100644 --- a/crawlab/routes/spiders.py +++ b/crawlab/routes/spiders.py @@ -61,6 +61,11 @@ class SpiderApi(BaseApi): ) def get(self, id=None, action=None): + """ + GET method of SpiderAPI. + :param id: spider_id + :param action: action + """ # action by id if action is not None: if not hasattr(self, action): @@ -115,7 +120,12 @@ class SpiderApi(BaseApi): 'items': jsonify(items) } - def crawl(self, id): + def crawl(self, id: str) -> (dict, tuple): + """ + Submit an HTTP request to start a crawl task in the node of given spider_id. + @deprecated + :param id: spider_id + """ args = self.parser.parse_args() node_id = args.get('node_id') @@ -152,7 +162,12 @@ class SpiderApi(BaseApi): 'task': data.get('task') } - def on_crawl(self, id): + def on_crawl(self, id: str) -> (dict, tuple): + """ + Start a crawl task. + :param id: spider_id + :return: + """ job = execute_spider.delay(id) # create a new task @@ -172,7 +187,12 @@ class SpiderApi(BaseApi): } } - def deploy(self, id): + def deploy(self, id: str) -> (dict, tuple): + """ + Submit HTTP requests to deploy the given spider to all nodes. + :param id: + :return: + """ spider = db_manager.get('spiders', id=id) nodes = db_manager.list('nodes', {'status': NodeStatus.ONLINE}) @@ -198,13 +218,19 @@ class SpiderApi(BaseApi): node_id, ), files=files) + # TODO: checkpoint for errors + return { 'code': 200, 'status': 'ok', 'message': 'deploy success' } - def deploy_file(self, id=None): + def deploy_file(self, id: str = None) -> (dict, tuple): + """ + Receive HTTP request of deploys and unzip zip files and copy to the destination directories. + :param id: spider_id + """ args = parser.parse_args() node_id = request.args.get('node_id') f = args.file @@ -261,7 +287,11 @@ class SpiderApi(BaseApi): 'message': 'deploy success' } - def get_deploys(self, id): + def get_deploys(self, id: str) -> (dict, tuple): + """ + Get a list of latest deploys of given spider_id + :param id: spider_id + """ items = db_manager.list('deploys', cond={'spider_id': ObjectId(id)}, limit=10, sort_key='finish_ts') deploys = [] for item in items: @@ -274,7 +304,11 @@ class SpiderApi(BaseApi): 'items': jsonify(deploys) } - def get_tasks(self, id): + def get_tasks(self, id: str) -> (dict, tuple): + """ + Get a list of latest tasks of given spider_id + :param id: + """ items = db_manager.list('tasks', cond={'spider_id': ObjectId(id)}, limit=10, sort_key='create_ts') for item in items: spider_id = item['spider_id'] @@ -287,11 +321,18 @@ class SpiderApi(BaseApi): 'items': jsonify(items) } - def after_update(self, id=None): + def after_update(self, id: str = None) -> None: + """ + After each spider is updated, update the cron scheduler correspondingly. + :param id: spider_id + """ scheduler.update() class SpiderImportApi(Resource): + __doc__ = """ + API for importing spiders from external resources including Github, Gitlab, and subversion (WIP) + """ parser = reqparse.RequestParser() arguments = [ ('url', str) @@ -302,7 +343,7 @@ class SpiderImportApi(Resource): for arg, type in self.arguments: self.parser.add_argument(arg, type=type) - def post(self, platform=None): + def post(self, platform: str = None) -> (dict, tuple): if platform is None: return { 'status': 'ok', @@ -319,13 +360,22 @@ class SpiderImportApi(Resource): return getattr(self, platform)() - def github(self): + def github(self) -> None: + """ + Import Github API + """ self._git() - def gitlab(self): + def gitlab(self) -> None: + """ + Import Gitlab API + """ self._git() def _git(self): + """ + Helper method to perform github important (basically "git clone" method). + """ args = self.parser.parse_args() url = args.get('url') if url is None: @@ -357,7 +407,11 @@ class SpiderManageApi(Resource): ('url', str) ] - def post(self, action): + def post(self, action: str) -> (dict, tuple): + """ + POST method for SpiderManageAPI. + :param action: + """ if not hasattr(self, action): return { 'status': 'ok', @@ -367,7 +421,10 @@ class SpiderManageApi(Resource): return getattr(self, action)() - def deploy_all(self): + def deploy_all(self) -> (dict, tuple): + """ + Deploy all spiders to all nodes. + """ # active nodes nodes = db_manager.list('nodes', {'status': NodeStatus.ONLINE}) diff --git a/crawlab/routes/tasks.py b/crawlab/routes/tasks.py index 769b0558..d864f859 100644 --- a/crawlab/routes/tasks.py +++ b/crawlab/routes/tasks.py @@ -1,6 +1,7 @@ import json import requests +from bson import ObjectId from celery.worker.control import revoke from constants.task import TaskStatus @@ -12,6 +13,7 @@ from utils.log import other class TaskApi(BaseApi): + # collection name col_name = 'tasks' arguments = ( @@ -19,7 +21,12 @@ class TaskApi(BaseApi): ('file_path', str) ) - def get(self, id=None, action=None): + def get(self, id: str = None, action: str = None): + """ + GET method of TaskAPI. + :param id: item id + :param action: action + """ # action by id if action is not None: if not hasattr(self, action): @@ -28,12 +35,12 @@ class TaskApi(BaseApi): 'code': 400, 'error': 'action "%s" invalid' % action }, 400 - other.info(f"到这了{action},{id}") + # other.info(f"到这了{action},{id}") return getattr(self, action)(id) elif id is not None: - task = db_manager.get('tasks', id=id) - spider = db_manager.get('spiders', id=str(task['spider_id'])) + task = db_manager.get(col_name=self.col_name, id=id) + spider = db_manager.get(col_name='spiders', id=str(task['spider_id'])) task['spider_name'] = spider['name'] try: with open(task['log_file_path']) as f: @@ -46,11 +53,12 @@ class TaskApi(BaseApi): args = self.parser.parse_args() page_size = args.get('page_size') or 10 page_num = args.get('page_num') or 1 - tasks = db_manager.list('tasks', {}, limit=page_size, skip=page_size * (page_num - 1), sort_key='create_ts') + tasks = db_manager.list(col_name=self.col_name, cond={}, limit=page_size, skip=page_size * (page_num - 1), + sort_key='create_ts') items = [] for task in tasks: # _task = db_manager.get('tasks_celery', id=task['_id']) - _spider = db_manager.get('spiders', id=str(task['spider_id'])) + _spider = db_manager.get(col_name='spiders', id=str(task['spider_id'])) if task.get('status') is None: task['status'] = TaskStatus.UNAVAILABLE task['spider_name'] = _spider['name'] @@ -63,9 +71,13 @@ class TaskApi(BaseApi): 'items': jsonify(items) } - def on_get_log(self, id): + def on_get_log(self, id: (str, ObjectId)) -> (dict, tuple): + """ + Get the log of given task_id + :param id: task_id + """ try: - task = db_manager.get('tasks', id=id) + task = db_manager.get(col_name=self.col_name, id=id) with open(task['log_file_path']) as f: log = f.read() return { @@ -79,9 +91,14 @@ class TaskApi(BaseApi): 'error': str(err) }, 500 - def get_log(self, id): - task = db_manager.get('tasks', id=id) - node = db_manager.get('nodes', id=task['node_id']) + def get_log(self, id: (str, ObjectId)) -> (dict, tuple): + """ + Submit an HTTP request to fetch log from the node of a given task. + :param id: task_id + :return: + """ + task = db_manager.get(col_name=self.col_name, id=id) + node = db_manager.get(col_name='nodes', id=task['node_id']) r = requests.get('http://%s:%s/api/tasks/%s/on_get_log' % ( node['ip'], node['port'], @@ -101,7 +118,11 @@ class TaskApi(BaseApi): 'error': data['error'] }, 500 - def get_results(self, id): + def get_results(self, id: str) -> (dict, tuple): + """ + Get a list of results crawled in a given task. + :param id: task_id + """ args = self.parser.parse_args() page_size = args.get('page_size') or 10 page_num = args.get('page_num') or 1 @@ -123,6 +144,12 @@ class TaskApi(BaseApi): } def stop(self, id): + """ + Stop the task in progress. + TODO: work in progress + :param id: + :return: + """ revoke(id, terminate=True) return { 'id': id, diff --git a/crawlab/tasks/scheduler.py b/crawlab/tasks/scheduler.py index 31346f8e..d4249bf7 100644 --- a/crawlab/tasks/scheduler.py +++ b/crawlab/tasks/scheduler.py @@ -20,7 +20,6 @@ class Scheduler(object): scheduler = BackgroundScheduler(jobstores=jobstores) def execute_spider(self, id: str): - r = requests.get('http://%s:%s/api/spiders/%s/on_crawl' % ( FLASK_HOST, FLASK_PORT, diff --git a/crawlab/utils/__init__.py b/crawlab/utils/__init__.py index c1ce608b..edf03130 100644 --- a/crawlab/utils/__init__.py +++ b/crawlab/utils/__init__.py @@ -5,11 +5,20 @@ from datetime import datetime from bson import json_util -def is_object_id(id): +def is_object_id(id: str) -> bool: + """ + Determine if the id is a valid ObjectId string + :param id: ObjectId string + """ return re.search('^[a-zA-Z0-9]{24}$', id) is not None -def jsonify(obj): +def jsonify(obj: (dict, list)) -> (dict, list): + """ + Convert dict/list to a valid json object. + :param obj: object to be converted + :return: dict/list + """ dump_str = json_util.dumps(obj) converted_obj = json.loads(dump_str) if type(converted_obj) == dict: