updated Dockerfile

This commit is contained in:
Marvin Zhang
2019-02-28 18:57:44 +08:00
parent cf1d1ca878
commit ae5ea03043
19 changed files with 236 additions and 59 deletions

View File

@@ -1,2 +1,30 @@
# images
#FROM python:latest
FROM ubuntu:latest
# source files
ADD . /opt/crawlab
# add dns
#RUN echo -e "nameserver 180.76.76.76" >> /etc/resolv.conf
#ADD ./resolv.conf /etc
RUN cat /etc/resolv.conf
# install python
RUN apt-get update
RUN apt-get install python3 python3-pip net-tools iputils-ping
# soft link
ln -s /usr/bin/pip3 /usr/local/bin/pip
# install required libraries
RUN pip install -U setuptools
RUN pip install -r /opt/crawlab/requirements.txt
# execute apps
WORKDIR /opt/crawlab
CMD python ./bin/run_worker.py
CMD python app.py
# port
EXPOSE 5000

29
app.py
View File

@@ -1,8 +1,13 @@
import subprocess
import sys
import threading
from celery import Celery
from flask import Flask
from flask_cors import CORS
from flask_restful import Api, Resource
from config import BROKER_URL
from routes.deploys import DeployApi
from routes.files import FileApi
from routes.nodes import NodeApi
@@ -12,7 +17,7 @@ from routes.tasks import TaskApi
# flask app instance
app = Flask(__name__)
app.config.from_object('config.flask')
app.config.from_object('config')
# init flask api instance
api = Api(app)
@@ -45,6 +50,24 @@ api.add_resource(StatsApi,
'/api/stats',
'/api/stats/<string:action>')
# start flask app
if __name__ == '__main__':
def run_app():
app.run()
def run_flower():
p = subprocess.Popen(['celery', 'flower', '-b', BROKER_URL], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
for line in iter(p.stdout.readline, 'b'):
print(line.decode('utf-8'))
if __name__ == '__main__':
# start flower app
th_flower = threading.Thread(target=run_flower)
th_flower.start()
# start flask app
# th_app = threading.Thread(target=run_app)
# th_app.start()
app.run()

View File

@@ -7,7 +7,7 @@ file_dir = os.path.dirname(os.path.realpath(__file__))
root_path = os.path.abspath(os.path.join(file_dir, '..'))
sys.path.append(root_path)
from config.celery import BROKER_URL
from config import BROKER_URL
if __name__ == '__main__':
p = subprocess.Popen(['celery', 'flower', '-b', BROKER_URL], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

24
config.py Normal file
View File

@@ -0,0 +1,24 @@
# project variables
PROJECT_SOURCE_FILE_FOLDER = '/Users/yeqing/projects/crawlab/spiders'
PROJECT_DEPLOY_FILE_FOLDER = '/var/crawlab'
PROJECT_LOGS_FOLDER = '/Users/yeqing/projects/crawlab/logs/crawlab'
PROJECT_TMP_FOLDER = '/tmp'
# celery variables
BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'mongodb://localhost:27017/'
CELERY_MONGODB_BACKEND_SETTINGS = {
'database': 'crawlab_test',
'taskmeta_collection': 'tasks_celery',
}
FLOWER_API_ENDPOINT = 'http://localhost:5555/api'
# database variables
MONGO_HOST = 'localhost'
MONGO_PORT = 27017
# MONGO_USER = 'test'
# MONGO_PASS = 'test'
MONGO_DB = 'crawlab_test'
# flask variables
DEBUG = True

View File

View File

@@ -1,13 +0,0 @@
BROKER_URL = 'redis://localhost:6379/0'
# BROKER_URL = 'mongodb://localhost:27017/'
CELERY_RESULT_BACKEND = 'mongodb://localhost:27017/'
# CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'
# CELERY_TASK_SERIALIZER = 'json'
# CELERY_RESULT_SERIALIZER = 'json'
# CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间
CELERY_MONGODB_BACKEND_SETTINGS = {
'database': 'crawlab_test',
'taskmeta_collection': 'tasks_celery',
}
FLOWER_API_ENDPOINT = 'http://localhost:5555/api'

View File

@@ -1,3 +0,0 @@
PROJECT_SOURCE_FILE_FOLDER = '/Users/yeqing/projects/crawlab/spiders'
PROJECT_DEPLOY_FILE_FOLDER = '/var/crawlab'
PROJECT_LOGS_FOLDER = '/Users/yeqing/projects/crawlab/logs/crawlab'

View File

@@ -1,6 +0,0 @@
# 数据库
MONGO_HOST = 'localhost'
MONGO_PORT = 27017
# MONGO_USER = 'test'
# MONGO_PASS = 'test'
MONGO_DB = 'crawlab_test'

View File

@@ -1,3 +0,0 @@
DEBUG = True
# SERVER_NAME = '0.0.0.0:5000'

View File

@@ -1,7 +1,7 @@
from bson import ObjectId
from mongoengine import connect
from pymongo import MongoClient, DESCENDING
from config.db import MONGO_HOST, MONGO_PORT, MONGO_DB
from config import MONGO_HOST, MONGO_PORT, MONGO_DB
from utils import is_object_id
connect(db=MONGO_DB, host=MONGO_HOST, port=MONGO_PORT)

View File

@@ -3,6 +3,7 @@ import os
from flask_restful import reqparse, Resource
from utils import jsonify
from utils.file import get_file_content
class FileApi(Resource):
@@ -14,16 +15,23 @@ class FileApi(Resource):
self.parser.add_argument('path', type=str)
def get(self, action=None):
args = self.parser.parse_args()
path = args.get('path')
if action is not None:
if action == 'getDefaultPath':
return jsonify({
'defaultPath': os.path.abspath(os.path.join(os.path.curdir, 'spiders'))
})
elif action == 'get_file':
file_data = get_file_content(path)
file_data['status'] = 'ok'
return jsonify(file_data)
else:
return {}
args = self.parser.parse_args()
path = args.get('path')
folders = []
files = []
for _path in os.listdir(path):
@@ -36,4 +44,3 @@ class FileApi(Resource):
'files': sorted(files),
'folders': sorted(folders),
})

