fixed deploy/run task issue

This commit is contained in:
Marvin Zhang
2019-03-02 09:37:55 +08:00
parent 7d98cbddf8
commit 7b88488966
10 changed files with 125 additions and 33 deletions

View File

@@ -10,7 +10,7 @@ RUN cat /etc/resolv.conf
# install python
RUN apt-get update
RUN apt-get install -y python3 python3-pip net-tools iputils-ping
RUN apt-get install -y python3 python3-pip net-tools iputils-ping vim
# soft link
RUN ln -s /usr/bin/pip3 /usr/local/bin/pip

26
Dockerfile-task Normal file
View File

@@ -0,0 +1,26 @@
# images
#FROM python:latest
FROM ubuntu:latest
# source files
ADD . /opt/crawlab
# add dns
RUN cat /etc/resolv.conf
# install python
RUN apt-get update
RUN apt-get install -y python3 python3-pip net-tools iputils-ping vim
# soft link
RUN ln -s /usr/bin/pip3 /usr/local/bin/pip
RUN ln -s /usr/bin/python3 /usr/local/bin/python
# install required libraries
RUN pip install -U setuptools
RUN pip install -r /opt/crawlab/requirements.txt
# execute apps
WORKDIR /opt/crawlab
CMD python ./bin/run_worker.py
CMD python app.py

9
app.py
View File

@@ -1,8 +1,11 @@
import os
import shutil
from flask import Flask, logging
from flask_cors import CORS
from flask_restful import Api
from config import FLASK_HOST, FLASK_PORT
from config import FLASK_HOST, FLASK_PORT, PROJECT_LOGS_FOLDER
from routes.deploys import DeployApi
from routes.files import FileApi
from routes.nodes import NodeApi
@@ -45,5 +48,9 @@ api.add_resource(StatsApi,
'/api/stats',
'/api/stats/<string:action>')
# create folder if it does not exist
if os.path.exists(PROJECT_LOGS_FOLDER):
os.makedirs(PROJECT_LOGS_FOLDER)
if __name__ == '__main__':
app.run(host=FLASK_HOST, port=FLASK_PORT)

View File

