From 7e8531aa5748593b3650e81d0ea846123123c779 Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Mon, 11 Feb 2019 13:25:33 +0800 Subject: [PATCH] added celery functionality added CRUD base --- .gitignore | 2 ++ app.py | 13 +++++++++++++ bin/start_flower.sh | 1 + config/__init__.py | 2 ++ config/celery.py | 11 +++++++++++ config/db.py | 6 ++++++ config/flask.py | 2 ++ db/__init__.py | 0 db/manager.py | 31 +++++++++++++++++++++++++++++++ logger/__init__.py | 0 model/__init__.py | 0 route.py | 4 ++++ routes/__init__.py | 5 +++++ routes/spider.py | 23 +++++++++++++++++++++++ tasks/__init__.py | 9 +++++++++ tasks/spider.py | 17 +++++++++++++++++ 16 files changed, 126 insertions(+) create mode 100644 app.py create mode 100644 bin/start_flower.sh create mode 100644 config/__init__.py create mode 100644 config/celery.py create mode 100644 config/db.py create mode 100644 config/flask.py create mode 100644 db/__init__.py create mode 100644 db/manager.py create mode 100644 logger/__init__.py create mode 100644 model/__init__.py create mode 100644 route.py create mode 100644 routes/__init__.py create mode 100644 routes/spider.py create mode 100644 tasks/__init__.py create mode 100644 tasks/spider.py diff --git a/.gitignore b/.gitignore index 894a44cc..04e1286c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ +.idea/ + # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] diff --git a/app.py b/app.py new file mode 100644 index 00000000..5e99dc3f --- /dev/null +++ b/app.py @@ -0,0 +1,13 @@ +from celery import Celery +from flask import Flask +from flask_restful import Api + +# TODO: 用配置文件启动 http://www.pythondoc.com/flask/config.html +app = Flask(__name__) +app.config['DEBUG'] = True + +# init flask api instance +api = Api(app) + +# start flask app +app.run() diff --git a/bin/start_flower.sh b/bin/start_flower.sh new file mode 100644 index 00000000..59570c24 --- /dev/null +++ b/bin/start_flower.sh @@ -0,0 +1 @@ +celery flower --broker=redis://localhost:6379/0 --backend=redis://localhost:6379/1 \ No newline at end of file diff --git a/config/__init__.py b/config/__init__.py new file mode 100644 index 00000000..e51a3ffb --- /dev/null +++ b/config/__init__.py @@ -0,0 +1,2 @@ +MONGO_HOST = 'localhost' +MONGO_DATABASE = 'test' diff --git a/config/celery.py b/config/celery.py new file mode 100644 index 00000000..480a3d79 --- /dev/null +++ b/config/celery.py @@ -0,0 +1,11 @@ +# BROKER_URL = 'redis://localhost:6379/0' +BROKER_URL = 'mongodb://localhost:27017/' +CELERY_RESULT_BACKEND = 'mongodb://localhost:27017/' +# CELERY_RESULT_BACKEND = 'redis://localhost:6379/1' +# CELERY_TASK_SERIALIZER = 'json' +# CELERY_RESULT_SERIALIZER = 'json' +# CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间 +CELERY_MONGODB_BACKEND_SETTINGS = { + 'database': 'crawlab_test', + 'taskmeta_collection': 'tasks_celery', +} diff --git a/config/db.py b/config/db.py new file mode 100644 index 00000000..f66752bf --- /dev/null +++ b/config/db.py @@ -0,0 +1,6 @@ +# 数据库 +MONGO_HOST = 'localhost' +MONGO_PORT = '27017' +# MONGO_USER = 'test' +# MONGO_PASS = 'test' +MONGO_DB = 'crawlab_test' diff --git a/config/flask.py b/config/flask.py new file mode 100644 index 00000000..209bb279 --- /dev/null +++ b/config/flask.py @@ -0,0 +1,2 @@ +DEBUG = True + diff --git a/db/__init__.py b/db/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/db/manager.py b/db/manager.py new file mode 100644 index 00000000..dccb4619 --- /dev/null +++ b/db/manager.py @@ -0,0 +1,31 @@ +from pymongo import MongoClient +from config.db import MONGO_HOST, MONGO_PORT, MONGO_DB + + +class DbManager(object): + def __init__(self): + self.mongo = MongoClient(host=MONGO_HOST, port=MONGO_PORT) + self.db = self.mongo[MONGO_DB] + + # TODO: CRUD + def save(self, col_name: str, item, **kwargs): + col = self.db[col_name] + col.save(item, **kwargs) + + def remove(self, col_name: str, cond: dict, **kwargs): + col = self.db[col_name] + col.remove(cond, **kwargs) + + def update(self, col_name: str, cond: dict, values: dict, **kwargs): + col = self.db[col_name] + col.update(cond, {'$set': values}, **kwargs) + + def list(self, col_name: str, cond: dict, skip: int, limit: int, **kwargs): + if kwargs.get('page') is not None: + try: + page = int(kwargs.get('page')) + skip = page * limit + except Exception as err: + pass + # TODO: list logic + # TODO: pagination diff --git a/logger/__init__.py b/logger/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/model/__init__.py b/model/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/route.py b/route.py new file mode 100644 index 00000000..12aedf7a --- /dev/null +++ b/route.py @@ -0,0 +1,4 @@ +from app import api +from api.spider import SpiderApi, SpiderExecutorApi + +api.add_resource(SpiderExecutorApi, '/spider') diff --git a/routes/__init__.py b/routes/__init__.py new file mode 100644 index 00000000..f0f71b0d --- /dev/null +++ b/routes/__init__.py @@ -0,0 +1,5 @@ +from app import app + + +# api.add_resource(SpiderApi, '/spider') +# print(SpiderExecutorApi) diff --git a/routes/spider.py b/routes/spider.py new file mode 100644 index 00000000..fbd0c10a --- /dev/null +++ b/routes/spider.py @@ -0,0 +1,23 @@ +from celery.utils.log import get_logger +from flask_restful import reqparse, Resource +from tasks.spider import execute_spider + +logger = get_logger('tasks') +parser = reqparse.RequestParser() +parser.add_argument('spider_name', type=str) + + +class SpiderApi(Resource): + pass + + +class SpiderExecutorApi(Resource): + def get(self): + args = parser.parse_args() + job = execute_spider.delay(args.spider_name) + return { + 'id': job.id, + 'status': job.status, + 'spider_name': args.spider_name, + 'result': job.get(timeout=5) + } diff --git a/tasks/__init__.py b/tasks/__init__.py new file mode 100644 index 00000000..635bd48e --- /dev/null +++ b/tasks/__init__.py @@ -0,0 +1,9 @@ +from celery import Celery + +app = Celery(__name__) +app.config_from_object('config.celery') + +import tasks.spider + +if __name__ == '__main__': + app.start(argv=['tasks.spider', 'worker', '-P', 'eventlet', '-E', '-l', 'INFO']) diff --git a/tasks/spider.py b/tasks/spider.py new file mode 100644 index 00000000..fdb622fa --- /dev/null +++ b/tasks/spider.py @@ -0,0 +1,17 @@ +import requests +from celery.utils.log import get_logger +from tasks import app + +logger = get_logger(__name__) + + +@app.task +def execute_spider(spider_name: str): + logger.info('spider_name: %s' % spider_name) + return spider_name + + +@app.task +def get_baidu_html(keyword: str): + res = requests.get('http://www.baidu.com') + return res.content.decode('utf-8')