updated node/spider overviews

This commit is contained in:
Marvin Zhang
2019-02-21 14:08:48 +08:00
parent 5eb69a4b0d
commit 913c0644c9
19 changed files with 230 additions and 86 deletions

44
app.py
View File

@@ -1,16 +1,17 @@
from celery import Celery
from flask import Flask
from flask_cors import CORS
from flask_restful import Api
from flask_restful import Api, Resource
# TODO: 用配置文件启动 http://www.pythondoc.com/flask/config.html
from routes.deploys import DeployApi
from routes.files import FileApi
from routes.nodes import NodeApi
from routes.spiders import SpiderApi
from routes.tasks import TaskApi
# flask app instance
app = Flask(__name__)
app.config['DEBUG'] = True
# celery_app
celery_app = Celery(__name__)
celery_app.config_from_object('config.celery')
app.config.from_object('config.flask')
# init flask api instance
api = Api(app)
@@ -18,12 +19,27 @@ api = Api(app)
CORS(app, supports_credentials=True)
# reference api routes
import routes.nodes
import routes.spiders
import routes.deploys
import routes.tasks
import routes.files
api.add_resource(NodeApi,
'/api/nodes',
'/api/nodes/<string:id>',
'/api/nodes/<string:id>/<string:action>')
api.add_resource(SpiderApi,
'/api/spiders',
'/api/spiders/<string:id>',
'/api/spiders/<string:id>/<string:action>')
api.add_resource(DeployApi,
'/api/deploys',
'/api/deploys/<string:id>',
'/api/deploys/<string:id>/<string:action>')
api.add_resource(TaskApi,
'/api/tasks',
'/api/tasks/<string:id>'
)
api.add_resource(FileApi,
'/api/files',
'/api/files/<string:action>')
# start flask app
if __name__ == '__main__':
app.run(host='0.0.0.0', port='5000')
app.run()

View File

@@ -1,7 +1,13 @@
import sys
import os
from celery import Celery
from app import celery_app
# make sure the working directory is in system path
file_dir = os.path.dirname(os.path.realpath(__file__))
root_path = os.path.abspath(os.path.join(file_dir, '..'))
sys.path.append(root_path)
from tasks.celery import celery_app
# import necessary tasks
import tasks.spider

View File

@@ -1,2 +0,0 @@
PROJECT_FILE_FOLDER = '/var/crawlab'
PROJECT_LOGS_FOLDER = '/Users/yeqing/projects/crawlab/logs/crawlab'

3
config/common.py Normal file
View File

@@ -0,0 +1,3 @@
PROJECT_SOURCE_FILE_FOLDER = '/Users/yeqing/projects/crawlab/spiders'
PROJECT_DEPLOY_FILE_FOLDER = '/var/crawlab'
PROJECT_LOGS_FOLDER = '/Users/yeqing/projects/crawlab/logs/crawlab'

View File

@@ -1,2 +1,3 @@
DEBUG = True
# SERVER_NAME = '0.0.0.0:5000'

View File

@@ -1,10 +1,24 @@
class SpiderType:
SCRAPY = 1
PYSPIDER = 2
WEBMAGIC = 3
SCRAPY = 'scrapy'
PYSPIDER = 'pyspider'
WEBMAGIC = 'webmagic'
class LangType:
PYTHON = 1
NODEJS = 2
JAVA = 3
PYTHON = 'python'
JAVASCRIPT = 'javascript'
JAVA = 'java'
GO = 'go'
OTHER = 'other'
SUFFIX_IGNORE = [
'pyc'
]
FILE_SUFFIX_LANG_MAPPING = {
'py': LangType.PYTHON,
'js': LangType.JAVASCRIPT,
'java': LangType.JAVA,
'go': LangType.GO,
}

View File

@@ -14,7 +14,11 @@ class DbManager(object):
def save(self, col_name: str, item, **kwargs):
col = self.db[col_name]
item.pop('stats') # in case some fields cannot be saved in MongoDB
# in case some fields cannot be saved in MongoDB
if item.get('stats') is not None:
item.pop('stats')
col.save(item, **kwargs)
def remove(self, col_name: str, cond: dict, **kwargs):
@@ -43,6 +47,10 @@ class DbManager(object):
data.append(item)
return data
def _get(self, col_name: str, cond: dict):
col = self.db[col_name]
return col.find_one(cond)
def get(self, col_name: str, id):
if type(id) == ObjectId:
_id = id
@@ -50,8 +58,10 @@ class DbManager(object):
_id = ObjectId(id)
else:
_id = id
col = self.db[col_name]
return col.find_one({'_id': _id})
return self._get(col_name=col_name, cond={'_id': _id})
def get_one_by_key(self, col_name: str, key, value):
return self._get(col_name=col_name, cond={key: value})
def count(self, col_name: str, cond):
col = self.db[col_name]

