From 0480984cdb3804320366de27cb2cc6e1202c0e6f Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Thu, 21 Feb 2019 14:08:48 +0800 Subject: [PATCH] updated node/spider overviews --- app.py | 44 +++++++++++++------- bin/run_worker.py | 8 +++- config/__init__.py | 2 - config/common.py | 3 ++ config/flask.py | 1 + constants/spider.py | 26 +++++++++--- db/manager.py | 16 ++++++-- routes/__init__.py | 14 +++---- routes/deploys.py | 6 --- routes/files.py | 5 --- routes/nodes.py | 49 ++++++++++++++++------ routes/spiders.py | 70 ++++++++++++++++++++++++-------- routes/tasks.py | 8 ---- spiders/weixin/weixin_crawler.py | 0 tasks/celery.py | 5 +++ tasks/deploy.py | 4 +- tasks/spider.py | 6 +-- utils/file.py | 34 ++++++++++++++++ utils/spider.py | 15 +++++++ 19 files changed, 230 insertions(+), 86 deletions(-) create mode 100644 config/common.py create mode 100644 spiders/weixin/weixin_crawler.py create mode 100644 tasks/celery.py create mode 100644 utils/file.py create mode 100644 utils/spider.py diff --git a/app.py b/app.py index db2dbbbd..573ea416 100644 --- a/app.py +++ b/app.py @@ -1,16 +1,17 @@ from celery import Celery from flask import Flask from flask_cors import CORS -from flask_restful import Api +from flask_restful import Api, Resource -# TODO: 用配置文件启动 http://www.pythondoc.com/flask/config.html +from routes.deploys import DeployApi +from routes.files import FileApi +from routes.nodes import NodeApi +from routes.spiders import SpiderApi +from routes.tasks import TaskApi + +# flask app instance app = Flask(__name__) -app.config['DEBUG'] = True - -# celery_app -celery_app = Celery(__name__) -celery_app.config_from_object('config.celery') - +app.config.from_object('config.flask') # init flask api instance api = Api(app) @@ -18,12 +19,27 @@ api = Api(app) CORS(app, supports_credentials=True) # reference api routes -import routes.nodes -import routes.spiders -import routes.deploys -import routes.tasks -import routes.files + +api.add_resource(NodeApi, + '/api/nodes', + '/api/nodes/', + '/api/nodes//') +api.add_resource(SpiderApi, + '/api/spiders', + '/api/spiders/', + '/api/spiders//') +api.add_resource(DeployApi, + '/api/deploys', + '/api/deploys/', + '/api/deploys//') +api.add_resource(TaskApi, + '/api/tasks', + '/api/tasks/' + ) +api.add_resource(FileApi, + '/api/files', + '/api/files/') # start flask app if __name__ == '__main__': - app.run(host='0.0.0.0', port='5000') + app.run() diff --git a/bin/run_worker.py b/bin/run_worker.py index 38141eaf..7fa68edb 100644 --- a/bin/run_worker.py +++ b/bin/run_worker.py @@ -1,7 +1,13 @@ import sys +import os from celery import Celery -from app import celery_app +# make sure the working directory is in system path +file_dir = os.path.dirname(os.path.realpath(__file__)) +root_path = os.path.abspath(os.path.join(file_dir, '..')) +sys.path.append(root_path) + +from tasks.celery import celery_app # import necessary tasks import tasks.spider diff --git a/config/__init__.py b/config/__init__.py index 4a968adb..e69de29b 100644 --- a/config/__init__.py +++ b/config/__init__.py @@ -1,2 +0,0 @@ -PROJECT_FILE_FOLDER = '/var/crawlab' -PROJECT_LOGS_FOLDER = '/Users/yeqing/projects/crawlab/logs/crawlab' diff --git a/config/common.py b/config/common.py new file mode 100644 index 00000000..36c6e4a3 --- /dev/null +++ b/config/common.py @@ -0,0 +1,3 @@ +PROJECT_SOURCE_FILE_FOLDER = '/Users/yeqing/projects/crawlab/spiders' +PROJECT_DEPLOY_FILE_FOLDER = '/var/crawlab' +PROJECT_LOGS_FOLDER = '/Users/yeqing/projects/crawlab/logs/crawlab' diff --git a/config/flask.py b/config/flask.py index 209bb279..8d9a7ab2 100644 --- a/config/flask.py +++ b/config/flask.py @@ -1,2 +1,3 @@ DEBUG = True +# SERVER_NAME = '0.0.0.0:5000' diff --git a/constants/spider.py b/constants/spider.py index 1df0e031..7595f79e 100644 --- a/constants/spider.py +++ b/constants/spider.py @@ -1,10 +1,24 @@ class SpiderType: - SCRAPY = 1 - PYSPIDER = 2 - WEBMAGIC = 3 + SCRAPY = 'scrapy' + PYSPIDER = 'pyspider' + WEBMAGIC = 'webmagic' class LangType: - PYTHON = 1 - NODEJS = 2 - JAVA = 3 + PYTHON = 'python' + JAVASCRIPT = 'javascript' + JAVA = 'java' + GO = 'go' + OTHER = 'other' + + +SUFFIX_IGNORE = [ + 'pyc' +] + +FILE_SUFFIX_LANG_MAPPING = { + 'py': LangType.PYTHON, + 'js': LangType.JAVASCRIPT, + 'java': LangType.JAVA, + 'go': LangType.GO, +} diff --git a/db/manager.py b/db/manager.py index a636d23f..115fbdfa 100644 --- a/db/manager.py +++ b/db/manager.py @@ -14,7 +14,11 @@ class DbManager(object): def save(self, col_name: str, item, **kwargs): col = self.db[col_name] - item.pop('stats') # in case some fields cannot be saved in MongoDB + + # in case some fields cannot be saved in MongoDB + if item.get('stats') is not None: + item.pop('stats') + col.save(item, **kwargs) def remove(self, col_name: str, cond: dict, **kwargs): @@ -43,6 +47,10 @@ class DbManager(object): data.append(item) return data + def _get(self, col_name: str, cond: dict): + col = self.db[col_name] + return col.find_one(cond) + def get(self, col_name: str, id): if type(id) == ObjectId: _id = id @@ -50,8 +58,10 @@ class DbManager(object): _id = ObjectId(id) else: _id = id - col = self.db[col_name] - return col.find_one({'_id': _id}) + return self._get(col_name=col_name, cond={'_id': _id}) + + def get_one_by_key(self, col_name: str, key, value): + return self._get(col_name=col_name, cond={key: value}) def count(self, col_name: str, cond): col = self.db[col_name] diff --git a/routes/__init__.py b/routes/__init__.py index ccebe5a0..e3b7fa36 100644 --- a/routes/__init__.py +++ b/routes/__init__.py @@ -1,7 +1,7 @@ -# print('routes') - -from routes import spiders -from routes import tasks - -print('routes') - +# from app import api +# from routes.deploys import DeployApi +# from routes.files import FileApi +# from routes.nodes import NodeApi +# from routes.spiders import SpiderApi +# from routes.tasks import TaskApi +# print(api) diff --git a/routes/deploys.py b/routes/deploys.py index 421df7cd..25a5d4d6 100644 --- a/routes/deploys.py +++ b/routes/deploys.py @@ -1,4 +1,3 @@ -from app import api from routes.base import BaseApi @@ -10,8 +9,3 @@ class DeployApi(BaseApi): ('node_id', str), ) - -api.add_resource(DeployApi, - '/api/deploys', - '/api/deploys/', - '/api/deploys//') diff --git a/routes/files.py b/routes/files.py index 61aad6bf..8c087f7e 100644 --- a/routes/files.py +++ b/routes/files.py @@ -2,7 +2,6 @@ import os from flask_restful import reqparse, Resource -from app import api from utils import jsonify @@ -38,7 +37,3 @@ class FileApi(Resource): 'folders': sorted(folders), }) - -api.add_resource(FileApi, - '/api/files', - '/api/files/') diff --git a/routes/nodes.py b/routes/nodes.py index d6d4866c..82c58de7 100644 --- a/routes/nodes.py +++ b/routes/nodes.py @@ -2,7 +2,6 @@ import json import requests -from app import api from config.celery import FLOWER_API_ENDPOINT from constants.node import NodeType from db.manager import db_manager @@ -20,19 +19,46 @@ class NodeApi(BaseApi): ('description', str), ) - def get(self, id=None): - if id is not None: + def get(self, id=None, action=None): + # action by id + if action is not None: + if not hasattr(self, action): + return { + 'status': 'ok', + 'code': 400, + 'error': 'action "%s" invalid' % action + }, 400 + return getattr(self, action)(id) + + # get one node + elif id is not None: return db_manager.get('nodes', id=id) + # get a list of items else: res = requests.get('%s/workers' % FLOWER_API_ENDPOINT) for k, v in json.loads(res.content.decode('utf-8')).items(): node_name = k - node = v - node['_id'] = node_name - node['name'] = node_name - node['status'] = NodeType.ONLINE - db_manager.save('nodes', node) + node_celery = v + node = db_manager.get('nodes', id=node_name) + + # new node + if node is None: + node = {} + for _k, _v in v.items(): + node[_k] = _v + node['_id'] = node_name + node['name'] = node_name + node['status'] = NodeType.ONLINE + db_manager.save('nodes', node) + + # existing node + else: + for _k, _v in v.items(): + node[_k] = _v + node['name'] = node_name + node['status'] = NodeType.ONLINE + db_manager.save('nodes', node) items = db_manager.list('nodes', {}) @@ -41,8 +67,5 @@ class NodeApi(BaseApi): 'items': items }) - -api.add_resource(NodeApi, - '/api/nodes', - '/api/nodes/', - '/api/nodes//') + def spider(self, id=None): + items = db_manager.list('spiders') diff --git a/routes/spiders.py b/routes/spiders.py index fa67bead..4eeca7fc 100644 --- a/routes/spiders.py +++ b/routes/spiders.py @@ -1,17 +1,15 @@ -import json -# from celery.utils.log import get_logger import os import shutil from bson import ObjectId -from flask_restful import reqparse, Resource -from app import api -from config import PROJECT_FILE_FOLDER +from config.common import PROJECT_DEPLOY_FILE_FOLDER, PROJECT_SOURCE_FILE_FOLDER from db.manager import db_manager from routes.base import BaseApi from tasks.spider import execute_spider from utils import jsonify +from utils.file import get_file_suffix_stats +from utils.spider import get_lang_by_stats class SpiderApi(BaseApi): @@ -21,13 +19,59 @@ class SpiderApi(BaseApi): ('name', str), ('cmd', str), ('src', str), - ('type', int), - ('lang', int), + ('type', str), + ('lang', str), ) def get(self, id=None, action=None): - # TODO: discover folders by path - pass + # action by id + if action is not None: + if not hasattr(self, action): + return { + 'status': 'ok', + 'code': 400, + 'error': 'action "%s" invalid' % action + }, 400 + return getattr(self, action)(id) + + # get one node + elif id is not None: + return jsonify(db_manager.get('spiders', id=id)) + + # get a list of items + else: + dirs = os.listdir(PROJECT_SOURCE_FILE_FOLDER) + for _dir in dirs: + dir_path = os.path.join(PROJECT_SOURCE_FILE_FOLDER, _dir) + dir_name = _dir + spider = db_manager.get_one_by_key('spiders', key='src', value=dir_path) + + # new spider + if spider is None: + stats = get_file_suffix_stats(dir_path) + lang = get_lang_by_stats(stats) + db_manager.save('spiders', { + 'name': dir_name, + 'src': dir_path, + 'lang': lang, + 'suffix_stats': stats, + }) + + # existing spider + else: + stats = get_file_suffix_stats(dir_path) + lang = get_lang_by_stats(stats) + db_manager.update_one('spiders', id=str(spider['_id']), values={ + 'lang': lang, + 'suffix_stats': stats, + }) + + items = db_manager.list('spiders', {}) + + return jsonify({ + 'status': 'ok', + 'items': items + }) def crawl(self, id): job = execute_spider.delay(id) @@ -56,7 +100,7 @@ class SpiderApi(BaseApi): # make source / destination src = spider.get('src') - dst = os.path.join(PROJECT_FILE_FOLDER, str(spider.get('_id')), str(latest_version + 1)) + dst = os.path.join(PROJECT_DEPLOY_FILE_FOLDER, str(spider.get('_id')), str(latest_version + 1)) # copy files try: @@ -80,9 +124,3 @@ class SpiderApi(BaseApi): 'version': version, 'node_id': None # TODO: deploy to corresponding node }) - - -api.add_resource(SpiderApi, - '/api/spiders', - '/api/spiders/', - '/api/spiders//') diff --git a/routes/tasks.py b/routes/tasks.py index 244694f5..a7bac35d 100644 --- a/routes/tasks.py +++ b/routes/tasks.py @@ -1,4 +1,3 @@ -from app import api from db.manager import db_manager from routes.base import BaseApi from utils import jsonify @@ -25,10 +24,3 @@ class TaskApi(BaseApi): 'status': 'ok', 'items': items }) - - -# add api to resources -api.add_resource(TaskApi, - '/api/tasks', - '/api/tasks/' - ) diff --git a/spiders/weixin/weixin_crawler.py b/spiders/weixin/weixin_crawler.py new file mode 100644 index 00000000..e69de29b diff --git a/tasks/celery.py b/tasks/celery.py new file mode 100644 index 00000000..e5c7c93d --- /dev/null +++ b/tasks/celery.py @@ -0,0 +1,5 @@ +from celery import Celery + +# celery app instance +celery_app = Celery(__name__) +celery_app.config_from_object('config.celery') diff --git a/tasks/deploy.py b/tasks/deploy.py index 3ac0c0ec..7bb9bbce 100644 --- a/tasks/deploy.py +++ b/tasks/deploy.py @@ -5,9 +5,9 @@ from datetime import datetime import requests from celery.utils.log import get_logger -from config import PROJECT_FILE_FOLDER, PROJECT_LOGS_FOLDER +from config.common import PROJECT_DEPLOY_FILE_FOLDER, PROJECT_LOGS_FOLDER from db.manager import db_manager -from app import celery_app +from .celery import celery_app import subprocess logger = get_logger(__name__) diff --git a/tasks/spider.py b/tasks/spider.py index ecbf697d..070ff5db 100644 --- a/tasks/spider.py +++ b/tasks/spider.py @@ -6,9 +6,9 @@ import requests from bson import ObjectId from celery.utils.log import get_logger -from config import PROJECT_FILE_FOLDER, PROJECT_LOGS_FOLDER +from config.common import PROJECT_DEPLOY_FILE_FOLDER, PROJECT_LOGS_FOLDER from db.manager import db_manager -from app import celery_app +from .celery import celery_app import subprocess logger = get_logger(__name__) @@ -21,7 +21,7 @@ def execute_spider(self, id: str): spider = db_manager.get('spiders', id=id) latest_version = db_manager.get_latest_version(spider_id=id) command = spider.get('cmd') - current_working_directory = os.path.join(PROJECT_FILE_FOLDER, str(spider.get('_id')), str(latest_version)) + current_working_directory = os.path.join(PROJECT_DEPLOY_FILE_FOLDER, str(spider.get('_id')), str(latest_version)) # log info logger.info('spider_id: %s' % id) diff --git a/utils/file.py b/utils/file.py new file mode 100644 index 00000000..7c629f33 --- /dev/null +++ b/utils/file.py @@ -0,0 +1,34 @@ +import os +import re +from collections import defaultdict + +SUFFIX_PATTERN = r'\.(\w{,10})$' +suffix_regex = re.compile(SUFFIX_PATTERN, re.IGNORECASE) + + +def get_file_suffix(file_name: str): + file_name = file_name.lower() + m = suffix_regex.search(file_name) + if m is not None: + return m.groups()[0] + else: + return file_name + + +def get_file_list(path): + for root, dirs, file_names in os.walk(path): + # print(root) # 当前目录路径 + # print(dirs) # 当前路径下所有子目录 + # print(file_names) # 当前路径下所有非目录子文件 + + for file_name in file_names: + file_path = os.path.join(root, file_name) + yield file_path + + +def get_file_suffix_stats(path) -> dict: + stats = defaultdict(int) + for file_path in get_file_list(path): + suffix = get_file_suffix(file_path) + stats[suffix] += 1 + return stats diff --git a/utils/spider.py b/utils/spider.py new file mode 100644 index 00000000..d17a1d3a --- /dev/null +++ b/utils/spider.py @@ -0,0 +1,15 @@ +from constants.spider import FILE_SUFFIX_LANG_MAPPING, LangType, SUFFIX_IGNORE + + +def get_lang_by_stats(stats: dict) -> LangType: + """ + :param stats: stats is generated by utils.file.get_file_suffix_stats + :return: + """ + data = stats.items() + data = sorted(data, key=lambda item: item[1]) + data = list(filter(lambda item: item[0] not in SUFFIX_IGNORE, data)) + top_suffix = data[-1][0] + if FILE_SUFFIX_LANG_MAPPING.get(top_suffix) is not None: + return FILE_SUFFIX_LANG_MAPPING.get(top_suffix) + return LangType.OTHER