updated some parts of API docs

This commit is contained in:
Marvin Zhang
2019-04-07 16:39:55 +08:00
parent e34e9a6691
commit d56fb30127
8 changed files with 279 additions and 59 deletions

View File

@@ -7,8 +7,8 @@ import click
from celery import Celery
from flask import Flask
from flask_cors import CORS
from flask_restful import Api
# from flask_restplus import Api
# from flask_restful import Api
from flask_restplus import Api
from utils.log import other
from constants.node import NodeStatus
from db.manager import db_manager
@@ -43,23 +43,22 @@ api.add_resource(NodeApi,
'/api/nodes',
'/api/nodes/<string:id>',
'/api/nodes/<string:id>/<string:action>')
api.add_resource(SpiderImportApi,
'/api/spiders/import/<string:platform>')
api.add_resource(SpiderManageApi,
'/api/spiders/manage/<string:action>')
api.add_resource(SpiderApi,
'/api/spiders',
'/api/spiders/<string:id>',
'/api/spiders/<string:id>/<string:action>')
api.add_resource(SpiderImportApi,
'/api/spiders/import/<string:platform>')
api.add_resource(SpiderManageApi,
'/api/spiders/manage/<string:action>')
api.add_resource(TaskApi,
'/api/tasks',
'/api/tasks/<string:id>',
'/api/tasks/<string:id>/<string:action>')
api.add_resource(DeployApi,
'/api/deploys',
'/api/deploys/<string:id>',
'/api/deploys/<string:id>/<string:action>')
api.add_resource(TaskApi,
'/api/tasks',
'/api/tasks/<string:id>',
'/api/tasks/<string:id>/<string:action>'
)
api.add_resource(FileApi,
'/api/files',
'/api/files/<string:action>')

View File

