added log file for task

This commit is contained in:
Marvin Zhang
2019-02-19 22:55:52 +08:00
parent 140a43d1b3
commit 1e03b44677
4 changed files with 78 additions and 14 deletions

View File

@@ -26,7 +26,10 @@ class DbManager(object):
def update_one(self, col_name: str, id: str, values: dict, **kwargs):
col = self.db[col_name]
col.find_one_and_update({'_id': ObjectId(id)}, {'$set': values})
_id = id
if is_object_id(id):
_id = ObjectId(id)
col.find_one_and_update({'_id': _id}, {'$set': values})
def remove_one(self, col_name: str, id: str, **kwargs):
col = self.db[col_name]
@@ -39,13 +42,14 @@ class DbManager(object):
data.append(item)
return data
def get(self, col_name: str, id: str):
if is_object_id(id):
def get(self, col_name: str, id):
if type(id) == ObjectId:
_id = id
elif is_object_id(id):
_id = ObjectId(id)
else:
_id = id
col = self.db[col_name]
print(_id)
return col.find_one({'_id': _id})
def count(self, col_name: str, cond):

View File

@@ -1,4 +1,7 @@
import subprocess
from app import api
from config.celery import BROKER_URL
from routes.base import BaseApi
@@ -12,6 +15,19 @@ class NodeApi(BaseApi):
('description', str),
)
def _get(self, id=None):
if id is not None:
return {
}
else:
p = subprocess.Popen(['celery', 'inspect', 'stats', '-b', BROKER_URL])
stdout, stderr = p.communicate()
return {
'stdout': stdout,
'stderr': stderr,
}
api.add_resource(NodeApi,
'/api/nodes',

View File

@@ -1,15 +1,31 @@
from app import api
from db.manager import db_manager
from routes.base import BaseApi
from utils import jsonify
class TaskApi(BaseApi):
col_name = 'tasks_celery'
col_name = 'tasks'
arguments = (
('deploy_id', str),
('file_path', str)
)
def get(self, id=None):
tasks = db_manager.list('tasks', {}, limit=1000)
items = []
for task in tasks:
_task = db_manager.get('tasks_celery', id=task['_id'])
_spider = db_manager.get('spiders', id=str(task['spider_id']))
task['status'] = _task['status']
task['spider_name'] = _spider['name']
items.append(task)
return jsonify({
'status': 'ok',
'items': items
})
# add api to resources
api.add_resource(TaskApi,

View File

@@ -3,6 +3,7 @@ import sys
from datetime import datetime
import requests
from bson import ObjectId
from celery.utils.log import get_logger
from config import PROJECT_FILE_FOLDER, PROJECT_LOGS_FOLDER
@@ -13,8 +14,10 @@ import subprocess
logger = get_logger(__name__)
@app.task
def execute_spider(id: str):
@app.task(bind=True)
def execute_spider(self, id: str):
task_id = self.request.id
hostname = self.request.hostname
spider = db_manager.get('spiders', id=id)
latest_version = db_manager.get_latest_version(spider_id=id)
command = spider.get('cmd')
@@ -30,19 +33,44 @@ def execute_spider(id: str):
if not os.path.exists(log_path):
os.makedirs(log_path)
# open log file streams
log_file_path = os.path.join(log_path, '%s.log' % datetime.now().strftime('%Y%m%d%H%M%S'))
stdout = open(log_file_path, 'a')
stderr = open(log_file_path, 'a')
# create a new task
db_manager.save('tasks', {
'_id': task_id,
'spider_id': ObjectId(id),
'create_ts': datetime.now(),
'hostname': hostname,
'log_file_path': log_file_path,
})
# execute the command
p = subprocess.Popen(command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdout=stdout.fileno(),
stderr=stderr.fileno(),
cwd=current_working_directory,
bufsize=1)
# output the log file
log_file_path = os.path.join(log_path, '%s.txt' % datetime.now().strftime('%Y%m%d%H%M%S'))
with open(log_file_path, 'a') as f:
for line in p.stdout.readlines():
f.write(line.decode('utf-8') + '\n')
# get output from the process
_stdout, _stderr = p.communicate()
# save task when the task is finished
db_manager.update_one('tasks', id=task_id, values={
'finish_ts': datetime.now(),
})
task = db_manager.get('tasks', id=id)
# close log file streams
stdout.flush()
stderr.flush()
stdout.close()
stderr.close()
return task
@app.task