Merge pull request #79 from tikazyq/develop

Develop
This commit is contained in:
Marvin Zhang
2019-07-07 16:49:44 +08:00
committed by GitHub
16 changed files with 358 additions and 9 deletions

View File

@@ -89,7 +89,7 @@ Crawlab的架构跟Celery非常相似但是加入了包括前端、爬虫、F
任务是利用python的`subprocess`模块中的`Popen`来实现的。任务ID将以环境变量`CRAWLAB_TASK_ID`的形式存在于爬虫任务运行的进程中,并以此来关联抓取数据。
在你的爬虫程序中,你需要将`CRAWLAB_TASK_ID`的值以`task_id`作为可以存入数据库中。这样Crawlab就直到如何将爬虫任务与抓取数据关联起来了。当前Crawlab只支持MongoDB。
在你的爬虫程序中,你需要将`CRAWLAB_TASK_ID`的值以`task_id`作为可以存入数据库中。这样Crawlab就知道如何将爬虫任务与抓取数据关联起来了。当前Crawlab只支持MongoDB。
### Scrapy

View File

@@ -103,4 +103,4 @@ if not os.path.exists(PROJECT_LOGS_FOLDER):
if __name__ == '__main__':
# run app instance
app.run(host=FLASK_HOST, port=FLASK_PORT, threaded=True)
app.run(host=FLASK_HOST, port=FLASK_PORT)

View File

@@ -14,3 +14,4 @@ eventlet
Celery
Flower
redis
gunicorn

View File

@@ -1,3 +1,6 @@
import atexit
import fcntl
import requests
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
@@ -65,8 +68,19 @@ class Scheduler(object):
print(f'running: {self.scheduler.running}')
def run(self):
self.update()
self.scheduler.start()
f = open("scheduler.lock", "wb")
try:
fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
self.update()
self.scheduler.start()
except:
pass
def unlock():
fcntl.flock(f, fcntl.LOCK_UN)
f.close()
atexit.register(unlock)
scheduler = Scheduler()

View File

@@ -1,12 +1,13 @@
#!/bin/sh
case $1 in
master)
cd /opt/crawlab/frontend \
cd $WORK_DIR/frontend \
&& npm run build:prod \
&& service nginx start
python $WORK_DIR/crawlab/flower.py >> /opt/crawlab/flower.log 2>&1 &
python $WORK_DIR/crawlab/worker.py >> /opt/crawlab/worker.log 2>&1 &
python $WORK_DIR/crawlab/app.py
cd $WORK_DIR/crawlab \
&& gunicorn --log-level=DEBUG -b 0.0.0.0 -w 8 app:app
;;
worker)
python $WORK_DIR/crawlab/app.py >> /opt/crawlab/app.log 2>&1 &

View File

@@ -0,0 +1,11 @@
# Automatically created by: scrapy startproject
#
# For more information about the [deploy] section see:
# https://scrapyd.readthedocs.io/en/latest/deploy.html
[settings]
default = sinastock.settings
[deploy]
#url = http://localhost:6800/
project = sinastock

View File

View File

@@ -0,0 +1,21 @@
# -*- coding: utf-8 -*-
# Define here the models for your scraped items
#
# See documentation in:
# https://doc.scrapy.org/en/latest/topics/items.html
import scrapy
class NewsItem(scrapy.Item):
# define the fields for your item here like:
_id = scrapy.Field()
title = scrapy.Field()
ts_str = scrapy.Field()
ts = scrapy.Field()
url = scrapy.Field()
text = scrapy.Field()
task_id = scrapy.Field()
source = scrapy.Field()
stocks = scrapy.Field()

View File