@@ -2,17 +2,26 @@ from bson import ObjectId
from mongoengine import connect
from pymongo import MongoClient, DESCENDING
from config import MONGO_HOST, MONGO_PORT, MONGO_DB
from utils import is_object_id, jsonify
from utils import is_object_id
connect(db=MONGO_DB, host=MONGO_HOST, port=MONGO_PORT)
class DbManager(object):
__doc__ = """
Database Manager class for handling database CRUD actions.
"""
def __init__(self):
self.mongo = MongoClient(host=MONGO_HOST, port=MONGO_PORT)
self.db = self.mongo[MONGO_DB]
def save(self, col_name: str, item, **kwargs):
def save(self, col_name: str, item: dict, **kwargs) -> None:
"""
Save the item in the specified collection
:param col_name: collection name
:param item: item object
"""
col = self.db[col_name]
# in case some fields cannot be saved in MongoDB
@@ -21,15 +30,32 @@ class DbManager(object):
col.save(item, **kwargs)
def remove(self, col_name: str, cond: dict, **kwargs):
def remove(self, col_name: str, cond: dict, **kwargs) -> None:
"""
Remove items given specified condition.
:param col_name: collection name
:param cond: condition or filter
"""
col = self.db[col_name]
col.remove(cond, **kwargs)
def update(self, col_name: str, cond: dict, values: dict, **kwargs):
"""
Update items given specified condition.
:param col_name: collection name
:param cond: condition or filter
:param values: values to update
"""
col = self.db[col_name]
col.update(cond, {'$set': values}, **kwargs)
def update_one(self, col_name: str, id: str, values: dict, **kwargs):
"""
Update an item given specified _id
:param col_name: collection name
:param id: _id
:param values: values to update
"""
col = self.db[col_name]
_id = id
if is_object_id(id):
@@ -38,6 +64,11 @@ class DbManager(object):
col.find_one_and_update({'_id': _id}, {'$set': values})
def remove_one(self, col_name: str, id: str, **kwargs):
"""
Remove an item given specified _id
:param col_name: collection name
:param id: _id
"""
col = self.db[col_name]
_id = id
if is_object_id(id):
@@ -45,7 +76,16 @@ class DbManager(object):
col.remove({'_id': _id})
def list(self, col_name: str, cond: dict, sort_key=None, sort_direction=DESCENDING, skip: int = 0, limit: int = 100,
**kwargs):
**kwargs) -> list:
"""
Return a list of items given specified condition, sort_key, sort_direction, skip, and limit.
:param col_name: collection name
:param cond: condition or filter
:param sort_key: key to sort
:param sort_direction: sort direction
:param skip: skip number
:param limit: limit number
"""
if sort_key is None:
sort_key = '_i'
col = self.db[col_name]
@@ -54,11 +94,21 @@ class DbManager(object):
data.append(item)
return data
def _get(self, col_name: str, cond: dict):
def _get(self, col_name: str, cond: dict) -> dict:
"""
Get an item given specified condition.
:param col_name: collection name
:param cond: condition or filter
"""
col = self.db[col_name]
return col.find_one(cond)
def get(self, col_name: str, id):
def get(self, col_name: str, id: (ObjectId, str)) -> dict:
"""
Get an item given specified _id.
:param col_name: collection name
:param id: _id
"""
if type(id) == ObjectId:
_id = id
elif is_object_id(id):
@@ -67,14 +117,28 @@ class DbManager(object):
_id = id
return self._get(col_name=col_name, cond={'_id': _id})
def get_one_by_key(self, col_name: str, key, value):
def get_one_by_key(self, col_name: str, key, value) -> dict:
"""
Get an item given key/value condition.
:param col_name: collection name
:param key: key
:param value: value
"""
return self._get(col_name=col_name, cond={key: value})
def count(self, col_name: str, cond):
def count(self, col_name: str, cond) -> int:
"""
Get total count of a collection given specified condition
:param col_name: collection name
:param cond: condition or filter
"""
col = self.db[col_name]
return col.count(cond)
def get_latest_version(self, spider_id, node_id):
"""
@deprecated
"""
col = self.db['deploys']
for item in col.find({'spider_id': ObjectId(spider_id), 'node_id': node_id}) \
.sort('version', DESCENDING):
@@ -82,6 +146,9 @@ class DbManager(object):
return None
def get_last_deploy(self, spider_id):
"""
@deprecated
"""
col = self.db['deploys']
for item in col.find({'spider_id': ObjectId(spider_id)}) \
.sort('finish_ts', DESCENDING):
@@ -89,6 +156,12 @@ class DbManager(object):
return None
def aggregate(self, col_name: str, pipelines, **kwargs):
"""
Perform MongoDB col.aggregate action to aggregate stats given collection name and pipelines.
Reference: https://docs.mongodb.com/manual/reference/command/aggregate/
:param col_name: collection name
:param pipelines: pipelines
"""
col = self.db[col_name]
return col.aggregate(pipelines, **kwargs)

View File

