added celery functionality

added CRUD base
This commit is contained in:
Marvin Zhang
2019-02-11 13:25:33 +08:00
parent 426fb19238
commit 7e8531aa57
16 changed files with 126 additions and 0 deletions

2
.gitignore vendored
View File

@@ -1,3 +1,5 @@
.idea/
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files
__pycache__/ __pycache__/
*.py[cod] *.py[cod]

13
app.py Normal file
View File

@@ -0,0 +1,13 @@
from celery import Celery
from flask import Flask
from flask_restful import Api
# TODO: 用配置文件启动 http://www.pythondoc.com/flask/config.html
app = Flask(__name__)
app.config['DEBUG'] = True
# init flask api instance
api = Api(app)
# start flask app
app.run()

1
bin/start_flower.sh Normal file
View File

@@ -0,0 +1 @@
celery flower --broker=redis://localhost:6379/0 --backend=redis://localhost:6379/1

2
config/__init__.py Normal file
View File

@@ -0,0 +1,2 @@
MONGO_HOST = 'localhost'
MONGO_DATABASE = 'test'

11
config/celery.py Normal file
View File

@@ -0,0 +1,11 @@
# BROKER_URL = 'redis://localhost:6379/0'
BROKER_URL = 'mongodb://localhost:27017/'
CELERY_RESULT_BACKEND = 'mongodb://localhost:27017/'
# CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'
# CELERY_TASK_SERIALIZER = 'json'
# CELERY_RESULT_SERIALIZER = 'json'
# CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间
CELERY_MONGODB_BACKEND_SETTINGS = {
'database': 'crawlab_test',
'taskmeta_collection': 'tasks_celery',
}

6
config/db.py Normal file
View File

@@ -0,0 +1,6 @@
# 数据库
MONGO_HOST = 'localhost'
MONGO_PORT = '27017'
# MONGO_USER = 'test'
# MONGO_PASS = 'test'
MONGO_DB = 'crawlab_test'

2
config/flask.py Normal file
View File

@@ -0,0 +1,2 @@
DEBUG = True

0
db/__init__.py Normal file
View File

31
db/manager.py Normal file
View File

@@ -0,0 +1,31 @@
from pymongo import MongoClient
from config.db import MONGO_HOST, MONGO_PORT, MONGO_DB
class DbManager(object):
def __init__(self):
self.mongo = MongoClient(host=MONGO_HOST, port=MONGO_PORT)
self.db = self.mongo[MONGO_DB]
# TODO: CRUD
def save(self, col_name: str, item, **kwargs):
col = self.db[col_name]
col.save(item, **kwargs)
def remove(self, col_name: str, cond: dict, **kwargs):
col = self.db[col_name]
col.remove(cond, **kwargs)
def update(self, col_name: str, cond: dict, values: dict, **kwargs):
col = self.db[col_name]
col.update(cond, {'$set': values}, **kwargs)
def list(self, col_name: str, cond: dict, skip: int, limit: int, **kwargs):
if kwargs.get('page') is not None:
try:
page = int(kwargs.get('page'))
skip = page * limit
except Exception as err:
pass
# TODO: list logic
# TODO: pagination

0
logger/__init__.py Normal file
View File

0
model/__init__.py Normal file
View File

4
route.py Normal file
View File

@@ -0,0 +1,4 @@
from app import api
from api.spider import SpiderApi, SpiderExecutorApi
api.add_resource(SpiderExecutorApi, '/spider')

5
routes/__init__.py Normal file
View File

@@ -0,0 +1,5 @@
from app import app
# api.add_resource(SpiderApi, '/spider')
# print(SpiderExecutorApi)

23
routes/spider.py Normal file
View File

@@ -0,0 +1,23 @@
from celery.utils.log import get_logger
from flask_restful import reqparse, Resource
from tasks.spider import execute_spider
logger = get_logger('tasks')
parser = reqparse.RequestParser()
parser.add_argument('spider_name', type=str)
class SpiderApi(Resource):
pass
class SpiderExecutorApi(Resource):
def get(self):
args = parser.parse_args()
job = execute_spider.delay(args.spider_name)
return {
'id': job.id,
'status': job.status,
'spider_name': args.spider_name,
'result': job.get(timeout=5)
}

9
tasks/__init__.py Normal file
View File

@@ -0,0 +1,9 @@
from celery import Celery
app = Celery(__name__)
app.config_from_object('config.celery')
import tasks.spider
if __name__ == '__main__':
app.start(argv=['tasks.spider', 'worker', '-P', 'eventlet', '-E', '-l', 'INFO'])

17
tasks/spider.py Normal file
View File

@@ -0,0 +1,17 @@
import requests
from celery.utils.log import get_logger
from tasks import app
logger = get_logger(__name__)
@app.task
def execute_spider(spider_name: str):
logger.info('spider_name: %s' % spider_name)
return spider_name
@app.task
def get_baidu_html(keyword: str):
res = requests.get('http://www.baidu.com')
return res.content.decode('utf-8')