diff --git a/Dockerfile b/Dockerfile index d567fa9d..df9c6029 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,2 +1,30 @@ +# images +#FROM python:latest FROM ubuntu:latest +# source files +ADD . /opt/crawlab + +# add dns +#RUN echo -e "nameserver 180.76.76.76" >> /etc/resolv.conf +#ADD ./resolv.conf /etc +RUN cat /etc/resolv.conf + +# install python +RUN apt-get update +RUN apt-get install python3 python3-pip net-tools iputils-ping + +# soft link +ln -s /usr/bin/pip3 /usr/local/bin/pip + +# install required libraries +RUN pip install -U setuptools +RUN pip install -r /opt/crawlab/requirements.txt + +# execute apps +WORKDIR /opt/crawlab +CMD python ./bin/run_worker.py +CMD python app.py + +# port +EXPOSE 5000 diff --git a/app.py b/app.py index 44446f79..e067a30f 100644 --- a/app.py +++ b/app.py @@ -1,8 +1,13 @@ +import subprocess +import sys +import threading + from celery import Celery from flask import Flask from flask_cors import CORS from flask_restful import Api, Resource +from config import BROKER_URL from routes.deploys import DeployApi from routes.files import FileApi from routes.nodes import NodeApi @@ -12,7 +17,7 @@ from routes.tasks import TaskApi # flask app instance app = Flask(__name__) -app.config.from_object('config.flask') +app.config.from_object('config') # init flask api instance api = Api(app) @@ -45,6 +50,24 @@ api.add_resource(StatsApi, '/api/stats', '/api/stats/') -# start flask app -if __name__ == '__main__': + +def run_app(): app.run() + + +def run_flower(): + p = subprocess.Popen(['celery', 'flower', '-b', BROKER_URL], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + for line in iter(p.stdout.readline, 'b'): + print(line.decode('utf-8')) + + +if __name__ == '__main__': + # start flower app + th_flower = threading.Thread(target=run_flower) + th_flower.start() + + # start flask app + # th_app = threading.Thread(target=run_app) + # th_app.start() + app.run() + diff --git a/bin/run_flower.py b/bin/run_flower.py index d3a6900c..9bc867ca 100644 --- a/bin/run_flower.py +++ b/bin/run_flower.py @@ -7,7 +7,7 @@ file_dir = os.path.dirname(os.path.realpath(__file__)) root_path = os.path.abspath(os.path.join(file_dir, '..')) sys.path.append(root_path) -from config.celery import BROKER_URL +from config import BROKER_URL if __name__ == '__main__': p = subprocess.Popen(['celery', 'flower', '-b', BROKER_URL], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) diff --git a/config.py b/config.py new file mode 100644 index 00000000..bca50bb6 --- /dev/null +++ b/config.py @@ -0,0 +1,24 @@ +# project variables +PROJECT_SOURCE_FILE_FOLDER = '/Users/yeqing/projects/crawlab/spiders' +PROJECT_DEPLOY_FILE_FOLDER = '/var/crawlab' +PROJECT_LOGS_FOLDER = '/Users/yeqing/projects/crawlab/logs/crawlab' +PROJECT_TMP_FOLDER = '/tmp' + +# celery variables +BROKER_URL = 'redis://localhost:6379/0' +CELERY_RESULT_BACKEND = 'mongodb://localhost:27017/' +CELERY_MONGODB_BACKEND_SETTINGS = { + 'database': 'crawlab_test', + 'taskmeta_collection': 'tasks_celery', +} +FLOWER_API_ENDPOINT = 'http://localhost:5555/api' + +# database variables +MONGO_HOST = 'localhost' +MONGO_PORT = 27017 +# MONGO_USER = 'test' +# MONGO_PASS = 'test' +MONGO_DB = 'crawlab_test' + +# flask variables +DEBUG = True diff --git a/config/__init__.py b/config/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/config/celery.py b/config/celery.py deleted file mode 100644 index 30d5455f..00000000 --- a/config/celery.py +++ /dev/null @@ -1,13 +0,0 @@ -BROKER_URL = 'redis://localhost:6379/0' -# BROKER_URL = 'mongodb://localhost:27017/' -CELERY_RESULT_BACKEND = 'mongodb://localhost:27017/' -# CELERY_RESULT_BACKEND = 'redis://localhost:6379/1' -# CELERY_TASK_SERIALIZER = 'json' -# CELERY_RESULT_SERIALIZER = 'json' -# CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间 -CELERY_MONGODB_BACKEND_SETTINGS = { - 'database': 'crawlab_test', - 'taskmeta_collection': 'tasks_celery', -} - -FLOWER_API_ENDPOINT = 'http://localhost:5555/api' diff --git a/config/common.py b/config/common.py deleted file mode 100644 index 36c6e4a3..00000000 --- a/config/common.py +++ /dev/null @@ -1,3 +0,0 @@ -PROJECT_SOURCE_FILE_FOLDER = '/Users/yeqing/projects/crawlab/spiders' -PROJECT_DEPLOY_FILE_FOLDER = '/var/crawlab' -PROJECT_LOGS_FOLDER = '/Users/yeqing/projects/crawlab/logs/crawlab' diff --git a/config/db.py b/config/db.py deleted file mode 100644 index ff628e95..00000000 --- a/config/db.py +++ /dev/null @@ -1,6 +0,0 @@ -# 数据库 -MONGO_HOST = 'localhost' -MONGO_PORT = 27017 -# MONGO_USER = 'test' -# MONGO_PASS = 'test' -MONGO_DB = 'crawlab_test' diff --git a/config/flask.py b/config/flask.py deleted file mode 100644 index 8d9a7ab2..00000000 --- a/config/flask.py +++ /dev/null @@ -1,3 +0,0 @@ -DEBUG = True -# SERVER_NAME = '0.0.0.0:5000' - diff --git a/db/manager.py b/db/manager.py index 7d2035c8..005c7cb1 100644 --- a/db/manager.py +++ b/db/manager.py @@ -1,7 +1,7 @@ from bson import ObjectId from mongoengine import connect from pymongo import MongoClient, DESCENDING -from config.db import MONGO_HOST, MONGO_PORT, MONGO_DB +from config import MONGO_HOST, MONGO_PORT, MONGO_DB from utils import is_object_id connect(db=MONGO_DB, host=MONGO_HOST, port=MONGO_PORT) diff --git a/routes/files.py b/routes/files.py index 8c087f7e..4f18f0ee 100644 --- a/routes/files.py +++ b/routes/files.py @@ -3,6 +3,7 @@ import os from flask_restful import reqparse, Resource from utils import jsonify +from utils.file import get_file_content class FileApi(Resource): @@ -14,16 +15,23 @@ class FileApi(Resource): self.parser.add_argument('path', type=str) def get(self, action=None): + args = self.parser.parse_args() + path = args.get('path') + if action is not None: if action == 'getDefaultPath': return jsonify({ 'defaultPath': os.path.abspath(os.path.join(os.path.curdir, 'spiders')) }) + + elif action == 'get_file': + file_data = get_file_content(path) + file_data['status'] = 'ok' + return jsonify(file_data) + else: return {} - args = self.parser.parse_args() - path = args.get('path') folders = [] files = [] for _path in os.listdir(path): @@ -36,4 +44,3 @@ class FileApi(Resource): 'files': sorted(files), 'folders': sorted(folders), }) - diff --git a/routes/nodes.py b/routes/nodes.py index f9f04dc5..094d5e7b 100644 --- a/routes/nodes.py +++ b/routes/nodes.py @@ -3,21 +3,22 @@ import json import requests from bson import ObjectId -from config.celery import FLOWER_API_ENDPOINT +from config import FLOWER_API_ENDPOINT from constants.node import NodeType from db.manager import db_manager from routes.base import BaseApi from utils import jsonify +from utils.node import check_nodes_status class NodeApi(BaseApi): col_name = 'nodes' arguments = ( - # ('ip', str), - # ('port', int), ('name', str), ('description', str), + ('ip', str), + ('port', str), ) def get(self, id=None, action=None): @@ -37,6 +38,9 @@ class NodeApi(BaseApi): # TODO: use query "?status=1" to get status of nodes + # get status for each node + status_data = check_nodes_status() + # get a list of items res = requests.get('%s/workers' % FLOWER_API_ENDPOINT) online_node_ids = [] @@ -52,7 +56,6 @@ class NodeApi(BaseApi): node[_k] = _v node['_id'] = node_name node['name'] = node_name - node['status'] = NodeType.ONLINE db_manager.save('nodes', node) # existing node @@ -60,7 +63,6 @@ class NodeApi(BaseApi): for _k, _v in v.items(): node[_k] = _v node['name'] = node_name - node['status'] = NodeType.ONLINE db_manager.save('nodes', node) online_node_ids.append(node_name) @@ -69,8 +71,9 @@ class NodeApi(BaseApi): nodes = [] items = db_manager.list('nodes', {}) for item in items: + node_status = status_data.get(item['name']) if item['_id'] in online_node_ids: - item['status'] = NodeType.ONLINE + item['status'] = NodeType.ONLINE if node_status else NodeType.OFFLINE else: item['status'] = NodeType.OFFLINE db_manager.update_one('nodes', item['_id'], { diff --git a/routes/spiders.py b/routes/spiders.py index 43ef6ae9..f3251da1 100644 --- a/routes/spiders.py +++ b/routes/spiders.py @@ -1,17 +1,26 @@ +import json import os import shutil from datetime import datetime +from random import random +import requests from bson import ObjectId +from flask_restful import reqparse +from werkzeug.datastructures import FileStorage -from config.common import PROJECT_DEPLOY_FILE_FOLDER, PROJECT_SOURCE_FILE_FOLDER +from config import PROJECT_DEPLOY_FILE_FOLDER, PROJECT_SOURCE_FILE_FOLDER, PROJECT_TMP_FOLDER from db.manager import db_manager from routes.base import BaseApi from tasks.spider import execute_spider from utils import jsonify -from utils.file import get_file_suffix_stats +from utils.deploy import zip_file, unzip_file +from utils.file import get_file_suffix_stats, get_file_suffix from utils.spider import get_lang_by_stats +parser = reqparse.RequestParser() +parser.add_argument('file', type=FileStorage, location='files') + class SpiderApi(BaseApi): col_name = 'spiders' @@ -106,7 +115,8 @@ class SpiderApi(BaseApi): if spider is None: return - # TODO: deploy spiders to other node rather than in local machine + # get node given the node + node = db_manager.get(col_name='nodes', id=node_id) # get latest version latest_version = db_manager.get_latest_version(spider_id=id) @@ -117,24 +127,48 @@ class SpiderApi(BaseApi): # make source / destination src = spider.get('src') - dst = os.path.join(PROJECT_DEPLOY_FILE_FOLDER, str(spider.get('_id')), str(latest_version + 1)) # copy files # TODO: multi-node copy files try: - shutil.copytree(src=src, dst=dst) - return { - 'code': 200, - 'status': 'ok', - 'message': 'deploy success' - } + # TODO: deploy spiders to other node rather than in local machine + output_file_name = '%s_%s.zip' % ( + datetime.now().strftime('%Y%m%d%H%M%S'), + str(random())[2:12] + ) + output_file_path = os.path.join(PROJECT_TMP_FOLDER, output_file_name) + + # zip source folder to zip file + zip_file(source_dir=src, + output_filename=output_file_path) + + # upload to api + files = {'file': open(output_file_path, 'rb')} + r = requests.post('http://%s:%s/api/spiders/%s/deploy_file' % (node.get('ip'), node.get('port'), id), + files=files) + + if r.status_code == 200: + return { + 'code': 200, + 'status': 'ok', + 'message': 'deploy success' + } + + else: + return { + 'code': r.status_code, + 'status': 'ok', + 'error': json.loads(r.content)['error'] + }, r.status_code + except Exception as err: print(err) return { - 'code': 500, - 'status': 'ok', - 'error': str(err) - } + 'code': 500, + 'status': 'ok', + 'error': str(err) + }, 500 + finally: version = latest_version + 1 db_manager.save('deploys', { @@ -144,8 +178,52 @@ class SpiderApi(BaseApi): 'finish_ts': datetime.now() }) + def deploy_file(self, id): + args = parser.parse_args() + f = args.file + + if get_file_suffix(f.filename) != 'zip': + return { + 'status': 'ok', + 'error': 'file type mismatch' + }, 400 + + # save zip file on temp folder + file_path = '%s/%s' % (PROJECT_TMP_FOLDER, f.filename) + with open(file_path, 'wb') as fw: + fw.write(f.stream.read()) + + # unzip zip file + dir_path = file_path.replace('.zip', '') + if os.path.exists(dir_path): + shutil.rmtree(dir_path) + unzip_file(file_path, dir_path) + + # get spider and version + spider = db_manager.get(col_name=self.col_name, id=id) + if spider is None: + return None, 400 + + # get version + latest_version = db_manager.get_latest_version(spider_id=id) + if latest_version is None: + latest_version = 0 + + # make source / destination + src = dir_path + dst = os.path.join(PROJECT_DEPLOY_FILE_FOLDER, str(spider.get('_id')), str(latest_version + 1)) + + # copy from source to destination + shutil.copytree(src=src, dst=dst) + + return { + 'code': 200, + 'status': 'ok', + 'message': 'deploy success' + } + def get_deploys(self, id): - items = db_manager.list('deploys', {'spider_id': ObjectId(id)}, limit=10, sort_key='create_ts') + items = db_manager.list('deploys', cond={'spider_id': ObjectId(id)}, limit=10, sort_key='create_ts') deploys = [] for item in items: spider_id = item['spider_id'] @@ -158,7 +236,7 @@ class SpiderApi(BaseApi): }) def get_tasks(self, id): - items = db_manager.list('tasks', {'spider_id': ObjectId(id)}, limit=10, sort_key='finish_ts') + items = db_manager.list('tasks', cond={'spider_id': ObjectId(id)}, limit=10, sort_key='finish_ts') for item in items: spider_id = item['spider_id'] spider = db_manager.get('spiders', id=str(spider_id)) diff --git a/tasks/celery.py b/tasks/celery.py index e5c7c93d..3def1b29 100644 --- a/tasks/celery.py +++ b/tasks/celery.py @@ -2,4 +2,4 @@ from celery import Celery # celery app instance celery_app = Celery(__name__) -celery_app.config_from_object('config.celery') +celery_app.config_from_object('config') diff --git a/tasks/deploy.py b/tasks/deploy.py index 7bb9bbce..935aa9b6 100644 --- a/tasks/deploy.py +++ b/tasks/deploy.py @@ -5,7 +5,7 @@ from datetime import datetime import requests from celery.utils.log import get_logger -from config.common import PROJECT_DEPLOY_FILE_FOLDER, PROJECT_LOGS_FOLDER +from config import PROJECT_DEPLOY_FILE_FOLDER, PROJECT_LOGS_FOLDER from db.manager import db_manager from .celery import celery_app import subprocess diff --git a/tasks/spider.py b/tasks/spider.py index 02019ec8..87a967da 100644 --- a/tasks/spider.py +++ b/tasks/spider.py @@ -6,7 +6,7 @@ import requests from bson import ObjectId from celery.utils.log import get_logger -from config.common import PROJECT_DEPLOY_FILE_FOLDER, PROJECT_LOGS_FOLDER +from config import PROJECT_DEPLOY_FILE_FOLDER, PROJECT_LOGS_FOLDER from db.manager import db_manager from .celery import celery_app import subprocess diff --git a/utils/deploy.py b/utils/deploy.py index 4cdae09b..305feb5a 100644 --- a/utils/deploy.py +++ b/utils/deploy.py @@ -2,7 +2,7 @@ import os, zipfile # 打包目录为zip文件(未压缩) -def make_zip(source_dir, output_filename): +def zip_file(source_dir, output_filename): zipf = zipfile.ZipFile(output_filename, 'w') pre_len = len(os.path.dirname(source_dir)) for parent, dirnames, filenames in os.walk(source_dir): @@ -11,3 +11,13 @@ def make_zip(source_dir, output_filename): arcname = pathfile[pre_len:].strip(os.path.sep) # 相对路径 zipf.write(pathfile, arcname) zipf.close() + + +def unzip_file(zip_src, dst_dir): + r = zipfile.is_zipfile(zip_src) + if r: + fz = zipfile.ZipFile(zip_src, 'r') + for file in fz.namelist(): + fz.extract(file, dst_dir) + else: + print('This is not zip') diff --git a/utils/file.py b/utils/file.py index 7c629f33..8ecc1c89 100644 --- a/utils/file.py +++ b/utils/file.py @@ -5,6 +5,14 @@ from collections import defaultdict SUFFIX_PATTERN = r'\.(\w{,10})$' suffix_regex = re.compile(SUFFIX_PATTERN, re.IGNORECASE) +SUFFIX_LANG_MAPPING = { + 'py': 'python', + 'js': 'javascript', + 'sh': 'shell', + 'java': 'java', + 'c': 'c', +} + def get_file_suffix(file_name: str): file_name = file_name.lower() @@ -32,3 +40,14 @@ def get_file_suffix_stats(path) -> dict: suffix = get_file_suffix(file_path) stats[suffix] += 1 return stats + + +def get_file_content(path) -> dict: + with open(path) as f: + suffix = get_file_suffix(path) + lang = SUFFIX_LANG_MAPPING.get(suffix) + return { + 'lang': lang, + 'suffix': suffix, + 'content': f.read() + } diff --git a/utils/node.py b/utils/node.py new file mode 100644 index 00000000..05bd30ed --- /dev/null +++ b/utils/node.py @@ -0,0 +1,10 @@ +import json + +import requests + +from config import FLOWER_API_ENDPOINT + + +def check_nodes_status(): + res = requests.get('%s/workers?status=1' % FLOWER_API_ENDPOINT) + return json.loads(res.content.decode('utf-8'))