@@ -1,5 +1,5 @@
from flask_restful import reqparse, Resource
# from flask_restplus import reqparse, Resource
# from flask_restful import reqparse, Resource
from flask_restplus import reqparse, Resource
from db.manager import db_manager
from utils import jsonify
@@ -12,6 +12,9 @@ DEFAULT_ARGS = [
class BaseApi(Resource):
"""
Base class for API. All API classes should inherit this class.
"""
col_name = 'tmp'
parser = reqparse.RequestParser()
arguments = []
@@ -25,7 +28,16 @@ class BaseApi(Resource):
for arg, type in self.arguments:
self.parser.add_argument(arg, type=type)
def get(self, id=None, action=None):
def get(self, id: str = None, action: str = None) -> (dict, tuple):
"""
GET method for retrieving item information.
If id is specified and action is not, return the object of the given id;
If id and action are both specified, execute the given action results of the given id;
If neither id nor action is specified, return the list of items given the page_size, page_num and filter
:param id:
:param action:
:return:
"""
import pdb
pdb.set_trace()
args = self.parser.parse_args()
@@ -85,7 +97,11 @@ class BaseApi(Resource):
else:
return jsonify(db_manager.get(col_name=self.col_name, id=id))
def put(self):
def put(self) -> (dict, tuple):
"""
PUT method for creating a new item.
:return:
"""
args = self.parser.parse_args()
item = {}
for k in args.keys():
@@ -94,7 +110,12 @@ class BaseApi(Resource):
item = db_manager.save(col_name=self.col_name, item=item)
return item
def update(self, id=None):
def update(self, id: str = None) -> (dict, tuple):
"""
Helper function for update action given the id.
:param id:
:return:
"""
args = self.parser.parse_args()
item = db_manager.get(col_name=self.col_name, id=id)
if item is None:
@@ -114,10 +135,18 @@ class BaseApi(Resource):
return item
def post(self, id=None, action=None):
def post(self, id: str = None, action: str = None):
"""
POST method of the given id for performing an action.
:param id:
:param action:
:return:
"""
# perform update action if action is not specified
if action is None:
return self.update(id)
# if action is not defined in the attributes, return 400 error
if not hasattr(self, action):
return {
'status': 'ok',
@@ -125,10 +154,27 @@ class BaseApi(Resource):
'error': 'action "%s" invalid' % action
}, 400
# perform specified action of given id
return getattr(self, action)(id)
def delete(self, id=None):
def delete(self, id: str = None) -> (dict, tuple):
"""
DELETE method of given id for deleting an item.
:param id:
:return:
"""
# perform delete action
db_manager.remove_one(col_name=self.col_name, id=id)
return {
'status': 'ok',
'message': 'deleted successfully',
}
def after_update(self, id=None):
def after_update(self, id: str = None):
"""
This is the after update hook once the update method is performed.
To be overridden.
:param id:
:return:
"""
pass

View File

@@ -15,7 +15,12 @@ class NodeApi(BaseApi):
('port', str),
)
def get(self, id=None, action=None):
def get(self, id: str = None, action: str = None) -> (dict, tuple):
"""
GET method of NodeAPI.
:param id: item id
:param action: action
"""
# action by id
if action is not None:
if not hasattr(self, action):
@@ -43,10 +48,11 @@ class NodeApi(BaseApi):
'items': jsonify(nodes)
}
def get_spiders(self, id=None):
items = db_manager.list('spiders')
def get_deploys(self, id):
def get_deploys(self, id: str) -> (dict, tuple):
"""
Get a list of latest deploys of given node_id
:param id: node_id
"""
items = db_manager.list('deploys', {'node_id': id}, limit=10, sort_key='finish_ts')
deploys = []
for item in items:
@@ -60,6 +66,10 @@ class NodeApi(BaseApi):
}
def get_tasks(self, id):
"""
Get a list of latest tasks of given node_id
:param id: node_id
"""
items = db_manager.list('tasks', {'node_id': id}, limit=10, sort_key='create_ts')
for item in items:
spider_id = item['spider_id']

View File