@@ -12,4 +12,5 @@ from config import BROKER_URL
if __name__ == '__main__':
p = subprocess.Popen(['celery', 'flower', '-b', BROKER_URL], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
for line in iter(p.stdout.readline, 'b'):
print(line.decode('utf-8'))
if line.decode('utf-8') != '':
print(line.decode('utf-8'))

View File

@@ -1,12 +1,14 @@
# project variables
PROJECT_SOURCE_FILE_FOLDER = '/Users/yeqing/projects/crawlab/spiders'
PROJECT_DEPLOY_FILE_FOLDER = '/var/crawlab'
PROJECT_LOGS_FOLDER = '/Users/yeqing/projects/crawlab/logs/crawlab'
PROJECT_LOGS_FOLDER = '/var/logs/crawlab'
PROJECT_TMP_FOLDER = '/tmp'
# celery variables
BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'mongodb://localhost:27017/'
# BROKER_URL = 'redis://localhost:6379/0'
# CELERY_RESULT_BACKEND = 'mongodb://localhost:27017/'
BROKER_URL = 'redis://192.168.99.100:6379/0'
CELERY_RESULT_BACKEND = 'mongodb://192.168.99.100:27017/'
CELERY_MONGODB_BACKEND_SETTINGS = {
'database': 'crawlab_test',
'taskmeta_collection': 'tasks_celery',
@@ -16,7 +18,8 @@ CELERY_MONGODB_BACKEND_SETTINGS = {
FLOWER_API_ENDPOINT = 'http://localhost:5555/api'
# database variables
MONGO_HOST = 'localhost'
# MONGO_HOST = 'localhost'
MONGO_HOST = '192.168.99.100'
MONGO_PORT = 27017
# MONGO_USER = 'test'
# MONGO_PASS = 'test'

View File

@@ -38,7 +38,10 @@ class DbManager(object):
def remove_one(self, col_name: str, id: str, **kwargs):
col = self.db[col_name]
col.remove({'_id': ObjectId(id)})
_id = id
if is_object_id(id):
_id = ObjectId(id)
col.remove({'_id': _id})
def list(self, col_name: str, cond: dict, sort_key=None, sort_direction=DESCENDING, skip: int = 0, limit: int = 100,
**kwargs):
@@ -70,12 +73,20 @@ class DbManager(object):
col = self.db[col_name]
return col.count(cond)
def get_latest_version(self, spider_id):
def get_latest_version(self, spider_id, node_id):
col = self.db['deploys']
for item in col.find({'spider_id': ObjectId(spider_id)}).sort('version', DESCENDING):
for item in col.find({'spider_id': ObjectId(spider_id), 'node_id': node_id}) \
.sort('version', DESCENDING):
return item.get('version')
return None
def get_last_deploy(self, spider_id):
col = self.db['deploys']
for item in col.find({'spider_id': ObjectId(spider_id)}) \
.sort('finish_ts', DESCENDING):
return item
return None
def aggregate(self, col_name: str, pipelines, **kwargs):
col = self.db[col_name]
return col.aggregate(pipelines, **kwargs)

18
docker-compose.yml Normal file
View File

@@ -0,0 +1,18 @@
version: '3.3' # 表示该 Docker-Compose 文件使用的是 Version 2 file
services:
web: # 指定服务名称
build: . # 指定 Dockerfile 所在路径
ports: # 指定端口映射
- "5001:5000"
task:
image: crawlab:v3
db:
image: mongo
restart: always
ports:
- "27017:27017"
redis:
image: redis
restart: always
ports:
- "6379:6379"

View File

@@ -6,6 +6,7 @@ from random import random
import requests
from bson import ObjectId
from flask import current_app, request
from flask_restful import reqparse
from werkzeug.datastructures import FileStorage
@@ -81,7 +82,9 @@ class SpiderApi(BaseApi):
items = db_manager.list('spiders', {})
for item in items:
item['latest_version'] = db_manager.get_latest_version(item['_id'])
last_deploy = db_manager.get_last_deploy(spider_id=str(item['_id']))
if last_deploy:
item['update_ts'] = last_deploy['finish_ts'].strftime('%Y-%m-%d %H:%M:%S')
return jsonify({
'status': 'ok',
@@ -111,12 +114,13 @@ class SpiderApi(BaseApi):
}, 400
# dispatch crawl task
res = requests.get('http://%s:%s/api/spiders/%s/on_crawl' % (
res = requests.get('http://%s:%s/api/spiders/%s/on_crawl?node_id=%s' % (
node.get('ip'),
node.get('port'),
id
), {'node_id', node_id})
data = json.loads(res)
id,
node_id
))
data = json.loads(res.content.decode('utf-8'))
return {
'code': res.status_code,
'status': 'ok',
@@ -152,7 +156,7 @@ class SpiderApi(BaseApi):
node = db_manager.get(col_name='nodes', id=node_id)
# get latest version
latest_version = db_manager.get_latest_version(spider_id=id)
latest_version = db_manager.get_latest_version(spider_id=id, node_id=node_id)
# initialize version if no version found
if latest_version is None:
@@ -177,8 +181,12 @@ class SpiderApi(BaseApi):
# upload to api
files = {'file': open(output_file_path, 'rb')}
r = requests.post('http://%s:%s/api/spiders/%s/deploy_file' % (node.get('ip'), node.get('port'), id),
files=files)
r = requests.post('http://%s:%s/api/spiders/%s/deploy_file?node_id=%s' % (
node.get('ip'),
node.get('port'),
id,
node_id,
), files=files)
if r.status_code == 200:
return {
@@ -191,11 +199,11 @@ class SpiderApi(BaseApi):
return {
'code': r.status_code,
'status': 'ok',
'error': json.loads(r.content)['error']
'error': r.content.decode('utf-8')
}, r.status_code
except Exception as err:
print(err)
current_app.logger.error(err)
return {
'code': 500,
'status': 'ok',
@@ -211,8 +219,9 @@ class SpiderApi(BaseApi):
'finish_ts': datetime.now()
})
def deploy_file(self, id):
def deploy_file(self, id=None):
args = parser.parse_args()
node_id = request.args.get('node_id')
f = args.file
if get_file_suffix(f.filename) != 'zip':
@@ -238,14 +247,19 @@ class SpiderApi(BaseApi):
return None, 400
# get version
latest_version = db_manager.get_latest_version(spider_id=id)
latest_version = db_manager.get_latest_version(spider_id=id, node_id=node_id)
if latest_version is None:
latest_version = 0
# make source / destination
src = dir_path
src = os.path.join(dir_path, os.listdir(dir_path)[0])
# src = dir_path
dst = os.path.join(PROJECT_DEPLOY_FILE_FOLDER, str(spider.get('_id')), str(latest_version + 1))
# logging info
current_app.logger.info('src: %s' % src)
current_app.logger.info('dst: %s' % dst)
# copy from source to destination
shutil.copytree(src=src, dst=dst)
@@ -256,7 +270,7 @@ class SpiderApi(BaseApi):
}
def get_deploys(self, id):
items = db_manager.list('deploys', cond={'spider_id': ObjectId(id)}, limit=10, sort_key='create_ts')
items = db_manager.list('deploys', cond={'spider_id': ObjectId(id)}, limit=10, sort_key='finish_ts')
deploys = []
for item in items:
spider_id = item['spider_id']

View File

@@ -29,8 +29,11 @@ class TaskApi(BaseApi):
task['status'] = _task['status']
task['result'] = _task['result']
task['spider_name'] = _spider['name']
with open(task['log_file_path']) as f:
task['log'] = f.read()
try:
with open(task['log_file_path']) as f:
task['log'] = f.read()
except Exception as err:
task['log'] = ''
return jsonify(task)
tasks = db_manager.list('tasks', {}, limit=1000, sort_key='finish_ts')
@@ -47,10 +50,17 @@ class TaskApi(BaseApi):
})
def get_log(self, id):
task = db_manager.get('tasks', id=id)
with open(task['log_file_path']) as f:
log = f.read()
try:
task = db_manager.get('tasks', id=id)
with open(task['log_file_path']) as f:
log = f.read()
return {
'status': 'ok',
'log': log
}
except Exception as err:
return {
'status': 'ok',
'log': log
}
'code': 500,
'status': 'ok',
'error': str(err)
}, 500

View File

@@ -4,6 +4,7 @@ from datetime import datetime
import requests
from bson import ObjectId
from celery import current_app
from celery.utils.log import get_logger
from config import PROJECT_DEPLOY_FILE_FOLDER, PROJECT_LOGS_FOLDER
@@ -19,11 +20,12 @@ def execute_spider(self, id: str, node_id: str):
task_id = self.request.id
hostname = self.request.hostname
spider = db_manager.get('spiders', id=id)
latest_version = db_manager.get_latest_version(spider_id=id)
latest_version = db_manager.get_latest_version(spider_id=id, node_id=node_id)
command = spider.get('cmd')
current_working_directory = os.path.join(PROJECT_DEPLOY_FILE_FOLDER, str(spider.get('_id')), str(latest_version))
# log info
logger.info('current_working_directory: %s' % current_working_directory)
logger.info('spider_id: %s' % id)
logger.info('version: %s' % latest_version)
logger.info(command)