View File

@@ -3,21 +3,22 @@ import json
import requests
from bson import ObjectId
from config.celery import FLOWER_API_ENDPOINT
from config import FLOWER_API_ENDPOINT
from constants.node import NodeType
from db.manager import db_manager
from routes.base import BaseApi
from utils import jsonify
from utils.node import check_nodes_status
class NodeApi(BaseApi):
col_name = 'nodes'
arguments = (
# ('ip', str),
# ('port', int),
('name', str),
('description', str),
('ip', str),
('port', str),
)
def get(self, id=None, action=None):
@@ -37,6 +38,9 @@ class NodeApi(BaseApi):
# TODO: use query "?status=1" to get status of nodes
# get status for each node
status_data = check_nodes_status()
# get a list of items
res = requests.get('%s/workers' % FLOWER_API_ENDPOINT)
online_node_ids = []
@@ -52,7 +56,6 @@ class NodeApi(BaseApi):
node[_k] = _v
node['_id'] = node_name
node['name'] = node_name
node['status'] = NodeType.ONLINE
db_manager.save('nodes', node)
# existing node
@@ -60,7 +63,6 @@ class NodeApi(BaseApi):
for _k, _v in v.items():
node[_k] = _v
node['name'] = node_name
node['status'] = NodeType.ONLINE
db_manager.save('nodes', node)
online_node_ids.append(node_name)
@@ -69,8 +71,9 @@ class NodeApi(BaseApi):
nodes = []
items = db_manager.list('nodes', {})
for item in items:
node_status = status_data.get(item['name'])
if item['_id'] in online_node_ids:
item['status'] = NodeType.ONLINE
item['status'] = NodeType.ONLINE if node_status else NodeType.OFFLINE
else:
item['status'] = NodeType.OFFLINE
db_manager.update_one('nodes', item['_id'], {

View File

@@ -1,17 +1,26 @@
import json
import os
import shutil
from datetime import datetime
from random import random
import requests
from bson import ObjectId
from flask_restful import reqparse
from werkzeug.datastructures import FileStorage
from config.common import PROJECT_DEPLOY_FILE_FOLDER, PROJECT_SOURCE_FILE_FOLDER
from config import PROJECT_DEPLOY_FILE_FOLDER, PROJECT_SOURCE_FILE_FOLDER, PROJECT_TMP_FOLDER
from db.manager import db_manager
from routes.base import BaseApi
from tasks.spider import execute_spider
from utils import jsonify
from utils.file import get_file_suffix_stats
from utils.deploy import zip_file, unzip_file
from utils.file import get_file_suffix_stats, get_file_suffix
from utils.spider import get_lang_by_stats
parser = reqparse.RequestParser()
parser.add_argument('file', type=FileStorage, location='files')
class SpiderApi(BaseApi):
col_name = 'spiders'
@@ -106,7 +115,8 @@ class SpiderApi(BaseApi):
if spider is None:
return
# TODO: deploy spiders to other node rather than in local machine
# get node given the node
node = db_manager.get(col_name='nodes', id=node_id)
# get latest version
latest_version = db_manager.get_latest_version(spider_id=id)
@@ -117,24 +127,48 @@ class SpiderApi(BaseApi):
# make source / destination
src = spider.get('src')
dst = os.path.join(PROJECT_DEPLOY_FILE_FOLDER, str(spider.get('_id')), str(latest_version + 1))
# copy files
# TODO: multi-node copy files
try:
shutil.copytree(src=src, dst=dst)
return {
'code': 200,
'status': 'ok',
'message': 'deploy success'
}
# TODO: deploy spiders to other node rather than in local machine
output_file_name = '%s_%s.zip' % (
datetime.now().strftime('%Y%m%d%H%M%S'),
str(random())[2:12]
)
output_file_path = os.path.join(PROJECT_TMP_FOLDER, output_file_name)
# zip source folder to zip file
zip_file(source_dir=src,
output_filename=output_file_path)
# upload to api
files = {'file': open(output_file_path, 'rb')}
r = requests.post('http://%s:%s/api/spiders/%s/deploy_file' % (node.get('ip'), node.get('port'), id),
files=files)
if r.status_code == 200:
return {
'code': 200,
'status': 'ok',
'message': 'deploy success'
}
else:
return {
'code': r.status_code,
'status': 'ok',
'error': json.loads(r.content)['error']
}, r.status_code
except Exception as err:
print(err)
return {
'code': 500,
'status': 'ok',
'error': str(err)
}
'code': 500,
'status': 'ok',
'error': str(err)
}, 500
finally:
version = latest_version + 1
db_manager.save('deploys', {
@@ -144,8 +178,52 @@ class SpiderApi(BaseApi):
'finish_ts': datetime.now()
})
def deploy_file(self, id):
args = parser.parse_args()
f = args.file
if get_file_suffix(f.filename) != 'zip':
return {
'status': 'ok',
'error': 'file type mismatch'
}, 400
# save zip file on temp folder
file_path = '%s/%s' % (PROJECT_TMP_FOLDER, f.filename)
with open(file_path, 'wb') as fw:
fw.write(f.stream.read())
# unzip zip file
dir_path = file_path.replace('.zip', '')
if os.path.exists(dir_path):
shutil.rmtree(dir_path)
unzip_file(file_path, dir_path)
# get spider and version
spider = db_manager.get(col_name=self.col_name, id=id)
if spider is None:
return None, 400
# get version
latest_version = db_manager.get_latest_version(spider_id=id)
if latest_version is None:
latest_version = 0
# make source / destination
src = dir_path
dst = os.path.join(PROJECT_DEPLOY_FILE_FOLDER, str(spider.get('_id')), str(latest_version + 1))
# copy from source to destination
shutil.copytree(src=src, dst=dst)
return {
'code': 200,
'status': 'ok',
'message': 'deploy success'
}
def get_deploys(self, id):
items = db_manager.list('deploys', {'spider_id': ObjectId(id)}, limit=10, sort_key='create_ts')
items = db_manager.list('deploys', cond={'spider_id': ObjectId(id)}, limit=10, sort_key='create_ts')
deploys = []
for item in items:
spider_id = item['spider_id']
@@ -158,7 +236,7 @@ class SpiderApi(BaseApi):
})
def get_tasks(self, id):
items = db_manager.list('tasks', {'spider_id': ObjectId(id)}, limit=10, sort_key='finish_ts')
items = db_manager.list('tasks', cond={'spider_id': ObjectId(id)}, limit=10, sort_key='finish_ts')
for item in items:
spider_id = item['spider_id']
spider = db_manager.get('spiders', id=str(spider_id))