@@ -61,6 +61,11 @@ class SpiderApi(BaseApi):
)
def get(self, id=None, action=None):
"""
GET method of SpiderAPI.
:param id: spider_id
:param action: action
"""
# action by id
if action is not None:
if not hasattr(self, action):
@@ -115,7 +120,12 @@ class SpiderApi(BaseApi):
'items': jsonify(items)
}
def crawl(self, id):
def crawl(self, id: str) -> (dict, tuple):
"""
Submit an HTTP request to start a crawl task in the node of given spider_id.
@deprecated
:param id: spider_id
"""
args = self.parser.parse_args()
node_id = args.get('node_id')
@@ -152,7 +162,12 @@ class SpiderApi(BaseApi):
'task': data.get('task')
}
def on_crawl(self, id):
def on_crawl(self, id: str) -> (dict, tuple):
"""
Start a crawl task.
:param id: spider_id
:return:
"""
job = execute_spider.delay(id)
# create a new task
@@ -172,7 +187,12 @@ class SpiderApi(BaseApi):
}
}
def deploy(self, id):
def deploy(self, id: str) -> (dict, tuple):
"""
Submit HTTP requests to deploy the given spider to all nodes.
:param id:
:return:
"""
spider = db_manager.get('spiders', id=id)
nodes = db_manager.list('nodes', {'status': NodeStatus.ONLINE})
@@ -198,13 +218,19 @@ class SpiderApi(BaseApi):
node_id,
), files=files)
# TODO: checkpoint for errors
return {
'code': 200,
'status': 'ok',
'message': 'deploy success'
}
def deploy_file(self, id=None):
def deploy_file(self, id: str = None) -> (dict, tuple):
"""
Receive HTTP request of deploys and unzip zip files and copy to the destination directories.
:param id: spider_id
"""
args = parser.parse_args()
node_id = request.args.get('node_id')
f = args.file
@@ -261,7 +287,11 @@ class SpiderApi(BaseApi):
'message': 'deploy success'
}
def get_deploys(self, id):
def get_deploys(self, id: str) -> (dict, tuple):
"""
Get a list of latest deploys of given spider_id
:param id: spider_id
"""
items = db_manager.list('deploys', cond={'spider_id': ObjectId(id)}, limit=10, sort_key='finish_ts')
deploys = []
for item in items:
@@ -274,7 +304,11 @@ class SpiderApi(BaseApi):
'items': jsonify(deploys)
}
def get_tasks(self, id):
def get_tasks(self, id: str) -> (dict, tuple):
"""
Get a list of latest tasks of given spider_id
:param id:
"""
items = db_manager.list('tasks', cond={'spider_id': ObjectId(id)}, limit=10, sort_key='create_ts')
for item in items:
spider_id = item['spider_id']
@@ -287,11 +321,18 @@ class SpiderApi(BaseApi):
'items': jsonify(items)
}
def after_update(self, id=None):
def after_update(self, id: str = None) -> None:
"""
After each spider is updated, update the cron scheduler correspondingly.
:param id: spider_id
"""
scheduler.update()
class SpiderImportApi(Resource):
__doc__ = """
API for importing spiders from external resources including Github, Gitlab, and subversion (WIP)
"""
parser = reqparse.RequestParser()
arguments = [
('url', str)
@@ -302,7 +343,7 @@ class SpiderImportApi(Resource):
for arg, type in self.arguments:
self.parser.add_argument(arg, type=type)
def post(self, platform=None):
def post(self, platform: str = None) -> (dict, tuple):
if platform is None:
return {
'status': 'ok',
@@ -319,13 +360,22 @@ class SpiderImportApi(Resource):
return getattr(self, platform)()
def github(self):
def github(self) -> None:
"""
Import Github API
"""
self._git()
def gitlab(self):
def gitlab(self) -> None:
"""
Import Gitlab API
"""
self._git()
def _git(self):
"""
Helper method to perform github important (basically "git clone" method).
"""
args = self.parser.parse_args()
url = args.get('url')
if url is None:
@@ -357,7 +407,11 @@ class SpiderManageApi(Resource):
('url', str)
]
def post(self, action):
def post(self, action: str) -> (dict, tuple):
"""
POST method for SpiderManageAPI.
:param action:
"""
if not hasattr(self, action):
return {
'status': 'ok',
@@ -367,7 +421,10 @@ class SpiderManageApi(Resource):
return getattr(self, action)()
def deploy_all(self):
def deploy_all(self) -> (dict, tuple):
"""
Deploy all spiders to all nodes.
"""
# active nodes
nodes = db_manager.list('nodes', {'status': NodeStatus.ONLINE})

View File

@@ -1,6 +1,7 @@
import json
import requests
from bson import ObjectId
from celery.worker.control import revoke
from constants.task import TaskStatus
@@ -12,6 +13,7 @@ from utils.log import other
class TaskApi(BaseApi):
# collection name
col_name = 'tasks'
arguments = (
@@ -19,7 +21,12 @@ class TaskApi(BaseApi):
('file_path', str)
)
def get(self, id=None, action=None):
def get(self, id: str = None, action: str = None):
"""
GET method of TaskAPI.
:param id: item id
:param action: action
"""
# action by id
if action is not None:
if not hasattr(self, action):
@@ -28,12 +35,12 @@ class TaskApi(BaseApi):
'code': 400,
'error': 'action "%s" invalid' % action
}, 400
other.info(f"到这了{action},{id}")
# other.info(f"到这了{action},{id}")
return getattr(self, action)(id)
elif id is not None:
task = db_manager.get('tasks', id=id)
spider = db_manager.get('spiders', id=str(task['spider_id']))
task = db_manager.get(col_name=self.col_name, id=id)
spider = db_manager.get(col_name='spiders', id=str(task['spider_id']))
task['spider_name'] = spider['name']
try:
with open(task['log_file_path']) as f:
@@ -46,11 +53,12 @@ class TaskApi(BaseApi):
args = self.parser.parse_args()
page_size = args.get('page_size') or 10
page_num = args.get('page_num') or 1
tasks = db_manager.list('tasks', {}, limit=page_size, skip=page_size * (page_num - 1), sort_key='create_ts')
tasks = db_manager.list(col_name=self.col_name, cond={}, limit=page_size, skip=page_size * (page_num - 1),
sort_key='create_ts')
items = []
for task in tasks:
# _task = db_manager.get('tasks_celery', id=task['_id'])
_spider = db_manager.get('spiders', id=str(task['spider_id']))
_spider = db_manager.get(col_name='spiders', id=str(task['spider_id']))
if task.get('status') is None:
task['status'] = TaskStatus.UNAVAILABLE
task['spider_name'] = _spider['name']
@@ -63,9 +71,13 @@ class TaskApi(BaseApi):
'items': jsonify(items)
}
def on_get_log(self, id):
def on_get_log(self, id: (str, ObjectId)) -> (dict, tuple):
"""
Get the log of given task_id
:param id: task_id
"""
try:
task = db_manager.get('tasks', id=id)
task = db_manager.get(col_name=self.col_name, id=id)
with open(task['log_file_path']) as f:
log = f.read()
return {
@@ -79,9 +91,14 @@ class TaskApi(BaseApi):
'error': str(err)
}, 500
def get_log(self, id):
task = db_manager.get('tasks', id=id)
node = db_manager.get('nodes', id=task['node_id'])
def get_log(self, id: (str, ObjectId)) -> (dict, tuple):
"""
Submit an HTTP request to fetch log from the node of a given task.
:param id: task_id
:return:
"""
task = db_manager.get(col_name=self.col_name, id=id)
node = db_manager.get(col_name='nodes', id=task['node_id'])
r = requests.get('http://%s:%s/api/tasks/%s/on_get_log' % (
node['ip'],
node['port'],
@@ -101,7 +118,11 @@ class TaskApi(BaseApi):
'error': data['error']
}, 500
def get_results(self, id):
def get_results(self, id: str) -> (dict, tuple):
"""
Get a list of results crawled in a given task.
:param id: task_id
"""
args = self.parser.parse_args()
page_size = args.get('page_size') or 10
page_num = args.get('page_num') or 1
@@ -123,6 +144,12 @@ class TaskApi(BaseApi):
}
def stop(self, id):
"""
Stop the task in progress.
TODO: work in progress
:param id:
:return:
"""
revoke(id, terminate=True)
return {
'id': id,

View File

@@ -20,7 +20,6 @@ class Scheduler(object):
scheduler = BackgroundScheduler(jobstores=jobstores)
def execute_spider(self, id: str):
r = requests.get('http://%s:%s/api/spiders/%s/on_crawl' % (
FLASK_HOST,
FLASK_PORT,

View File

@@ -5,11 +5,20 @@ from datetime import datetime
from bson import json_util
def is_object_id(id):
def is_object_id(id: str) -> bool:
"""
Determine if the id is a valid ObjectId string
:param id: ObjectId string
"""
return re.search('^[a-zA-Z0-9]{24}$', id) is not None
def jsonify(obj):
def jsonify(obj: (dict, list)) -> (dict, list):
"""
Convert dict/list to a valid json object.
:param obj: object to be converted
:return: dict/list
"""
dump_str = json_util.dumps(obj)
converted_obj = json.loads(dump_str)
if type(converted_obj) == dict: