From 5eb69a4b0d539e7a9ac49efc2ab7db25b4b02d83 Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Wed, 20 Feb 2019 21:24:28 +0800 Subject: [PATCH] overview pages updated --- app.py | 5 +- bin/run_flower.py | 7 +++ bin/run_worker.py | 14 +++++ bin/start_flower.sh | 3 +- config/celery.py | 6 +- constants/node.py | 2 +- db/manager.py | 1 + requirements.txt | 62 +++++++++++++++++++ route.py | 4 -- routes/base.py | 18 ++++-- routes/nodes.py | 35 +++++++---- routes/spiders.py | 4 ++ routes/test.py | 25 -------- spiders/baidu/baidu.py | 5 -- .../taobao/taobao/spiders/taobao_spider.py | 2 +- tasks/__init__.py | 17 ----- tasks/deploy.py | 4 +- tasks/spider.py | 10 +-- 18 files changed, 142 insertions(+), 82 deletions(-) create mode 100644 bin/run_flower.py create mode 100644 bin/run_worker.py create mode 100644 requirements.txt delete mode 100644 route.py delete mode 100644 routes/test.py delete mode 100644 spiders/baidu/baidu.py diff --git a/app.py b/app.py index 92e97ff1..db2dbbbd 100644 --- a/app.py +++ b/app.py @@ -7,6 +7,10 @@ from flask_restful import Api app = Flask(__name__) app.config['DEBUG'] = True +# celery_app +celery_app = Celery(__name__) +celery_app.config_from_object('config.celery') + # init flask api instance api = Api(app) @@ -19,7 +23,6 @@ import routes.spiders import routes.deploys import routes.tasks import routes.files -import routes.test # start flask app if __name__ == '__main__': diff --git a/bin/run_flower.py b/bin/run_flower.py new file mode 100644 index 00000000..9e506464 --- /dev/null +++ b/bin/run_flower.py @@ -0,0 +1,7 @@ +from config.celery import BROKER_URL +import subprocess + +if __name__ == '__main__': + p = subprocess.Popen(['celery', 'flower', '-b', BROKER_URL], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + for line in iter(p.stdout.readline, 'b'): + print(line.decode('utf-8')) diff --git a/bin/run_worker.py b/bin/run_worker.py new file mode 100644 index 00000000..38141eaf --- /dev/null +++ b/bin/run_worker.py @@ -0,0 +1,14 @@ +import sys +from celery import Celery + +from app import celery_app + +# import necessary tasks +import tasks.spider +import tasks.deploy + +if __name__ == '__main__': + if sys.platform == 'windows': + celery_app.start(argv=['tasks', 'worker', '-P', 'eventlet', '-E', '-l', 'INFO']) + else: + celery_app.start(argv=['tasks', 'worker', '-E', '-l', 'INFO']) diff --git a/bin/start_flower.sh b/bin/start_flower.sh index 335e0170..98339f7d 100755 --- a/bin/start_flower.sh +++ b/bin/start_flower.sh @@ -1 +1,2 @@ -celery flower --broker=mongodb://localhost:27017 --backend=redis://localhost:6379/1 \ No newline at end of file +#!/usr/bin/env bash +celery flower --broker=mongodb://localhost:27017 diff --git a/config/celery.py b/config/celery.py index 480a3d79..30d5455f 100644 --- a/config/celery.py +++ b/config/celery.py @@ -1,5 +1,5 @@ -# BROKER_URL = 'redis://localhost:6379/0' -BROKER_URL = 'mongodb://localhost:27017/' +BROKER_URL = 'redis://localhost:6379/0' +# BROKER_URL = 'mongodb://localhost:27017/' CELERY_RESULT_BACKEND = 'mongodb://localhost:27017/' # CELERY_RESULT_BACKEND = 'redis://localhost:6379/1' # CELERY_TASK_SERIALIZER = 'json' @@ -9,3 +9,5 @@ CELERY_MONGODB_BACKEND_SETTINGS = { 'database': 'crawlab_test', 'taskmeta_collection': 'tasks_celery', } + +FLOWER_API_ENDPOINT = 'http://localhost:5555/api' diff --git a/constants/node.py b/constants/node.py index e4b32810..0e0b1dbc 100644 --- a/constants/node.py +++ b/constants/node.py @@ -1,3 +1,3 @@ -class SpiderType: +class NodeType: OFFLINE = 0 ONLINE = 1 diff --git a/db/manager.py b/db/manager.py index 8d1dceff..a636d23f 100644 --- a/db/manager.py +++ b/db/manager.py @@ -14,6 +14,7 @@ class DbManager(object): def save(self, col_name: str, item, **kwargs): col = self.db[col_name] + item.pop('stats') # in case some fields cannot be saved in MongoDB col.save(item, **kwargs) def remove(self, col_name: str, cond: dict, **kwargs): diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 00000000..f99bbc88 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,62 @@ +amqp==2.4.1 +aniso8601==4.1.0 +asn1crypto==0.24.0 +attrs==18.2.0 +Automat==0.7.0 +Babel==2.6.0 +billiard==3.5.0.5 +celery==4.2.1 +certifi==2018.11.29 +cffi==1.11.5 +chardet==3.0.4 +Click==7.0 +constantly==15.1.0 +cryptography==2.5 +cssselect==1.0.3 +Django==2.1.7 +django-cors-headers==2.4.0 +dnspython==1.16.0 +docopt==0.6.2 +eventlet==0.24.1 +Flask==1.0.2 +Flask-Cors==3.0.7 +Flask-RESTful==0.3.7 +flower==0.9.2 +gerapy==0.8.5 +greenlet==0.4.15 +hyperlink==18.0.0 +idna==2.8 +incremental==17.5.0 +itsdangerous==1.1.0 +Jinja2==2.10 +kombu==4.3.0 +lxml==4.3.1 +MarkupSafe==1.1.0 +mongoengine==0.16.3 +monotonic==1.5 +parsel==1.5.1 +pyasn1==0.4.5 +pyasn1-modules==0.2.4 +pycparser==2.19 +PyDispatcher==2.0.5 +PyHamcrest==1.9.0 +pymongo==3.7.2 +PyMySQL==0.9.3 +pyOpenSSL==19.0.0 +python-scrapyd-api==2.1.2 +pytz==2018.9 +queuelib==1.5.0 +redis==3.1.0 +requests==2.21.0 +Scrapy==1.6.0 +scrapy-redis==0.6.8 +scrapy-splash==0.7.2 +service-identity==18.1.0 +six==1.12.0 +tornado==5.1.1 +Twisted==18.9.0 +urllib3==1.24.1 +vine==1.2.0 +w3lib==1.20.0 +Werkzeug==0.14.1 +zope.interface==4.6.0 diff --git a/route.py b/route.py deleted file mode 100644 index e8020ab1..00000000 --- a/route.py +++ /dev/null @@ -1,4 +0,0 @@ -# from app import api -# from api.spider import SpiderApi, SpiderExecutorApi - -# api.add_resource(SpiderExecutorApi, '/spider') diff --git a/routes/base.py b/routes/base.py index 983a2a7f..4106cae1 100644 --- a/routes/base.py +++ b/routes/base.py @@ -24,11 +24,21 @@ class BaseApi(Resource): for arg, type in self.arguments: self.parser.add_argument(arg, type=type) - def get(self, id=None): + def get(self, id=None, action=None): args = self.parser.parse_args() - # get item by id - if id is None: + # action by id + if action is not None: + if not hasattr(self, action): + return { + 'status': 'ok', + 'code': 400, + 'error': 'action "%s" invalid' % action + }, 400 + return getattr(self, action)(id) + + # list items + elif id is None: # filter cond = {} if args.get('filter') is not None: @@ -68,7 +78,7 @@ class BaseApi(Resource): 'items': items }) - # list items + # get item by id else: return jsonify(db_manager.get(col_name=self.col_name, id=id)) diff --git a/routes/nodes.py b/routes/nodes.py index e30513cd..d6d4866c 100644 --- a/routes/nodes.py +++ b/routes/nodes.py @@ -1,8 +1,13 @@ -import subprocess +import json + +import requests from app import api -from config.celery import BROKER_URL +from config.celery import FLOWER_API_ENDPOINT +from constants.node import NodeType +from db.manager import db_manager from routes.base import BaseApi +from utils import jsonify class NodeApi(BaseApi): @@ -15,18 +20,26 @@ class NodeApi(BaseApi): ('description', str), ) - def _get(self, id=None): + def get(self, id=None): if id is not None: - return { - } + return db_manager.get('nodes', id=id) else: - p = subprocess.Popen(['celery', 'inspect', 'stats', '-b', BROKER_URL]) - stdout, stderr = p.communicate() - return { - 'stdout': stdout, - 'stderr': stderr, - } + res = requests.get('%s/workers' % FLOWER_API_ENDPOINT) + for k, v in json.loads(res.content.decode('utf-8')).items(): + node_name = k + node = v + node['_id'] = node_name + node['name'] = node_name + node['status'] = NodeType.ONLINE + db_manager.save('nodes', node) + + items = db_manager.list('nodes', {}) + + return jsonify({ + 'status': 'ok', + 'items': items + }) api.add_resource(NodeApi, diff --git a/routes/spiders.py b/routes/spiders.py index 678072a5..fa67bead 100644 --- a/routes/spiders.py +++ b/routes/spiders.py @@ -25,6 +25,10 @@ class SpiderApi(BaseApi): ('lang', int), ) + def get(self, id=None, action=None): + # TODO: discover folders by path + pass + def crawl(self, id): job = execute_spider.delay(id) # print('crawl: %s' % id) diff --git a/routes/test.py b/routes/test.py deleted file mode 100644 index a017f537..00000000 --- a/routes/test.py +++ /dev/null @@ -1,25 +0,0 @@ -from app import api -from routes.base import BaseApi -from tasks.spider import get_baidu_html - - -class TestApi(BaseApi): - col_name = 'test' - - def __init__(self): - super(TestApi).__init__() - self.parser.add_argument('keyword', type=str) - - def get(self, id=None): - args = self.parser.parse_args() - for i in range(100): - get_baidu_html.delay(args.keyword) - return { - 'status': 'ok' - } - - -# add api to resources -api.add_resource(TestApi, - '/api/test', - ) diff --git a/spiders/baidu/baidu.py b/spiders/baidu/baidu.py deleted file mode 100644 index 349afc37..00000000 --- a/spiders/baidu/baidu.py +++ /dev/null @@ -1,5 +0,0 @@ -from time import sleep -import requests - -r = requests.get('http://www.baidu.com') -print(r.content) diff --git a/spiders/taobao/taobao/spiders/taobao_spider.py b/spiders/taobao/taobao/spiders/taobao_spider.py index c25fbca2..113efcf7 100644 --- a/spiders/taobao/taobao/spiders/taobao_spider.py +++ b/spiders/taobao/taobao/spiders/taobao_spider.py @@ -3,7 +3,7 @@ import scrapy class TaobaoSpiderSpider(scrapy.Spider): - name = 'taobao-spider' + name = 'taobao_spider' allowed_domains = ['taobao.com'] start_urls = ['http://taobao.com/'] diff --git a/tasks/__init__.py b/tasks/__init__.py index d7e86a00..e69de29b 100644 --- a/tasks/__init__.py +++ b/tasks/__init__.py @@ -1,17 +0,0 @@ -import os -import sys -import threading - -from celery import Celery - -app = Celery(__name__) -app.config_from_object('config.celery') - -import tasks.spider -import tasks.deploy - -if __name__ == '__main__': - if sys.platform == 'windows': - app.start(argv=['tasks', 'worker', '-P', 'eventlet', '-E', '-l', 'INFO']) - else: - app.start(argv=['tasks', 'worker', '-E', '-l', 'INFO']) diff --git a/tasks/deploy.py b/tasks/deploy.py index 875be949..3ac0c0ec 100644 --- a/tasks/deploy.py +++ b/tasks/deploy.py @@ -7,12 +7,12 @@ from celery.utils.log import get_logger from config import PROJECT_FILE_FOLDER, PROJECT_LOGS_FOLDER from db.manager import db_manager -from tasks import app +from app import celery_app import subprocess logger = get_logger(__name__) -@app.task +@celery_app.task def deploy_spider(id): pass diff --git a/tasks/spider.py b/tasks/spider.py index 1807cb8e..ecbf697d 100644 --- a/tasks/spider.py +++ b/tasks/spider.py @@ -8,13 +8,13 @@ from celery.utils.log import get_logger from config import PROJECT_FILE_FOLDER, PROJECT_LOGS_FOLDER from db.manager import db_manager -from tasks import app +from app import celery_app import subprocess logger = get_logger(__name__) -@app.task(bind=True) +@celery_app.task(bind=True) def execute_spider(self, id: str): task_id = self.request.id hostname = self.request.hostname @@ -71,9 +71,3 @@ def execute_spider(self, id: str): stderr.close() return task - - -@app.task -def get_baidu_html(keyword: str): - res = requests.get('http://www.baidu.com') - return res.content.decode('utf-8')