View File

@@ -2,4 +2,4 @@ from celery import Celery
# celery app instance
celery_app = Celery(__name__)
celery_app.config_from_object('config.celery')
celery_app.config_from_object('config')

View File

@@ -5,7 +5,7 @@ from datetime import datetime
import requests
from celery.utils.log import get_logger
from config.common import PROJECT_DEPLOY_FILE_FOLDER, PROJECT_LOGS_FOLDER
from config import PROJECT_DEPLOY_FILE_FOLDER, PROJECT_LOGS_FOLDER
from db.manager import db_manager
from .celery import celery_app
import subprocess

View File

@@ -6,7 +6,7 @@ import requests
from bson import ObjectId
from celery.utils.log import get_logger
from config.common import PROJECT_DEPLOY_FILE_FOLDER, PROJECT_LOGS_FOLDER
from config import PROJECT_DEPLOY_FILE_FOLDER, PROJECT_LOGS_FOLDER
from db.manager import db_manager
from .celery import celery_app
import subprocess

View File

@@ -2,7 +2,7 @@ import os, zipfile
# 打包目录为zip文件未压缩
def make_zip(source_dir, output_filename):
def zip_file(source_dir, output_filename):
zipf = zipfile.ZipFile(output_filename, 'w')
pre_len = len(os.path.dirname(source_dir))
for parent, dirnames, filenames in os.walk(source_dir):
@@ -11,3 +11,13 @@ def make_zip(source_dir, output_filename):
arcname = pathfile[pre_len:].strip(os.path.sep) # 相对路径
zipf.write(pathfile, arcname)
zipf.close()
def unzip_file(zip_src, dst_dir):
r = zipfile.is_zipfile(zip_src)
if r:
fz = zipfile.ZipFile(zip_src, 'r')
for file in fz.namelist():
fz.extract(file, dst_dir)
else:
print('This is not zip')

View File

@@ -5,6 +5,14 @@ from collections import defaultdict
SUFFIX_PATTERN = r'\.(\w{,10})$'
suffix_regex = re.compile(SUFFIX_PATTERN, re.IGNORECASE)
SUFFIX_LANG_MAPPING = {
'py': 'python',
'js': 'javascript',
'sh': 'shell',
'java': 'java',
'c': 'c',
}
def get_file_suffix(file_name: str):
file_name = file_name.lower()
@@ -32,3 +40,14 @@ def get_file_suffix_stats(path) -> dict:
suffix = get_file_suffix(file_path)
stats[suffix] += 1
return stats
def get_file_content(path) -> dict:
with open(path) as f:
suffix = get_file_suffix(path)
lang = SUFFIX_LANG_MAPPING.get(suffix)
return {
'lang': lang,
'suffix': suffix,
'content': f.read()
}

10
utils/node.py Normal file
View File

@@ -0,0 +1,10 @@
import json
import requests
from config import FLOWER_API_ENDPOINT
def check_nodes_status():
res = requests.get('%s/workers?status=1' % FLOWER_API_ENDPOINT)
return json.loads(res.content.decode('utf-8'))