updated methodologies of deployment and running spider

This commit is contained in:
Marvin Zhang
2019-03-05 13:22:23 +08:00
parent ab2279c647
commit 6b9c0581c2
14 changed files with 193 additions and 111 deletions

View File

@@ -10,7 +10,7 @@ RUN cat /etc/resolv.conf
# install python
RUN apt-get update
RUN apt-get install -y python3 python3-pip net-tools iputils-ping vim
RUN apt-get install -y python3 python3-pip net-tools iputils-ping vim ntp
# soft link
RUN ln -s /usr/bin/pip3 /usr/local/bin/pip

4
app.py
View File

@@ -9,7 +9,7 @@ from config import FLASK_HOST, FLASK_PORT, PROJECT_LOGS_FOLDER
from routes.deploys import DeployApi
from routes.files import FileApi
from routes.nodes import NodeApi
from routes.spiders import SpiderApi, SpiderImportApi
from routes.spiders import SpiderApi, SpiderImportApi, SpiderManageApi
from routes.stats import StatsApi
from routes.tasks import TaskApi
@@ -30,6 +30,8 @@ api.add_resource(NodeApi,
'/api/nodes/<string:id>/<string:action>')
api.add_resource(SpiderImportApi,
'/api/spiders/import/<string:platform>')
api.add_resource(SpiderManageApi,
'/api/spiders/manage/<string:action>')
api.add_resource(SpiderApi,
'/api/spiders',
'/api/spiders/<string:id>',

View File

@@ -1,4 +1,6 @@
# project variables
from celery.schedules import crontab
PROJECT_SOURCE_FILE_FOLDER = '/Users/yeqing/projects/crawlab/spiders'
PROJECT_DEPLOY_FILE_FOLDER = '/var/crawlab'
PROJECT_LOGS_FOLDER = '/var/logs/crawlab'
@@ -14,6 +16,7 @@ CELERY_MONGODB_BACKEND_SETTINGS = {
'taskmeta_collection': 'tasks_celery',
}
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = True
# flower variables
FLOWER_API_ENDPOINT = 'http://localhost:5555/api'
@@ -30,4 +33,3 @@ MONGO_DB = 'crawlab_test'
DEBUG = True
FLASK_HOST = '0.0.0.0'
FLASK_PORT = 5000
# SERVER_NAME = '0.0.0.0:5000'

View File

@@ -1,3 +1,3 @@
class NodeType:
class NodeStatus:
ONLINE = 'online'
OFFLINE = 'offline'

8
constants/task.py Normal file
View File

@@ -0,0 +1,8 @@
class TaskStatus:
PENDING = 'PENDING'
STARTED = 'STARTED'
SUCCESS = 'SUCCESS'
FAILURE = 'FAILURE'
RETRY = 'RETRY'
REVOKED = 'REVOKED'
UNAVAILABLE = 'UNAVAILABLE'

View File

@@ -71,10 +71,19 @@ export default {
},
methods: {
onRun () {
const row = this.spiderForm
this.$refs['spiderForm'].validate(res => {
if (res) {
this.$store.commit('dialogView/SET_DIALOG_VISIBLE', true)
this.$store.commit('dialogView/SET_DIALOG_TYPE', 'spiderRun')
this.$confirm('Are you sure to run this spider', 'Notice', {
confirmButtonText: 'Confirm',
cancelButtonText: 'Cancel'
})
.then(() => {
this.$store.dispatch('spider/crawlSpider', { id: row._id.$oid })
.then(() => {
this.$message.success(`Running spider "${row._id.$oid}" has been scheduled`)
})
})
}
})
},

View File

@@ -87,10 +87,8 @@ const actions = {
})
},
crawlSpider ({ state, dispatch }, payload) {
const { id, nodeId } = payload
return request.post(`/spiders/${id}/crawl`, {
node_id: nodeId
})
const { id } = payload
return request.post(`/spiders/${id}/on_crawl`)
.then(response => {
console.log(response.data)
})
@@ -124,6 +122,12 @@ const actions = {
.then(response => {
console.log(response)
})
},
deployAll () {
return request.post('/spiders/manage/deploy_all')
.then(response => {
console.log(response)
})
}
}

View File