View File

@@ -1,7 +1,7 @@
# print('routes')
from routes import spiders
from routes import tasks
print('routes')
# from app import api
# from routes.deploys import DeployApi
# from routes.files import FileApi
# from routes.nodes import NodeApi
# from routes.spiders import SpiderApi
# from routes.tasks import TaskApi
# print(api)

View File

@@ -1,4 +1,3 @@
from app import api
from routes.base import BaseApi
@@ -10,8 +9,3 @@ class DeployApi(BaseApi):
('node_id', str),
)
api.add_resource(DeployApi,
'/api/deploys',
'/api/deploys/<string:id>',
'/api/deploys/<string:id>/<string:action>')

View File

@@ -2,7 +2,6 @@ import os
from flask_restful import reqparse, Resource
from app import api
from utils import jsonify
@@ -38,7 +37,3 @@ class FileApi(Resource):
'folders': sorted(folders),
})
api.add_resource(FileApi,
'/api/files',
'/api/files/<string:action>')

View File

@@ -2,7 +2,6 @@ import json
import requests
from app import api
from config.celery import FLOWER_API_ENDPOINT
from constants.node import NodeType
from db.manager import db_manager
@@ -20,19 +19,46 @@ class NodeApi(BaseApi):
('description', str),
)
def get(self, id=None):
if id is not None:
def get(self, id=None, action=None):
# action by id
if action is not None:
if not hasattr(self, action):
return {
'status': 'ok',
'code': 400,
'error': 'action "%s" invalid' % action
}, 400
return getattr(self, action)(id)
# get one node
elif id is not None:
return db_manager.get('nodes', id=id)
# get a list of items
else:
res = requests.get('%s/workers' % FLOWER_API_ENDPOINT)
for k, v in json.loads(res.content.decode('utf-8')).items():
node_name = k
node = v
node['_id'] = node_name
node['name'] = node_name
node['status'] = NodeType.ONLINE
db_manager.save('nodes', node)
node_celery = v
node = db_manager.get('nodes', id=node_name)
# new node
if node is None:
node = {}
for _k, _v in v.items():
node[_k] = _v
node['_id'] = node_name
node['name'] = node_name
node['status'] = NodeType.ONLINE
db_manager.save('nodes', node)
# existing node
else:
for _k, _v in v.items():
node[_k] = _v
node['name'] = node_name
node['status'] = NodeType.ONLINE
db_manager.save('nodes', node)
items = db_manager.list('nodes', {})
@@ -41,8 +67,5 @@ class NodeApi(BaseApi):
'items': items
})
api.add_resource(NodeApi,
'/api/nodes',
'/api/nodes/<string:id>',
'/api/nodes/<string:id>/<string:action>')
def spider(self, id=None):
items = db_manager.list('spiders')

View File

@@ -1,17 +1,15 @@
import json
# from celery.utils.log import get_logger
import os
import shutil
from bson import ObjectId
from flask_restful import reqparse, Resource
from app import api
from config import PROJECT_FILE_FOLDER
from config.common import PROJECT_DEPLOY_FILE_FOLDER, PROJECT_SOURCE_FILE_FOLDER
from db.manager import db_manager
from routes.base import BaseApi
from tasks.spider import execute_spider
from utils import jsonify
from utils.file import get_file_suffix_stats
from utils.spider import get_lang_by_stats
class SpiderApi(BaseApi):
@@ -21,13 +19,59 @@ class SpiderApi(BaseApi):
('name', str),
('cmd', str),
('src', str),
('type', int),
('lang', int),
('type', str),
('lang', str),
)
def get(self, id=None, action=None):
# TODO: discover folders by path
pass
# action by id
if action is not None:
if not hasattr(self, action):
return {
'status': 'ok',
'code': 400,
'error': 'action "%s" invalid' % action
}, 400
return getattr(self, action)(id)
# get one node
elif id is not None:
return jsonify(db_manager.get('spiders', id=id))
# get a list of items
else:
dirs = os.listdir(PROJECT_SOURCE_FILE_FOLDER)
for _dir in dirs:
dir_path = os.path.join(PROJECT_SOURCE_FILE_FOLDER, _dir)
dir_name = _dir
spider = db_manager.get_one_by_key('spiders', key='src', value=dir_path)
# new spider
if spider is None:
stats = get_file_suffix_stats(dir_path)
lang = get_lang_by_stats(stats)
db_manager.save('spiders', {
'name': dir_name,
'src': dir_path,
'lang': lang,
'suffix_stats': stats,
})
# existing spider
else:
stats = get_file_suffix_stats(dir_path)
lang = get_lang_by_stats(stats)
db_manager.update_one('spiders', id=str(spider['_id']), values={
'lang': lang,
'suffix_stats': stats,
})
items = db_manager.list('spiders', {})
return jsonify({
'status': 'ok',
'items': items
})
def crawl(self, id):
job = execute_spider.delay(id)
@@ -56,7 +100,7 @@ class SpiderApi(BaseApi):
# make source / destination
src = spider.get('src')
dst = os.path.join(PROJECT_FILE_FOLDER, str(spider.get('_id')), str(latest_version + 1))
dst = os.path.join(PROJECT_DEPLOY_FILE_FOLDER, str(spider.get('_id')), str(latest_version + 1))
# copy files
try:
@@ -80,9 +124,3 @@ class SpiderApi(BaseApi):
'version': version,
'node_id': None # TODO: deploy to corresponding node
})
api.add_resource(SpiderApi,
'/api/spiders',
'/api/spiders/<string:id>',
'/api/spiders/<string:id>/<string:action>')

