From 06575c8ae51a6a399f9aaca6af562ba2ec5a22d7 Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Mon, 18 Feb 2019 13:27:57 +0800 Subject: [PATCH] update deploy and crawl functionality --- db/manager.py | 3 ++- routes/spiders.py | 55 +++++++++++++++++++++++++++++++++++++++++------ tasks/spider.py | 23 +++++++++++++++++--- 3 files changed, 70 insertions(+), 11 deletions(-) diff --git a/db/manager.py b/db/manager.py index b711618f..fe3ffde2 100644 --- a/db/manager.py +++ b/db/manager.py @@ -45,6 +45,7 @@ class DbManager(object): else: _id = id col = self.db[col_name] + print(_id) return col.find_one({'_id': _id}) def count(self, col_name: str, cond): @@ -54,7 +55,7 @@ class DbManager(object): def get_latest_version(self, spider_id): col = self.db['deploys'] for item in col.find({'spider_id': ObjectId(spider_id)}).sort('version', DESCENDING): - return item.version + return item.get('version') return None diff --git a/routes/spiders.py b/routes/spiders.py index d0ae53c6..6fb2096d 100644 --- a/routes/spiders.py +++ b/routes/spiders.py @@ -3,6 +3,7 @@ import json import os import shutil +from bson import ObjectId from flask_restful import reqparse, Resource from app import api @@ -10,6 +11,7 @@ from config import PROJECT_FILE_FOLDER from db.manager import db_manager from routes.base import BaseApi from tasks.spider import execute_spider +from utils import jsonify class SpiderApi(BaseApi): @@ -24,17 +26,56 @@ class SpiderApi(BaseApi): ) def crawl(self, id): + job = execute_spider.delay(id) print('crawl: %s' % id) + return { + 'code': 200, + 'status': 'ok', + 'task': { + 'id': job.id, + 'status': job.status + } + } def deploy(self, id): - args = self.parser.parse_args() + # get spider given the id spider = db_manager.get(col_name=self.col_name, id=id) - latest_version = db_manager.get_latest_version(id=id) - src = args.get('src') - dst = os.path.join(PROJECT_FILE_FOLDER, str(spider._id), latest_version + 1) - if not os.path.exists(dst): - os.mkdir(dst) - shutil.copytree(src=src, dst=dst) + if spider is None: + return + + # get latest version + latest_version = db_manager.get_latest_version(spider_id=id) + + # initialize version if no version found + if latest_version is None: + latest_version = 0 + + # make source / destination + src = spider.get('src') + dst = os.path.join(PROJECT_FILE_FOLDER, str(spider.get('_id')), str(latest_version + 1)) + + # copy files + try: + shutil.copytree(src=src, dst=dst) + return { + 'code': 200, + 'status': 'ok', + 'message': 'deploy success' + } + except Exception as err: + print(err) + return { + 'code': 500, + 'status': 'ok', + 'error': str(err) + } + finally: + version = latest_version + 1 + db_manager.save('deploys', { + 'spider_id': ObjectId(id), + 'version': version, + 'node_id': None # TODO: deploy to corresponding node + }) api.add_resource(SpiderApi, diff --git a/tasks/spider.py b/tasks/spider.py index fdb622fa..d7e2d801 100644 --- a/tasks/spider.py +++ b/tasks/spider.py @@ -1,14 +1,31 @@ +import os +import sys + import requests from celery.utils.log import get_logger + +from config import PROJECT_FILE_FOLDER +from db.manager import db_manager from tasks import app +import subprocess logger = get_logger(__name__) @app.task -def execute_spider(spider_name: str): - logger.info('spider_name: %s' % spider_name) - return spider_name +def execute_spider(id: str): + spider = db_manager.get('spiders', id=id) + latest_version = db_manager.get_latest_version(spider_id=id) + command = spider.get('cmd') + current_working_directory = os.path.join(PROJECT_FILE_FOLDER, str(spider.get('_id')), str(latest_version)) + p = subprocess.Popen(command, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + cwd=current_working_directory, + bufsize=1) + for i in iter(p.stdout.readline, 'b'): + yield i @app.task