From 6b13975af45dc942fb19ed11efcba54b7cfb1498 Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Tue, 19 Feb 2019 22:55:52 +0800 Subject: [PATCH] added log file for task --- db/manager.py | 12 ++++++++---- routes/nodes.py | 16 ++++++++++++++++ routes/tasks.py | 18 +++++++++++++++++- tasks/spider.py | 46 +++++++++++++++++++++++++++++++++++++--------- 4 files changed, 78 insertions(+), 14 deletions(-) diff --git a/db/manager.py b/db/manager.py index fe3ffde2..8d1dceff 100644 --- a/db/manager.py +++ b/db/manager.py @@ -26,7 +26,10 @@ class DbManager(object): def update_one(self, col_name: str, id: str, values: dict, **kwargs): col = self.db[col_name] - col.find_one_and_update({'_id': ObjectId(id)}, {'$set': values}) + _id = id + if is_object_id(id): + _id = ObjectId(id) + col.find_one_and_update({'_id': _id}, {'$set': values}) def remove_one(self, col_name: str, id: str, **kwargs): col = self.db[col_name] @@ -39,13 +42,14 @@ class DbManager(object): data.append(item) return data - def get(self, col_name: str, id: str): - if is_object_id(id): + def get(self, col_name: str, id): + if type(id) == ObjectId: + _id = id + elif is_object_id(id): _id = ObjectId(id) else: _id = id col = self.db[col_name] - print(_id) return col.find_one({'_id': _id}) def count(self, col_name: str, cond): diff --git a/routes/nodes.py b/routes/nodes.py index ee5a1567..e30513cd 100644 --- a/routes/nodes.py +++ b/routes/nodes.py @@ -1,4 +1,7 @@ +import subprocess + from app import api +from config.celery import BROKER_URL from routes.base import BaseApi @@ -12,6 +15,19 @@ class NodeApi(BaseApi): ('description', str), ) + def _get(self, id=None): + if id is not None: + return { + } + + else: + p = subprocess.Popen(['celery', 'inspect', 'stats', '-b', BROKER_URL]) + stdout, stderr = p.communicate() + return { + 'stdout': stdout, + 'stderr': stderr, + } + api.add_resource(NodeApi, '/api/nodes', diff --git a/routes/tasks.py b/routes/tasks.py index 9d39e3bf..244694f5 100644 --- a/routes/tasks.py +++ b/routes/tasks.py @@ -1,15 +1,31 @@ from app import api +from db.manager import db_manager from routes.base import BaseApi +from utils import jsonify class TaskApi(BaseApi): - col_name = 'tasks_celery' + col_name = 'tasks' arguments = ( ('deploy_id', str), ('file_path', str) ) + def get(self, id=None): + tasks = db_manager.list('tasks', {}, limit=1000) + items = [] + for task in tasks: + _task = db_manager.get('tasks_celery', id=task['_id']) + _spider = db_manager.get('spiders', id=str(task['spider_id'])) + task['status'] = _task['status'] + task['spider_name'] = _spider['name'] + items.append(task) + return jsonify({ + 'status': 'ok', + 'items': items + }) + # add api to resources api.add_resource(TaskApi, diff --git a/tasks/spider.py b/tasks/spider.py index f0e67841..1807cb8e 100644 --- a/tasks/spider.py +++ b/tasks/spider.py @@ -3,6 +3,7 @@ import sys from datetime import datetime import requests +from bson import ObjectId from celery.utils.log import get_logger from config import PROJECT_FILE_FOLDER, PROJECT_LOGS_FOLDER @@ -13,8 +14,10 @@ import subprocess logger = get_logger(__name__) -@app.task -def execute_spider(id: str): +@app.task(bind=True) +def execute_spider(self, id: str): + task_id = self.request.id + hostname = self.request.hostname spider = db_manager.get('spiders', id=id) latest_version = db_manager.get_latest_version(spider_id=id) command = spider.get('cmd') @@ -30,19 +33,44 @@ def execute_spider(id: str): if not os.path.exists(log_path): os.makedirs(log_path) + # open log file streams + log_file_path = os.path.join(log_path, '%s.log' % datetime.now().strftime('%Y%m%d%H%M%S')) + stdout = open(log_file_path, 'a') + stderr = open(log_file_path, 'a') + + # create a new task + db_manager.save('tasks', { + '_id': task_id, + 'spider_id': ObjectId(id), + 'create_ts': datetime.now(), + 'hostname': hostname, + 'log_file_path': log_file_path, + }) + # execute the command p = subprocess.Popen(command, shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, + stdout=stdout.fileno(), + stderr=stderr.fileno(), cwd=current_working_directory, bufsize=1) - # output the log file - log_file_path = os.path.join(log_path, '%s.txt' % datetime.now().strftime('%Y%m%d%H%M%S')) - with open(log_file_path, 'a') as f: - for line in p.stdout.readlines(): - f.write(line.decode('utf-8') + '\n') + # get output from the process + _stdout, _stderr = p.communicate() + + # save task when the task is finished + db_manager.update_one('tasks', id=task_id, values={ + 'finish_ts': datetime.now(), + }) + task = db_manager.get('tasks', id=id) + + # close log file streams + stdout.flush() + stderr.flush() + stdout.close() + stderr.close() + + return task @app.task