diff --git a/Dockerfile b/Dockerfile index 275f5b0c..6202db5c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -10,7 +10,7 @@ RUN cat /etc/resolv.conf # install python RUN apt-get update -RUN apt-get install -y python3 python3-pip net-tools iputils-ping +RUN apt-get install -y python3 python3-pip net-tools iputils-ping vim # soft link RUN ln -s /usr/bin/pip3 /usr/local/bin/pip diff --git a/Dockerfile-task b/Dockerfile-task new file mode 100644 index 00000000..6202db5c --- /dev/null +++ b/Dockerfile-task @@ -0,0 +1,26 @@ +# images +#FROM python:latest +FROM ubuntu:latest + +# source files +ADD . /opt/crawlab + +# add dns +RUN cat /etc/resolv.conf + +# install python +RUN apt-get update +RUN apt-get install -y python3 python3-pip net-tools iputils-ping vim + +# soft link +RUN ln -s /usr/bin/pip3 /usr/local/bin/pip +RUN ln -s /usr/bin/python3 /usr/local/bin/python + +# install required libraries +RUN pip install -U setuptools +RUN pip install -r /opt/crawlab/requirements.txt + +# execute apps +WORKDIR /opt/crawlab +CMD python ./bin/run_worker.py +CMD python app.py diff --git a/app.py b/app.py index 3147580a..91ba55f6 100644 --- a/app.py +++ b/app.py @@ -1,8 +1,11 @@ +import os +import shutil + from flask import Flask, logging from flask_cors import CORS from flask_restful import Api -from config import FLASK_HOST, FLASK_PORT +from config import FLASK_HOST, FLASK_PORT, PROJECT_LOGS_FOLDER from routes.deploys import DeployApi from routes.files import FileApi from routes.nodes import NodeApi @@ -45,5 +48,9 @@ api.add_resource(StatsApi, '/api/stats', '/api/stats/') +# create folder if it does not exist +if os.path.exists(PROJECT_LOGS_FOLDER): + os.makedirs(PROJECT_LOGS_FOLDER) + if __name__ == '__main__': app.run(host=FLASK_HOST, port=FLASK_PORT) diff --git a/bin/run_flower.py b/bin/run_flower.py index 9bc867ca..da903c07 100644 --- a/bin/run_flower.py +++ b/bin/run_flower.py @@ -12,4 +12,5 @@ from config import BROKER_URL if __name__ == '__main__': p = subprocess.Popen(['celery', 'flower', '-b', BROKER_URL], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) for line in iter(p.stdout.readline, 'b'): - print(line.decode('utf-8')) + if line.decode('utf-8') != '': + print(line.decode('utf-8')) diff --git a/config.py b/config.py index 8fea5922..4135016b 100644 --- a/config.py +++ b/config.py @@ -1,12 +1,14 @@ # project variables PROJECT_SOURCE_FILE_FOLDER = '/Users/yeqing/projects/crawlab/spiders' PROJECT_DEPLOY_FILE_FOLDER = '/var/crawlab' -PROJECT_LOGS_FOLDER = '/Users/yeqing/projects/crawlab/logs/crawlab' +PROJECT_LOGS_FOLDER = '/var/logs/crawlab' PROJECT_TMP_FOLDER = '/tmp' # celery variables -BROKER_URL = 'redis://localhost:6379/0' -CELERY_RESULT_BACKEND = 'mongodb://localhost:27017/' +# BROKER_URL = 'redis://localhost:6379/0' +# CELERY_RESULT_BACKEND = 'mongodb://localhost:27017/' +BROKER_URL = 'redis://192.168.99.100:6379/0' +CELERY_RESULT_BACKEND = 'mongodb://192.168.99.100:27017/' CELERY_MONGODB_BACKEND_SETTINGS = { 'database': 'crawlab_test', 'taskmeta_collection': 'tasks_celery', @@ -16,7 +18,8 @@ CELERY_MONGODB_BACKEND_SETTINGS = { FLOWER_API_ENDPOINT = 'http://localhost:5555/api' # database variables -MONGO_HOST = 'localhost' +# MONGO_HOST = 'localhost' +MONGO_HOST = '192.168.99.100' MONGO_PORT = 27017 # MONGO_USER = 'test' # MONGO_PASS = 'test' diff --git a/db/manager.py b/db/manager.py index 005c7cb1..07f1c2db 100644 --- a/db/manager.py +++ b/db/manager.py @@ -38,7 +38,10 @@ class DbManager(object): def remove_one(self, col_name: str, id: str, **kwargs): col = self.db[col_name] - col.remove({'_id': ObjectId(id)}) + _id = id + if is_object_id(id): + _id = ObjectId(id) + col.remove({'_id': _id}) def list(self, col_name: str, cond: dict, sort_key=None, sort_direction=DESCENDING, skip: int = 0, limit: int = 100, **kwargs): @@ -70,12 +73,20 @@ class DbManager(object): col = self.db[col_name] return col.count(cond) - def get_latest_version(self, spider_id): + def get_latest_version(self, spider_id, node_id): col = self.db['deploys'] - for item in col.find({'spider_id': ObjectId(spider_id)}).sort('version', DESCENDING): + for item in col.find({'spider_id': ObjectId(spider_id), 'node_id': node_id}) \ + .sort('version', DESCENDING): return item.get('version') return None + def get_last_deploy(self, spider_id): + col = self.db['deploys'] + for item in col.find({'spider_id': ObjectId(spider_id)}) \ + .sort('finish_ts', DESCENDING): + return item + return None + def aggregate(self, col_name: str, pipelines, **kwargs): col = self.db[col_name] return col.aggregate(pipelines, **kwargs) diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 00000000..fe2bfb78 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,18 @@ +version: '3.3' # 表示该 Docker-Compose 文件使用的是 Version 2 file +services: + web: # 指定服务名称 + build: . # 指定 Dockerfile 所在路径 + ports: # 指定端口映射 + - "5001:5000" + task: + image: crawlab:v3 + db: + image: mongo + restart: always + ports: + - "27017:27017" + redis: + image: redis + restart: always + ports: + - "6379:6379" diff --git a/routes/spiders.py b/routes/spiders.py index dc719613..3bf546fa 100644 --- a/routes/spiders.py +++ b/routes/spiders.py @@ -6,6 +6,7 @@ from random import random import requests from bson import ObjectId +from flask import current_app, request from flask_restful import reqparse from werkzeug.datastructures import FileStorage @@ -81,7 +82,9 @@ class SpiderApi(BaseApi): items = db_manager.list('spiders', {}) for item in items: - item['latest_version'] = db_manager.get_latest_version(item['_id']) + last_deploy = db_manager.get_last_deploy(spider_id=str(item['_id'])) + if last_deploy: + item['update_ts'] = last_deploy['finish_ts'].strftime('%Y-%m-%d %H:%M:%S') return jsonify({ 'status': 'ok', @@ -111,12 +114,13 @@ class SpiderApi(BaseApi): }, 400 # dispatch crawl task - res = requests.get('http://%s:%s/api/spiders/%s/on_crawl' % ( + res = requests.get('http://%s:%s/api/spiders/%s/on_crawl?node_id=%s' % ( node.get('ip'), node.get('port'), - id - ), {'node_id', node_id}) - data = json.loads(res) + id, + node_id + )) + data = json.loads(res.content.decode('utf-8')) return { 'code': res.status_code, 'status': 'ok', @@ -152,7 +156,7 @@ class SpiderApi(BaseApi): node = db_manager.get(col_name='nodes', id=node_id) # get latest version - latest_version = db_manager.get_latest_version(spider_id=id) + latest_version = db_manager.get_latest_version(spider_id=id, node_id=node_id) # initialize version if no version found if latest_version is None: @@ -177,8 +181,12 @@ class SpiderApi(BaseApi): # upload to api files = {'file': open(output_file_path, 'rb')} - r = requests.post('http://%s:%s/api/spiders/%s/deploy_file' % (node.get('ip'), node.get('port'), id), - files=files) + r = requests.post('http://%s:%s/api/spiders/%s/deploy_file?node_id=%s' % ( + node.get('ip'), + node.get('port'), + id, + node_id, + ), files=files) if r.status_code == 200: return { @@ -191,11 +199,11 @@ class SpiderApi(BaseApi): return { 'code': r.status_code, 'status': 'ok', - 'error': json.loads(r.content)['error'] + 'error': r.content.decode('utf-8') }, r.status_code except Exception as err: - print(err) + current_app.logger.error(err) return { 'code': 500, 'status': 'ok', @@ -211,8 +219,9 @@ class SpiderApi(BaseApi): 'finish_ts': datetime.now() }) - def deploy_file(self, id): + def deploy_file(self, id=None): args = parser.parse_args() + node_id = request.args.get('node_id') f = args.file if get_file_suffix(f.filename) != 'zip': @@ -238,14 +247,19 @@ class SpiderApi(BaseApi): return None, 400 # get version - latest_version = db_manager.get_latest_version(spider_id=id) + latest_version = db_manager.get_latest_version(spider_id=id, node_id=node_id) if latest_version is None: latest_version = 0 # make source / destination - src = dir_path + src = os.path.join(dir_path, os.listdir(dir_path)[0]) + # src = dir_path dst = os.path.join(PROJECT_DEPLOY_FILE_FOLDER, str(spider.get('_id')), str(latest_version + 1)) + # logging info + current_app.logger.info('src: %s' % src) + current_app.logger.info('dst: %s' % dst) + # copy from source to destination shutil.copytree(src=src, dst=dst) @@ -256,7 +270,7 @@ class SpiderApi(BaseApi): } def get_deploys(self, id): - items = db_manager.list('deploys', cond={'spider_id': ObjectId(id)}, limit=10, sort_key='create_ts') + items = db_manager.list('deploys', cond={'spider_id': ObjectId(id)}, limit=10, sort_key='finish_ts') deploys = [] for item in items: spider_id = item['spider_id'] diff --git a/routes/tasks.py b/routes/tasks.py index 14e384d3..7e76b083 100644 --- a/routes/tasks.py +++ b/routes/tasks.py @@ -29,8 +29,11 @@ class TaskApi(BaseApi): task['status'] = _task['status'] task['result'] = _task['result'] task['spider_name'] = _spider['name'] - with open(task['log_file_path']) as f: - task['log'] = f.read() + try: + with open(task['log_file_path']) as f: + task['log'] = f.read() + except Exception as err: + task['log'] = '' return jsonify(task) tasks = db_manager.list('tasks', {}, limit=1000, sort_key='finish_ts') @@ -47,10 +50,17 @@ class TaskApi(BaseApi): }) def get_log(self, id): - task = db_manager.get('tasks', id=id) - with open(task['log_file_path']) as f: - log = f.read() + try: + task = db_manager.get('tasks', id=id) + with open(task['log_file_path']) as f: + log = f.read() + return { + 'status': 'ok', + 'log': log + } + except Exception as err: return { - 'status': 'ok', - 'log': log - } + 'code': 500, + 'status': 'ok', + 'error': str(err) + }, 500 diff --git a/tasks/spider.py b/tasks/spider.py index 87a967da..44e5843b 100644 --- a/tasks/spider.py +++ b/tasks/spider.py @@ -4,6 +4,7 @@ from datetime import datetime import requests from bson import ObjectId +from celery import current_app from celery.utils.log import get_logger from config import PROJECT_DEPLOY_FILE_FOLDER, PROJECT_LOGS_FOLDER @@ -19,11 +20,12 @@ def execute_spider(self, id: str, node_id: str): task_id = self.request.id hostname = self.request.hostname spider = db_manager.get('spiders', id=id) - latest_version = db_manager.get_latest_version(spider_id=id) + latest_version = db_manager.get_latest_version(spider_id=id, node_id=node_id) command = spider.get('cmd') current_working_directory = os.path.join(PROJECT_DEPLOY_FILE_FOLDER, str(spider.get('_id')), str(latest_version)) # log info + logger.info('current_working_directory: %s' % current_working_directory) logger.info('spider_id: %s' % id) logger.info('version: %s' % latest_version) logger.info(command)