mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-01-21 17:21:09 +01:00
fixed deploy/run task issue
This commit is contained in:
@@ -10,7 +10,7 @@ RUN cat /etc/resolv.conf
|
||||
|
||||
# install python
|
||||
RUN apt-get update
|
||||
RUN apt-get install -y python3 python3-pip net-tools iputils-ping
|
||||
RUN apt-get install -y python3 python3-pip net-tools iputils-ping vim
|
||||
|
||||
# soft link
|
||||
RUN ln -s /usr/bin/pip3 /usr/local/bin/pip
|
||||
|
||||
26
Dockerfile-task
Normal file
26
Dockerfile-task
Normal file
@@ -0,0 +1,26 @@
|
||||
# images
|
||||
#FROM python:latest
|
||||
FROM ubuntu:latest
|
||||
|
||||
# source files
|
||||
ADD . /opt/crawlab
|
||||
|
||||
# add dns
|
||||
RUN cat /etc/resolv.conf
|
||||
|
||||
# install python
|
||||
RUN apt-get update
|
||||
RUN apt-get install -y python3 python3-pip net-tools iputils-ping vim
|
||||
|
||||
# soft link
|
||||
RUN ln -s /usr/bin/pip3 /usr/local/bin/pip
|
||||
RUN ln -s /usr/bin/python3 /usr/local/bin/python
|
||||
|
||||
# install required libraries
|
||||
RUN pip install -U setuptools
|
||||
RUN pip install -r /opt/crawlab/requirements.txt
|
||||
|
||||
# execute apps
|
||||
WORKDIR /opt/crawlab
|
||||
CMD python ./bin/run_worker.py
|
||||
CMD python app.py
|
||||
9
app.py
9
app.py
@@ -1,8 +1,11 @@
|
||||
import os
|
||||
import shutil
|
||||
|
||||
from flask import Flask, logging
|
||||
from flask_cors import CORS
|
||||
from flask_restful import Api
|
||||
|
||||
from config import FLASK_HOST, FLASK_PORT
|
||||
from config import FLASK_HOST, FLASK_PORT, PROJECT_LOGS_FOLDER
|
||||
from routes.deploys import DeployApi
|
||||
from routes.files import FileApi
|
||||
from routes.nodes import NodeApi
|
||||
@@ -45,5 +48,9 @@ api.add_resource(StatsApi,
|
||||
'/api/stats',
|
||||
'/api/stats/<string:action>')
|
||||
|
||||
# create folder if it does not exist
|
||||
if os.path.exists(PROJECT_LOGS_FOLDER):
|
||||
os.makedirs(PROJECT_LOGS_FOLDER)
|
||||
|
||||
if __name__ == '__main__':
|
||||
app.run(host=FLASK_HOST, port=FLASK_PORT)
|
||||
|
||||
@@ -12,4 +12,5 @@ from config import BROKER_URL
|
||||
if __name__ == '__main__':
|
||||
p = subprocess.Popen(['celery', 'flower', '-b', BROKER_URL], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
||||
for line in iter(p.stdout.readline, 'b'):
|
||||
print(line.decode('utf-8'))
|
||||
if line.decode('utf-8') != '':
|
||||
print(line.decode('utf-8'))
|
||||
|
||||
11
config.py
11
config.py
@@ -1,12 +1,14 @@
|
||||
# project variables
|
||||
PROJECT_SOURCE_FILE_FOLDER = '/Users/yeqing/projects/crawlab/spiders'
|
||||
PROJECT_DEPLOY_FILE_FOLDER = '/var/crawlab'
|
||||
PROJECT_LOGS_FOLDER = '/Users/yeqing/projects/crawlab/logs/crawlab'
|
||||
PROJECT_LOGS_FOLDER = '/var/logs/crawlab'
|
||||
PROJECT_TMP_FOLDER = '/tmp'
|
||||
|
||||
# celery variables
|
||||
BROKER_URL = 'redis://localhost:6379/0'
|
||||
CELERY_RESULT_BACKEND = 'mongodb://localhost:27017/'
|
||||
# BROKER_URL = 'redis://localhost:6379/0'
|
||||
# CELERY_RESULT_BACKEND = 'mongodb://localhost:27017/'
|
||||
BROKER_URL = 'redis://192.168.99.100:6379/0'
|
||||
CELERY_RESULT_BACKEND = 'mongodb://192.168.99.100:27017/'
|
||||
CELERY_MONGODB_BACKEND_SETTINGS = {
|
||||
'database': 'crawlab_test',
|
||||
'taskmeta_collection': 'tasks_celery',
|
||||
@@ -16,7 +18,8 @@ CELERY_MONGODB_BACKEND_SETTINGS = {
|
||||
FLOWER_API_ENDPOINT = 'http://localhost:5555/api'
|
||||
|
||||
# database variables
|
||||
MONGO_HOST = 'localhost'
|
||||
# MONGO_HOST = 'localhost'
|
||||
MONGO_HOST = '192.168.99.100'
|
||||
MONGO_PORT = 27017
|
||||
# MONGO_USER = 'test'
|
||||
# MONGO_PASS = 'test'
|
||||
|
||||
@@ -38,7 +38,10 @@ class DbManager(object):
|
||||
|
||||
def remove_one(self, col_name: str, id: str, **kwargs):
|
||||
col = self.db[col_name]
|
||||
col.remove({'_id': ObjectId(id)})
|
||||
_id = id
|
||||
if is_object_id(id):
|
||||
_id = ObjectId(id)
|
||||
col.remove({'_id': _id})
|
||||
|
||||
def list(self, col_name: str, cond: dict, sort_key=None, sort_direction=DESCENDING, skip: int = 0, limit: int = 100,
|
||||
**kwargs):
|
||||
@@ -70,12 +73,20 @@ class DbManager(object):
|
||||
col = self.db[col_name]
|
||||
return col.count(cond)
|
||||
|
||||
def get_latest_version(self, spider_id):
|
||||
def get_latest_version(self, spider_id, node_id):
|
||||
col = self.db['deploys']
|
||||
for item in col.find({'spider_id': ObjectId(spider_id)}).sort('version', DESCENDING):
|
||||
for item in col.find({'spider_id': ObjectId(spider_id), 'node_id': node_id}) \
|
||||
.sort('version', DESCENDING):
|
||||
return item.get('version')
|
||||
return None
|
||||
|
||||
def get_last_deploy(self, spider_id):
|
||||
col = self.db['deploys']
|
||||
for item in col.find({'spider_id': ObjectId(spider_id)}) \
|
||||
.sort('finish_ts', DESCENDING):
|
||||
return item
|
||||
return None
|
||||
|
||||
def aggregate(self, col_name: str, pipelines, **kwargs):
|
||||
col = self.db[col_name]
|
||||
return col.aggregate(pipelines, **kwargs)
|
||||
|
||||
18
docker-compose.yml
Normal file
18
docker-compose.yml
Normal file
@@ -0,0 +1,18 @@
|
||||
version: '3.3' # 表示该 Docker-Compose 文件使用的是 Version 2 file
|
||||
services:
|
||||
web: # 指定服务名称
|
||||
build: . # 指定 Dockerfile 所在路径
|
||||
ports: # 指定端口映射
|
||||
- "5001:5000"
|
||||
task:
|
||||
image: crawlab:v3
|
||||
db:
|
||||
image: mongo
|
||||
restart: always
|
||||
ports:
|
||||
- "27017:27017"
|
||||
redis:
|
||||
image: redis
|
||||
restart: always
|
||||
ports:
|
||||
- "6379:6379"
|
||||
@@ -6,6 +6,7 @@ from random import random
|
||||
|
||||
import requests
|
||||
from bson import ObjectId
|
||||
from flask import current_app, request
|
||||
from flask_restful import reqparse
|
||||
from werkzeug.datastructures import FileStorage
|
||||
|
||||
@@ -81,7 +82,9 @@ class SpiderApi(BaseApi):
|
||||
|
||||
items = db_manager.list('spiders', {})
|
||||
for item in items:
|
||||
item['latest_version'] = db_manager.get_latest_version(item['_id'])
|
||||
last_deploy = db_manager.get_last_deploy(spider_id=str(item['_id']))
|
||||
if last_deploy:
|
||||
item['update_ts'] = last_deploy['finish_ts'].strftime('%Y-%m-%d %H:%M:%S')
|
||||
|
||||
return jsonify({
|
||||
'status': 'ok',
|
||||
@@ -111,12 +114,13 @@ class SpiderApi(BaseApi):
|
||||
}, 400
|
||||
|
||||
# dispatch crawl task
|
||||
res = requests.get('http://%s:%s/api/spiders/%s/on_crawl' % (
|
||||
res = requests.get('http://%s:%s/api/spiders/%s/on_crawl?node_id=%s' % (
|
||||
node.get('ip'),
|
||||
node.get('port'),
|
||||
id
|
||||
), {'node_id', node_id})
|
||||
data = json.loads(res)
|
||||
id,
|
||||
node_id
|
||||
))
|
||||
data = json.loads(res.content.decode('utf-8'))
|
||||
return {
|
||||
'code': res.status_code,
|
||||
'status': 'ok',
|
||||
@@ -152,7 +156,7 @@ class SpiderApi(BaseApi):
|
||||
node = db_manager.get(col_name='nodes', id=node_id)
|
||||
|
||||
# get latest version
|
||||
latest_version = db_manager.get_latest_version(spider_id=id)
|
||||
latest_version = db_manager.get_latest_version(spider_id=id, node_id=node_id)
|
||||
|
||||
# initialize version if no version found
|
||||
if latest_version is None:
|
||||
@@ -177,8 +181,12 @@ class SpiderApi(BaseApi):
|
||||
|
||||
# upload to api
|
||||
files = {'file': open(output_file_path, 'rb')}
|
||||
r = requests.post('http://%s:%s/api/spiders/%s/deploy_file' % (node.get('ip'), node.get('port'), id),
|
||||
files=files)
|
||||
r = requests.post('http://%s:%s/api/spiders/%s/deploy_file?node_id=%s' % (
|
||||
node.get('ip'),
|
||||
node.get('port'),
|
||||
id,
|
||||
node_id,
|
||||
), files=files)
|
||||
|
||||
if r.status_code == 200:
|
||||
return {
|
||||
@@ -191,11 +199,11 @@ class SpiderApi(BaseApi):
|
||||
return {
|
||||
'code': r.status_code,
|
||||
'status': 'ok',
|
||||
'error': json.loads(r.content)['error']
|
||||
'error': r.content.decode('utf-8')
|
||||
}, r.status_code
|
||||
|
||||
except Exception as err:
|
||||
print(err)
|
||||
current_app.logger.error(err)
|
||||
return {
|
||||
'code': 500,
|
||||
'status': 'ok',
|
||||
@@ -211,8 +219,9 @@ class SpiderApi(BaseApi):
|
||||
'finish_ts': datetime.now()
|
||||
})
|
||||
|
||||
def deploy_file(self, id):
|
||||
def deploy_file(self, id=None):
|
||||
args = parser.parse_args()
|
||||
node_id = request.args.get('node_id')
|
||||
f = args.file
|
||||
|
||||
if get_file_suffix(f.filename) != 'zip':
|
||||
@@ -238,14 +247,19 @@ class SpiderApi(BaseApi):
|
||||
return None, 400
|
||||
|
||||
# get version
|
||||
latest_version = db_manager.get_latest_version(spider_id=id)
|
||||
latest_version = db_manager.get_latest_version(spider_id=id, node_id=node_id)
|
||||
if latest_version is None:
|
||||
latest_version = 0
|
||||
|
||||
# make source / destination
|
||||
src = dir_path
|
||||
src = os.path.join(dir_path, os.listdir(dir_path)[0])
|
||||
# src = dir_path
|
||||
dst = os.path.join(PROJECT_DEPLOY_FILE_FOLDER, str(spider.get('_id')), str(latest_version + 1))
|
||||
|
||||
# logging info
|
||||
current_app.logger.info('src: %s' % src)
|
||||
current_app.logger.info('dst: %s' % dst)
|
||||
|
||||
# copy from source to destination
|
||||
shutil.copytree(src=src, dst=dst)
|
||||
|
||||
@@ -256,7 +270,7 @@ class SpiderApi(BaseApi):
|
||||
}
|
||||
|
||||
def get_deploys(self, id):
|
||||
items = db_manager.list('deploys', cond={'spider_id': ObjectId(id)}, limit=10, sort_key='create_ts')
|
||||
items = db_manager.list('deploys', cond={'spider_id': ObjectId(id)}, limit=10, sort_key='finish_ts')
|
||||
deploys = []
|
||||
for item in items:
|
||||
spider_id = item['spider_id']
|
||||
|
||||
@@ -29,8 +29,11 @@ class TaskApi(BaseApi):
|
||||
task['status'] = _task['status']
|
||||
task['result'] = _task['result']
|
||||
task['spider_name'] = _spider['name']
|
||||
with open(task['log_file_path']) as f:
|
||||
task['log'] = f.read()
|
||||
try:
|
||||
with open(task['log_file_path']) as f:
|
||||
task['log'] = f.read()
|
||||
except Exception as err:
|
||||
task['log'] = ''
|
||||
return jsonify(task)
|
||||
|
||||
tasks = db_manager.list('tasks', {}, limit=1000, sort_key='finish_ts')
|
||||
@@ -47,10 +50,17 @@ class TaskApi(BaseApi):
|
||||
})
|
||||
|
||||
def get_log(self, id):
|
||||
task = db_manager.get('tasks', id=id)
|
||||
with open(task['log_file_path']) as f:
|
||||
log = f.read()
|
||||
try:
|
||||
task = db_manager.get('tasks', id=id)
|
||||
with open(task['log_file_path']) as f:
|
||||
log = f.read()
|
||||
return {
|
||||
'status': 'ok',
|
||||
'log': log
|
||||
}
|
||||
except Exception as err:
|
||||
return {
|
||||
'status': 'ok',
|
||||
'log': log
|
||||
}
|
||||
'code': 500,
|
||||
'status': 'ok',
|
||||
'error': str(err)
|
||||
}, 500
|
||||
|
||||
@@ -4,6 +4,7 @@ from datetime import datetime
|
||||
|
||||
import requests
|
||||
from bson import ObjectId
|
||||
from celery import current_app
|
||||
from celery.utils.log import get_logger
|
||||
|
||||
from config import PROJECT_DEPLOY_FILE_FOLDER, PROJECT_LOGS_FOLDER
|
||||
@@ -19,11 +20,12 @@ def execute_spider(self, id: str, node_id: str):
|
||||
task_id = self.request.id
|
||||
hostname = self.request.hostname
|
||||
spider = db_manager.get('spiders', id=id)
|
||||
latest_version = db_manager.get_latest_version(spider_id=id)
|
||||
latest_version = db_manager.get_latest_version(spider_id=id, node_id=node_id)
|
||||
command = spider.get('cmd')
|
||||
current_working_directory = os.path.join(PROJECT_DEPLOY_FILE_FOLDER, str(spider.get('_id')), str(latest_version))
|
||||
|
||||
# log info
|
||||
logger.info('current_working_directory: %s' % current_working_directory)
|
||||
logger.info('spider_id: %s' % id)
|
||||
logger.info('version: %s' % latest_version)
|
||||
logger.info(command)
|
||||
|
||||
Reference in New Issue
Block a user