From 321aba2c7615cd17fcc2c4f9340045c1149ab854 Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Tue, 30 Jul 2019 22:25:00 +0800 Subject: [PATCH 1/2] removed python code --- CHANGELOG.md | 8 +- crawlab/.gitignore | 114 --- crawlab/__init__.py | 0 crawlab/app.py | 106 -- crawlab/config/__init__.py | 3 - crawlab/config/config.py | 52 - crawlab/constants/__init__.py | 0 crawlab/constants/file.py | 3 - crawlab/constants/lang.py | 4 - crawlab/constants/manage.py | 6 - crawlab/constants/node.py | 3 - crawlab/constants/spider.py | 44 - crawlab/constants/task.py | 8 - crawlab/db/__init__.py | 0 crawlab/db/manager.py | 189 ---- crawlab/flower.py | 20 - crawlab/requirements.txt | 17 - crawlab/routes/__init__.py | 7 - crawlab/routes/base.py | 189 ---- crawlab/routes/deploys.py | 46 - crawlab/routes/files.py | 50 - crawlab/routes/nodes.py | 81 -- crawlab/routes/schedules.py | 25 - crawlab/routes/sites.py | 90 -- crawlab/routes/spiders.py | 937 ------------------ crawlab/routes/stats.py | 235 ----- crawlab/routes/tasks.py | 256 ----- crawlab/spiders/scrapy.cfg | 11 - crawlab/spiders/spiders/__init__.py | 0 crawlab/spiders/spiders/db.py | 18 - crawlab/spiders/spiders/items.py | 25 - crawlab/spiders/spiders/middlewares.py | 103 -- crawlab/spiders/spiders/pipelines.py | 17 - crawlab/spiders/spiders/settings.py | 90 -- crawlab/spiders/spiders/spiders/__init__.py | 4 - .../spiders/spiders/spiders/config_spider.py | 123 --- crawlab/spiders/spiders/utils.py | 55 - crawlab/swagger.yaml | 353 ------- crawlab/tasks/__init__.py | 0 crawlab/tasks/celery.py | 5 - crawlab/tasks/deploy.py | 18 - crawlab/tasks/node.py | 7 - crawlab/tasks/scheduler.py | 89 -- crawlab/tasks/spider.py | 272 ----- crawlab/test/__init__.py | 0 crawlab/test/test.http | 53 - crawlab/utils/__init__.py | 34 - crawlab/utils/deploy.py | 33 - crawlab/utils/file.py | 79 -- crawlab/utils/log.py | 75 -- crawlab/utils/node.py | 50 - crawlab/utils/spider.py | 179 ---- crawlab/worker.py | 19 - 53 files changed, 7 insertions(+), 4198 deletions(-) delete mode 100644 crawlab/.gitignore delete mode 100644 crawlab/__init__.py delete mode 100644 crawlab/app.py delete mode 100644 crawlab/config/__init__.py delete mode 100644 crawlab/config/config.py delete mode 100644 crawlab/constants/__init__.py delete mode 100644 crawlab/constants/file.py delete mode 100644 crawlab/constants/lang.py delete mode 100644 crawlab/constants/manage.py delete mode 100644 crawlab/constants/node.py delete mode 100644 crawlab/constants/spider.py delete mode 100644 crawlab/constants/task.py delete mode 100644 crawlab/db/__init__.py delete mode 100644 crawlab/db/manager.py delete mode 100644 crawlab/flower.py delete mode 100644 crawlab/requirements.txt delete mode 100644 crawlab/routes/__init__.py delete mode 100644 crawlab/routes/base.py delete mode 100644 crawlab/routes/deploys.py delete mode 100644 crawlab/routes/files.py delete mode 100644 crawlab/routes/nodes.py delete mode 100644 crawlab/routes/schedules.py delete mode 100644 crawlab/routes/sites.py delete mode 100644 crawlab/routes/spiders.py delete mode 100644 crawlab/routes/stats.py delete mode 100644 crawlab/routes/tasks.py delete mode 100644 crawlab/spiders/scrapy.cfg delete mode 100644 crawlab/spiders/spiders/__init__.py delete mode 100644 crawlab/spiders/spiders/db.py delete mode 100644 crawlab/spiders/spiders/items.py delete mode 100644 crawlab/spiders/spiders/middlewares.py delete mode 100644 crawlab/spiders/spiders/pipelines.py delete mode 100644 crawlab/spiders/spiders/settings.py delete mode 100644 crawlab/spiders/spiders/spiders/__init__.py delete mode 100644 crawlab/spiders/spiders/spiders/config_spider.py delete mode 100644 crawlab/spiders/spiders/utils.py delete mode 100644 crawlab/swagger.yaml delete mode 100644 crawlab/tasks/__init__.py delete mode 100644 crawlab/tasks/celery.py delete mode 100644 crawlab/tasks/deploy.py delete mode 100644 crawlab/tasks/node.py delete mode 100644 crawlab/tasks/scheduler.py delete mode 100644 crawlab/tasks/spider.py delete mode 100644 crawlab/test/__init__.py delete mode 100644 crawlab/test/test.http delete mode 100644 crawlab/utils/__init__.py delete mode 100644 crawlab/utils/deploy.py delete mode 100644 crawlab/utils/file.py delete mode 100644 crawlab/utils/log.py delete mode 100644 crawlab/utils/node.py delete mode 100644 crawlab/utils/spider.py delete mode 100644 crawlab/worker.py diff --git a/CHANGELOG.md b/CHANGELOG.md index b24576ea..1f29c97a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,10 @@ -# 0.2.4 (unreleased) +# 0.3.0 (2019-07-31) +### Features / Enhancement +- **Golang**: Refactored code from Python backend to Golang, much more stability and performance. +- **Node Network Graph**: Visualization of node typology. +- **Node System Info**: Available to see system info including OS, CPUs and executables. + +# 0.2.4 (2019-07-07) ### Features / Enhancement - **Documentation**: Better and much more detailed documentation. - **Better Crontab**: Make crontab expression through crontab UI. diff --git a/crawlab/.gitignore b/crawlab/.gitignore deleted file mode 100644 index ccc81841..00000000 --- a/crawlab/.gitignore +++ /dev/null @@ -1,114 +0,0 @@ -.idea/ - -# Byte-compiled / optimized / DLL files -__pycache__/ -*.py[cod] -*$py.class - -# C extensions -*.so - -# Distribution / packaging -.Python -build/ -develop-eggs/ -dist/ -downloads/ -eggs/ -.eggs/ -lib/ -lib64/ -parts/ -sdist/ -var/ -wheels/ -*.egg-info/ -.installed.cfg -*.egg -MANIFEST - -# PyInstaller -# Usually these files are written by a python script from a template -# before PyInstaller builds the exe, so as to inject date/other infos into it. -*.manifest -*.spec - -# Installer logs -pip-log.txt -pip-delete-this-directory.txt - -# Unit test / coverage reports -htmlcov/ -.tox/ -.coverage -.coverage.* -.cache -nosetests.xml -coverage.xml -*.cover -.hypothesis/ -.pytest_cache/ - -# Translations -*.mo -*.pot - -# Django stuff: -*.log -local_settings.py -db.sqlite3 - -# Flask stuff: -instance/ -.webassets-cache - -# Scrapy stuff: -.scrapy - -# Sphinx documentation -docs/_build/ - -# PyBuilder -target/ - -# Jupyter Notebook -.ipynb_checkpoints - -# pyenv -.python-version - -# celery beat schedule file -celerybeat-schedule - -# SageMath parsed files -*.sage.py - -# Environments -.env -.venv -env/ -venv/ -ENV/ -env.bak/ -venv.bak/ - -# Spyder project settings -.spyderproject -.spyproject - -# Rope project settings -.ropeproject - -# mkdocs documentation -/site - -# mypy -.mypy_cache/ - -# node_modules -node_modules/ - -# egg-info -*.egg-info - -tmp/ diff --git a/crawlab/__init__.py b/crawlab/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/crawlab/app.py b/crawlab/app.py deleted file mode 100644 index 484e9234..00000000 --- a/crawlab/app.py +++ /dev/null @@ -1,106 +0,0 @@ -import os -import sys -from multiprocessing import Process - -from flask import Flask -from flask_cors import CORS -from flask_restful import Api -# from flask_restplus import Api - -file_dir = os.path.dirname(os.path.realpath(__file__)) -root_path = os.path.abspath(os.path.join(file_dir, '.')) -sys.path.append(root_path) - -from utils.log import other -from constants.node import NodeStatus -from db.manager import db_manager -from routes.schedules import ScheduleApi -from tasks.celery import celery_app -from tasks.scheduler import scheduler -from config import FLASK_HOST, FLASK_PORT, PROJECT_LOGS_FOLDER -from routes.sites import SiteApi -from routes.deploys import DeployApi -from routes.files import FileApi -from routes.nodes import NodeApi -from routes.spiders import SpiderApi, SpiderImportApi, SpiderManageApi -from routes.stats import StatsApi -from routes.tasks import TaskApi - -# flask app instance -app = Flask(__name__) -app.config.from_object('config') - -# init flask api instance -api = Api(app) - -# cors support -CORS(app, supports_credentials=True) - -# reference api routes -api.add_resource(NodeApi, - '/api/nodes', - '/api/nodes/', - '/api/nodes//') -api.add_resource(SpiderApi, - '/api/spiders', - '/api/spiders/', - '/api/spiders//') -api.add_resource(SpiderImportApi, - '/api/spiders/import/') -api.add_resource(SpiderManageApi, - '/api/spiders/manage/') -api.add_resource(TaskApi, - '/api/tasks', - '/api/tasks/', - '/api/tasks//') -api.add_resource(DeployApi, - '/api/deploys', - '/api/deploys/', - '/api/deploys//') -api.add_resource(FileApi, - '/api/files', - '/api/files/') -api.add_resource(StatsApi, - '/api/stats', - '/api/stats/') -api.add_resource(ScheduleApi, - '/api/schedules', - '/api/schedules/') -api.add_resource(SiteApi, - '/api/sites', - '/api/sites/', - '/api/sites/get/') - - -def monitor_nodes_status(celery_app): - def update_nodes_status(event): - node_id = event.get('hostname') - db_manager.update_one('nodes', id=node_id, values={ - 'status': NodeStatus.ONLINE - }) - - def update_nodes_status_online(event): - other.info(f"{event}") - - with celery_app.connection() as connection: - recv = celery_app.events.Receiver(connection, handlers={ - 'worker-heartbeat': update_nodes_status, - # 'worker-online': update_nodes_status_online, - }) - recv.capture(limit=None, timeout=None, wakeup=True) - - -# run scheduler as a separate process -scheduler.run() - -# monitor node status -p_monitor = Process(target=monitor_nodes_status, args=(celery_app,)) -p_monitor.start() - -# create folder if it does not exist -if not os.path.exists(PROJECT_LOGS_FOLDER): - os.makedirs(PROJECT_LOGS_FOLDER) - -if __name__ == '__main__': - # run app instance - app.run(host=FLASK_HOST, port=FLASK_PORT) diff --git a/crawlab/config/__init__.py b/crawlab/config/__init__.py deleted file mode 100644 index 4d2d8d10..00000000 --- a/crawlab/config/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -# encoding: utf-8 - -from config.config import * diff --git a/crawlab/config/config.py b/crawlab/config/config.py deleted file mode 100644 index 749ecdba..00000000 --- a/crawlab/config/config.py +++ /dev/null @@ -1,52 +0,0 @@ -import os - -BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) - -# 爬虫源码路径 -PROJECT_SOURCE_FILE_FOLDER = os.path.join(BASE_DIR, "spiders") - -# 爬虫部署路径 -PROJECT_DEPLOY_FILE_FOLDER = '/var/crawlab' - -# 爬虫日志路径 -PROJECT_LOGS_FOLDER = '/var/log/crawlab' - -# 打包临时文件夹 -PROJECT_TMP_FOLDER = '/tmp' - -# MongoDB 变量 -MONGO_HOST = '127.0.0.1' -MONGO_PORT = 27017 -MONGO_USERNAME = None -MONGO_PASSWORD = None -MONGO_DB = 'crawlab_test' -MONGO_AUTH_DB = 'crawlab_test' - -# Celery中间者URL -BROKER_URL = 'redis://127.0.0.1:6379/0' - -# Celery后台URL -if MONGO_USERNAME is not None: - CELERY_RESULT_BACKEND = f'mongodb://{MONGO_USERNAME}:{MONGO_PASSWORD}@{MONGO_HOST}:{MONGO_PORT}/' -else: - CELERY_RESULT_BACKEND = f'mongodb://{MONGO_HOST}:{MONGO_PORT}/' - -# Celery MongoDB设置 -CELERY_MONGODB_BACKEND_SETTINGS = { - 'database': 'crawlab_test', - 'taskmeta_collection': 'tasks_celery', -} - -# Celery时区 -CELERY_TIMEZONE = 'Asia/Shanghai' - -# 是否启用UTC -CELERY_ENABLE_UTC = True - -# flower variables -FLOWER_API_ENDPOINT = 'http://localhost:5555/api' - -# Flask 变量 -DEBUG = False -FLASK_HOST = '0.0.0.0' -FLASK_PORT = 8000 diff --git a/crawlab/constants/__init__.py b/crawlab/constants/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/crawlab/constants/file.py b/crawlab/constants/file.py deleted file mode 100644 index 7418936e..00000000 --- a/crawlab/constants/file.py +++ /dev/null @@ -1,3 +0,0 @@ -class FileType: - FILE = 1 - FOLDER = 2 diff --git a/crawlab/constants/lang.py b/crawlab/constants/lang.py deleted file mode 100644 index b0dfdb4e..00000000 --- a/crawlab/constants/lang.py +++ /dev/null @@ -1,4 +0,0 @@ -class LangType: - PYTHON = 1 - NODE = 2 - GO = 3 diff --git a/crawlab/constants/manage.py b/crawlab/constants/manage.py deleted file mode 100644 index 1c57837d..00000000 --- a/crawlab/constants/manage.py +++ /dev/null @@ -1,6 +0,0 @@ -class ActionType: - APP = 'app' - FLOWER = 'flower' - WORKER = 'worker' - SCHEDULER = 'scheduler' - RUN_ALL = 'run_all' diff --git a/crawlab/constants/node.py b/crawlab/constants/node.py deleted file mode 100644 index 4cdc3fda..00000000 --- a/crawlab/constants/node.py +++ /dev/null @@ -1,3 +0,0 @@ -class NodeStatus: - ONLINE = 'online' - OFFLINE = 'offline' diff --git a/crawlab/constants/spider.py b/crawlab/constants/spider.py deleted file mode 100644 index 97cbbdf2..00000000 --- a/crawlab/constants/spider.py +++ /dev/null @@ -1,44 +0,0 @@ -class SpiderType: - CONFIGURABLE = 'configurable' - CUSTOMIZED = 'customized' - - -class LangType: - PYTHON = 'python' - JAVASCRIPT = 'javascript' - JAVA = 'java' - GO = 'go' - OTHER = 'other' - - -class CronEnabled: - ON = 1 - OFF = 0 - - -class CrawlType: - LIST = 'list' - DETAIL = 'detail' - LIST_DETAIL = 'list-detail' - - -class QueryType: - CSS = 'css' - XPATH = 'xpath' - - -class ExtractType: - TEXT = 'text' - ATTRIBUTE = 'attribute' - - -SUFFIX_IGNORE = [ - 'pyc' -] - -FILE_SUFFIX_LANG_MAPPING = { - 'py': LangType.PYTHON, - 'js': LangType.JAVASCRIPT, - 'java': LangType.JAVA, - 'go': LangType.GO, -} diff --git a/crawlab/constants/task.py b/crawlab/constants/task.py deleted file mode 100644 index 07e169b3..00000000 --- a/crawlab/constants/task.py +++ /dev/null @@ -1,8 +0,0 @@ -class TaskStatus: - PENDING = 'PENDING' - STARTED = 'STARTED' - SUCCESS = 'SUCCESS' - FAILURE = 'FAILURE' - RETRY = 'RETRY' - REVOKED = 'REVOKED' - UNAVAILABLE = 'UNAVAILABLE' diff --git a/crawlab/db/__init__.py b/crawlab/db/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/crawlab/db/manager.py b/crawlab/db/manager.py deleted file mode 100644 index ac157dfb..00000000 --- a/crawlab/db/manager.py +++ /dev/null @@ -1,189 +0,0 @@ -from bson import ObjectId -from pymongo import MongoClient, DESCENDING -from config import MONGO_HOST, MONGO_PORT, MONGO_DB, MONGO_USERNAME, MONGO_PASSWORD, MONGO_AUTH_DB -from utils import is_object_id - - -class DbManager(object): - __doc__ = """ - Database Manager class for handling database CRUD actions. - """ - - def __init__(self): - self.mongo = MongoClient(host=MONGO_HOST, - port=MONGO_PORT, - username=MONGO_USERNAME, - password=MONGO_PASSWORD, - authSource=MONGO_AUTH_DB or MONGO_DB, - connect=False) - self.db = self.mongo[MONGO_DB] - - def save(self, col_name: str, item: dict, **kwargs) -> None: - """ - Save the item in the specified collection - :param col_name: collection name - :param item: item object - """ - col = self.db[col_name] - - # in case some fields cannot be saved in MongoDB - if item.get('stats') is not None: - item.pop('stats') - - return col.save(item, **kwargs) - - def remove(self, col_name: str, cond: dict, **kwargs) -> None: - """ - Remove items given specified condition. - :param col_name: collection name - :param cond: condition or filter - """ - col = self.db[col_name] - col.remove(cond, **kwargs) - - def update(self, col_name: str, cond: dict, values: dict, **kwargs): - """ - Update items given specified condition. - :param col_name: collection name - :param cond: condition or filter - :param values: values to update - """ - col = self.db[col_name] - col.update(cond, {'$set': values}, **kwargs) - - def update_one(self, col_name: str, id: str, values: dict, **kwargs): - """ - Update an item given specified _id - :param col_name: collection name - :param id: _id - :param values: values to update - """ - col = self.db[col_name] - _id = id - if is_object_id(id): - _id = ObjectId(id) - # print('UPDATE: _id = "%s", values = %s' % (str(_id), jsonify(values))) - col.find_one_and_update({'_id': _id}, {'$set': values}) - - def remove_one(self, col_name: str, id: str, **kwargs): - """ - Remove an item given specified _id - :param col_name: collection name - :param id: _id - """ - col = self.db[col_name] - _id = id - if is_object_id(id): - _id = ObjectId(id) - col.remove({'_id': _id}) - - def list(self, col_name: str, cond: dict, sort_key=None, sort_direction=DESCENDING, skip: int = 0, limit: int = 100, - **kwargs) -> list: - """ - Return a list of items given specified condition, sort_key, sort_direction, skip, and limit. - :param col_name: collection name - :param cond: condition or filter - :param sort_key: key to sort - :param sort_direction: sort direction - :param skip: skip number - :param limit: limit number - """ - if sort_key is None: - sort_key = '_i' - col = self.db[col_name] - data = [] - for item in col.find(cond).sort(sort_key, sort_direction).skip(skip).limit(limit): - data.append(item) - return data - - def _get(self, col_name: str, cond: dict) -> dict: - """ - Get an item given specified condition. - :param col_name: collection name - :param cond: condition or filter - """ - col = self.db[col_name] - return col.find_one(cond) - - def get(self, col_name: str, id: (ObjectId, str)) -> dict: - """ - Get an item given specified _id. - :param col_name: collection name - :param id: _id - """ - if type(id) == ObjectId: - _id = id - elif is_object_id(id): - _id = ObjectId(id) - else: - _id = id - return self._get(col_name=col_name, cond={'_id': _id}) - - def get_one_by_key(self, col_name: str, key, value) -> dict: - """ - Get an item given key/value condition. - :param col_name: collection name - :param key: key - :param value: value - """ - return self._get(col_name=col_name, cond={key: value}) - - def count(self, col_name: str, cond) -> int: - """ - Get total count of a collection given specified condition - :param col_name: collection name - :param cond: condition or filter - """ - col = self.db[col_name] - return col.count(cond) - - def get_latest_version(self, spider_id, node_id): - """ - @deprecated - """ - col = self.db['deploys'] - for item in col.find({'spider_id': ObjectId(spider_id), 'node_id': node_id}) \ - .sort('version', DESCENDING): - return item.get('version') - return None - - def get_last_deploy(self, spider_id): - """ - Get latest deploy for a given spider_id - """ - col = self.db['deploys'] - for item in col.find({'spider_id': ObjectId(spider_id)}) \ - .sort('finish_ts', DESCENDING): - return item - return None - - def get_last_task(self, spider_id): - """ - Get latest deploy for a given spider_id - """ - col = self.db['tasks'] - for item in col.find({'spider_id': ObjectId(spider_id)}) \ - .sort('create_ts', DESCENDING): - return item - return None - - def aggregate(self, col_name: str, pipelines, **kwargs): - """ - Perform MongoDB col.aggregate action to aggregate stats given collection name and pipelines. - Reference: https://docs.mongodb.com/manual/reference/command/aggregate/ - :param col_name: collection name - :param pipelines: pipelines - """ - col = self.db[col_name] - return col.aggregate(pipelines, **kwargs) - - def create_index(self, col_name: str, keys: dict, **kwargs): - col = self.db[col_name] - col.create_index(keys=keys, **kwargs) - - def distinct(self, col_name: str, key: str, filter: dict): - col = self.db[col_name] - return sorted(col.distinct(key, filter)) - - -db_manager = DbManager() diff --git a/crawlab/flower.py b/crawlab/flower.py deleted file mode 100644 index 818d96f7..00000000 --- a/crawlab/flower.py +++ /dev/null @@ -1,20 +0,0 @@ -import os -import sys -import subprocess - -# make sure the working directory is in system path -FILE_DIR = os.path.dirname(os.path.realpath(__file__)) -ROOT_PATH = os.path.abspath(os.path.join(FILE_DIR, '..')) -sys.path.append(ROOT_PATH) - -from utils.log import other -from config import BROKER_URL - -if __name__ == '__main__': - p = subprocess.Popen([sys.executable, '-m', 'celery', 'flower', '-b', BROKER_URL], - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - cwd=ROOT_PATH) - for line in iter(p.stdout.readline, 'b'): - if line.decode('utf-8') != '': - other.info(line.decode('utf-8')) diff --git a/crawlab/requirements.txt b/crawlab/requirements.txt deleted file mode 100644 index 282c4cc2..00000000 --- a/crawlab/requirements.txt +++ /dev/null @@ -1,17 +0,0 @@ -Flask_CSV==1.2.0 -gevent==1.4.0 -requests==2.21.0 -Scrapy==1.6.0 -pymongo==3.7.2 -APScheduler==3.6.0 -coloredlogs==10.0 -Flask_RESTful==0.3.7 -Flask==1.0.2 -lxml==4.3.3 -Flask_Cors==3.0.7 -Werkzeug==0.15.2 -eventlet -Celery -Flower -redis -gunicorn diff --git a/crawlab/routes/__init__.py b/crawlab/routes/__init__.py deleted file mode 100644 index e3b7fa36..00000000 --- a/crawlab/routes/__init__.py +++ /dev/null @@ -1,7 +0,0 @@ -# from app import api -# from routes.deploys import DeployApi -# from routes.files import FileApi -# from routes.nodes import NodeApi -# from routes.spiders import SpiderApi -# from routes.tasks import TaskApi -# print(api) diff --git a/crawlab/routes/base.py b/crawlab/routes/base.py deleted file mode 100644 index 3bb2c1b0..00000000 --- a/crawlab/routes/base.py +++ /dev/null @@ -1,189 +0,0 @@ -from flask_restful import reqparse, Resource -# from flask_restplus import reqparse, Resource - -from db.manager import db_manager -from utils import jsonify - -DEFAULT_ARGS = [ - 'page_num', - 'page_size', - 'filter' -] - - -class BaseApi(Resource): - """ - Base class for API. All API classes should inherit this class. - """ - col_name = 'tmp' - parser = reqparse.RequestParser() - arguments = [] - - def __init__(self): - super(BaseApi).__init__() - self.parser.add_argument('page_num', type=int) - self.parser.add_argument('page_size', type=int) - self.parser.add_argument('filter', type=str) - - for arg, type in self.arguments: - self.parser.add_argument(arg, type=type) - - def get(self, id: str = None, action: str = None) -> (dict, tuple): - """ - GET method for retrieving item information. - If id is specified and action is not, return the object of the given id; - If id and action are both specified, execute the given action results of the given id; - If neither id nor action is specified, return the list of items given the page_size, page_num and filter - :param id: - :param action: - :return: - """ - # import pdb - # pdb.set_trace() - args = self.parser.parse_args() - - # action by id - if action is not None: - if not hasattr(self, action): - return { - 'status': 'ok', - 'code': 400, - 'error': 'action "%s" invalid' % action - }, 400 - return getattr(self, action)(id) - - # list items - elif id is None: - # filter - cond = {} - if args.get('filter') is not None: - cond = args.filter - # cond = json.loads(args.filter) - - # page number - page = 1 - if args.get('page_num') is not None: - page = args.page - # page = int(args.page) - - # page size - page_size = 10 - if args.get('page_size') is not None: - page_size = args.page_size - # page = int(args.page_size) - - # TODO: sort functionality - - # total count - total_count = db_manager.count(col_name=self.col_name, cond=cond) - - # items - items = db_manager.list(col_name=self.col_name, - cond=cond, - skip=(page - 1) * page_size, - limit=page_size) - - # TODO: getting status for node - - return { - 'status': 'ok', - 'total_count': total_count, - 'page_num': page, - 'page_size': page_size, - 'items': jsonify(items) - } - - # get item by id - else: - return jsonify(db_manager.get(col_name=self.col_name, id=id)) - - def put(self) -> (dict, tuple): - """ - PUT method for creating a new item. - :return: - """ - args = self.parser.parse_args() - item = {} - for k in args.keys(): - if k not in DEFAULT_ARGS: - item[k] = args.get(k) - id = db_manager.save(col_name=self.col_name, item=item) - - # execute after_update hook - self.after_update(id) - - return jsonify(id) - - def update(self, id: str = None) -> (dict, tuple): - """ - Helper function for update action given the id. - :param id: - :return: - """ - args = self.parser.parse_args() - item = db_manager.get(col_name=self.col_name, id=id) - if item is None: - return { - 'status': 'ok', - 'code': 401, - 'error': 'item not exists' - }, 401 - values = {} - for k in args.keys(): - if k not in DEFAULT_ARGS: - if args.get(k) is not None: - values[k] = args.get(k) - item = db_manager.update_one(col_name=self.col_name, id=id, values=values) - - # execute after_update hook - self.after_update(id) - - return jsonify(item) - - def post(self, id: str = None, action: str = None): - """ - POST method of the given id for performing an action. - :param id: - :param action: - :return: - """ - # perform update action if action is not specified - if action is None: - return self.update(id) - - # if action is not defined in the attributes, return 400 error - if not hasattr(self, action): - return { - 'status': 'ok', - 'code': 400, - 'error': 'action "%s" invalid' % action - }, 400 - - # perform specified action of given id - return getattr(self, action)(id) - - def delete(self, id: str = None) -> (dict, tuple): - """ - DELETE method of given id for deleting an item. - :param id: - :return: - """ - # perform delete action - db_manager.remove_one(col_name=self.col_name, id=id) - - # execute after_update hook - self.after_update(id) - - return { - 'status': 'ok', - 'message': 'deleted successfully', - } - - def after_update(self, id: str = None): - """ - This is the after update hook once the update method is performed. - To be overridden. - :param id: - :return: - """ - pass diff --git a/crawlab/routes/deploys.py b/crawlab/routes/deploys.py deleted file mode 100644 index 885173c1..00000000 --- a/crawlab/routes/deploys.py +++ /dev/null @@ -1,46 +0,0 @@ -from db.manager import db_manager -from routes.base import BaseApi -from utils import jsonify - - -class DeployApi(BaseApi): - col_name = 'deploys' - - arguments = ( - ('spider_id', str), - ('node_id', str), - ) - - def get(self, id: str = None, action: str = None) -> (dict, tuple): - """ - GET method of DeployAPI. - :param id: deploy_id - :param action: action - """ - # action by id - if action is not None: - if not hasattr(self, action): - return { - 'status': 'ok', - 'code': 400, - 'error': 'action "%s" invalid' % action - }, 400 - return getattr(self, action)(id) - - # get one node - elif id is not None: - return jsonify(db_manager.get('deploys', id=id)) - - # get a list of items - else: - items = db_manager.list('deploys', {}) - deploys = [] - for item in items: - spider_id = item['spider_id'] - spider = db_manager.get('spiders', id=str(spider_id)) - item['spider_name'] = spider['name'] - deploys.append(item) - return { - 'status': 'ok', - 'items': jsonify(deploys) - } diff --git a/crawlab/routes/files.py b/crawlab/routes/files.py deleted file mode 100644 index ef80e20b..00000000 --- a/crawlab/routes/files.py +++ /dev/null @@ -1,50 +0,0 @@ -import os - -from flask_restful import reqparse, Resource - -from utils import jsonify -from utils.file import get_file_content - - -class FileApi(Resource): - parser = reqparse.RequestParser() - arguments = [] - - def __init__(self): - super(FileApi).__init__() - self.parser.add_argument('path', type=str) - - def get(self, action=None): - """ - GET method of FileAPI. - :param action: action - """ - args = self.parser.parse_args() - path = args.get('path') - - if action is not None: - if action == 'getDefaultPath': - return jsonify({ - 'defaultPath': os.path.abspath(os.path.join(os.path.curdir, 'spiders')) - }) - - elif action == 'get_file': - file_data = get_file_content(path) - file_data['status'] = 'ok' - return jsonify(file_data) - - else: - return {} - - folders = [] - files = [] - for _path in os.listdir(path): - if os.path.isfile(os.path.join(path, _path)): - files.append(_path) - elif os.path.isdir(os.path.join(path, _path)): - folders.append(_path) - return jsonify({ - 'status': 'ok', - 'files': sorted(files), - 'folders': sorted(folders), - }) diff --git a/crawlab/routes/nodes.py b/crawlab/routes/nodes.py deleted file mode 100644 index 4ab7a4ad..00000000 --- a/crawlab/routes/nodes.py +++ /dev/null @@ -1,81 +0,0 @@ -from constants.task import TaskStatus -from db.manager import db_manager -from routes.base import BaseApi -from utils import jsonify -from utils.node import update_nodes_status - - -class NodeApi(BaseApi): - col_name = 'nodes' - - arguments = ( - ('name', str), - ('description', str), - ('ip', str), - ('port', str), - ) - - def get(self, id: str = None, action: str = None) -> (dict, tuple): - """ - GET method of NodeAPI. - :param id: item id - :param action: action - """ - # action by id - if action is not None: - if not hasattr(self, action): - return { - 'status': 'ok', - 'code': 400, - 'error': 'action "%s" invalid' % action - }, 400 - return getattr(self, action)(id) - - # get one node - elif id is not None: - return db_manager.get('nodes', id=id) - - # get a list of items - else: - # get a list of active nodes from flower and save to db - update_nodes_status() - - # iterate db nodes to update status - nodes = db_manager.list('nodes', {}) - - return { - 'status': 'ok', - 'items': jsonify(nodes) - } - - def get_deploys(self, id: str) -> (dict, tuple): - """ - Get a list of latest deploys of given node_id - :param id: node_id - """ - items = db_manager.list('deploys', {'node_id': id}, limit=10, sort_key='finish_ts') - deploys = [] - for item in items: - spider_id = item['spider_id'] - spider = db_manager.get('spiders', id=str(spider_id)) - item['spider_name'] = spider['name'] - deploys.append(item) - return { - 'status': 'ok', - 'items': jsonify(deploys) - } - - def get_tasks(self, id): - """ - Get a list of latest tasks of given node_id - :param id: node_id - """ - items = db_manager.list('tasks', {'node_id': id}, limit=10, sort_key='create_ts') - for item in items: - spider_id = item['spider_id'] - spider = db_manager.get('spiders', id=str(spider_id)) - item['spider_name'] = spider['name'] - return { - 'status': 'ok', - 'items': jsonify(items) - } diff --git a/crawlab/routes/schedules.py b/crawlab/routes/schedules.py deleted file mode 100644 index 01db8be1..00000000 --- a/crawlab/routes/schedules.py +++ /dev/null @@ -1,25 +0,0 @@ -import json - -import requests - -from constants.task import TaskStatus -from db.manager import db_manager -from routes.base import BaseApi -from tasks.scheduler import scheduler -from utils import jsonify -from utils.spider import get_spider_col_fields - - -class ScheduleApi(BaseApi): - col_name = 'schedules' - - arguments = ( - ('name', str), - ('description', str), - ('cron', str), - ('spider_id', str), - ('params', str) - ) - - def after_update(self, id: str = None): - scheduler.update() diff --git a/crawlab/routes/sites.py b/crawlab/routes/sites.py deleted file mode 100644 index e49dbe37..00000000 --- a/crawlab/routes/sites.py +++ /dev/null @@ -1,90 +0,0 @@ -import json - -from bson import ObjectId -from pymongo import ASCENDING - -from db.manager import db_manager -from routes.base import BaseApi -from utils import jsonify - - -class SiteApi(BaseApi): - col_name = 'sites' - - arguments = ( - ('keyword', str), - ('main_category', str), - ('category', str), - ) - - def get(self, id: str = None, action: str = None): - # action by id - if action is not None: - if not hasattr(self, action): - return { - 'status': 'ok', - 'code': 400, - 'error': 'action "%s" invalid' % action - }, 400 - return getattr(self, action)(id) - - elif id is not None: - site = db_manager.get(col_name=self.col_name, id=id) - return jsonify(site) - - # list tasks - args = self.parser.parse_args() - page_size = args.get('page_size') or 10 - page_num = args.get('page_num') or 1 - filter_str = args.get('filter') - keyword = args.get('keyword') - filter_ = {} - if filter_str is not None: - filter_ = json.loads(filter_str) - if keyword is not None: - filter_['$or'] = [ - {'description': {'$regex': keyword}}, - {'name': {'$regex': keyword}}, - {'domain': {'$regex': keyword}} - ] - - items = db_manager.list( - col_name=self.col_name, - cond=filter_, - limit=page_size, - skip=page_size * (page_num - 1), - sort_key='rank', - sort_direction=ASCENDING - ) - - sites = [] - for site in items: - # get spider count - site['spider_count'] = db_manager.count('spiders', {'site': site['_id']}) - - sites.append(site) - - return { - 'status': 'ok', - 'total_count': db_manager.count(self.col_name, filter_), - 'page_num': page_num, - 'page_size': page_size, - 'items': jsonify(sites) - } - - def get_main_category_list(self, id): - return { - 'status': 'ok', - 'items': db_manager.distinct(col_name=self.col_name, key='main_category', filter={}) - } - - def get_category_list(self, id): - args = self.parser.parse_args() - filter_ = {} - if args.get('main_category') is not None: - filter_['main_category'] = args.get('main_category') - return { - 'status': 'ok', - 'items': db_manager.distinct(col_name=self.col_name, key='category', - filter=filter_) - } diff --git a/crawlab/routes/spiders.py b/crawlab/routes/spiders.py deleted file mode 100644 index e3d897cb..00000000 --- a/crawlab/routes/spiders.py +++ /dev/null @@ -1,937 +0,0 @@ -import json -import os -import shutil -import subprocess -from datetime import datetime -from random import random -from urllib.parse import urlparse - -import gevent -import requests -from bson import ObjectId -from flask import current_app, request -from flask_restful import reqparse, Resource -from lxml import etree -from werkzeug.datastructures import FileStorage - -from config import PROJECT_DEPLOY_FILE_FOLDER, PROJECT_SOURCE_FILE_FOLDER, PROJECT_TMP_FOLDER -from constants.node import NodeStatus -from constants.spider import SpiderType, CrawlType, QueryType, ExtractType -from constants.task import TaskStatus -from db.manager import db_manager -from routes.base import BaseApi -from tasks.scheduler import scheduler -from tasks.spider import execute_spider, execute_config_spider -from utils import jsonify -from utils.deploy import zip_file, unzip_file -from utils.file import get_file_suffix_stats, get_file_suffix -from utils.spider import get_lang_by_stats, get_last_n_run_errors_count, get_last_n_day_tasks_count, get_list_page_data, \ - get_detail_page_data, generate_urls - -parser = reqparse.RequestParser() -parser.add_argument('file', type=FileStorage, location='files') - -IGNORE_DIRS = [ - '.idea' -] - - -class SpiderApi(BaseApi): - col_name = 'spiders' - - arguments = ( - # name of spider - ('name', str), - - # execute shell command - ('cmd', str), - - # spider source folder - ('src', str), - - # spider type - ('type', str), - - # spider language - ('lang', str), - - # spider results collection - ('col', str), - - # spider schedule cron - ('cron', str), - - # spider schedule cron enabled - ('cron_enabled', int), - - # spider schedule cron enabled - ('envs', str), - - # spider site - ('site', str), - - ######################## - # Configurable Spider - ######################## - - # spider crawl fields for list page - ('fields', str), - - # spider crawl fields for detail page - ('detail_fields', str), - - # spider crawl type - ('crawl_type', str), - - # spider start url - ('start_url', str), - - # url pattern: support generation of urls with patterns - ('url_pattern', str), - - # spider item selector - ('item_selector', str), - - # spider item selector type - ('item_selector_type', str), - - # spider pagination selector - ('pagination_selector', str), - - # spider pagination selector type - ('pagination_selector_type', str), - - # whether to obey robots.txt - ('obey_robots_txt', bool), - - # item threshold to filter out non-relevant list items - ('item_threshold', int), - ) - - def get(self, id=None, action=None): - """ - GET method of SpiderAPI. - :param id: spider_id - :param action: action - """ - # action by id - if action is not None: - if not hasattr(self, action): - return { - 'status': 'ok', - 'code': 400, - 'error': 'action "%s" invalid' % action - }, 400 - return getattr(self, action)(id) - - # get one node - elif id is not None: - spider = db_manager.get('spiders', id=id) - - # get deploy - last_deploy = db_manager.get_last_deploy(spider_id=spider['_id']) - if last_deploy is not None: - spider['deploy_ts'] = last_deploy['finish_ts'] - - return jsonify(spider) - - # get a list of items - else: - items = [] - - # get customized spiders - dirs = os.listdir(PROJECT_SOURCE_FILE_FOLDER) - for _dir in dirs: - if _dir in IGNORE_DIRS: - continue - - dir_path = os.path.join(PROJECT_SOURCE_FILE_FOLDER, _dir) - dir_name = _dir - spider = db_manager.get_one_by_key('spiders', key='src', value=dir_path) - - # new spider - if spider is None: - stats = get_file_suffix_stats(dir_path) - lang = get_lang_by_stats(stats) - spider_id = db_manager.save('spiders', { - 'name': dir_name, - 'src': dir_path, - 'lang': lang, - 'suffix_stats': stats, - 'type': SpiderType.CUSTOMIZED - }) - spider = db_manager.get('spiders', id=spider_id) - - # existing spider - else: - # get last deploy - last_deploy = db_manager.get_last_deploy(spider_id=spider['_id']) - if last_deploy is not None: - spider['deploy_ts'] = last_deploy['finish_ts'] - - # file stats - stats = get_file_suffix_stats(dir_path) - - # language - lang = get_lang_by_stats(stats) - - # spider type - type_ = SpiderType.CUSTOMIZED - - # update spider data - db_manager.update_one('spiders', id=str(spider['_id']), values={ - 'lang': lang, - 'type': type_, - 'suffix_stats': stats, - }) - - # append spider - items.append(spider) - - # get configurable spiders - for spider in db_manager.list('spiders', {'type': SpiderType.CONFIGURABLE}): - # append spider - items.append(spider) - - # get other info - for i in range(len(items)): - spider = items[i] - - # get site - if spider.get('site') is not None: - site = db_manager.get('sites', spider['site']) - if site is not None: - items[i]['site_name'] = site['name'] - - # get last task - last_task = db_manager.get_last_task(spider_id=spider['_id']) - if last_task is not None: - items[i]['task_ts'] = last_task['create_ts'] - - # --------- - # stats - # --------- - # last 5-run errors - items[i]['last_5_errors'] = get_last_n_run_errors_count(spider_id=spider['_id'], n=5) - items[i]['last_7d_tasks'] = get_last_n_day_tasks_count(spider_id=spider['_id'], n=5) - - # sort spiders by _id descending - items = reversed(sorted(items, key=lambda x: x['_id'])) - - return { - 'status': 'ok', - 'items': jsonify(items) - } - - def delete(self, id: str = None) -> (dict, tuple): - """ - DELETE method of given id for deleting an spider. - :param id: - :return: - """ - # get spider from db - spider = db_manager.get(col_name=self.col_name, id=id) - - # delete spider folder - if spider.get('type') == SpiderType.CUSTOMIZED: - try: - shutil.rmtree(os.path.abspath(os.path.join(PROJECT_SOURCE_FILE_FOLDER, spider['src']))) - except Exception as err: - return { - 'status': 'ok', - 'error': str(err) - }, 500 - - # perform delete action - db_manager.remove_one(col_name=self.col_name, id=id) - - # remove related tasks - db_manager.remove(col_name='tasks', cond={'spider_id': spider['_id']}) - - # remove related schedules - db_manager.remove(col_name='schedules', cond={'spider_id': spider['_id']}) - - # execute after_update hook - self.after_update(id) - - return { - 'status': 'ok', - 'message': 'deleted successfully', - } - - def crawl(self, id: str) -> (dict, tuple): - """ - Submit an HTTP request to start a crawl task in the node of given spider_id. - @deprecated - :param id: spider_id - """ - args = self.parser.parse_args() - node_id = args.get('node_id') - - if node_id is None: - return { - 'code': 400, - 'status': 400, - 'error': 'node_id cannot be empty' - }, 400 - - # get node from db - node = db_manager.get('nodes', id=node_id) - - # validate ip and port - if node.get('ip') is None or node.get('port') is None: - return { - 'code': 400, - 'status': 'ok', - 'error': 'node ip and port should not be empty' - }, 400 - - # dispatch crawl task - res = requests.get('http://%s:%s/api/spiders/%s/on_crawl?node_id=%s' % ( - node.get('ip'), - node.get('port'), - id, - node_id - )) - data = json.loads(res.content.decode('utf-8')) - return { - 'code': res.status_code, - 'status': 'ok', - 'error': data.get('error'), - 'task': data.get('task') - } - - def on_crawl(self, id: str) -> (dict, tuple): - """ - Start a crawl task. - :param id: spider_id - :return: - """ - args = self.parser.parse_args() - params = args.get('params') - - spider = db_manager.get('spiders', id=ObjectId(id)) - - # determine execute function - if spider['type'] == SpiderType.CONFIGURABLE: - # configurable spider - exec_func = execute_config_spider - else: - # customized spider - exec_func = execute_spider - - # trigger an asynchronous job - job = exec_func.delay(id, params) - - # create a new task - db_manager.save('tasks', { - '_id': job.id, - 'spider_id': ObjectId(id), - 'cmd': spider.get('cmd'), - 'params': params, - 'create_ts': datetime.utcnow(), - 'status': TaskStatus.PENDING - }) - - return { - 'code': 200, - 'status': 'ok', - 'task': { - 'id': job.id, - 'status': job.status - } - } - - def deploy(self, id: str) -> (dict, tuple): - """ - Submit HTTP requests to deploy the given spider to all nodes. - :param id: - :return: - """ - spider = db_manager.get('spiders', id=id) - nodes = db_manager.list('nodes', {'status': NodeStatus.ONLINE}) - - for node in nodes: - node_id = node['_id'] - - output_file_name = '%s_%s.zip' % ( - datetime.now().strftime('%Y%m%d%H%M%S'), - str(random())[2:12] - ) - output_file_path = os.path.join(PROJECT_TMP_FOLDER, output_file_name) - - # zip source folder to zip file - zip_file(source_dir=spider['src'], - output_filename=output_file_path) - - # upload to api - files = {'file': open(output_file_path, 'rb')} - r = requests.post('http://%s:%s/api/spiders/%s/deploy_file?node_id=%s' % ( - node.get('ip'), - node.get('port'), - id, - node_id, - ), files=files) - - # TODO: checkpoint for errors - - return { - 'code': 200, - 'status': 'ok', - 'message': 'deploy success' - } - - def deploy_file(self, id: str = None) -> (dict, tuple): - """ - Receive HTTP request of deploys and unzip zip files and copy to the destination directories. - :param id: spider_id - """ - args = parser.parse_args() - node_id = request.args.get('node_id') - f = args.file - - if get_file_suffix(f.filename) != 'zip': - return { - 'status': 'ok', - 'error': 'file type mismatch' - }, 400 - - # save zip file on temp folder - file_path = '%s/%s' % (PROJECT_TMP_FOLDER, f.filename) - with open(file_path, 'wb') as fw: - fw.write(f.stream.read()) - - # unzip zip file - dir_path = file_path.replace('.zip', '') - if os.path.exists(dir_path): - shutil.rmtree(dir_path) - unzip_file(file_path, dir_path) - - # get spider and version - spider = db_manager.get(col_name=self.col_name, id=id) - if spider is None: - return None, 400 - - # make source / destination - src = os.path.join(dir_path, os.listdir(dir_path)[0]) - dst = os.path.join(PROJECT_DEPLOY_FILE_FOLDER, str(spider.get('_id'))) - - # logging info - current_app.logger.info('src: %s' % src) - current_app.logger.info('dst: %s' % dst) - - # remove if the target folder exists - if os.path.exists(dst): - shutil.rmtree(dst) - - # copy from source to destination - shutil.copytree(src=src, dst=dst) - - # save to db - # TODO: task management for deployment - db_manager.save('deploys', { - 'spider_id': ObjectId(id), - 'node_id': node_id, - 'finish_ts': datetime.utcnow() - }) - - return { - 'code': 200, - 'status': 'ok', - 'message': 'deploy success' - } - - def get_deploys(self, id: str) -> (dict, tuple): - """ - Get a list of latest deploys of given spider_id - :param id: spider_id - """ - items = db_manager.list('deploys', cond={'spider_id': ObjectId(id)}, limit=10, sort_key='finish_ts') - deploys = [] - for item in items: - spider_id = item['spider_id'] - spider = db_manager.get('spiders', id=str(spider_id)) - item['spider_name'] = spider['name'] - deploys.append(item) - return { - 'status': 'ok', - 'items': jsonify(deploys) - } - - def get_tasks(self, id: str) -> (dict, tuple): - """ - Get a list of latest tasks of given spider_id - :param id: - """ - items = db_manager.list('tasks', cond={'spider_id': ObjectId(id)}, limit=10, sort_key='create_ts') - for item in items: - spider_id = item['spider_id'] - spider = db_manager.get('spiders', id=str(spider_id)) - item['spider_name'] = spider['name'] - if item.get('status') is None: - item['status'] = TaskStatus.UNAVAILABLE - return { - 'status': 'ok', - 'items': jsonify(items) - } - - def after_update(self, id: str = None) -> None: - """ - After each spider is updated, update the cron scheduler correspondingly. - :param id: spider_id - """ - scheduler.update() - - def update_envs(self, id: str): - """ - Update environment variables - :param id: spider_id - """ - args = self.parser.parse_args() - envs = json.loads(args.envs) - db_manager.update_one(col_name='spiders', id=id, values={'envs': envs}) - - def update_fields(self, id: str): - """ - Update list page fields variables for configurable spiders - :param id: spider_id - """ - args = self.parser.parse_args() - fields = json.loads(args.fields) - db_manager.update_one(col_name='spiders', id=id, values={'fields': fields}) - - def update_detail_fields(self, id: str): - """ - Update detail page fields variables for configurable spiders - :param id: spider_id - """ - args = self.parser.parse_args() - detail_fields = json.loads(args.detail_fields) - db_manager.update_one(col_name='spiders', id=id, values={'detail_fields': detail_fields}) - - @staticmethod - def _get_html(spider) -> etree.Element: - if spider['type'] != SpiderType.CONFIGURABLE: - return { - 'status': 'ok', - 'error': 'type %s is invalid' % spider['type'] - }, 400 - - if spider.get('start_url') is None: - return { - 'status': 'ok', - 'error': 'start_url should not be empty' - }, 400 - - try: - r = None - for url in generate_urls(spider['start_url']): - r = requests.get(url, headers={ - 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36' - }) - break - except Exception as err: - return { - 'status': 'ok', - 'error': 'connection error' - }, 500 - - if not r: - return { - 'status': 'ok', - 'error': 'response is not returned' - }, 500 - - if r and r.status_code != 200: - return { - 'status': 'ok', - 'error': 'status code is not 200, but %s' % r.status_code - }, r.status_code - - # get html parse tree - try: - sel = etree.HTML(r.content.decode('utf-8')) - except Exception as err: - sel = etree.HTML(r.content) - - # remove unnecessary tags - unnecessary_tags = [ - 'script' - ] - for t in unnecessary_tags: - etree.strip_tags(sel, t) - - return sel - - @staticmethod - def _get_children(sel): - return [tag for tag in sel.getchildren() if type(tag) != etree._Comment] - - @staticmethod - def _get_text_child_tags(sel): - tags = [] - for tag in sel.iter(): - if type(tag) != etree._Comment and tag.text is not None and tag.text.strip() != '': - tags.append(tag) - return tags - - @staticmethod - def _get_a_child_tags(sel): - tags = [] - for tag in sel.iter(): - if tag.tag == 'a': - if tag.get('href') is not None and not tag.get('href').startswith('#') and not tag.get( - 'href').startswith('javascript'): - tags.append(tag) - - return tags - - @staticmethod - def _get_next_page_tag(sel): - next_page_text_list = [ - '下一页', - '下页', - 'next page', - 'next', - '>' - ] - for tag in sel.iter(): - if tag.text is not None and tag.text.lower().strip() in next_page_text_list: - return tag - return None - - def preview_crawl(self, id: str): - spider = db_manager.get(col_name='spiders', id=id) - - # get html parse tree - sel = self._get_html(spider) - - # when error happens, return - if type(sel) == type(tuple): - return sel - - # parse fields - if spider['crawl_type'] == CrawlType.LIST: - if spider.get('item_selector') is None: - return { - 'status': 'ok', - 'error': 'item_selector should not be empty' - }, 400 - - data = get_list_page_data(spider, sel)[:10] - - return { - 'status': 'ok', - 'items': data - } - - elif spider['crawl_type'] == CrawlType.DETAIL: - # TODO: 详情页预览 - pass - - elif spider['crawl_type'] == CrawlType.LIST_DETAIL: - data = get_list_page_data(spider, sel)[:10] - - ev_list = [] - for idx, d in enumerate(data): - for f in spider['fields']: - if f.get('is_detail'): - url = d.get(f['name']) - if url is not None: - if not url.startswith('http') and not url.startswith('//'): - u = urlparse(spider['start_url']) - url = f'{u.scheme}://{u.netloc}{url}' - ev_list.append(gevent.spawn(get_detail_page_data, url, spider, idx, data)) - break - - gevent.joinall(ev_list) - - return { - 'status': 'ok', - 'items': data - } - - def extract_fields(self, id: str): - """ - Extract list fields from a web page - :param id: - :return: - """ - spider = db_manager.get(col_name='spiders', id=id) - - # get html parse tree - sel = self._get_html(spider) - - # when error happens, return - if type(sel) == tuple: - return sel - - list_tag_list = [] - threshold = spider.get('item_threshold') or 10 - # iterate all child nodes in a top-down direction - for tag in sel.iter(): - # get child tags - child_tags = self._get_children(tag) - - if len(child_tags) < threshold: - # if number of child tags is below threshold, skip - continue - else: - # have one or more child tags - child_tags_set = set(map(lambda x: x.tag, child_tags)) - - # if there are more than 1 tag names, skip - if len(child_tags_set) > 1: - continue - - # add as list tag - list_tag_list.append(tag) - - # find the list tag with the most child text tags - max_tag = None - max_num = 0 - for tag in list_tag_list: - _child_text_tags = self._get_text_child_tags(self._get_children(tag)[0]) - if len(_child_text_tags) > max_num: - max_tag = tag - max_num = len(_child_text_tags) - - # get list item selector - item_selector = None - item_selector_type = 'css' - if max_tag.get('id') is not None: - item_selector = f'#{max_tag.get("id")} > {self._get_children(max_tag)[0].tag}' - elif max_tag.get('class') is not None: - cls_str = '.'.join([x for x in max_tag.get("class").split(' ') if x != '']) - if len(sel.cssselect(f'.{cls_str}')) == 1: - item_selector = f'.{cls_str} > {self._get_children(max_tag)[0].tag}' - else: - item_selector = max_tag.getroottree().getpath(max_tag) - item_selector_type = 'xpath' - - # get list fields - fields = [] - if item_selector is not None: - first_tag = self._get_children(max_tag)[0] - for i, tag in enumerate(self._get_text_child_tags(first_tag)): - el_list = first_tag.cssselect(f'{tag.tag}') - if len(el_list) == 1: - fields.append({ - 'name': f'field{i + 1}', - 'type': 'css', - 'extract_type': 'text', - 'query': f'{tag.tag}', - }) - elif tag.get('class') is not None: - cls_str = '.'.join([x for x in tag.get("class").split(' ') if x != '']) - if len(tag.cssselect(f'{tag.tag}.{cls_str}')) == 1: - fields.append({ - 'name': f'field{i + 1}', - 'type': 'css', - 'extract_type': 'text', - 'query': f'{tag.tag}.{cls_str}', - }) - else: - for j, el in enumerate(el_list): - if tag == el: - fields.append({ - 'name': f'field{i + 1}', - 'type': 'css', - 'extract_type': 'text', - 'query': f'{tag.tag}:nth-of-type({j + 1})', - }) - - for i, tag in enumerate(self._get_a_child_tags(self._get_children(max_tag)[0])): - # if the tag is , extract its href - if tag.get('class') is not None: - cls_str = '.'.join([x for x in tag.get("class").split(' ') if x != '']) - fields.append({ - 'name': f'field{i + 1}_url', - 'type': 'css', - 'extract_type': 'attribute', - 'attribute': 'href', - 'query': f'{tag.tag}.{cls_str}', - }) - - # get pagination tag - pagination_selector = None - pagination_tag = self._get_next_page_tag(sel) - if pagination_tag is not None: - if pagination_tag.get('id') is not None: - pagination_selector = f'#{pagination_tag.get("id")}' - elif pagination_tag.get('class') is not None and len(sel.cssselect(f'.{pagination_tag.get("id")}')) == 1: - pagination_selector = f'.{pagination_tag.get("id")}' - - return { - 'status': 'ok', - 'item_selector': item_selector, - 'item_selector_type': item_selector_type, - 'pagination_selector': pagination_selector, - 'fields': fields - } - - -class SpiderImportApi(Resource): - __doc__ = """ - API for importing spiders from external resources including Github, Gitlab, and subversion (WIP) - """ - parser = reqparse.RequestParser() - arguments = [ - ('url', str) - ] - - def __init__(self): - super(SpiderImportApi).__init__() - for arg, type in self.arguments: - self.parser.add_argument(arg, type=type) - - def post(self, platform: str = None) -> (dict, tuple): - if platform is None: - return { - 'status': 'ok', - 'code': 404, - 'error': 'platform invalid' - }, 404 - - if not hasattr(self, platform): - return { - 'status': 'ok', - 'code': 400, - 'error': 'platform "%s" invalid' % platform - }, 400 - - return getattr(self, platform)() - - def github(self) -> None: - """ - Import Github API - """ - self._git() - - def gitlab(self) -> None: - """ - Import Gitlab API - """ - self._git() - - def _git(self): - """ - Helper method to perform github important (basically "git clone" method). - """ - args = self.parser.parse_args() - url = args.get('url') - if url is None: - return { - 'status': 'ok', - 'code': 400, - 'error': 'url should not be empty' - }, 400 - - try: - p = subprocess.Popen(['git', 'clone', url], cwd=PROJECT_SOURCE_FILE_FOLDER) - _stdout, _stderr = p.communicate() - except Exception as err: - return { - 'status': 'ok', - 'code': 500, - 'error': str(err) - }, 500 - - return { - 'status': 'ok', - 'message': 'success' - } - - -class SpiderManageApi(Resource): - parser = reqparse.RequestParser() - arguments = [ - ('url', str) - ] - - def post(self, action: str) -> (dict, tuple): - """ - POST method for SpiderManageAPI. - :param action: - """ - if not hasattr(self, action): - return { - 'status': 'ok', - 'code': 400, - 'error': 'action "%s" invalid' % action - }, 400 - - return getattr(self, action)() - - def deploy_all(self) -> (dict, tuple): - """ - Deploy all spiders to all nodes. - """ - # active nodes - nodes = db_manager.list('nodes', {'status': NodeStatus.ONLINE}) - - # all spiders - spiders = db_manager.list('spiders', {'cmd': {'$exists': True}}) - - # iterate all nodes - for node in nodes: - node_id = node['_id'] - for spider in spiders: - spider_id = spider['_id'] - spider_src = spider['src'] - - output_file_name = '%s_%s.zip' % ( - datetime.now().strftime('%Y%m%d%H%M%S'), - str(random())[2:12] - ) - output_file_path = os.path.join(PROJECT_TMP_FOLDER, output_file_name) - - # zip source folder to zip file - zip_file(source_dir=spider_src, - output_filename=output_file_path) - - # upload to api - files = {'file': open(output_file_path, 'rb')} - r = requests.post('http://%s:%s/api/spiders/%s/deploy_file?node_id=%s' % ( - node.get('ip'), - node.get('port'), - spider_id, - node_id, - ), files=files) - - return { - 'status': 'ok', - 'message': 'success' - } - - def upload(self): - f = request.files['file'] - - if get_file_suffix(f.filename) != 'zip': - return { - 'status': 'ok', - 'error': 'file type mismatch' - }, 400 - - # save zip file on temp folder - file_path = '%s/%s' % (PROJECT_TMP_FOLDER, f.filename) - with open(file_path, 'wb') as fw: - fw.write(f.stream.read()) - - # unzip zip file - dir_path = file_path.replace('.zip', '') - if os.path.exists(dir_path): - shutil.rmtree(dir_path) - unzip_file(file_path, dir_path) - - # copy to source folder - output_path = os.path.join(PROJECT_SOURCE_FILE_FOLDER, f.filename.replace('.zip', '')) - print(output_path) - if os.path.exists(output_path): - shutil.rmtree(output_path) - shutil.copytree(dir_path, output_path) - - return { - 'status': 'ok', - 'message': 'success' - } diff --git a/crawlab/routes/stats.py b/crawlab/routes/stats.py deleted file mode 100644 index fe43e7a9..00000000 --- a/crawlab/routes/stats.py +++ /dev/null @@ -1,235 +0,0 @@ -import os -from collections import defaultdict -from datetime import datetime, timedelta - -from flask_restful import reqparse, Resource - -from constants.task import TaskStatus -from db.manager import db_manager -from routes.base import BaseApi -from utils import jsonify - - -class StatsApi(BaseApi): - arguments = [ - ['spider_id', str], - ] - - def get(self, action: str = None) -> (dict, tuple): - """ - GET method of StatsApi. - :param action: action - """ - # action - if action is not None: - if not hasattr(self, action): - return { - 'status': 'ok', - 'code': 400, - 'error': 'action "%s" invalid' % action - }, 400 - return getattr(self, action)() - - else: - return {} - - def get_home_stats(self): - """ - Get stats for home page - """ - # overview stats - task_count = db_manager.count('tasks', {}) - spider_count = db_manager.count('spiders', {}) - node_count = db_manager.count('nodes', {}) - deploy_count = db_manager.count('deploys', {}) - - # daily stats - cur = db_manager.aggregate('tasks', [ - { - '$project': { - 'date': { - '$dateToString': { - 'format': '%Y-%m-%d', - 'date': '$create_ts' - } - } - } - }, - { - '$group': { - '_id': '$date', - 'count': { - '$sum': 1 - } - } - }, - { - '$sort': { - '_id': 1 - } - } - ]) - date_cache = {} - for item in cur: - date_cache[item['_id']] = item['count'] - start_date = datetime.now() - timedelta(31) - end_date = datetime.now() - timedelta(1) - date = start_date - daily_tasks = [] - while date < end_date: - date = date + timedelta(1) - date_str = date.strftime('%Y-%m-%d') - daily_tasks.append({ - 'date': date_str, - 'count': date_cache.get(date_str) or 0, - }) - - return { - 'status': 'ok', - 'overview_stats': { - 'task_count': task_count, - 'spider_count': spider_count, - 'node_count': node_count, - 'deploy_count': deploy_count, - }, - 'daily_tasks': daily_tasks - } - - def get_spider_stats(self): - args = self.parser.parse_args() - spider_id = args.get('spider_id') - spider = db_manager.get('spiders', id=spider_id) - tasks = db_manager.list( - col_name='tasks', - cond={ - 'spider_id': spider['_id'], - 'create_ts': { - '$gte': datetime.now() - timedelta(30) - } - }, - limit=9999999 - ) - - # task count - task_count = len(tasks) - - # calculate task count stats - task_count_by_status = defaultdict(int) - task_count_by_node = defaultdict(int) - total_seconds = 0 - for task in tasks: - task_count_by_status[task['status']] += 1 - task_count_by_node[task.get('node_id')] += 1 - if task['status'] == TaskStatus.SUCCESS and task.get('finish_ts'): - duration = (task['finish_ts'] - task['create_ts']).total_seconds() - total_seconds += duration - - # task count by node - task_count_by_node_ = [] - for status, value in task_count_by_node.items(): - task_count_by_node_.append({ - 'name': status, - 'value': value - }) - - # task count by status - task_count_by_status_ = [] - for status, value in task_count_by_status.items(): - task_count_by_status_.append({ - 'name': status, - 'value': value - }) - - # success rate - success_rate = task_count_by_status[TaskStatus.SUCCESS] / task_count - - # average duration - avg_duration = total_seconds / task_count - - # calculate task count by date - cur = db_manager.aggregate('tasks', [ - { - '$match': { - 'spider_id': spider['_id'] - } - }, - { - '$project': { - 'date': { - '$dateToString': { - 'format': '%Y-%m-%d', - 'date': '$create_ts' - } - }, - 'duration': { - '$subtract': [ - '$finish_ts', - '$create_ts' - ] - } - } - }, - { - '$group': { - '_id': '$date', - 'count': { - '$sum': 1 - }, - 'duration': { - '$avg': '$duration' - } - } - }, - { - '$sort': { - '_id': 1 - } - } - ]) - date_cache = {} - for item in cur: - date_cache[item['_id']] = { - 'duration': (item['duration'] or 0) / 1000, - 'count': item['count'] - } - start_date = datetime.now() - timedelta(31) - end_date = datetime.now() - timedelta(1) - date = start_date - daily_tasks = [] - while date < end_date: - date = date + timedelta(1) - date_str = date.strftime('%Y-%m-%d') - d = date_cache.get(date_str) - row = { - 'date': date_str, - } - if d is None: - row['count'] = 0 - row['duration'] = 0 - else: - row['count'] = d['count'] - row['duration'] = d['duration'] - daily_tasks.append(row) - - # calculate total results - result_count = 0 - col_name = spider.get('col') - if col_name is not None: - for task in tasks: - result_count += db_manager.count(col_name, {'task_id': task['_id']}) - - # top tasks - # top_10_tasks = db_manager.list('tasks', {'spider_id': spider['_id']}) - - return { - 'status': 'ok', - 'overview': { - 'task_count': task_count, - 'result_count': result_count, - 'success_rate': success_rate, - 'avg_duration': avg_duration - }, - 'task_count_by_status': task_count_by_status_, - 'task_count_by_node': task_count_by_node_, - 'daily_stats': daily_tasks, - } diff --git a/crawlab/routes/tasks.py b/crawlab/routes/tasks.py deleted file mode 100644 index 5d1ff473..00000000 --- a/crawlab/routes/tasks.py +++ /dev/null @@ -1,256 +0,0 @@ -import json -import os -import sys -from time import time - -from flask_csv import send_csv - -try: - from _signal import SIGKILL -except ImportError: - pass - -import requests -from bson import ObjectId -from tasks.celery import celery_app - -from constants.task import TaskStatus -from db.manager import db_manager -from routes.base import BaseApi -from utils import jsonify -from utils.spider import get_spider_col_fields - - -class TaskApi(BaseApi): - # collection name - col_name = 'tasks' - - arguments = ( - ('deploy_id', str), - ('file_path', str) - ) - - def get(self, id: str = None, action: str = None): - """ - GET method of TaskAPI. - :param id: item id - :param action: action - """ - # action by id - if action is not None: - if not hasattr(self, action): - return { - 'status': 'ok', - 'code': 400, - 'error': 'action "%s" invalid' % action - }, 400 - return getattr(self, action)(id) - - elif id is not None: - task = db_manager.get(col_name=self.col_name, id=id) - spider = db_manager.get(col_name='spiders', id=str(task['spider_id'])) - - # spider - task['num_results'] = 0 - if spider: - task['spider_name'] = spider['name'] - if spider.get('col'): - col = spider.get('col') - num_results = db_manager.count(col, {'task_id': task['_id']}) - task['num_results'] = num_results - - # duration - if task.get('finish_ts') is not None: - task['duration'] = (task['finish_ts'] - task['create_ts']).total_seconds() - task['avg_num_results'] = round(task['num_results'] / task['duration'], 1) - - try: - with open(task['log_file_path']) as f: - task['log'] = f.read() - except Exception as err: - task['log'] = '' - return jsonify(task) - - # list tasks - args = self.parser.parse_args() - page_size = args.get('page_size') or 10 - page_num = args.get('page_num') or 1 - filter_str = args.get('filter') - filter_ = {} - if filter_str is not None: - filter_ = json.loads(filter_str) - if filter_.get('spider_id'): - filter_['spider_id'] = ObjectId(filter_['spider_id']) - tasks = db_manager.list(col_name=self.col_name, cond=filter_, limit=page_size, skip=page_size * (page_num - 1), - sort_key='create_ts') - items = [] - for task in tasks: - # get spider - _spider = db_manager.get(col_name='spiders', id=str(task['spider_id'])) - - # status - if task.get('status') is None: - task['status'] = TaskStatus.UNAVAILABLE - - # spider - task['num_results'] = 0 - if _spider: - # spider name - task['spider_name'] = _spider['name'] - - # number of results - if _spider.get('col'): - col = _spider.get('col') - num_results = db_manager.count(col, {'task_id': task['_id']}) - task['num_results'] = num_results - - # duration - if task.get('finish_ts') is not None: - task['duration'] = (task['finish_ts'] - task['create_ts']).total_seconds() - task['avg_num_results'] = round(task['num_results'] / task['duration'], 1) - - items.append(task) - - return { - 'status': 'ok', - 'total_count': db_manager.count('tasks', filter_), - 'page_num': page_num, - 'page_size': page_size, - 'items': jsonify(items) - } - - def on_get_log(self, id: (str, ObjectId)) -> (dict, tuple): - """ - Get the log of given task_id - :param id: task_id - """ - try: - task = db_manager.get(col_name=self.col_name, id=id) - with open(task['log_file_path']) as f: - log = f.read() - return { - 'status': 'ok', - 'log': log - } - except Exception as err: - return { - 'code': 500, - 'status': 'ok', - 'error': str(err) - }, 500 - - def get_log(self, id: (str, ObjectId)) -> (dict, tuple): - """ - Submit an HTTP request to fetch log from the node of a given task. - :param id: task_id - :return: - """ - task = db_manager.get(col_name=self.col_name, id=id) - node = db_manager.get(col_name='nodes', id=task['node_id']) - r = requests.get('http://%s:%s/api/tasks/%s/on_get_log' % ( - node['ip'], - node['port'], - id - )) - if r.status_code == 200: - data = json.loads(r.content.decode('utf-8')) - return { - 'status': 'ok', - 'log': data.get('log') - } - else: - data = json.loads(r.content) - return { - 'code': 500, - 'status': 'ok', - 'error': data['error'] - }, 500 - - def get_results(self, id: str) -> (dict, tuple): - """ - Get a list of results crawled in a given task. - :param id: task_id - """ - args = self.parser.parse_args() - page_size = args.get('page_size') or 10 - page_num = args.get('page_num') or 1 - - task = db_manager.get('tasks', id=id) - spider = db_manager.get('spiders', id=task['spider_id']) - col_name = spider.get('col') - if not col_name: - return [] - fields = get_spider_col_fields(col_name) - items = db_manager.list(col_name, {'task_id': id}, skip=page_size * (page_num - 1), limit=page_size) - return { - 'status': 'ok', - 'fields': jsonify(fields), - 'total_count': db_manager.count(col_name, {'task_id': id}), - 'page_num': page_num, - 'page_size': page_size, - 'items': jsonify(items) - } - - def stop(self, id): - """ - Send stop signal to a specific node - :param id: task_id - """ - task = db_manager.get('tasks', id=id) - node = db_manager.get('nodes', id=task['node_id']) - r = requests.get('http://%s:%s/api/tasks/%s/on_stop' % ( - node['ip'], - node['port'], - id - )) - if r.status_code == 200: - return { - 'status': 'ok', - 'message': 'success' - } - else: - data = json.loads(r.content) - return { - 'code': 500, - 'status': 'ok', - 'error': data['error'] - }, 500 - - def on_stop(self, id): - """ - Stop the task in progress. - :param id: - :return: - """ - task = db_manager.get('tasks', id=id) - celery_app.control.revoke(id, terminate=True) - db_manager.update_one('tasks', id=id, values={ - 'status': TaskStatus.REVOKED - }) - - # kill process - if task.get('pid'): - pid = task.get('pid') - if 'win32' in sys.platform: - os.popen('taskkill /pid:' + str(pid)) - else: - # unix system - os.kill(pid, SIGKILL) - - return { - 'id': id, - 'status': 'ok', - } - - def download_results(self, id: str): - task = db_manager.get('tasks', id=id) - spider = db_manager.get('spiders', id=task['spider_id']) - col_name = spider.get('col') - if not col_name: - return send_csv([], f'results_{col_name}_{round(time())}.csv') - items = db_manager.list(col_name, {'task_id': id}, limit=999999999) - fields = get_spider_col_fields(col_name, task_id=id, limit=999999999) - return send_csv(items, - filename=f'results_{col_name}_{round(time())}.csv', - fields=fields, - encoding='utf-8') diff --git a/crawlab/spiders/scrapy.cfg b/crawlab/spiders/scrapy.cfg deleted file mode 100644 index bf9391f1..00000000 --- a/crawlab/spiders/scrapy.cfg +++ /dev/null @@ -1,11 +0,0 @@ -# Automatically created by: scrapy startproject -# -# For more information about the [deploy] section see: -# https://scrapyd.readthedocs.io/en/latest/deploy.html - -[settings] -default = spiders.settings - -[deploy] -#url = http://localhost:6800/ -project = spiders diff --git a/crawlab/spiders/spiders/__init__.py b/crawlab/spiders/spiders/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/crawlab/spiders/spiders/db.py b/crawlab/spiders/spiders/db.py deleted file mode 100644 index 0521d784..00000000 --- a/crawlab/spiders/spiders/db.py +++ /dev/null @@ -1,18 +0,0 @@ -import os - -from pymongo import MongoClient - -MONGO_HOST = os.environ.get('MONGO_HOST') or 'localhost' -MONGO_PORT = int(os.environ.get('MONGO_PORT')) or 27017 -MONGO_USERNAME = os.environ.get('MONGO_USERNAME') -MONGO_PASSWORD = os.environ.get('MONGO_PASSWORD') -MONGO_DB = os.environ.get('MONGO_DB') or 'crawlab_test' -mongo = MongoClient(host=MONGO_HOST, - port=MONGO_PORT, - username=MONGO_USERNAME, - password=MONGO_PASSWORD) -db = mongo[MONGO_DB] -task_id = os.environ.get('CRAWLAB_TASK_ID') -col_name = os.environ.get('CRAWLAB_COLLECTION') -task = db['tasks'].find_one({'_id': task_id}) -spider = db['spiders'].find_one({'_id': task['spider_id']}) diff --git a/crawlab/spiders/spiders/items.py b/crawlab/spiders/spiders/items.py deleted file mode 100644 index 7163d6e3..00000000 --- a/crawlab/spiders/spiders/items.py +++ /dev/null @@ -1,25 +0,0 @@ -# -*- coding: utf-8 -*- - -# Define here the models for your scraped items -# -# See documentation in: -# https://doc.scrapy.org/en/latest/topics/items.html - -import scrapy - -from spiders.db import spider - - -class SpidersItem(scrapy.Item): - if spider['crawl_type'] == 'list': - fields = {f['name']: scrapy.Field() for f in spider['fields']} - elif spider['crawl_type'] == 'detail': - fields = {f['name']: scrapy.Field() for f in spider['detail_fields']} - elif spider['crawl_type'] == 'list-detail': - fields = {f['name']: scrapy.Field() for f in (spider['fields'] + spider['detail_fields'])} - else: - fields = {} - - # basic fields - fields['_id'] = scrapy.Field() - fields['task_id'] = scrapy.Field() diff --git a/crawlab/spiders/spiders/middlewares.py b/crawlab/spiders/spiders/middlewares.py deleted file mode 100644 index 1760fe41..00000000 --- a/crawlab/spiders/spiders/middlewares.py +++ /dev/null @@ -1,103 +0,0 @@ -# -*- coding: utf-8 -*- - -# Define here the models for your spider middleware -# -# See documentation in: -# https://doc.scrapy.org/en/latest/topics/spider-middleware.html - -from scrapy import signals - - -class SpidersSpiderMiddleware(object): - # Not all methods need to be defined. If a method is not defined, - # scrapy acts as if the spider middleware does not modify the - # passed objects. - - @classmethod - def from_crawler(cls, crawler): - # This method is used by Scrapy to create your spiders. - s = cls() - crawler.signals.connect(s.spider_opened, signal=signals.spider_opened) - return s - - def process_spider_input(self, response, spider): - # Called for each response that goes through the spider - # middleware and into the spider. - - # Should return None or raise an exception. - return None - - def process_spider_output(self, response, result, spider): - # Called with the results returned from the Spider, after - # it has processed the response. - - # Must return an iterable of Request, dict or Item objects. - for i in result: - yield i - - def process_spider_exception(self, response, exception, spider): - # Called when a spider or process_spider_input() method - # (from other spider middleware) raises an exception. - - # Should return either None or an iterable of Response, dict - # or Item objects. - pass - - def process_start_requests(self, start_requests, spider): - # Called with the start requests of the spider, and works - # similarly to the process_spider_output() method, except - # that it doesn’t have a response associated. - - # Must return only requests (not items). - for r in start_requests: - yield r - - def spider_opened(self, spider): - spider.logger.info('Spider opened: %s' % spider.name) - - -class SpidersDownloaderMiddleware(object): - # Not all methods need to be defined. If a method is not defined, - # scrapy acts as if the downloader middleware does not modify the - # passed objects. - - @classmethod - def from_crawler(cls, crawler): - # This method is used by Scrapy to create your spiders. - s = cls() - crawler.signals.connect(s.spider_opened, signal=signals.spider_opened) - return s - - def process_request(self, request, spider): - # Called for each request that goes through the downloader - # middleware. - - # Must either: - # - return None: continue processing this request - # - or return a Response object - # - or return a Request object - # - or raise IgnoreRequest: process_exception() methods of - # installed downloader middleware will be called - return None - - def process_response(self, request, response, spider): - # Called with the response returned from the downloader. - - # Must either; - # - return a Response object - # - return a Request object - # - or raise IgnoreRequest - return response - - def process_exception(self, request, exception, spider): - # Called when a download handler or a process_request() - # (from other downloader middleware) raises an exception. - - # Must either: - # - return None: continue processing this exception - # - return a Response object: stops process_exception() chain - # - return a Request object: stops process_exception() chain - pass - - def spider_opened(self, spider): - spider.logger.info('Spider opened: %s' % spider.name) diff --git a/crawlab/spiders/spiders/pipelines.py b/crawlab/spiders/spiders/pipelines.py deleted file mode 100644 index 69531067..00000000 --- a/crawlab/spiders/spiders/pipelines.py +++ /dev/null @@ -1,17 +0,0 @@ -# -*- coding: utf-8 -*- - -# Define your item pipelines here -# -# Don't forget to add your pipeline to the ITEM_PIPELINES setting -# See: https://doc.scrapy.org/en/latest/topics/item-pipeline.html -from spiders.db import db, col_name, task_id - - -class SpidersPipeline(object): - col = db[col_name] - - def process_item(self, item, spider): - item['task_id'] = task_id - self.col.save(item) - - return item diff --git a/crawlab/spiders/spiders/settings.py b/crawlab/spiders/spiders/settings.py deleted file mode 100644 index 83685fec..00000000 --- a/crawlab/spiders/spiders/settings.py +++ /dev/null @@ -1,90 +0,0 @@ -# -*- coding: utf-8 -*- - -# Scrapy settings for spiders project -# -# For simplicity, this file contains only settings considered important or -# commonly used. You can find more settings consulting the documentation: -# -# https://doc.scrapy.org/en/latest/topics/settings.html -# https://doc.scrapy.org/en/latest/topics/downloader-middleware.html -# https://doc.scrapy.org/en/latest/topics/spider-middleware.html -from spiders.db import spider - -BOT_NAME = 'Crawlab Spider' - -SPIDER_MODULES = ['spiders.spiders'] -NEWSPIDER_MODULE = 'spiders.spiders' - -# Crawl responsibly by identifying yourself (and your website) on the user-agent -# USER_AGENT = 'spiders (+http://www.yourdomain.com)' - -# Obey robots.txt rules -ROBOTSTXT_OBEY = spider.get('obey_robots_txt') if spider.get('obey_robots_txt') is not None else True - -# Configure maximum concurrent requests performed by Scrapy (default: 16) -# CONCURRENT_REQUESTS = 32 - -# Configure a delay for requests for the same website (default: 0) -# See https://doc.scrapy.org/en/latest/topics/settings.html#download-delay -# See also autothrottle settings and docs -# DOWNLOAD_DELAY = 3 -# The download delay setting will honor only one of: -# CONCURRENT_REQUESTS_PER_DOMAIN = 16 -# CONCURRENT_REQUESTS_PER_IP = 16 - -# Disable cookies (enabled by default) -# COOKIES_ENABLED = False - -# Disable Telnet Console (enabled by default) -# TELNETCONSOLE_ENABLED = False - -# Override the default request headers: -# DEFAULT_REQUEST_HEADERS = { -# 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', -# 'Accept-Language': 'en', -# } - -# Enable or disable spider middlewares -# See https://doc.scrapy.org/en/latest/topics/spider-middleware.html -# SPIDER_MIDDLEWARES = { -# 'spiders.middlewares.SpidersSpiderMiddleware': 543, -# } - -# Enable or disable downloader middlewares -# See https://doc.scrapy.org/en/latest/topics/downloader-middleware.html -# DOWNLOADER_MIDDLEWARES = { -# 'spiders.middlewares.SpidersDownloaderMiddleware': 543, -# } - -# Enable or disable extensions -# See https://doc.scrapy.org/en/latest/topics/extensions.html -# EXTENSIONS = { -# 'scrapy.extensions.telnet.TelnetConsole': None, -# } - -# Configure item pipelines -# See https://doc.scrapy.org/en/latest/topics/item-pipeline.html -ITEM_PIPELINES = { - 'spiders.pipelines.SpidersPipeline': 300, -} - -# Enable and configure the AutoThrottle extension (disabled by default) -# See https://doc.scrapy.org/en/latest/topics/autothrottle.html -# AUTOTHROTTLE_ENABLED = True -# The initial download delay -# AUTOTHROTTLE_START_DELAY = 5 -# The maximum download delay to be set in case of high latencies -# AUTOTHROTTLE_MAX_DELAY = 60 -# The average number of requests Scrapy should be sending in parallel to -# each remote server -# AUTOTHROTTLE_TARGET_CONCURRENCY = 1.0 -# Enable showing throttling stats for every response received: -# AUTOTHROTTLE_DEBUG = False - -# Enable and configure HTTP caching (disabled by default) -# See https://doc.scrapy.org/en/latest/topics/downloader-middleware.html#httpcache-middleware-settings -# HTTPCACHE_ENABLED = True -# HTTPCACHE_EXPIRATION_SECS = 0 -# HTTPCACHE_DIR = 'httpcache' -# HTTPCACHE_IGNORE_HTTP_CODES = [] -# HTTPCACHE_STORAGE = 'scrapy.extensions.httpcache.FilesystemCacheStorage' diff --git a/crawlab/spiders/spiders/spiders/__init__.py b/crawlab/spiders/spiders/spiders/__init__.py deleted file mode 100644 index ebd689ac..00000000 --- a/crawlab/spiders/spiders/spiders/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -# This package will contain the spiders of your Scrapy project -# -# Please refer to the documentation for information on how to create and manage -# your spiders. diff --git a/crawlab/spiders/spiders/spiders/config_spider.py b/crawlab/spiders/spiders/spiders/config_spider.py deleted file mode 100644 index 77e65862..00000000 --- a/crawlab/spiders/spiders/spiders/config_spider.py +++ /dev/null @@ -1,123 +0,0 @@ -# -*- coding: utf-8 -*- -import os -import sys -from urllib.parse import urlparse, urljoin - -import scrapy - -from spiders.db import spider -from spiders.items import SpidersItem -from spiders.utils import generate_urls - -sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..'))) - - -def get_detail_url(item): - for f in spider['fields']: - if f.get('is_detail'): - return item.get(f['name']) - return None - - -def get_spiders_item(sel, fields, item=None): - if item is None: - item = SpidersItem() - - for f in fields: - if f['type'] == 'xpath': - # xpath selector - if f['extract_type'] == 'text': - # text content - query = f['query'] + '/text()' - else: - # attribute - attribute = f["attribute"] - query = f['query'] + f'/@("{attribute}")' - item[f['name']] = sel.xpath(query).extract_first() - - else: - # css selector - if f['extract_type'] == 'text': - # text content - query = f['query'] + '::text' - else: - # attribute - attribute = f["attribute"] - query = f['query'] + f'::attr("{attribute}")' - item[f['name']] = sel.css(query).extract_first() - - return item - - -def get_list_items(response): - if spider['item_selector_type'] == 'xpath': - # xpath selector - items = response.xpath(spider['item_selector']) - else: - # css selector - items = response.css(spider['item_selector']) - return items - - -def get_next_url(response): - # pagination - if spider.get('pagination_selector') is not None: - if spider['pagination_selector_type'] == 'xpath': - # xpath selector - next_url = response.xpath(spider['pagination_selector'] + '/@href').extract_first() - else: - # css selector - next_url = response.css(spider['pagination_selector'] + '::attr("href")').extract_first() - - # found next url - if next_url is not None: - if not next_url.startswith('http') and not next_url.startswith('//'): - return urljoin(response.url, next_url) - else: - return next_url - return None - - -class ConfigSpiderSpider(scrapy.Spider): - name = 'config_spider' - - def start_requests(self): - for url in generate_urls(spider['start_url']): - yield scrapy.Request(url=url) - - def parse(self, response): - - if spider['crawl_type'] == 'list': - # list page only - items = get_list_items(response) - for _item in items: - item = get_spiders_item(sel=_item, fields=spider['fields']) - yield item - next_url = get_next_url(response) - if next_url is not None: - yield scrapy.Request(url=next_url) - - elif spider['crawl_type'] == 'detail': - # TODO: detail page only - # detail page only - pass - - elif spider['crawl_type'] == 'list-detail': - # list page + detail page - items = get_list_items(response) - for _item in items: - item = get_spiders_item(sel=_item, fields=spider['fields']) - detail_url = get_detail_url(item) - if detail_url is not None: - yield scrapy.Request(url=detail_url, - callback=self.parse_detail, - meta={ - 'item': item - }) - next_url = get_next_url(response) - if next_url is not None: - yield scrapy.Request(url=next_url) - - def parse_detail(self, response): - item = get_spiders_item(sel=response, fields=spider['detail_fields'], item=response.meta['item']) - yield item diff --git a/crawlab/spiders/spiders/utils.py b/crawlab/spiders/spiders/utils.py deleted file mode 100644 index 0fc60188..00000000 --- a/crawlab/spiders/spiders/utils.py +++ /dev/null @@ -1,55 +0,0 @@ -import itertools -import re - - -def generate_urls(base_url: str) -> str: - url = base_url - - # number range list - list_arr = [] - for i, res in enumerate(re.findall(r'{(\d+),(\d+)}', base_url)): - try: - _min = int(res[0]) - _max = int(res[1]) - except ValueError as err: - raise ValueError(f'{base_url} is not a valid URL pattern') - - # list - _list = range(_min, _max + 1) - - # key - _key = f'n{i}' - - # append list and key - list_arr.append((_list, _key)) - - # replace url placeholder with key - url = url.replace('{' + res[0] + ',' + res[1] + '}', '{' + _key + '}', 1) - - # string list - for i, res in enumerate(re.findall(r'\[([\w\-,]+)\]', base_url)): - # list - _list = res.split(',') - - # key - _key = f's{i}' - - # append list and key - list_arr.append((_list, _key)) - - # replace url placeholder with key - url = url.replace('[' + ','.join(_list) + ']', '{' + _key + '}', 1) - - # combine together - _list_arr = [] - for res in itertools.product(*map(lambda x: x[0], list_arr)): - _url = url - for _arr, _rep in zip(list_arr, res): - _list, _key = _arr - _url = _url.replace('{' + _key + '}', str(_rep), 1) - yield _url - -# -# base_url = 'http://[baidu,ali].com/page-{1,10}-[1,2,3]' -# for url in generate_urls(base_url): -# print(url) diff --git a/crawlab/swagger.yaml b/crawlab/swagger.yaml deleted file mode 100644 index 48949fc8..00000000 --- a/crawlab/swagger.yaml +++ /dev/null @@ -1,353 +0,0 @@ ---- -swagger: '2.0' -basePath: "/api" -paths: - "/deploys": - get: - responses: - '200': - description: Success - summary: GET method of DeployAPI - operationId: get_deploy_api - tags: - - deploy - put: - responses: - '200': - description: Success - summary: PUT method for creating a new item - operationId: put_deploy_api - tags: - - deploy - "/deploys/{id}": - parameters: - - name: id - in: path - required: true - type: string - get: - responses: - '200': - description: Success - summary: GET method of DeployAPI - operationId: get_deploy_api_by_id - tags: - - deploy - post: - responses: - '200': - description: Success - summary: POST method of the given id for performing an action - operationId: post_deploy_api - tags: - - deploy - delete: - responses: - '200': - description: Success - summary: DELETE method of given id for deleting an item - operationId: delete_deploy_api - tags: - - deploy - "/files": - get: - responses: - '200': - description: Success - summary: GET method of FileAPI - operationId: get_file_api - tags: - - file - "/nodes": - get: - responses: - '200': - description: Success - summary: GET method of NodeAPI - operationId: get_node_api - tags: - - node - put: - responses: - '200': - description: Success - summary: PUT method for creating a new item - operationId: put_node_api - tags: - - node - "/nodes/{id}": - parameters: - - name: id - in: path - required: true - type: string - get: - responses: - '200': - description: Success - summary: GET method of NodeAPI - operationId: get_node_api_by_id - tags: - - node - post: - responses: - '200': - description: Success - summary: POST method of the given id for performing an action - operationId: post_node_api - tags: - - node - delete: - responses: - '200': - description: Success - summary: DELETE method of the given id - operationId: delete_node_api - tags: - - node - "/nodes/{id}/get_deploys": - parameters: - - name: id - in: path - required: true - type: string - get: - responses: - '200': - description: Success - summary: Get a list of latest deploys of given node_id - tags: - - node - "/nodes/{id}/get_tasks": - parameters: - - name: id - in: path - required: true - type: string - get: - responses: - '200': - description: Success - summary: Get a list of latest tasks of given node_id - tags: - - node - "/spiders": - get: - responses: - '200': - description: Success - summary: GET method of SpiderAPI - operationId: get_spider_api - tags: - - spider - put: - responses: - '200': - description: Success - summary: PUT method for creating a new item - operationId: put_spider_api - tags: - - spider - "/spiders/import/{platform}": - parameters: - - name: platform - in: path - required: true - type: string - post: - responses: - '200': - description: Success - operationId: post_spider_import_api - tags: - - spider - "/spiders/manage/deploy_all": - post: - responses: - '200': - description: Success - summary: Deploy all spiders to all nodes. - tags: - - spider - "/spiders/{id}": - parameters: - - name: id - in: path - required: true - type: string - get: - responses: - '200': - description: Success - summary: GET method of SpiderAPI - operationId: get_spider_api_by_id - tags: - - spider - post: - responses: - '200': - description: Success - summary: POST method of the given id for performing an action - operationId: post_spider_api - tags: - - spider - delete: - responses: - '200': - description: Success - summary: DELETE method of given id for deleting an item - operationId: delete_spider_api - tags: - - spider - "/spiders/{id}/get_tasks": - parameters: - - name: id - in: path - required: true - type: string - description: spider_id - get: - responses: - '200': - description: Success - summary: Get a list of latest tasks of given spider_id - tags: - - spider - "/spiders/{id}/get_deploys": - parameters: - - name: id - in: path - required: true - type: string - description: spider_id - get: - responses: - '200': - description: Success - summary: Get a list of latest deploys of given spider_id - tags: - - spider - "/spiders/{id}/on_crawl": - parameters: - - name: id - in: path - required: true - type: string - description: spider_id - post: - responses: - '200': - description: Success - summary: Start a crawl task. - tags: - - spider - "/spiders/{id}/deploy": - parameters: - - name: id - in: path - required: true - type: string - description: spider_id - post: - responses: - '200': - description: Success - summary: Start a crawl task. - tags: - - spider - "/stats/get_home_stats": - get: - responses: - '200': - description: Success - summary: Get stats for home page - operationId: get_stats_api - tags: - - stats - "/tasks": - get: - responses: - '200': - description: Success - summary: GET method of TaskAPI - operationId: get_task_api - tags: - - task - put: - responses: - '200': - description: Success - summary: PUT method for creating a new item - operationId: put_task_api - tags: - - task - "/tasks/{id}": - parameters: - - name: id - in: path - required: true - type: string - get: - responses: - '200': - description: Success - summary: GET method of TaskAPI - operationId: get_task_api_by_id - tags: - - task - post: - responses: - '200': - description: Success - summary: POST method of the given id for performing an action - operationId: post_task_api - tags: - - task - delete: - responses: - '200': - description: Success - summary: DELETE method of given id for deleting an item - operationId: delete_task_api - tags: - - task - "/tasks/{id}/get_log": - parameters: - - name: id - in: path - required: true - type: string - get: - responses: - '200': - description: Success - summary: Submit an HTTP request to fetch log from the node of a given task. - operationId: get_task_api_get_log - tags: - - task - "/tasks/{id}/on_get_log": - parameters: - - name: id - in: path - required: true - type: string - get: - responses: - '200': - description: Success - summary: Get the log of given task_id - operationId: get_task_api_on_get_log - tags: - - task -info: - title: Crawlab API - version: '1.0' -produces: -- application/json -consumes: -- application/json -responses: - ParseError: - description: When a mask can't be parsed - MaskError: - description: When any error occurs on mask diff --git a/crawlab/tasks/__init__.py b/crawlab/tasks/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/crawlab/tasks/celery.py b/crawlab/tasks/celery.py deleted file mode 100644 index 3def1b29..00000000 --- a/crawlab/tasks/celery.py +++ /dev/null @@ -1,5 +0,0 @@ -from celery import Celery - -# celery app instance -celery_app = Celery(__name__) -celery_app.config_from_object('config') diff --git a/crawlab/tasks/deploy.py b/crawlab/tasks/deploy.py deleted file mode 100644 index 935aa9b6..00000000 --- a/crawlab/tasks/deploy.py +++ /dev/null @@ -1,18 +0,0 @@ -import os -import sys -from datetime import datetime - -import requests -from celery.utils.log import get_logger - -from config import PROJECT_DEPLOY_FILE_FOLDER, PROJECT_LOGS_FOLDER -from db.manager import db_manager -from .celery import celery_app -import subprocess - -logger = get_logger(__name__) - - -@celery_app.task -def deploy_spider(id): - pass diff --git a/crawlab/tasks/node.py b/crawlab/tasks/node.py deleted file mode 100644 index 9747cef9..00000000 --- a/crawlab/tasks/node.py +++ /dev/null @@ -1,7 +0,0 @@ -from utils import node -from .celery import celery_app - - -@celery_app.task -def update_node_status(): - node.update_nodes_status(refresh=True) diff --git a/crawlab/tasks/scheduler.py b/crawlab/tasks/scheduler.py deleted file mode 100644 index b9fcb140..00000000 --- a/crawlab/tasks/scheduler.py +++ /dev/null @@ -1,89 +0,0 @@ -import atexit -import fcntl - -import requests -from apscheduler.schedulers.background import BackgroundScheduler -from apscheduler.jobstores.mongodb import MongoDBJobStore -from pymongo import MongoClient - -from config import MONGO_DB, MONGO_HOST, MONGO_PORT, FLASK_HOST, FLASK_PORT -from db.manager import db_manager - - -class Scheduler(object): - mongo = MongoClient(host=MONGO_HOST, port=MONGO_PORT, connect=False) - task_col = 'apscheduler_jobs' - - # scheduler jobstore - jobstores = { - 'mongo': MongoDBJobStore(database=MONGO_DB, - collection=task_col, - client=mongo) - } - - # scheduler instance - scheduler = BackgroundScheduler(jobstores=jobstores) - - def execute_spider(self, id: str, params: str = None): - print(f'executing spider {id}') - print(f'params: {params}') - self.scheduler.print_jobs(jobstore='mongo') - query = {} - if params is not None: - query['params'] = params - r = requests.get('http://%s:%s/api/spiders/%s/on_crawl' % ( - FLASK_HOST, - FLASK_PORT, - id - ), query) - - def update(self): - print('updating...') - # remove all existing periodic jobs - self.scheduler.remove_all_jobs() - self.mongo[MONGO_DB][self.task_col].remove() - - periodical_tasks = db_manager.list('schedules', {}) - for task in periodical_tasks: - cron = task.get('cron') - cron_arr = cron.split(' ') - second = cron_arr[0] - minute = cron_arr[1] - hour = cron_arr[2] - day = cron_arr[3] - month = cron_arr[4] - day_of_week = cron_arr[5] - self.scheduler.add_job(func=self.execute_spider, - args=(str(task['spider_id']), task.get('params'),), - trigger='cron', - jobstore='mongo', - day_of_week=day_of_week, - month=month, - day=day, - hour=hour, - minute=minute, - second=second) - self.scheduler.print_jobs(jobstore='mongo') - print(f'state: {self.scheduler.state}') - print(f'running: {self.scheduler.running}') - - def run(self): - f = open("scheduler.lock", "wb") - try: - fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB) - self.update() - self.scheduler.start() - except: - pass - - def unlock(): - fcntl.flock(f, fcntl.LOCK_UN) - f.close() - - atexit.register(unlock) - - -scheduler = Scheduler() - -if __name__ == '__main__': - scheduler.run() diff --git a/crawlab/tasks/spider.py b/crawlab/tasks/spider.py deleted file mode 100644 index 3bda236c..00000000 --- a/crawlab/tasks/spider.py +++ /dev/null @@ -1,272 +0,0 @@ -import os -import sys -from datetime import datetime -from time import sleep -import traceback - -from bson import ObjectId -from pymongo import ASCENDING, DESCENDING - -from config import PROJECT_DEPLOY_FILE_FOLDER, PROJECT_LOGS_FOLDER, MONGO_HOST, MONGO_PORT, MONGO_DB, MONGO_USERNAME, \ - MONGO_PASSWORD -from constants.task import TaskStatus -from db.manager import db_manager -from .celery import celery_app -import subprocess -from utils.log import other as logger - -BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) - - -def get_task(id: str): - i = 0 - while i < 5: - task = db_manager.get('tasks', id=id) - if task is not None: - return task - i += 1 - sleep(1) - return None - - -@celery_app.task(bind=True) -def execute_spider(self, id: str, params: str = None): - """ - Execute spider task. - :param self: - :param id: task_id - """ - task_id = self.request.id - hostname = self.request.hostname - spider = db_manager.get('spiders', id=id) - command = spider.get('cmd') - - # if start with python, then use sys.executable to execute in the virtualenv - if command.startswith('python '): - command = command.replace('python ', sys.executable + ' ') - - # if start with scrapy, then use sys.executable to execute scrapy as module in the virtualenv - elif command.startswith('scrapy '): - command = command.replace('scrapy ', sys.executable + ' -m scrapy ') - - # pass params to the command - if params is not None: - command += ' ' + params - - # get task object and return if not found - task = get_task(task_id) - if task is None: - return - - # current working directory - current_working_directory = os.path.join(PROJECT_DEPLOY_FILE_FOLDER, str(spider.get('_id'))) - - # log info - logger.info('task_id: %s' % task_id) - logger.info('hostname: %s' % hostname) - logger.info('current_working_directory: %s' % current_working_directory) - logger.info('spider_id: %s' % id) - logger.info(command) - - # make sure the log folder exists - log_path = os.path.join(PROJECT_LOGS_FOLDER, id) - if not os.path.exists(log_path): - os.makedirs(log_path) - - # open log file streams - log_file_path = os.path.join(log_path, '%s.log' % datetime.now().strftime('%Y%m%d%H%M%S')) - stdout = open(log_file_path, 'a') - stderr = open(log_file_path, 'a') - - # update task status as started - db_manager.update_one('tasks', id=task_id, values={ - 'start_ts': datetime.utcnow(), - 'node_id': hostname, - 'hostname': hostname, - 'log_file_path': log_file_path, - 'status': TaskStatus.STARTED - }) - - # pass params as env variables - env = os.environ.copy() - - # custom environment variables - if spider.get('envs'): - for _env in spider.get('envs'): - env[_env['name']] = _env['value'] - - # task id environment variable - env['CRAWLAB_TASK_ID'] = task_id - - # collection environment variable - if spider.get('col'): - env['CRAWLAB_COLLECTION'] = spider.get('col') - - # create index to speed results data retrieval - db_manager.create_index(spider.get('col'), [('task_id', ASCENDING)]) - - # start process - cmd_arr = command.split(' ') - cmd_arr = list(filter(lambda x: x != '', cmd_arr)) - try: - p = subprocess.Popen(cmd_arr, - stdout=stdout.fileno(), - stderr=stderr.fileno(), - cwd=current_working_directory, - env=env, - bufsize=1) - - # update pid - db_manager.update_one(col_name='tasks', id=task_id, values={ - 'pid': p.pid - }) - - # get output from the process - _stdout, _stderr = p.communicate() - - # get return code - code = p.poll() - if code == 0: - status = TaskStatus.SUCCESS - else: - status = TaskStatus.FAILURE - except Exception as err: - logger.error(err) - stderr.write(str(err)) - status = TaskStatus.FAILURE - - # save task when the task is finished - finish_ts = datetime.utcnow() - db_manager.update_one('tasks', id=task_id, values={ - 'finish_ts': finish_ts, - 'duration': (finish_ts - task['create_ts']).total_seconds(), - 'status': status - }) - task = db_manager.get('tasks', id=id) - - # close log file streams - stdout.flush() - stderr.flush() - stdout.close() - stderr.close() - - return task - - -@celery_app.task(bind=True) -def execute_config_spider(self, id: str, params: str = None): - task_id = self.request.id - hostname = self.request.hostname - spider = db_manager.get('spiders', id=id) - - # get task object and return if not found - task = get_task(task_id) - if task is None: - return - - # current working directory - current_working_directory = os.path.join(BASE_DIR, 'spiders') - - # log info - logger.info('task_id: %s' % task_id) - logger.info('hostname: %s' % hostname) - logger.info('current_working_directory: %s' % current_working_directory) - logger.info('spider_id: %s' % id) - - # make sure the log folder exists - log_path = os.path.join(PROJECT_LOGS_FOLDER, id) - if not os.path.exists(log_path): - os.makedirs(log_path) - - # open log file streams - log_file_path = os.path.join(log_path, '%s.log' % datetime.now().strftime('%Y%m%d%H%M%S')) - stdout = open(log_file_path, 'a') - stderr = open(log_file_path, 'a') - - # update task status as started - db_manager.update_one('tasks', id=task_id, values={ - 'start_ts': datetime.utcnow(), - 'node_id': hostname, - 'hostname': hostname, - 'log_file_path': log_file_path, - 'status': TaskStatus.STARTED - }) - - # pass params as env variables - env = os.environ.copy() - - # custom environment variables - if spider.get('envs'): - for _env in spider.get('envs'): - env[_env['name']] = _env['value'] - - # task id environment variable - env['CRAWLAB_TASK_ID'] = task_id - - # collection environment variable - if spider.get('col'): - env['CRAWLAB_COLLECTION'] = spider.get('col') - - # create index to speed results data retrieval - db_manager.create_index(spider.get('col'), [('task_id', ASCENDING)]) - - # mongodb environment variables - env['MONGO_HOST'] = MONGO_HOST - env['MONGO_PORT'] = str(MONGO_PORT) - env['MONGO_DB'] = MONGO_DB - if MONGO_USERNAME is not None: - env['MONGO_USERNAME'] = MONGO_USERNAME - if MONGO_PASSWORD: - env['MONGO_PASSWORD'] = MONGO_PASSWORD - - cmd_arr = [ - sys.executable, - '-m', - 'scrapy', - 'crawl', - 'config_spider' - ] - try: - p = subprocess.Popen(cmd_arr, - stdout=stdout.fileno(), - stderr=stderr.fileno(), - cwd=current_working_directory, - env=env, - bufsize=1) - - # update pid - db_manager.update_one(col_name='tasks', id=task_id, values={ - 'pid': p.pid - }) - - # get output from the process - _stdout, _stderr = p.communicate() - - # get return code - code = p.poll() - if code == 0: - status = TaskStatus.SUCCESS - else: - status = TaskStatus.FAILURE - except Exception as err: - traceback.print_exc() - logger.error(err) - stderr.write(str(err)) - status = TaskStatus.FAILURE - - # save task when the task is finished - finish_ts = datetime.utcnow() - db_manager.update_one('tasks', id=task_id, values={ - 'finish_ts': finish_ts, - 'duration': (finish_ts - task['create_ts']).total_seconds(), - 'status': status - }) - task = db_manager.get('tasks', id=id) - - # close log file streams - stdout.flush() - stderr.flush() - stdout.close() - stderr.close() - - return task diff --git a/crawlab/test/__init__.py b/crawlab/test/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/crawlab/test/test.http b/crawlab/test/test.http deleted file mode 100644 index 3cbd210c..00000000 --- a/crawlab/test/test.http +++ /dev/null @@ -1,53 +0,0 @@ -# For a quick start check out our HTTP Requests collection (Tools|HTTP Client|Open HTTP Requests Collection). -# -# Following HTTP Request Live Templates are available: -# * 'gtrp' and 'gtr' create a GET request with or without query parameters; -# * 'ptr' and 'ptrp' create a POST request with a simple or parameter-like body; -# * 'mptr' and 'fptr' create a POST request to submit a form with a text or file field (multipart/form-data); - -### Send PUT request with json body -PUT http://localhost:5000/api/spiders -Content-Type: application/json - -{ - "spider_name": "baidu spider", - "cmd": "python /Users/yeqing/projects/crawlab/spiders/baidu/baidu.py", - "src": "/Users/yeqing/projects/crawlab/spiders/baidu/baidu.py", - "spider_type": 1, - "lang_type": 1 -} - -### Send POST request with json body -POST http://localhost:5000/api/spiders/5c63a2ddb65d151bee71d76b -Content-Type: application/json - -{ - "spider_name": "baidu spider", - "cmd": "/Users/yeqing/projects/crawlab/spiders/baidu/baidu.py", - "src": "/Users/yeqing/projects/crawlab/spiders/baidu/baidu.py", - "spider_type": 1, - "lang_type": 1 -} - -### Send POST request with json body by path -POST http://localhost:5000/api/spiders/5c63a2ddb65d151bee71d76b/crawl -Content-Type: application/json - -{} - -### - -### Send GET request with json body by path -POST http://localhost:5000/api/spiders/5c63a2ddb65d151bee71d76b/crawl -Content-Type: application/json - -{} - -### -### Send GET request with json body by path -GET http://localhost:5000/api/files?path=/Users/yeqing/projects/crawlab -Content-Type: application/json - -{} - -### \ No newline at end of file diff --git a/crawlab/utils/__init__.py b/crawlab/utils/__init__.py deleted file mode 100644 index edf03130..00000000 --- a/crawlab/utils/__init__.py +++ /dev/null @@ -1,34 +0,0 @@ -import json -import re -from datetime import datetime - -from bson import json_util - - -def is_object_id(id: str) -> bool: - """ - Determine if the id is a valid ObjectId string - :param id: ObjectId string - """ - return re.search('^[a-zA-Z0-9]{24}$', id) is not None - - -def jsonify(obj: (dict, list)) -> (dict, list): - """ - Convert dict/list to a valid json object. - :param obj: object to be converted - :return: dict/list - """ - dump_str = json_util.dumps(obj) - converted_obj = json.loads(dump_str) - if type(converted_obj) == dict: - for k, v in converted_obj.items(): - if type(v) == dict: - if v.get('$oid') is not None: - converted_obj[k] = v['$oid'] - elif v.get('$date') is not None: - converted_obj[k] = datetime.fromtimestamp(v['$date'] / 1000).strftime('%Y-%m-%d %H:%M:%S') - elif type(converted_obj) == list: - for i, v in enumerate(converted_obj): - converted_obj[i] = jsonify(v) - return converted_obj diff --git a/crawlab/utils/deploy.py b/crawlab/utils/deploy.py deleted file mode 100644 index e04c7da7..00000000 --- a/crawlab/utils/deploy.py +++ /dev/null @@ -1,33 +0,0 @@ -import os, zipfile -from utils.log import other - - -def zip_file(source_dir, output_filename): - """ - 打包目录为zip文件(未压缩) - :param source_dir: source directory - :param output_filename: output file name - """ - zipf = zipfile.ZipFile(output_filename, 'w') - pre_len = len(os.path.dirname(source_dir)) - for parent, dirnames, filenames in os.walk(source_dir): - for filename in filenames: - pathfile = os.path.join(parent, filename) - arcname = pathfile[pre_len:].strip(os.path.sep) # 相对路径 - zipf.write(pathfile, arcname) - zipf.close() - - -def unzip_file(zip_src, dst_dir): - """ - Unzip file - :param zip_src: source zip file - :param dst_dir: destination directory - """ - r = zipfile.is_zipfile(zip_src) - if r: - fz = zipfile.ZipFile(zip_src, 'r') - for file in fz.namelist(): - fz.extract(file, dst_dir) - else: - other.info('This is not zip') diff --git a/crawlab/utils/file.py b/crawlab/utils/file.py deleted file mode 100644 index 06163d49..00000000 --- a/crawlab/utils/file.py +++ /dev/null @@ -1,79 +0,0 @@ -import os -import re -from collections import defaultdict - -SUFFIX_PATTERN = r'\.([a-zA-Z]{,6})$' -suffix_regex = re.compile(SUFFIX_PATTERN, re.IGNORECASE) - -SUFFIX_LANG_MAPPING = { - 'py': 'python', - 'js': 'javascript', - 'sh': 'shell', - 'java': 'java', - 'c': 'c', - 'go': 'go', -} - - -def get_file_suffix(file_name: str) -> (str, None): - """ - Get suffix of a file - :param file_name: - :return: - """ - file_name = file_name.lower() - m = suffix_regex.search(file_name) - if m is not None: - return m.groups()[0] - else: - return None - - -def get_file_list(path: str) -> list: - """ - Get a list of files of given directory path - :param path: directory path - """ - for root, dirs, file_names in os.walk(path): - # print(root) # 当前目录路径 - # print(dirs) # 当前路径下所有子目录 - # print(file_names) # 当前路径下所有非目录子文件 - - for file_name in file_names: - file_path = os.path.join(root, file_name) - yield file_path - - -def get_file_suffix_stats(path) -> dict: - """ - Get suffix stats of given file - :param path: file path - """ - _stats = defaultdict(int) - for file_path in get_file_list(path): - suffix = get_file_suffix(file_path) - if suffix is not None: - _stats[suffix] += 1 - - # only return suffixes with languages - stats = {} - for suffix, count in _stats.items(): - if SUFFIX_LANG_MAPPING.get(suffix) is not None: - stats[suffix] = count - - return stats - - -def get_file_content(path) -> dict: - """ - Get file content - :param path: file path - """ - with open(path) as f: - suffix = get_file_suffix(path) - lang = SUFFIX_LANG_MAPPING.get(suffix) - return { - 'lang': lang, - 'suffix': suffix, - 'content': f.read() - } diff --git a/crawlab/utils/log.py b/crawlab/utils/log.py deleted file mode 100644 index 4c852d0f..00000000 --- a/crawlab/utils/log.py +++ /dev/null @@ -1,75 +0,0 @@ -# -*- coding: utf-8 -*- -# @Time : 2019-01-28 15:37 -# @Author : cxa -# @File : log.py -# @Software: PyCharm -import os -import logging -import logging.config as log_conf -import datetime -import coloredlogs - -log_dir = os.path.dirname(os.path.dirname(__file__)) + '/logs' -if not os.path.exists(log_dir): - os.mkdir(log_dir) -today = datetime.datetime.now().strftime("%Y%m%d") - -log_path = os.path.join(log_dir, f'app_{today}.log') - -log_config = { - 'version': 1.0, - 'formatters': { - 'colored_console': {'()': 'coloredlogs.ColoredFormatter', - 'format': "%(asctime)s - %(name)s - %(levelname)s - %(message)s", 'datefmt': '%H:%M:%S'}, - 'detail': { - 'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s', - 'datefmt': "%Y-%m-%d %H:%M:%S" # 如果不加这个会显示到毫秒。 - }, - 'simple': { - 'format': '%(name)s - %(levelname)s - %(message)s', - }, - }, - 'handlers': { - 'console': { - 'class': 'logging.StreamHandler', # 日志打印到屏幕显示的类。 - 'level': 'INFO', - 'formatter': 'colored_console' - }, - 'file': { - 'class': 'logging.handlers.RotatingFileHandler', # 日志打印到文件的类。 - 'maxBytes': 1024 * 1024 * 1024, # 单个文件最大内存 - 'backupCount': 1, # 备份的文件个数 - 'filename': log_path, # 日志文件名 - 'level': 'INFO', # 日志等级 - 'formatter': 'detail', # 调用上面的哪个格式 - 'encoding': 'utf-8', # 编码 - }, - }, - 'loggers': { - 'crawler': { - 'handlers': ['console', 'file'], # 只打印屏幕 - 'level': 'DEBUG', # 只显示错误的log - }, - 'parser': { - 'handlers': ['file'], - 'level': 'INFO', - }, - 'other': { - 'handlers': ['console', 'file'], - 'level': 'INFO', - }, - 'storage': { - 'handlers': ['console', 'file'], - 'level': 'INFO', - } - } -} - -log_conf.dictConfig(log_config) - -crawler = logging.getLogger('crawler') -storage = logging.getLogger('storage') -other = logging.getLogger('storage') -coloredlogs.install(level='DEBUG', logger=crawler) -coloredlogs.install(level='DEBUG', logger=storage) -coloredlogs.install(level='DEBUG', logger=other) diff --git a/crawlab/utils/node.py b/crawlab/utils/node.py deleted file mode 100644 index 6e40bc2b..00000000 --- a/crawlab/utils/node.py +++ /dev/null @@ -1,50 +0,0 @@ -import json - -import requests - -from config import FLOWER_API_ENDPOINT -from constants.node import NodeStatus -from db.manager import db_manager - - -def check_nodes_status(): - """ - Update node status from Flower. - """ - res = requests.get('%s/workers?status=1' % FLOWER_API_ENDPOINT) - return json.loads(res.content.decode('utf-8')) - - -def update_nodes_status(refresh=False): - """ - Update all nodes status - :param refresh: - """ - online_node_ids = [] - url = '%s/workers?status=1' % FLOWER_API_ENDPOINT - if refresh: - url += '&refresh=1' - - res = requests.get(url) - if res.status_code != 200: - return online_node_ids - - for k, v in json.loads(res.content.decode('utf-8')).items(): - node_name = k - node_status = NodeStatus.ONLINE if v else NodeStatus.OFFLINE - # node_celery = v - node = db_manager.get('nodes', id=node_name) - - # new node - if node is None: - node = {'_id': node_name, 'name': node_name, 'status': node_status, 'ip': 'localhost', 'port': '8000'} - db_manager.save('nodes', node) - - # existing node - else: - node['status'] = node_status - db_manager.save('nodes', node) - - if node_status: - online_node_ids.append(node_name) - return online_node_ids diff --git a/crawlab/utils/spider.py b/crawlab/utils/spider.py deleted file mode 100644 index d8995028..00000000 --- a/crawlab/utils/spider.py +++ /dev/null @@ -1,179 +0,0 @@ -import itertools -import os -import re - -import requests -from datetime import datetime, timedelta - -from bson import ObjectId -from lxml import etree - -from constants.spider import FILE_SUFFIX_LANG_MAPPING, LangType, SUFFIX_IGNORE, SpiderType, QueryType, ExtractType -from constants.task import TaskStatus -from db.manager import db_manager - - -def get_lang_by_stats(stats: dict) -> LangType: - """ - Get programming language provided suffix stats - :param stats: stats is generated by utils.file.get_file_suffix_stats - :return: - """ - try: - data = stats.items() - data = sorted(data, key=lambda item: item[1]) - data = list(filter(lambda item: item[0] not in SUFFIX_IGNORE, data)) - top_suffix = data[-1][0] - if FILE_SUFFIX_LANG_MAPPING.get(top_suffix) is not None: - return FILE_SUFFIX_LANG_MAPPING.get(top_suffix) - return LangType.OTHER - except IndexError as e: - pass - - -def get_spider_type(path: str) -> SpiderType: - """ - Get spider type - :param path: spider directory path - """ - for file_name in os.listdir(path): - if file_name == 'scrapy.cfg': - return SpiderType.SCRAPY - - -def get_spider_col_fields(col_name: str, task_id: str = None, limit: int = 100) -> list: - """ - Get spider collection fields - :param col_name: collection name - :param task_id: task_id - :param limit: limit - """ - filter_ = {} - if task_id is not None: - filter_['task_id'] = task_id - items = db_manager.list(col_name, filter_, limit=limit, sort_key='_id') - fields = set() - for item in items: - for k in item.keys(): - fields.add(k) - return list(fields) - - -def get_last_n_run_errors_count(spider_id: ObjectId, n: int) -> list: - tasks = db_manager.list(col_name='tasks', - cond={'spider_id': spider_id}, - sort_key='create_ts', - limit=n) - count = 0 - for task in tasks: - if task['status'] == TaskStatus.FAILURE: - count += 1 - return count - - -def get_last_n_day_tasks_count(spider_id: ObjectId, n: int) -> list: - return db_manager.count(col_name='tasks', - cond={ - 'spider_id': spider_id, - 'create_ts': { - '$gte': (datetime.now() - timedelta(n)) - } - }) - - -def get_list_page_data(spider, sel): - data = [] - if spider['item_selector_type'] == QueryType.XPATH: - items = sel.xpath(spider['item_selector']) - else: - items = sel.cssselect(spider['item_selector']) - for item in items: - row = {} - for f in spider['fields']: - if f['type'] == QueryType.CSS: - # css selector - res = item.cssselect(f['query']) - else: - # xpath - res = item.xpath(f['query']) - - if len(res) > 0: - if f['extract_type'] == ExtractType.TEXT: - row[f['name']] = res[0].text - else: - row[f['name']] = res[0].get(f['attribute']) - data.append(row) - return data - - -def get_detail_page_data(url, spider, idx, data): - r = requests.get(url) - - sel = etree.HTML(r.content) - - row = {} - for f in spider['detail_fields']: - if f['type'] == QueryType.CSS: - # css selector - res = sel.cssselect(f['query']) - else: - # xpath - res = sel.xpath(f['query']) - - if len(res) > 0: - if f['extract_type'] == ExtractType.TEXT: - row[f['name']] = res[0].text - else: - row[f['name']] = res[0].get(f['attribute']) - - # assign values - for k, v in row.items(): - data[idx][k] = v - - -def generate_urls(base_url: str) -> str: - url = base_url - - # number range list - list_arr = [] - for i, res in enumerate(re.findall(r'{(\d+),(\d+)}', base_url)): - try: - _min = int(res[0]) - _max = int(res[1]) - except ValueError as err: - raise ValueError(f'{base_url} is not a valid URL pattern') - - # list - _list = range(_min, _max + 1) - - # key - _key = f'n{i}' - - # append list and key - list_arr.append((_list, _key)) - - # replace url placeholder with key - url = url.replace('{' + res[0] + ',' + res[1] + '}', '{' + _key + '}', 1) - - # string list - for i, res in enumerate(re.findall(r'\[([\w\-,]+)\]', base_url)): - # list - _list = res.split(',') - - # key - _key = f's{i}' - - # append list and key - list_arr.append((_list, _key)) - - # replace url placeholder with key - url = url.replace('[' + ','.join(_list) + ']', '{' + _key + '}', 1) - - # combine together - _list_arr = [] - for res in itertools.product(*map(lambda x: x[0], list_arr)): - _url = url - for _arr, _rep in zip(list_arr, res): - _list, _key = _arr - _url = _url.replace('{' + _key + '}', str(_rep), 1) - yield _url diff --git a/crawlab/worker.py b/crawlab/worker.py deleted file mode 100644 index 07f36396..00000000 --- a/crawlab/worker.py +++ /dev/null @@ -1,19 +0,0 @@ -import sys -import os - -# make sure the working directory is in system path -file_dir = os.path.dirname(os.path.realpath(__file__)) -root_path = os.path.abspath(os.path.join(file_dir, '..')) -sys.path.append(root_path) - -from tasks.celery import celery_app - -# import necessary tasks -import tasks.spider -import tasks.deploy - -if __name__ == '__main__': - if 'win32' in sys.platform: - celery_app.start(argv=['tasks', 'worker', '-P', 'eventlet', '-E', '-l', 'INFO']) - else: - celery_app.start(argv=['tasks', 'worker', '-E', '-l', 'INFO']) From 9aad0ef8d1ec6c7c124367abdad8837ec71569f8 Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Tue, 30 Jul 2019 22:48:01 +0800 Subject: [PATCH 2/2] updated CHANGELOG.md --- CHANGELOG.md | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1f29c97a..b4204f16 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,8 +1,20 @@ # 0.3.0 (2019-07-31) ### Features / Enhancement -- **Golang**: Refactored code from Python backend to Golang, much more stability and performance. +- **Golang Backend**: Refactored code from Python backend to Golang, much more stability and performance. - **Node Network Graph**: Visualization of node typology. - **Node System Info**: Available to see system info including OS, CPUs and executables. +- **Node Monitoring Enhancement**: Nodes are monitored and registered through Redis. +- **File Management**: Available to edit spider files online, including code highlight. +- **Login/Regiser/User Management**: Require users to login to use Crawlab, allow user registration and user management, some role-based authorization. +- **Automatic Spider Deployment**: Spiders are deployed/synchronized to all online nodes automatically. +- **Smaller Docker Image**: Slimmed Docker image and reduced Docker image size from 1.3G to \~700M by applying Multi-Stage Build. + +### Bug Fixes +- **Node Status**. Node status does not change even though it goes offline actually. [#87](https://github.com/tikazyq/crawlab/issues/87) +- **Spider Deployment Error**. Fixed through Automatic Spider Deployment [#83](https://github.com/tikazyq/crawlab/issues/83) +- **Node not showing**. Node not able to show online [#81](https://github.com/tikazyq/crawlab/issues/81) +- **Cron Job not working**. Fixed through new Golang backend [#64](https://github.com/tikazyq/crawlab/issues/64) +- **Flower Error**. Fixed through new Golang backend [#57](https://github.com/tikazyq/crawlab/issues/57) # 0.2.4 (2019-07-07) ### Features / Enhancement