View File

@@ -1,4 +1,3 @@
from app import api
from db.manager import db_manager
from routes.base import BaseApi
from utils import jsonify
@@ -25,10 +24,3 @@ class TaskApi(BaseApi):
'status': 'ok',
'items': items
})
# add api to resources
api.add_resource(TaskApi,
'/api/tasks',
'/api/tasks/<string:id>'
)

View File

5
tasks/celery.py Normal file
View File

@@ -0,0 +1,5 @@
from celery import Celery
# celery app instance
celery_app = Celery(__name__)
celery_app.config_from_object('config.celery')

View File

@@ -5,9 +5,9 @@ from datetime import datetime
import requests
from celery.utils.log import get_logger
from config import PROJECT_FILE_FOLDER, PROJECT_LOGS_FOLDER
from config.common import PROJECT_DEPLOY_FILE_FOLDER, PROJECT_LOGS_FOLDER
from db.manager import db_manager
from app import celery_app
from .celery import celery_app
import subprocess
logger = get_logger(__name__)

View File

@@ -6,9 +6,9 @@ import requests
from bson import ObjectId
from celery.utils.log import get_logger
from config import PROJECT_FILE_FOLDER, PROJECT_LOGS_FOLDER
from config.common import PROJECT_DEPLOY_FILE_FOLDER, PROJECT_LOGS_FOLDER
from db.manager import db_manager
from app import celery_app
from .celery import celery_app
import subprocess
logger = get_logger(__name__)
@@ -21,7 +21,7 @@ def execute_spider(self, id: str):
spider = db_manager.get('spiders', id=id)
latest_version = db_manager.get_latest_version(spider_id=id)
command = spider.get('cmd')
current_working_directory = os.path.join(PROJECT_FILE_FOLDER, str(spider.get('_id')), str(latest_version))
current_working_directory = os.path.join(PROJECT_DEPLOY_FILE_FOLDER, str(spider.get('_id')), str(latest_version))
# log info
logger.info('spider_id: %s' % id)

34
utils/file.py Normal file
View File

@@ -0,0 +1,34 @@
import os
import re
from collections import defaultdict
SUFFIX_PATTERN = r'\.(\w{,10})$'
suffix_regex = re.compile(SUFFIX_PATTERN, re.IGNORECASE)
def get_file_suffix(file_name: str):
file_name = file_name.lower()
m = suffix_regex.search(file_name)
if m is not None:
return m.groups()[0]
else:
return file_name
def get_file_list(path):
for root, dirs, file_names in os.walk(path):
# print(root) # 当前目录路径
# print(dirs) # 当前路径下所有子目录
# print(file_names) # 当前路径下所有非目录子文件
for file_name in file_names:
file_path = os.path.join(root, file_name)
yield file_path
def get_file_suffix_stats(path) -> dict:
stats = defaultdict(int)
for file_path in get_file_list(path):
suffix = get_file_suffix(file_path)
stats[suffix] += 1
return stats

15
utils/spider.py Normal file
View File

@@ -0,0 +1,15 @@
from constants.spider import FILE_SUFFIX_LANG_MAPPING, LangType, SUFFIX_IGNORE
def get_lang_by_stats(stats: dict) -> LangType:
"""
:param stats: stats is generated by utils.file.get_file_suffix_stats
:return:
"""
data = stats.items()
data = sorted(data, key=lambda item: item[1])
data = list(filter(lambda item: item[0] not in SUFFIX_IGNORE, data))
top_suffix = data[-1][0]
if FILE_SUFFIX_LANG_MAPPING.get(top_suffix) is not None:
return FILE_SUFFIX_LANG_MAPPING.get(top_suffix)
return LangType.OTHER