@@ -36,7 +36,10 @@
@change="onSearch">
</el-input>
<div class="right">
<el-button type="primary" icon="el-icon-upload" @click="openImportDialog">
<el-button type="primary" icon="fa fa-cloud" @click="onDeployAll">
Deploy All
</el-button>
<el-button type="primary" icon="fa fa-download" @click="openImportDialog">
Import Spiders
</el-button>
<el-button type="success"
@@ -243,9 +246,16 @@ export default {
this.$store.commit('dialogView/SET_DIALOG_TYPE', 'spiderDeploy')
},
onCrawl (row) {
this.$store.dispatch('spider/getSpiderData', row._id.$oid)
this.$store.commit('dialogView/SET_DIALOG_VISIBLE', true)
this.$store.commit('dialogView/SET_DIALOG_TYPE', 'spiderRun')
this.$confirm('Are you sure to run this spider', 'Notice', {
confirmButtonText: 'Confirm',
cancelButtonText: 'Cancel'
})
.then(() => {
this.$store.dispatch('spider/crawlSpider', { id: row._id.$oid })
.then(() => {
this.$message.success(`Running spider "${row._id.$oid}" has been scheduled`)
})
})
},
onView (row) {
this.$router.push(`/spiders/${row._id.$oid}`)
@@ -257,9 +267,10 @@ export default {
this.$refs.importForm.validate(valid => {
if (valid) {
this.importLoading = true
// TODO: switch between github / gitlab / svn
this.$store.dispatch('spider/importGithub')
.then(response => {
this.$message.success('Import repo sucessfully')
this.$message.success('Import repo successfully')
this.$store.dispatch('spider/getSpiderList')
})
.catch(response => {
@@ -274,6 +285,19 @@ export default {
},
openImportDialog () {
this.dialogVisible = true
},
onDeployAll () {
this.$confirm('Are you sure to deploy all spiders to active nodes?', 'Notice', {
confirmButtonText: 'Confirm',
cancelButtonText: 'Cancel',
type: 'warning'
})
.then(() => {
this.$store.dispatch('spider/deployAll')
.then(() => {
this.$message.success('Deployed all spiders successfully')
})
})
}
},
created () {

View File

@@ -1,14 +1,7 @@
import json
import requests
from flask import current_app
from config import FLOWER_API_ENDPOINT
from constants.node import NodeType
from db.manager import db_manager
from routes.base import BaseApi
from utils import jsonify
from utils.node import check_nodes_status
from utils.node import update_nodes_status
class NodeApi(BaseApi):
@@ -36,62 +29,18 @@ class NodeApi(BaseApi):
elif id is not None:
return db_manager.get('nodes', id=id)
# TODO: use query "?status=1" to get status of nodes
# get status for each node
status_data = {}
try:
status_data = check_nodes_status()
except Exception as err:
current_app.logger.error(err)
# get a list of items
online_node_ids = []
try:
res = requests.get('%s/workers' % FLOWER_API_ENDPOINT)
for k, v in json.loads(res.content.decode('utf-8')).items():
node_name = k
node_celery = v
node = db_manager.get('nodes', id=node_name)
else:
# get a list of active nodes from flower and save to db
update_nodes_status()
# new node
if node is None:
node = {}
for _k, _v in node_celery.items():
node[_k] = _v
node['_id'] = node_name
node['name'] = node_name
db_manager.save('nodes', node)
# iterate db nodes to update status
nodes = db_manager.list('nodes', {})
# existing node
else:
for _k, _v in v.items():
node[_k] = _v
node['name'] = node_name
db_manager.save('nodes', node)
online_node_ids.append(node_name)
except Exception as err:
current_app.logger.error(err)
# iterate db nodes to update status
nodes = []
items = db_manager.list('nodes', {})
for item in items:
node_status = status_data.get(item['name'])
if item['_id'] in online_node_ids:
item['status'] = NodeType.ONLINE if node_status else NodeType.OFFLINE
else:
item['status'] = NodeType.OFFLINE
db_manager.update_one('nodes', item['_id'], {
'status': item['status']
return jsonify({
'status': 'ok',
'items': nodes
})
nodes.append(item)
return jsonify({
'status': 'ok',
'items': nodes
})
def get_spiders(self, id=None):
items = db_manager.list('spiders')

View File

@@ -12,6 +12,8 @@ from flask_restful import reqparse, Resource
from werkzeug.datastructures import FileStorage
from config import PROJECT_DEPLOY_FILE_FOLDER, PROJECT_SOURCE_FILE_FOLDER, PROJECT_TMP_FOLDER
from constants.node import NodeStatus
from constants.task import TaskStatus
from db.manager import db_manager
from routes.base import BaseApi
from tasks.spider import execute_spider
@@ -55,6 +57,7 @@ class SpiderApi(BaseApi):
# get a list of items
else:
items = []
dirs = os.listdir(PROJECT_SOURCE_FILE_FOLDER)
for _dir in dirs:
dir_path = os.path.join(PROJECT_SOURCE_FILE_FOLDER, _dir)
@@ -81,16 +84,13 @@ class SpiderApi(BaseApi):
'suffix_stats': stats,
})
items = db_manager.list('spiders', {})
for item in items:
last_deploy = db_manager.get_last_deploy(spider_id=str(item['_id']))
if last_deploy:
item['update_ts'] = last_deploy['finish_ts'].strftime('%Y-%m-%d %H:%M:%S')
# append spider
items.append(spider)
return jsonify({
'status': 'ok',
'items': items
})
return jsonify({
'status': 'ok',
'items': items
})
def crawl(self, id):
args = self.parser.parse_args()
@@ -131,9 +131,8 @@ class SpiderApi(BaseApi):
def on_crawl(self, id):
args = self.parser.parse_args()
node_id = args.get('node_id')
job = execute_spider.delay(id, node_id)
job = execute_spider.delay(id)
return {
'code': 200,
@@ -156,13 +155,6 @@ class SpiderApi(BaseApi):
# get node given the node
node = db_manager.get(col_name='nodes', id=node_id)
# get latest version
latest_version = db_manager.get_latest_version(spider_id=id, node_id=node_id)
# initialize version if no version found
if latest_version is None:
latest_version = 0
# make source / destination
src = spider.get('src')
@@ -238,15 +230,10 @@ class SpiderApi(BaseApi):
if spider is None:
return None, 400
# get version
latest_version = db_manager.get_latest_version(spider_id=id, node_id=node_id)
if latest_version is None:
latest_version = 0
# make source / destination
src = os.path.join(dir_path, os.listdir(dir_path)[0])
# src = dir_path
dst = os.path.join(PROJECT_DEPLOY_FILE_FOLDER, str(spider.get('_id')), str(latest_version + 1))
dst = os.path.join(PROJECT_DEPLOY_FILE_FOLDER, str(spider.get('_id')))
# logging info
current_app.logger.info('src: %s' % src)
@@ -261,10 +248,8 @@ class SpiderApi(BaseApi):
# save to db
# TODO: task management for deployment
version = latest_version + 1
db_manager.save('deploys', {
'spider_id': ObjectId(id),
'version': version,
'node_id': node_id,
'finish_ts': datetime.now()
})
@@ -298,7 +283,7 @@ class SpiderApi(BaseApi):
if task is not None:
item['status'] = task['status']
else:
item['status'] = 'UNAVAILABLE'
item['status'] = TaskStatus.UNAVAILABLE
return jsonify({
'status': 'ok',
'items': items
@@ -363,3 +348,58 @@ class SpiderImportApi(Resource):
'status': 'ok',
'message': 'success'
}
class SpiderManageApi(Resource):
parser = reqparse.RequestParser()
arguments = [
('url', str)
]
def post(self, action):
if not hasattr(self, action):
return {
'status': 'ok',
'code': 400,
'error': 'action "%s" invalid' % action
}, 400
return getattr(self, action)()
def deploy_all(self):
# active nodes
nodes = db_manager.list('nodes', {'status': NodeStatus.ONLINE})
# all spiders
spiders = db_manager.list('spiders', {'cmd': {'$exists': True}})
# iterate all nodes
for node in nodes:
node_id = node['_id']
for spider in spiders:
spider_id = spider['_id']
spider_src = spider['src']
output_file_name = '%s_%s.zip' % (
datetime.now().strftime('%Y%m%d%H%M%S'),
str(random())[2:12]
)
output_file_path = os.path.join(PROJECT_TMP_FOLDER, output_file_name)
# zip source folder to zip file
zip_file(source_dir=spider_src,
output_filename=output_file_path)
# upload to api
files = {'file': open(output_file_path, 'rb')}
r = requests.post('http://%s:%s/api/spiders/%s/deploy_file?node_id=%s' % (
node.get('ip'),
node.get('port'),
spider_id,
node_id,
), files=files)
return {
'status': 'ok',
'message': 'success'
}

View File

@@ -1,3 +1,4 @@
from constants.task import TaskStatus
from db.manager import db_manager
from routes.base import BaseApi
from utils import jsonify
@@ -26,7 +27,10 @@ class TaskApi(BaseApi):
task = db_manager.get('tasks', id=id)
_task = db_manager.get('tasks_celery', id=task['_id'])
_spider = db_manager.get('spiders', id=str(task['spider_id']))
task['status'] = _task['status']
if _task:
task['status'] = _task['status']
else:
task['status'] = TaskStatus.UNAVAILABLE
task['result'] = _task['result']
task['spider_name'] = _spider['name']
try:
@@ -41,7 +45,10 @@ class TaskApi(BaseApi):
for task in tasks:
_task = db_manager.get('tasks_celery', id=task['_id'])
_spider = db_manager.get('spiders', id=str(task['spider_id']))
task['status'] = _task['status']
if _task:
task['status'] = _task['status']
else:
task['status'] = TaskStatus.UNAVAILABLE
task['spider_name'] = _spider['name']
items.append(task)
return jsonify({

7
tasks/node.py Normal file
View File

@@ -0,0 +1,7 @@
from utils import node
from .celery import celery_app
@celery_app.task
def update_node_status():
node.update_nodes_status(refresh=True)

View File

@@ -16,22 +16,22 @@ logger = get_logger(__name__)
@celery_app.task(bind=True)
def execute_spider(self, id: str, node_id: str):
def execute_spider(self, id: str):
print(self.state)
task_id = self.request.id
hostname = self.request.hostname
spider = db_manager.get('spiders', id=id)
latest_version = db_manager.get_latest_version(spider_id=id, node_id=node_id)
command = spider.get('cmd')
current_working_directory = os.path.join(PROJECT_DEPLOY_FILE_FOLDER, str(spider.get('_id')), str(latest_version))
current_working_directory = os.path.join(PROJECT_DEPLOY_FILE_FOLDER, str(spider.get('_id')))
# log info
logger.info('current_working_directory: %s' % current_working_directory)
logger.info('spider_id: %s' % id)
logger.info('version: %s' % latest_version)
logger.info(command)
# make sure the log folder exists
log_path = os.path.join(PROJECT_LOGS_FOLDER, id, str(latest_version))
log_path = os.path.join(PROJECT_LOGS_FOLDER, id)
if not os.path.exists(log_path):
os.makedirs(log_path)
@@ -45,17 +45,19 @@ def execute_spider(self, id: str, node_id: str):
'_id': task_id,
'spider_id': ObjectId(id),
'create_ts': datetime.now(),
'node_id': node_id,
'node_id': hostname,
'hostname': hostname,
'log_file_path': log_file_path,
'spider_version': latest_version
})
# execute the command
env = os.environ.copy()
env['CRAWLAB_TASK_ID'] = task_id
p = subprocess.Popen(command.split(' '),
stdout=stdout.fileno(),
stderr=stderr.fileno(),
cwd=current_working_directory,
env=env,
bufsize=1)
# get output from the process

View File

@@ -3,8 +3,36 @@ import json
import requests
from config import FLOWER_API_ENDPOINT
from constants.node import NodeStatus
from db.manager import db_manager
def check_nodes_status():
res = requests.get('%s/workers?status=1' % FLOWER_API_ENDPOINT)
return json.loads(res.content.decode('utf-8'))
def update_nodes_status(refresh=False):
online_node_ids = []
url = '%s/workers?status=1' % FLOWER_API_ENDPOINT
if refresh:
url += '&refresh=1'
res = requests.get(url)
for k, v in json.loads(res.content.decode('utf-8')).items():
node_name = k
node_status = NodeStatus.ONLINE if v else NodeStatus.OFFLINE
# node_celery = v
node = db_manager.get('nodes', id=node_name)
# new node
if node is None:
node = {'_id': node_name, 'name': node_name, 'status': node_status}
db_manager.save('nodes', node)
else:
node['status'] = node_status
db_manager.save('nodes', node)
if node_status:
online_node_ids.append(node_name)
return online_node_ids