From de3247269cbeefaeb66bc9d35dc83fb05288b644 Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Tue, 5 Mar 2019 13:22:23 +0800 Subject: [PATCH] updated methodologies of deployment and running spider --- Dockerfile | 2 +- app.py | 4 +- config.py | 4 +- constants/node.py | 2 +- constants/task.py | 8 ++ .../components/InfoView/SpiderInfoView.vue | 13 ++- frontend/src/store/modules/spider.js | 12 ++- frontend/src/views/spider/SpiderList.vue | 34 ++++++- routes/nodes.py | 69 ++------------ routes/spiders.py | 94 +++++++++++++------ routes/tasks.py | 11 ++- tasks/node.py | 7 ++ tasks/spider.py | 16 ++-- utils/node.py | 28 ++++++ 14 files changed, 193 insertions(+), 111 deletions(-) create mode 100644 constants/task.py create mode 100644 tasks/node.py diff --git a/Dockerfile b/Dockerfile index 6202db5c..b286d956 100644 --- a/Dockerfile +++ b/Dockerfile @@ -10,7 +10,7 @@ RUN cat /etc/resolv.conf # install python RUN apt-get update -RUN apt-get install -y python3 python3-pip net-tools iputils-ping vim +RUN apt-get install -y python3 python3-pip net-tools iputils-ping vim ntp # soft link RUN ln -s /usr/bin/pip3 /usr/local/bin/pip diff --git a/app.py b/app.py index ec94399c..7efd3af8 100644 --- a/app.py +++ b/app.py @@ -9,7 +9,7 @@ from config import FLASK_HOST, FLASK_PORT, PROJECT_LOGS_FOLDER from routes.deploys import DeployApi from routes.files import FileApi from routes.nodes import NodeApi -from routes.spiders import SpiderApi, SpiderImportApi +from routes.spiders import SpiderApi, SpiderImportApi, SpiderManageApi from routes.stats import StatsApi from routes.tasks import TaskApi @@ -30,6 +30,8 @@ api.add_resource(NodeApi, '/api/nodes//') api.add_resource(SpiderImportApi, '/api/spiders/import/') +api.add_resource(SpiderManageApi, + '/api/spiders/manage/') api.add_resource(SpiderApi, '/api/spiders', '/api/spiders/', diff --git a/config.py b/config.py index 015c5c5d..6b1575af 100644 --- a/config.py +++ b/config.py @@ -1,4 +1,6 @@ # project variables +from celery.schedules import crontab + PROJECT_SOURCE_FILE_FOLDER = '/Users/yeqing/projects/crawlab/spiders' PROJECT_DEPLOY_FILE_FOLDER = '/var/crawlab' PROJECT_LOGS_FOLDER = '/var/logs/crawlab' @@ -14,6 +16,7 @@ CELERY_MONGODB_BACKEND_SETTINGS = { 'taskmeta_collection': 'tasks_celery', } CELERY_TIMEZONE = 'Asia/Shanghai' +CELERY_ENABLE_UTC = True # flower variables FLOWER_API_ENDPOINT = 'http://localhost:5555/api' @@ -30,4 +33,3 @@ MONGO_DB = 'crawlab_test' DEBUG = True FLASK_HOST = '0.0.0.0' FLASK_PORT = 5000 -# SERVER_NAME = '0.0.0.0:5000' diff --git a/constants/node.py b/constants/node.py index 8dc2f494..4cdc3fda 100644 --- a/constants/node.py +++ b/constants/node.py @@ -1,3 +1,3 @@ -class NodeType: +class NodeStatus: ONLINE = 'online' OFFLINE = 'offline' diff --git a/constants/task.py b/constants/task.py new file mode 100644 index 00000000..07e169b3 --- /dev/null +++ b/constants/task.py @@ -0,0 +1,8 @@ +class TaskStatus: + PENDING = 'PENDING' + STARTED = 'STARTED' + SUCCESS = 'SUCCESS' + FAILURE = 'FAILURE' + RETRY = 'RETRY' + REVOKED = 'REVOKED' + UNAVAILABLE = 'UNAVAILABLE' diff --git a/frontend/src/components/InfoView/SpiderInfoView.vue b/frontend/src/components/InfoView/SpiderInfoView.vue index 94cec531..1ed15fdc 100644 --- a/frontend/src/components/InfoView/SpiderInfoView.vue +++ b/frontend/src/components/InfoView/SpiderInfoView.vue @@ -71,10 +71,19 @@ export default { }, methods: { onRun () { + const row = this.spiderForm this.$refs['spiderForm'].validate(res => { if (res) { - this.$store.commit('dialogView/SET_DIALOG_VISIBLE', true) - this.$store.commit('dialogView/SET_DIALOG_TYPE', 'spiderRun') + this.$confirm('Are you sure to run this spider', 'Notice', { + confirmButtonText: 'Confirm', + cancelButtonText: 'Cancel' + }) + .then(() => { + this.$store.dispatch('spider/crawlSpider', { id: row._id.$oid }) + .then(() => { + this.$message.success(`Running spider "${row._id.$oid}" has been scheduled`) + }) + }) } }) }, diff --git a/frontend/src/store/modules/spider.js b/frontend/src/store/modules/spider.js index 8d0ad719..928f71d6 100644 --- a/frontend/src/store/modules/spider.js +++ b/frontend/src/store/modules/spider.js @@ -87,10 +87,8 @@ const actions = { }) }, crawlSpider ({ state, dispatch }, payload) { - const { id, nodeId } = payload - return request.post(`/spiders/${id}/crawl`, { - node_id: nodeId - }) + const { id } = payload + return request.post(`/spiders/${id}/on_crawl`) .then(response => { console.log(response.data) }) @@ -124,6 +122,12 @@ const actions = { .then(response => { console.log(response) }) + }, + deployAll () { + return request.post('/spiders/manage/deploy_all') + .then(response => { + console.log(response) + }) } } diff --git a/frontend/src/views/spider/SpiderList.vue b/frontend/src/views/spider/SpiderList.vue index b6051c11..3e7d1589 100644 --- a/frontend/src/views/spider/SpiderList.vue +++ b/frontend/src/views/spider/SpiderList.vue @@ -36,7 +36,10 @@ @change="onSearch">
- + + Deploy All + + Import Spiders { + this.$store.dispatch('spider/crawlSpider', { id: row._id.$oid }) + .then(() => { + this.$message.success(`Running spider "${row._id.$oid}" has been scheduled`) + }) + }) }, onView (row) { this.$router.push(`/spiders/${row._id.$oid}`) @@ -257,9 +267,10 @@ export default { this.$refs.importForm.validate(valid => { if (valid) { this.importLoading = true + // TODO: switch between github / gitlab / svn this.$store.dispatch('spider/importGithub') .then(response => { - this.$message.success('Import repo sucessfully') + this.$message.success('Import repo successfully') this.$store.dispatch('spider/getSpiderList') }) .catch(response => { @@ -274,6 +285,19 @@ export default { }, openImportDialog () { this.dialogVisible = true + }, + onDeployAll () { + this.$confirm('Are you sure to deploy all spiders to active nodes?', 'Notice', { + confirmButtonText: 'Confirm', + cancelButtonText: 'Cancel', + type: 'warning' + }) + .then(() => { + this.$store.dispatch('spider/deployAll') + .then(() => { + this.$message.success('Deployed all spiders successfully') + }) + }) } }, created () { diff --git a/routes/nodes.py b/routes/nodes.py index 2d3e692c..36a37114 100644 --- a/routes/nodes.py +++ b/routes/nodes.py @@ -1,14 +1,7 @@ -import json - -import requests -from flask import current_app - -from config import FLOWER_API_ENDPOINT -from constants.node import NodeType from db.manager import db_manager from routes.base import BaseApi from utils import jsonify -from utils.node import check_nodes_status +from utils.node import update_nodes_status class NodeApi(BaseApi): @@ -36,62 +29,18 @@ class NodeApi(BaseApi): elif id is not None: return db_manager.get('nodes', id=id) - # TODO: use query "?status=1" to get status of nodes - - # get status for each node - status_data = {} - try: - status_data = check_nodes_status() - except Exception as err: - current_app.logger.error(err) - # get a list of items - online_node_ids = [] - try: - res = requests.get('%s/workers' % FLOWER_API_ENDPOINT) - for k, v in json.loads(res.content.decode('utf-8')).items(): - node_name = k - node_celery = v - node = db_manager.get('nodes', id=node_name) + else: + # get a list of active nodes from flower and save to db + update_nodes_status() - # new node - if node is None: - node = {} - for _k, _v in node_celery.items(): - node[_k] = _v - node['_id'] = node_name - node['name'] = node_name - db_manager.save('nodes', node) + # iterate db nodes to update status + nodes = db_manager.list('nodes', {}) - # existing node - else: - for _k, _v in v.items(): - node[_k] = _v - node['name'] = node_name - db_manager.save('nodes', node) - - online_node_ids.append(node_name) - except Exception as err: - current_app.logger.error(err) - - # iterate db nodes to update status - nodes = [] - items = db_manager.list('nodes', {}) - for item in items: - node_status = status_data.get(item['name']) - if item['_id'] in online_node_ids: - item['status'] = NodeType.ONLINE if node_status else NodeType.OFFLINE - else: - item['status'] = NodeType.OFFLINE - db_manager.update_one('nodes', item['_id'], { - 'status': item['status'] + return jsonify({ + 'status': 'ok', + 'items': nodes }) - nodes.append(item) - - return jsonify({ - 'status': 'ok', - 'items': nodes - }) def get_spiders(self, id=None): items = db_manager.list('spiders') diff --git a/routes/spiders.py b/routes/spiders.py index 81b7617c..cd4f242c 100644 --- a/routes/spiders.py +++ b/routes/spiders.py @@ -12,6 +12,8 @@ from flask_restful import reqparse, Resource from werkzeug.datastructures import FileStorage from config import PROJECT_DEPLOY_FILE_FOLDER, PROJECT_SOURCE_FILE_FOLDER, PROJECT_TMP_FOLDER +from constants.node import NodeStatus +from constants.task import TaskStatus from db.manager import db_manager from routes.base import BaseApi from tasks.spider import execute_spider @@ -55,6 +57,7 @@ class SpiderApi(BaseApi): # get a list of items else: + items = [] dirs = os.listdir(PROJECT_SOURCE_FILE_FOLDER) for _dir in dirs: dir_path = os.path.join(PROJECT_SOURCE_FILE_FOLDER, _dir) @@ -81,16 +84,13 @@ class SpiderApi(BaseApi): 'suffix_stats': stats, }) - items = db_manager.list('spiders', {}) - for item in items: - last_deploy = db_manager.get_last_deploy(spider_id=str(item['_id'])) - if last_deploy: - item['update_ts'] = last_deploy['finish_ts'].strftime('%Y-%m-%d %H:%M:%S') + # append spider + items.append(spider) - return jsonify({ - 'status': 'ok', - 'items': items - }) + return jsonify({ + 'status': 'ok', + 'items': items + }) def crawl(self, id): args = self.parser.parse_args() @@ -131,9 +131,8 @@ class SpiderApi(BaseApi): def on_crawl(self, id): args = self.parser.parse_args() - node_id = args.get('node_id') - job = execute_spider.delay(id, node_id) + job = execute_spider.delay(id) return { 'code': 200, @@ -156,13 +155,6 @@ class SpiderApi(BaseApi): # get node given the node node = db_manager.get(col_name='nodes', id=node_id) - # get latest version - latest_version = db_manager.get_latest_version(spider_id=id, node_id=node_id) - - # initialize version if no version found - if latest_version is None: - latest_version = 0 - # make source / destination src = spider.get('src') @@ -238,15 +230,10 @@ class SpiderApi(BaseApi): if spider is None: return None, 400 - # get version - latest_version = db_manager.get_latest_version(spider_id=id, node_id=node_id) - if latest_version is None: - latest_version = 0 - # make source / destination src = os.path.join(dir_path, os.listdir(dir_path)[0]) # src = dir_path - dst = os.path.join(PROJECT_DEPLOY_FILE_FOLDER, str(spider.get('_id')), str(latest_version + 1)) + dst = os.path.join(PROJECT_DEPLOY_FILE_FOLDER, str(spider.get('_id'))) # logging info current_app.logger.info('src: %s' % src) @@ -261,10 +248,8 @@ class SpiderApi(BaseApi): # save to db # TODO: task management for deployment - version = latest_version + 1 db_manager.save('deploys', { 'spider_id': ObjectId(id), - 'version': version, 'node_id': node_id, 'finish_ts': datetime.now() }) @@ -298,7 +283,7 @@ class SpiderApi(BaseApi): if task is not None: item['status'] = task['status'] else: - item['status'] = 'UNAVAILABLE' + item['status'] = TaskStatus.UNAVAILABLE return jsonify({ 'status': 'ok', 'items': items @@ -363,3 +348,58 @@ class SpiderImportApi(Resource): 'status': 'ok', 'message': 'success' } + + +class SpiderManageApi(Resource): + parser = reqparse.RequestParser() + arguments = [ + ('url', str) + ] + + def post(self, action): + if not hasattr(self, action): + return { + 'status': 'ok', + 'code': 400, + 'error': 'action "%s" invalid' % action + }, 400 + + return getattr(self, action)() + + def deploy_all(self): + # active nodes + nodes = db_manager.list('nodes', {'status': NodeStatus.ONLINE}) + + # all spiders + spiders = db_manager.list('spiders', {'cmd': {'$exists': True}}) + + # iterate all nodes + for node in nodes: + node_id = node['_id'] + for spider in spiders: + spider_id = spider['_id'] + spider_src = spider['src'] + + output_file_name = '%s_%s.zip' % ( + datetime.now().strftime('%Y%m%d%H%M%S'), + str(random())[2:12] + ) + output_file_path = os.path.join(PROJECT_TMP_FOLDER, output_file_name) + + # zip source folder to zip file + zip_file(source_dir=spider_src, + output_filename=output_file_path) + + # upload to api + files = {'file': open(output_file_path, 'rb')} + r = requests.post('http://%s:%s/api/spiders/%s/deploy_file?node_id=%s' % ( + node.get('ip'), + node.get('port'), + spider_id, + node_id, + ), files=files) + + return { + 'status': 'ok', + 'message': 'success' + } diff --git a/routes/tasks.py b/routes/tasks.py index 7e76b083..62854737 100644 --- a/routes/tasks.py +++ b/routes/tasks.py @@ -1,3 +1,4 @@ +from constants.task import TaskStatus from db.manager import db_manager from routes.base import BaseApi from utils import jsonify @@ -26,7 +27,10 @@ class TaskApi(BaseApi): task = db_manager.get('tasks', id=id) _task = db_manager.get('tasks_celery', id=task['_id']) _spider = db_manager.get('spiders', id=str(task['spider_id'])) - task['status'] = _task['status'] + if _task: + task['status'] = _task['status'] + else: + task['status'] = TaskStatus.UNAVAILABLE task['result'] = _task['result'] task['spider_name'] = _spider['name'] try: @@ -41,7 +45,10 @@ class TaskApi(BaseApi): for task in tasks: _task = db_manager.get('tasks_celery', id=task['_id']) _spider = db_manager.get('spiders', id=str(task['spider_id'])) - task['status'] = _task['status'] + if _task: + task['status'] = _task['status'] + else: + task['status'] = TaskStatus.UNAVAILABLE task['spider_name'] = _spider['name'] items.append(task) return jsonify({ diff --git a/tasks/node.py b/tasks/node.py new file mode 100644 index 00000000..9747cef9 --- /dev/null +++ b/tasks/node.py @@ -0,0 +1,7 @@ +from utils import node +from .celery import celery_app + + +@celery_app.task +def update_node_status(): + node.update_nodes_status(refresh=True) diff --git a/tasks/spider.py b/tasks/spider.py index 44e5843b..6098061b 100644 --- a/tasks/spider.py +++ b/tasks/spider.py @@ -16,22 +16,22 @@ logger = get_logger(__name__) @celery_app.task(bind=True) -def execute_spider(self, id: str, node_id: str): +def execute_spider(self, id: str): + print(self.state) task_id = self.request.id hostname = self.request.hostname spider = db_manager.get('spiders', id=id) - latest_version = db_manager.get_latest_version(spider_id=id, node_id=node_id) command = spider.get('cmd') - current_working_directory = os.path.join(PROJECT_DEPLOY_FILE_FOLDER, str(spider.get('_id')), str(latest_version)) + + current_working_directory = os.path.join(PROJECT_DEPLOY_FILE_FOLDER, str(spider.get('_id'))) # log info logger.info('current_working_directory: %s' % current_working_directory) logger.info('spider_id: %s' % id) - logger.info('version: %s' % latest_version) logger.info(command) # make sure the log folder exists - log_path = os.path.join(PROJECT_LOGS_FOLDER, id, str(latest_version)) + log_path = os.path.join(PROJECT_LOGS_FOLDER, id) if not os.path.exists(log_path): os.makedirs(log_path) @@ -45,17 +45,19 @@ def execute_spider(self, id: str, node_id: str): '_id': task_id, 'spider_id': ObjectId(id), 'create_ts': datetime.now(), - 'node_id': node_id, + 'node_id': hostname, 'hostname': hostname, 'log_file_path': log_file_path, - 'spider_version': latest_version }) # execute the command + env = os.environ.copy() + env['CRAWLAB_TASK_ID'] = task_id p = subprocess.Popen(command.split(' '), stdout=stdout.fileno(), stderr=stderr.fileno(), cwd=current_working_directory, + env=env, bufsize=1) # get output from the process diff --git a/utils/node.py b/utils/node.py index 05bd30ed..c6cd47be 100644 --- a/utils/node.py +++ b/utils/node.py @@ -3,8 +3,36 @@ import json import requests from config import FLOWER_API_ENDPOINT +from constants.node import NodeStatus +from db.manager import db_manager def check_nodes_status(): res = requests.get('%s/workers?status=1' % FLOWER_API_ENDPOINT) return json.loads(res.content.decode('utf-8')) + + +def update_nodes_status(refresh=False): + online_node_ids = [] + url = '%s/workers?status=1' % FLOWER_API_ENDPOINT + if refresh: + url += '&refresh=1' + res = requests.get(url) + for k, v in json.loads(res.content.decode('utf-8')).items(): + node_name = k + node_status = NodeStatus.ONLINE if v else NodeStatus.OFFLINE + # node_celery = v + node = db_manager.get('nodes', id=node_name) + + # new node + if node is None: + node = {'_id': node_name, 'name': node_name, 'status': node_status} + db_manager.save('nodes', node) + + else: + node['status'] = node_status + db_manager.save('nodes', node) + + if node_status: + online_node_ids.append(node_name) + return online_node_ids