@@ -0,0 +1,103 @@
# -*- coding: utf-8 -*-
# Define here the models for your spider middleware
#
# See documentation in:
# https://doc.scrapy.org/en/latest/topics/spider-middleware.html
from scrapy import signals
class SinastockSpiderMiddleware(object):
# Not all methods need to be defined. If a method is not defined,
# scrapy acts as if the spider middleware does not modify the
# passed objects.
@classmethod
def from_crawler(cls, crawler):
# This method is used by Scrapy to create your spiders.
s = cls()
crawler.signals.connect(s.spider_opened, signal=signals.spider_opened)
return s
def process_spider_input(self, response, spider):
# Called for each response that goes through the spider
# middleware and into the spider.
# Should return None or raise an exception.
return None
def process_spider_output(self, response, result, spider):
# Called with the results returned from the Spider, after
# it has processed the response.
# Must return an iterable of Request, dict or Item objects.
for i in result:
yield i
def process_spider_exception(self, response, exception, spider):
# Called when a spider or process_spider_input() method
# (from other spider middleware) raises an exception.
# Should return either None or an iterable of Response, dict
# or Item objects.
pass
def process_start_requests(self, start_requests, spider):
# Called with the start requests of the spider, and works
# similarly to the process_spider_output() method, except
# that it doesnt have a response associated.
# Must return only requests (not items).
for r in start_requests:
yield r
def spider_opened(self, spider):
spider.logger.info('Spider opened: %s' % spider.name)
class SinastockDownloaderMiddleware(object):
# Not all methods need to be defined. If a method is not defined,
# scrapy acts as if the downloader middleware does not modify the
# passed objects.
@classmethod
def from_crawler(cls, crawler):
# This method is used by Scrapy to create your spiders.
s = cls()
crawler.signals.connect(s.spider_opened, signal=signals.spider_opened)
return s
def process_request(self, request, spider):
# Called for each request that goes through the downloader
# middleware.
# Must either:
# - return None: continue processing this request
# - or return a Response object
# - or return a Request object
# - or raise IgnoreRequest: process_exception() methods of
# installed downloader middleware will be called
return None
def process_response(self, request, response, spider):
# Called with the response returned from the downloader.
# Must either;
# - return a Response object
# - return a Request object
# - or raise IgnoreRequest
return response
def process_exception(self, request, exception, spider):
# Called when a download handler or a process_request()
# (from other downloader middleware) raises an exception.
# Must either:
# - return None: continue processing this exception
# - return a Response object: stops process_exception() chain
# - return a Request object: stops process_exception() chain
pass
def spider_opened(self, spider):
spider.logger.info('Spider opened: %s' % spider.name)

View File

@@ -0,0 +1,28 @@
# -*- coding: utf-8 -*-
# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://doc.scrapy.org/en/latest/topics/item-pipeline.html
import os
from pymongo import MongoClient
class SinastockPipeline(object):
mongo = MongoClient(
host=os.environ.get('MONGO_HOST') or 'localhost',
port=int(os.environ.get('MONGO_PORT') or 27017)
)
db = mongo[os.environ.get('MONGO_DB') or 'crawlab_test']
col = db.get_collection(os.environ.get('CRAWLAB_COLLECTION') or 'stock_news')
# create indexes
col.create_index('stocks')
col.create_index('url')
def process_item(self, item, spider):
item['task_id'] = os.environ.get('CRAWLAB_TASK_ID')
if self.col.find_one({'url': item['url']}) is None:
self.col.save(item)
return item

View File

@@ -0,0 +1,89 @@
# -*- coding: utf-8 -*-
# Scrapy settings for sinastock project
#
# For simplicity, this file contains only settings considered important or
# commonly used. You can find more settings consulting the documentation:
#
# https://doc.scrapy.org/en/latest/topics/settings.html
# https://doc.scrapy.org/en/latest/topics/downloader-middleware.html
# https://doc.scrapy.org/en/latest/topics/spider-middleware.html
BOT_NAME = 'sinastock'
SPIDER_MODULES = ['sinastock.spiders']
NEWSPIDER_MODULE = 'sinastock.spiders'
# Crawl responsibly by identifying yourself (and your website) on the user-agent
# USER_AGENT = 'sinastock (+http://www.yourdomain.com)'
# Obey robots.txt rules
ROBOTSTXT_OBEY = True
# Configure maximum concurrent requests performed by Scrapy (default: 16)
# CONCURRENT_REQUESTS = 32
# Configure a delay for requests for the same website (default: 0)
# See https://doc.scrapy.org/en/latest/topics/settings.html#download-delay
# See also autothrottle settings and docs
# DOWNLOAD_DELAY = 3
# The download delay setting will honor only one of:
# CONCURRENT_REQUESTS_PER_DOMAIN = 16
# CONCURRENT_REQUESTS_PER_IP = 16
# Disable cookies (enabled by default)
# COOKIES_ENABLED = False
# Disable Telnet Console (enabled by default)
# TELNETCONSOLE_ENABLED = False
# Override the default request headers:
# DEFAULT_REQUEST_HEADERS = {
# 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
# 'Accept-Language': 'en',
# }
# Enable or disable spider middlewares
# See https://doc.scrapy.org/en/latest/topics/spider-middleware.html
# SPIDER_MIDDLEWARES = {
# 'sinastock.middlewares.SinastockSpiderMiddleware': 543,
# }
# Enable or disable downloader middlewares
# See https://doc.scrapy.org/en/latest/topics/downloader-middleware.html
# DOWNLOADER_MIDDLEWARES = {
# 'sinastock.middlewares.SinastockDownloaderMiddleware': 543,
# }
# Enable or disable extensions
# See https://doc.scrapy.org/en/latest/topics/extensions.html
# EXTENSIONS = {
# 'scrapy.extensions.telnet.TelnetConsole': None,
# }
# Configure item pipelines
# See https://doc.scrapy.org/en/latest/topics/item-pipeline.html
ITEM_PIPELINES = {
'sinastock.pipelines.SinastockPipeline': 300,
}
# Enable and configure the AutoThrottle extension (disabled by default)
# See https://doc.scrapy.org/en/latest/topics/autothrottle.html
# AUTOTHROTTLE_ENABLED = True
# The initial download delay
# AUTOTHROTTLE_START_DELAY = 5
# The maximum download delay to be set in case of high latencies
# AUTOTHROTTLE_MAX_DELAY = 60
# The average number of requests Scrapy should be sending in parallel to
# each remote server
# AUTOTHROTTLE_TARGET_CONCURRENCY = 1.0
# Enable showing throttling stats for every response received:
# AUTOTHROTTLE_DEBUG = False
# Enable and configure HTTP caching (disabled by default)
# See https://doc.scrapy.org/en/latest/topics/downloader-middleware.html#httpcache-middleware-settings
# HTTPCACHE_ENABLED = True
# HTTPCACHE_EXPIRATION_SECS = 0
# HTTPCACHE_DIR = 'httpcache'
# HTTPCACHE_IGNORE_HTTP_CODES = []
# HTTPCACHE_STORAGE = 'scrapy.extensions.httpcache.FilesystemCacheStorage'

View File

@@ -0,0 +1,4 @@
# This package will contain the spiders of your Scrapy project
#
# Please refer to the documentation for information on how to create and manage
# your spiders.

View File

@@ -0,0 +1,67 @@
# -*- coding: utf-8 -*-
import os
import re
from datetime import datetime
import scrapy
from pymongo import MongoClient
import pytz
from sinastock.items import NewsItem
# 时区
tz = pytz.timezone('Asia/Shanghai')
class SinastockSpiderSpider(scrapy.Spider):
name = 'sinastock_spider'
allowed_domains = ['finance.sina.com.cn']
mongo = MongoClient(
host=os.environ.get('MONGO_HOST') or 'localhost',
port=int(os.environ.get('MONGO_PORT') or 27017)
)
db = mongo[os.environ.get('MONGO_DB') or 'crawlab_test']
col = db.get_collection(os.environ.get('CRAWLAB_COLLECTION') or 'stock_news')
page_num = int(os.environ.get('PAGE_NUM')) or 3
def start_requests(self):
col = self.db['stocks']
for s in col.find({}):
code, ex = s['ts_code'].split('.')
for i in range(self.page_num):
url = f'http://vip.stock.finance.sina.com.cn/corp/view/vCB_AllNewsStock.php?symbol={ex.lower()}{code}&Page={i + 1}'
yield scrapy.Request(
url=url,
callback=self.parse,
meta={'ts_code': s['ts_code']}
)
def parse(self, response):
for a in response.css('.datelist > ul > a'):
url = a.css('a::attr("href")').extract_first()
item = NewsItem(
title=a.css('a::text').extract_first(),
url=url,
source='sina',
stocks=[response.meta['ts_code']]
)
yield scrapy.Request(
url=url,
callback=self.parse_detail,
meta={'item': item}
)
def parse_detail(self, response):
item = response.meta['item']
text = response.css('#artibody').extract_first()
pre = re.compile('>(.*?)<')
text = ''.join(pre.findall(text))
item['text'] = text.replace('\u3000', '')
item['ts_str'] = response.css('.date::text').extract_first()
if item['text'] is None or item['ts_str'] is None:
pass
else:
ts = datetime.strptime(item['ts_str'], '%Y年%m月%d%H:%M')
ts = tz.localize(ts)
item['ts'] = ts
yield item

View File

@@ -14,7 +14,10 @@ class XueqiuItem(scrapy.Item):
task_id = scrapy.Field()
id = scrapy.Field()
text = scrapy.Field()
url = scrapy.Field()
target = scrapy.Field()
view_count = scrapy.Field()
mark = scrapy.Field()
created_at = scrapy.Field()
ts = scrapy.Field()
source = scrapy.Field()

View File

@@ -17,9 +17,13 @@ class XueqiuPipeline(object):
db = mongo[os.environ.get('MONGO_DB') or 'crawlab_test']
col = db.get_collection(os.environ.get('CRAWLAB_COLLECTION') or 'results_xueqiu')
# create indexes
col.create_index('stocks')
col.create_index('id')
col.create_index('url')
def process_item(self, item, spider):
item['task_id'] = os.environ.get('CRAWLAB_TASK_ID')
item['_id'] = item['id']
if self.col.find_one({'_id': item['_id']}) is None:
if self.col.find_one({'id': item['id']}) is None:
self.col.save(item)
return item

View File

@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
import json
from datetime import datetime
from time import sleep
import scrapy
@@ -32,9 +33,11 @@ class XueqiuSpiderSpider(scrapy.Spider):
id=d['id'],
text=d['text'],
mark=d['mark'],
target=d['target'],
url=d['target'],
created_at=d['created_at'],
ts=datetime.fromtimestamp(d['created_at'] / 1e3),
view_count=d['view_count'],
source='xueqiu'
)
yield item