mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-01-21 17:21:09 +01:00
修改文件的配置地址为127.0.0.1系列
This commit is contained in:
@@ -11,7 +11,7 @@ const MongoClient = require('mongodb').MongoClient;
|
||||
const page = await browser.newPage();
|
||||
|
||||
// open database connection
|
||||
const client = await MongoClient.connect('mongodb://192.168.99.100:27017');
|
||||
const client = await MongoClient.connect('mongodb://127.0.0.1:27017');
|
||||
let db = await client.db('crawlab_test');
|
||||
const colName = process.env.CRAWLAB_COLLECTION || 'results';
|
||||
const col = db.collection(colName);
|
||||
|
||||
@@ -11,7 +11,7 @@ const MongoClient = require('mongodb').MongoClient;
|
||||
const page = await browser.newPage();
|
||||
|
||||
// open database connection
|
||||
const client = await MongoClient.connect('mongodb://192.168.99.100:27017');
|
||||
const client = await MongoClient.connect('mongodb://127.0.0.1:27017');
|
||||
let db = await client.db('crawlab_test');
|
||||
const colName = process.env.CRAWLAB_COLLECTION || 'results';
|
||||
const col = db.collection(colName);
|
||||
|
||||
@@ -53,7 +53,7 @@ const MongoClient = require('mongodb').MongoClient;
|
||||
});
|
||||
|
||||
// open database connection
|
||||
const client = await MongoClient.connect('mongodb://192.168.99.100:27017');
|
||||
const client = await MongoClient.connect('mongodb://127.0.0.1:27017');
|
||||
let db = await client.db('crawlab_test');
|
||||
const colName = process.env.CRAWLAB_COLLECTION || 'results_juejin';
|
||||
const taskId = process.env.CRAWLAB_TASK_ID;
|
||||
|
||||
@@ -53,7 +53,7 @@ const MongoClient = require('mongodb').MongoClient;
|
||||
});
|
||||
|
||||
// open database connection
|
||||
const client = await MongoClient.connect('mongodb://192.168.99.100:27017');
|
||||
const client = await MongoClient.connect('mongodb://127.0.0.1:27017');
|
||||
let db = await client.db('crawlab_test');
|
||||
const colName = process.env.CRAWLAB_COLLECTION || 'results_juejin';
|
||||
const taskId = process.env.CRAWLAB_TASK_ID;
|
||||
|
||||
@@ -1,18 +0,0 @@
|
||||
```
|
||||
爬取策略:
|
||||
discogs抓取范围2000-2003年的
|
||||
因为discogs限制显示最大10000条,所以这里采取以下策略。
|
||||
1。首先抓对应的年代结果页
|
||||
2。获取左栏对应的url和总数构建url
|
||||
url构成
|
||||
format_exact:类型
|
||||
layout:sm
|
||||
country_exact:国家
|
||||
style_exact:风格
|
||||
limit:250
|
||||
year:2000-2003
|
||||
decade:2000
|
||||
```
|
||||
### url构成
|
||||
1.发现左侧区域 我们需要是style,format和country
|
||||
2.根据每个分类的个数 除以每页最大显示数250,确定翻几页。
|
||||
@@ -1,129 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# @Time : 2019-03-22 15:05
|
||||
# @Author : cxa
|
||||
# @File : base_crawler.py
|
||||
# @Software: PyCharm
|
||||
|
||||
import asyncio
|
||||
import aiohttp
|
||||
from logger.log import crawler
|
||||
import async_timeout
|
||||
from collections import namedtuple
|
||||
from config.config import *
|
||||
from multidict import CIMultiDict
|
||||
from typing import Optional, Union
|
||||
from async_retrying import retry
|
||||
from lxml import html
|
||||
from aiostream import stream
|
||||
import marshal
|
||||
|
||||
Response = namedtuple("Response",
|
||||
["status", "source"])
|
||||
|
||||
try:
|
||||
import uvloop
|
||||
|
||||
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
|
||||
except ImportError:
|
||||
pass
|
||||
sem = asyncio.Semaphore(CONCURRENCY_NUM)
|
||||
|
||||
|
||||
class Crawler():
|
||||
def __init__(self):
|
||||
self.session = None
|
||||
self.tc = None
|
||||
|
||||
@retry(attempts=MAX_RETRY_TIMES)
|
||||
async def get_session(self, url, _kwargs: dict = {}, source_type="text", status_code=200) -> Response:
|
||||
"""
|
||||
:param kwargs:url,headers,data,params,etc,,
|
||||
:param method: get post.
|
||||
:param timeout: defalut 5s.
|
||||
"""
|
||||
kwargs = marshal.loads(marshal.dumps(_kwargs))
|
||||
if USE_PROXY:
|
||||
kwargs["proxy"] = await self.get_proxy()
|
||||
method = kwargs.pop("method", "get")
|
||||
timeout = kwargs.pop("timeout", 5)
|
||||
with async_timeout.timeout(timeout):
|
||||
async with getattr(self.session, method)(url, **kwargs) as req:
|
||||
status = req.status
|
||||
if status in [status_code, 201]:
|
||||
if source_type == "text":
|
||||
source = await req.text()
|
||||
elif source_type == "buff":
|
||||
source = await req.read()
|
||||
|
||||
crawler.info(f"get url:{url},status:{status}")
|
||||
res = Response(status=status, source=source)
|
||||
return res
|
||||
|
||||
def xpath(self, _response, rule, _attr=None):
|
||||
if isinstance(_response, Response):
|
||||
source = _response.text
|
||||
root = html.fromstring(source)
|
||||
|
||||
elif isinstance(_response, str):
|
||||
source = _response
|
||||
root = html.fromstring(source)
|
||||
else:
|
||||
root = _response
|
||||
nodes = root.xpath(rule)
|
||||
result = []
|
||||
if _attr:
|
||||
if _attr == "text":
|
||||
result = (entry.text for entry in nodes)
|
||||
else:
|
||||
result = (entry.get(_attr) for entry in nodes)
|
||||
else:
|
||||
result = nodes
|
||||
return result
|
||||
|
||||
async def branch(self, coros, limit=10):
|
||||
"""
|
||||
使用aiostream模块对异步生成器做一个切片操作。这里并发量为10.
|
||||
:param coros: 异步生成器
|
||||
:param limit: 并发次数
|
||||
:return:
|
||||
"""
|
||||
index = 0
|
||||
while True:
|
||||
xs = stream.preserve(coros)
|
||||
ys = xs[index:index + limit]
|
||||
t = await stream.list(ys)
|
||||
if not t:
|
||||
break
|
||||
await asyncio.ensure_future(asyncio.wait(t))
|
||||
index += limit + 1
|
||||
def call_back(self):
|
||||
return "请输入get或者post"
|
||||
async def get_proxy(self) -> Optional[str]:
|
||||
...
|
||||
|
||||
async def init_session(self, cookies=None):
|
||||
"""
|
||||
创建Tcpconnector,包括ssl和连接数的限制
|
||||
创建一个全局session。
|
||||
:return:
|
||||
"""
|
||||
self.tc = aiohttp.connector.TCPConnector(limit=300, force_close=True,
|
||||
enable_cleanup_closed=True,
|
||||
verify_ssl=False)
|
||||
self.session = aiohttp.ClientSession(connector=self.tc, cookies=cookies)
|
||||
|
||||
def run(self):
|
||||
'''
|
||||
创建全局session
|
||||
:return:
|
||||
'''
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.run_until_complete(self.get_session("url",{"method":"poo"}))
|
||||
|
||||
async def close(self):
|
||||
await self.tc.close()
|
||||
await self.session.close()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
c = Crawler().run()
|
||||
@@ -1,5 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# @Time : 2019-03-22 16:24
|
||||
# @Author : cxa
|
||||
# @File : __init__.py.py
|
||||
# @Software: PyCharm
|
||||
@@ -1,8 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# @Time : 2019-03-22 15:08
|
||||
# @Author : cxa
|
||||
# @File : config.py.py
|
||||
# @Software: PyCharm
|
||||
CONCURRENCY_NUM = 5
|
||||
MAX_RETRY_TIMES = 3
|
||||
USE_PROXY = False
|
||||
@@ -1,124 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# @Time : 2019-02-13 10:44
|
||||
# @Author : cxa
|
||||
# @File : mongohelper.py
|
||||
# @Software: PyCharm
|
||||
# -*- coding: utf-8 -*-
|
||||
# @Time : 2018/12/28 10:01 AM
|
||||
# @Author : cxa
|
||||
# @File : mongo_helper.py
|
||||
# @Software: PyCharm
|
||||
import asyncio
|
||||
from logger.log import storage
|
||||
import datetime
|
||||
from decorators.decorators import decorator
|
||||
from motor.motor_asyncio import AsyncIOMotorClient
|
||||
from itertools import islice
|
||||
|
||||
try:
|
||||
import uvloop
|
||||
|
||||
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
db_configs = {
|
||||
'host': '127.0.0.1',
|
||||
'port': '27017',
|
||||
'db_name': 'aio_spider_data',
|
||||
'user': ''
|
||||
}
|
||||
|
||||
|
||||
class MotorOperation():
|
||||
def __init__(self):
|
||||
self.__dict__.update(**db_configs)
|
||||
if self.user:
|
||||
self.motor_uri = f"mongodb://{self.user}:{self.passwd}@{self.host}:{self.port}/{self.db_name}?authSource={self.db_name}"
|
||||
else:
|
||||
self.motor_uri = f"mongodb://{self.host}:{self.port}/{self.db_name}"
|
||||
self.client = AsyncIOMotorClient(self.motor_uri)
|
||||
self.mb = self.client[self.db_name]
|
||||
|
||||
# async def get_use_list(self):
|
||||
# fs = await aiofiles.open("namelist.txt", "r", encoding="utf-8")
|
||||
# data = (i.replace("\n", "") async for i in fs)
|
||||
# return data
|
||||
|
||||
async def save_data_with_status(self, items, col="discogs_seed_data"):
|
||||
for i in range(0, len(items), 2000):
|
||||
tasks = []
|
||||
for item in islice(items, i, i + 2000):
|
||||
data = {}
|
||||
data["update_time"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
data["status"] = 0 # 0初始
|
||||
data["url"] = item
|
||||
tasks.append(data)
|
||||
print("存新的url",tasks)
|
||||
await self.mb[col].insert_many(tasks)
|
||||
|
||||
async def add_index(self, col="discogs_seed_data"):
|
||||
# 添加索引
|
||||
await self.mb[col].create_index('url')
|
||||
|
||||
async def save_data(self, items, col="discogs_index_data", key="obj_id"):
|
||||
# storage.info(f"此时的items:{items}")
|
||||
if isinstance(items, list):
|
||||
for item in items:
|
||||
try:
|
||||
item[key] = item[key]
|
||||
await self.mb[col].update_one({
|
||||
key: item.get(key)},
|
||||
{'$set': item},
|
||||
upsert=True)
|
||||
except Exception as e:
|
||||
storage.error(f"数据插入出错:{e.args}此时的item是:{item}")
|
||||
elif isinstance(items, dict):
|
||||
try:
|
||||
items[key] = items[key]
|
||||
await self.mb[col].update_one({
|
||||
key: items.get(key)},
|
||||
{'$set': items},
|
||||
upsert=True)
|
||||
except Exception as e:
|
||||
storage.error(f"数据插入出错:{e.args}此时的item是:{items}")
|
||||
|
||||
async def change_status(self, condition, col="discogs_seed_data", status_code=1):
|
||||
# status_code 0:初始,1:开始下载,2下载完了
|
||||
try:
|
||||
item = {}
|
||||
item["status"] = status_code
|
||||
# storage.info(f"修改状态,此时的数据是:{item}")
|
||||
await self.mb[col].update_one(condition, {'$set': item})
|
||||
except Exception as e:
|
||||
storage.error(f"修改状态出错:{e.args}此时的数据是:{item}")
|
||||
|
||||
async def get_detail_datas(self):
|
||||
data = self.mb.discogs_index.find({'status': 0})
|
||||
async for item in data:
|
||||
print(item)
|
||||
return data
|
||||
|
||||
async def reset_status(self, col="discogs_seed_data"):
|
||||
await self.mb[col].update_many({'status': 1}, {'$set': {"status": 0}})
|
||||
|
||||
async def reset_all_status(self, col="discogs_seed_data"):
|
||||
await self.mb[col].update_many({}, {'$set': {"status": 0}})
|
||||
|
||||
async def find_data(self, col="discogs_seed_data"):
|
||||
'''
|
||||
获取状态为0的数据,作为爬取对象。
|
||||
:return:AsyncGeneratorType
|
||||
'''
|
||||
cursor = self.mb[col].find({'status': 0}, {"_id": 0})
|
||||
async_gen = (item async for item in cursor)
|
||||
return async_gen
|
||||
|
||||
async def do_delete_many(self):
|
||||
await self.mb.tiaopiao_data.delete_many({"flag": 0})
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
m = MotorOperation()
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.run_until_complete(m.reset_all_status())
|
||||
@@ -1,5 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# @Time : 2018/03/28 15:35
|
||||
# @Author : cxa
|
||||
# @File : __init__.py.py
|
||||
# @Software: PyCharm
|
||||
@@ -1,30 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# @Time : 2018/03/28 15:35
|
||||
# @Author : cxa
|
||||
# @File : decorators.py
|
||||
# @Software: PyCharm
|
||||
from functools import wraps
|
||||
from logger.log import crawler
|
||||
import traceback
|
||||
|
||||
|
||||
def decorator(f=True):
|
||||
'''
|
||||
日志装饰
|
||||
:param f:默认是不输出info,False的时候输出info信息。
|
||||
:return:
|
||||
'''
|
||||
|
||||
def flag(func):
|
||||
@wraps(func)
|
||||
def log(*args, **kwargs):
|
||||
try:
|
||||
if f:
|
||||
crawler.info(f"{func.__name__} is run")
|
||||
return func(*args, **kwargs)
|
||||
except Exception as e:
|
||||
crawler.error(f"{func.__name__} is error,here are details:{traceback.format_exc()}")
|
||||
|
||||
return log
|
||||
|
||||
return flag
|
||||
@@ -1,169 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# @Time : 2019/3/24 2:05 AM
|
||||
# @Author : cxa
|
||||
# @File : discogs_details_spider.py
|
||||
# @Software: PyCharm
|
||||
import asyncio
|
||||
|
||||
import aiofiles
|
||||
from db.mongohelper import MotorOperation
|
||||
from logger.log import crawler, storage
|
||||
|
||||
from copy import copy
|
||||
import os
|
||||
from common.base_crawler import Crawler
|
||||
from types import AsyncGeneratorType
|
||||
from decorators.decorators import decorator
|
||||
|
||||
from urllib.parse import urljoin
|
||||
from multidict import CIMultiDict
|
||||
|
||||
DEFAULT_HEADRS = CIMultiDict({
|
||||
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8",
|
||||
"Accept-Encoding": "gzip, deflate, br",
|
||||
"Accept-Language": "zh-CN,zh;q=0.9",
|
||||
"Host": "www.discogs.com",
|
||||
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36",
|
||||
})
|
||||
BASE_URL = "https://www.discogs.com"
|
||||
|
||||
|
||||
class Details_Spider(Crawler):
|
||||
def __init__(self):
|
||||
self.page_pat = "&page=.*&"
|
||||
|
||||
@decorator()
|
||||
async def start(self):
|
||||
# 获取mongo的数据,类型异步生成器。
|
||||
data: AsyncGeneratorType = await MotorOperation().find_data(col="discogs_index_data")
|
||||
await self.init_session()
|
||||
# 分流
|
||||
tasks = (asyncio.ensure_future(self.fetch_detail_page(item)) async for item in data)
|
||||
await self.branch(tasks)
|
||||
|
||||
@decorator(False)
|
||||
async def fetch_detail_page(self, item: dict):
|
||||
'''
|
||||
访问详情页,开始解析
|
||||
:param url:
|
||||
:return:
|
||||
|
||||
'''
|
||||
detail_url = item.get("detail_url")
|
||||
kwargs = {"headers": DEFAULT_HEADRS}
|
||||
# 修改种子URL的状态为1表示开始爬取。
|
||||
condition = {'url': detail_url}
|
||||
await MotorOperation().change_status(condition, status_code=1)
|
||||
await asyncio.sleep(2)
|
||||
response = await self.get_session(detail_url, kwargs)
|
||||
if response.status == 200:
|
||||
source = response.source
|
||||
await self.more_images(source)
|
||||
# 获取当前的链接然后构建所有页数的url。
|
||||
# 保存当一页的内容。
|
||||
# await self.get_list_info(detail_url, source)
|
||||
# await self.max_page_index(url, source)
|
||||
|
||||
@decorator(False)
|
||||
async def get_list_info(self, url, source):
|
||||
'''
|
||||
为了取得元素的正确性,这里按照块进行处理。
|
||||
:param url: 当前页的url
|
||||
:param source: 源码
|
||||
:return:
|
||||
'''
|
||||
pass
|
||||
# div_xpath = "//div[@class='cards cards_layout_text-only']/div"
|
||||
# div_node_list = self.xpath(source, div_xpath)
|
||||
# tasks = []
|
||||
# t_append = tasks.append
|
||||
# for div_node in div_node_list:
|
||||
# try:
|
||||
# dic = {}
|
||||
# dic["obj_id"] = self.xpath(div_node, "@data-object-id")[0]
|
||||
# dic["artist"] = self.xpath(div_node, ".//div[@class='card_body']/h4/span/a", "text")[0]
|
||||
# dic["title"] = \
|
||||
# self.xpath(div_node, ".//div[@class='card_body']/h4/a[@class='search_result_title ']", "text")[0]
|
||||
# _detail_url = \
|
||||
# self.xpath(div_node, ".//div[@class='card_body']/h4/a[@class='search_result_title ']", "href")[0]
|
||||
# dic["detail_url"] = urljoin(BASE_URL, _detail_url)
|
||||
#
|
||||
# card_info_xpath = ".//div[@class='card_body']/p[@class='card_info']"
|
||||
# dic["label"] = self.xpath(div_node, f"{card_info_xpath}/a", "text")[0]
|
||||
# dic["catalog_number"] = \
|
||||
# self.xpath(div_node, f"{card_info_xpath}/span[@class='card_release_catalog_number']", "text")[0]
|
||||
# dic["format"] = self.xpath(div_node, f"{card_info_xpath}/span[@class='card_release_format']", "text")[0]
|
||||
# dic["year"] = self.xpath(div_node, f"{card_info_xpath}/span[@class='card_release_year']", "text")[0]
|
||||
# dic["country"] = self.xpath(div_node, f"{card_info_xpath}/span[@class='card_release_country']", "text")[
|
||||
# 0]
|
||||
# dic["url"] = url
|
||||
# dic["page_index"] = 1
|
||||
# dic["status"] = 0
|
||||
# dic["crawler_time"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
# t_append(dic)
|
||||
# except IndexError as e:
|
||||
# crawler.error(f"解析出错,此时的url是:{url}")
|
||||
# await MotorOperation().save_data(dic)
|
||||
# # 修改种子URL的状态为2表示爬取成功。
|
||||
# condition = {"url": url}
|
||||
# await MotorOperation().change_status(condition, status_code=2)
|
||||
|
||||
@decorator()
|
||||
async def more_images(self, source):
|
||||
'''
|
||||
获取更多图片的链接
|
||||
:param source:
|
||||
:return:
|
||||
'''
|
||||
more_url_node = self.xpath(source, "//a[contains(@class,'thumbnail_link') and contains(@href,'images')]",
|
||||
"href")
|
||||
if more_url_node:
|
||||
_url = more_url_node[0]
|
||||
more_url = urljoin(BASE_URL, _url)
|
||||
kwargs = {"headers": DEFAULT_HEADRS}
|
||||
response = await self.get_session(more_url, kwargs)
|
||||
if response.status == 200:
|
||||
source = response.source
|
||||
await self.parse_images(source)
|
||||
# TODO:解析歌曲的详细曲目信息
|
||||
|
||||
async def get_image_buff(self, img_url):
|
||||
img_headers = copy(DEFAULT_HEADRS)
|
||||
img_headers["host"] = "img.discogs.com"
|
||||
kwargs = {"headers": img_headers}
|
||||
response = await self.get_session(img_url, kwargs, source_type="buff")
|
||||
buff = response.source
|
||||
await self.save_image(img_url, buff)
|
||||
|
||||
@decorator()
|
||||
async def save_image(self, img_url, buff):
|
||||
image_name = img_url.split("/")[-1].replace(".jpeg", "")
|
||||
file_path = os.path.join(os.getcwd(), "discogs_images")
|
||||
image_path = os.path.join(file_path, image_name)
|
||||
if not os.path.exists(file_path):
|
||||
os.makedirs(file_path)
|
||||
# 文件是否存在
|
||||
if not os.path.exists(image_path):
|
||||
storage.info(f"SAVE_PATH:{image_path}")
|
||||
async with aiofiles.open(image_path, 'wb') as f:
|
||||
await f.write(buff)
|
||||
|
||||
@decorator()
|
||||
async def parse_images(self, source):
|
||||
'''
|
||||
解析当前页所有图片的链接
|
||||
:param source:
|
||||
:return:
|
||||
'''
|
||||
image_node_list = self.xpath(source, "//div[@id='view_images']/p//img", "src")
|
||||
tasks = [asyncio.ensure_future(self.get_image_buff(url)) for url in image_node_list]
|
||||
await tasks
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
s = Details_Spider()
|
||||
loop = asyncio.get_event_loop()
|
||||
try:
|
||||
loop.run_until_complete(s.start())
|
||||
finally:
|
||||
loop.close()
|
||||
@@ -1,141 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# @Time : 2019-03-22 13:41
|
||||
# @Author : cxa
|
||||
# @File : discogs_index_spider.py
|
||||
# @Software: PyCharm
|
||||
# 2000-2009
|
||||
import asyncio
|
||||
import aiohttp
|
||||
from db.mongohelper import MotorOperation
|
||||
from logger.log import crawler
|
||||
from collections import namedtuple, deque
|
||||
import datetime
|
||||
from common.base_crawler import Crawler
|
||||
from types import AsyncGeneratorType
|
||||
from decorators.decorators import decorator
|
||||
import re
|
||||
import math
|
||||
from urllib.parse import urljoin
|
||||
|
||||
Response = namedtuple("Response",
|
||||
["status", "text"])
|
||||
try:
|
||||
import uvloop
|
||||
|
||||
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
|
||||
except ImportError:
|
||||
pass
|
||||
BASE_URL = "https://www.discogs.com"
|
||||
# 最终形式
|
||||
DEFAULT_HEADRS = {
|
||||
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8",
|
||||
"Accept-Encoding": "gzip, deflate, br",
|
||||
"Accept-Language": "zh-CN,zh;q=0.9",
|
||||
"Host": "www.discogs.com",
|
||||
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36",
|
||||
}
|
||||
|
||||
|
||||
class Index_Spider(Crawler):
|
||||
def __init__(self):
|
||||
self.page_pat = "&page=.*&"
|
||||
|
||||
@decorator()
|
||||
async def start(self):
|
||||
# 获取mongo的数据,类型异步生成器。
|
||||
data: AsyncGeneratorType = await MotorOperation().find_data()
|
||||
await self.init_session()
|
||||
# 分流
|
||||
tasks = (asyncio.ensure_future(self.fetch_index_page(item)) async for item in data)
|
||||
await self.branch(tasks)
|
||||
|
||||
@decorator(False)
|
||||
async def fetch_index_page(self, item: dict):
|
||||
'''
|
||||
访问列表,并开始解析
|
||||
:param url:
|
||||
:return:
|
||||
|
||||
'''
|
||||
url = item.get("url")
|
||||
kwargs = {"headers": DEFAULT_HEADRS}
|
||||
# 修改种子URL的状态为1表示开始爬取。
|
||||
condition = {'url': url}
|
||||
await MotorOperation().change_status(condition, status_code=1)
|
||||
response = await self.get_session(url, kwargs)
|
||||
if response.status == 200:
|
||||
source = response.source
|
||||
# 获取当前的链接然后构建所有页数的url。
|
||||
# 保存当一页的内容。
|
||||
await self.get_list_info(url, source)
|
||||
await self.max_page_index(url, source)
|
||||
|
||||
@decorator(False)
|
||||
async def get_list_info(self, url, source):
|
||||
'''
|
||||
为了取得元素的正确性,这里按照块进行处理。
|
||||
:param url: 当前页的url
|
||||
:param source: 源码
|
||||
:return:
|
||||
'''
|
||||
div_xpath = "//div[@class='cards cards_layout_text-only']/div"
|
||||
div_node_list = self.xpath(source, div_xpath)
|
||||
tasks = []
|
||||
t_append = tasks.append
|
||||
for div_node in div_node_list:
|
||||
try:
|
||||
dic = {}
|
||||
dic["obj_id"] = self.xpath(div_node, "@data-object-id")[0]
|
||||
dic["artist"] = self.xpath(div_node, ".//div[@class='card_body']/h4/span/a", "text")[0]
|
||||
dic["title"] = \
|
||||
self.xpath(div_node, ".//div[@class='card_body']/h4/a[@class='search_result_title ']", "text")[0]
|
||||
_detail_url = \
|
||||
self.xpath(div_node, ".//div[@class='card_body']/h4/a[@class='search_result_title ']", "href")[0]
|
||||
dic["detail_url"] = urljoin(BASE_URL, _detail_url)
|
||||
|
||||
card_info_xpath = ".//div[@class='card_body']/p[@class='card_info']"
|
||||
dic["label"] = self.xpath(div_node, f"{card_info_xpath}/a", "text")[0]
|
||||
dic["catalog_number"] = \
|
||||
self.xpath(div_node, f"{card_info_xpath}/span[@class='card_release_catalog_number']", "text")[0]
|
||||
dic["format"] = self.xpath(div_node, f"{card_info_xpath}/span[@class='card_release_format']", "text")[0]
|
||||
dic["year"] = self.xpath(div_node, f"{card_info_xpath}/span[@class='card_release_year']", "text")[0]
|
||||
dic["country"] = self.xpath(div_node, f"{card_info_xpath}/span[@class='card_release_country']", "text")[
|
||||
0]
|
||||
dic["url"] = url
|
||||
dic["page_index"] = 1
|
||||
dic["status"] = 0
|
||||
dic["crawler_time"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
t_append(dic)
|
||||
except IndexError as e:
|
||||
crawler.error(f"解析出错,此时的url是:{url}")
|
||||
await MotorOperation().save_data(dic)
|
||||
# 修改种子URL的状态为2表示爬取成功。
|
||||
condition = {"url": url}
|
||||
await MotorOperation().change_status(condition, status_code=2)
|
||||
|
||||
@decorator(False)
|
||||
async def max_page_index(self, url, source):
|
||||
'''
|
||||
:param source:
|
||||
:return:
|
||||
'''
|
||||
total_page_node = self.xpath(source, "//strong[@class='pagination_total']", "text")
|
||||
total_page = total_page_node[0].split("of")[-1].strip().replace(",", "")
|
||||
_max_page_index = math.ceil(int(total_page) / 100)
|
||||
old_index = re.compile("&page=(.*?)&").findall(url)[0]
|
||||
new_url_list = deque()
|
||||
n_append = new_url_list.append
|
||||
if _max_page_index > 1:
|
||||
for i in range(2, _max_page_index + 1):
|
||||
new_url = re.sub(self.page_pat, f"&page={i}&", url)
|
||||
n_append(new_url)
|
||||
await MotorOperation().save_data_with_status(new_url_list)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
s = Index_Spider()
|
||||
loop = asyncio.get_event_loop()
|
||||
try:
|
||||
loop.run_until_complete(s.start())
|
||||
finally:
|
||||
loop.close()
|
||||
@@ -1,115 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# @Time : 2019-03-22 13:41
|
||||
# @Author : cxa
|
||||
# @File : discogs_seed_spider.py
|
||||
# @Software: PyCharm
|
||||
# 2000-2009
|
||||
import asyncio
|
||||
from db.mongohelper import MotorOperation
|
||||
from collections import namedtuple
|
||||
from common.base_crawler import Crawler
|
||||
from decorators.decorators import decorator
|
||||
import re
|
||||
from collections import deque
|
||||
from itertools import product
|
||||
|
||||
Response = namedtuple("Response",
|
||||
["status", "text"])
|
||||
try:
|
||||
import uvloop
|
||||
|
||||
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
|
||||
except ImportError:
|
||||
pass
|
||||
START_URL_LIST = [f"https://www.discogs.com/search/?limit=25&layout=sm&decade=2000&year={i}&page=1"
|
||||
for i in range(2000, 2001)]
|
||||
# 最终形式
|
||||
BASE_URL = "https://www.discogs.com/search/?layout=sm&country_exact=UK&format_exact=Vinyl&limit=100&year=2000&style_exact=House&page=2&decade=2000"
|
||||
DEFAULT_HEADERS = {
|
||||
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8",
|
||||
"Accept-Encoding": "gzip, deflate, br",
|
||||
"Accept-Language": "zh-CN,zh;q=0.9",
|
||||
"Host": "www.discogs.com",
|
||||
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36",
|
||||
}
|
||||
|
||||
|
||||
class Seed_Spider(Crawler):
|
||||
@decorator(False)
|
||||
async def start(self):
|
||||
await self.init_session()
|
||||
res_list: list = [asyncio.ensure_future(self.fetch_home(url)) for url in START_URL_LIST]
|
||||
tasks = asyncio.wait(res_list)
|
||||
await tasks
|
||||
|
||||
@decorator(False)
|
||||
async def fetch_home(self, url: str):
|
||||
'''
|
||||
访问主页,并开始解析
|
||||
:param url:
|
||||
:return:
|
||||
'''
|
||||
kwargs = {"headers": DEFAULT_HEADERS}
|
||||
response = await self.get_session(url, kwargs)
|
||||
if response.status == 200:
|
||||
source = response.source
|
||||
await self.parse(source)
|
||||
|
||||
@decorator()
|
||||
async def parse(self, source):
|
||||
'''
|
||||
:param source:
|
||||
:return:
|
||||
'''
|
||||
# ul分四块处理, 风格,唱片类型,国家。
|
||||
# 分块处理
|
||||
keyword = ["Italodance", "House", "Trance"]
|
||||
stlye_dic = {}
|
||||
format_dic = {}
|
||||
country_dic = {}
|
||||
type_dic = {"style": stlye_dic, "format": format_dic, "country": country_dic}
|
||||
xpath_id_dic = {"style": "facets_style_exact", "format": "facets_format_exact",
|
||||
"country": "facets_country_exact"}
|
||||
for k, v in xpath_id_dic.items():
|
||||
x = f"//div[@id='{v}']/ul/li/a"
|
||||
node_list = self.xpath(source, x)
|
||||
for item in node_list:
|
||||
count = self.xpath(item, ".//small", "text")[0].replace(",", "")
|
||||
_type = self.xpath(item, "@href")[0]
|
||||
name = self.xpath(item, "text()")[1].strip("\n").strip()
|
||||
r = v.split("facets_")[1]
|
||||
pat = re.compile(f"{r}=(.*?)&")
|
||||
url_name = pat.findall(_type)[0]
|
||||
if k == "style":
|
||||
if (
|
||||
"ITALO" in name.upper() or "DANCE" in name.upper() or "HOUSE" in name.upper() or "TECHNO" in name.upper()
|
||||
or "CORE" in name.upper() or "HARD" in name.upper()
|
||||
or "EURO" in name.upper()):
|
||||
type_dic[k].setdefault("url_name", deque()).append(url_name)
|
||||
type_dic[k].setdefault("name", deque()).append(name)
|
||||
type_dic[k].setdefault("count", deque()).append(count)
|
||||
else:
|
||||
type_dic[k].setdefault("url_name", deque()).append(url_name)
|
||||
type_dic[k].setdefault("name", deque()).append(name)
|
||||
type_dic[k].setdefault("count", deque()).append(count)
|
||||
|
||||
tasks = deque()
|
||||
t_append = tasks.append
|
||||
for item in product([2000, 2001, 2002, 2003], stlye_dic["url_name"], format_dic["url_name"],
|
||||
country_dic["url_name"]):
|
||||
country = item[3]
|
||||
_format = item[2]
|
||||
year = item[0]
|
||||
style = item[1]
|
||||
url = f"https://www.discogs.com/search/?layout=sm&country_exact={country}&format_exact={_format}&limit=100&year={year}&style_exact={style}&page=1&decade=2000"
|
||||
t_append(url)
|
||||
await MotorOperation().save_data_with_status(tasks)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
s = Seed_Spider()
|
||||
loop = asyncio.get_event_loop()
|
||||
try:
|
||||
loop.run_until_complete(s.start())
|
||||
finally:
|
||||
loop.close()
|
||||
@@ -1,68 +0,0 @@
|
||||
import os
|
||||
import logging
|
||||
import logging.config as log_conf
|
||||
import datetime
|
||||
import coloredlogs
|
||||
|
||||
log_dir = os.path.dirname(os.path.dirname(__file__)) + '/logs'
|
||||
if not os.path.exists(log_dir):
|
||||
os.mkdir(log_dir)
|
||||
today = datetime.datetime.now().strftime("%Y%m%d")
|
||||
|
||||
log_path = os.path.join(log_dir, f'discogs{today}.log')
|
||||
|
||||
log_config = {
|
||||
'version': 1.0,
|
||||
'formatters': {
|
||||
'colored_console': {'()': 'coloredlogs.ColoredFormatter',
|
||||
'format': "%(asctime)s - %(name)s - %(levelname)s - %(message)s", 'datefmt': '%H:%M:%S'},
|
||||
'detail': {
|
||||
'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
||||
'datefmt': "%Y-%m-%d %H:%M:%S" # 如果不加这个会显示到毫秒。
|
||||
},
|
||||
'simple': {
|
||||
'format': '%(name)s - %(levelname)s - %(message)s',
|
||||
},
|
||||
},
|
||||
'handlers': {
|
||||
'console': {
|
||||
'class': 'logging.StreamHandler', # 日志打印到屏幕显示的类。
|
||||
'level': 'INFO',
|
||||
'formatter': 'colored_console'
|
||||
},
|
||||
'file': {
|
||||
'class': 'logging.handlers.RotatingFileHandler', # 日志打印到文件的类。
|
||||
'maxBytes': 1024 * 1024 * 1024, # 单个文件最大内存
|
||||
'backupCount': 1, # 备份的文件个数
|
||||
'filename': log_path, # 日志文件名
|
||||
'level': 'INFO', # 日志等级
|
||||
'formatter': 'detail', # 调用上面的哪个格式
|
||||
'encoding': 'utf-8', # 编码
|
||||
},
|
||||
},
|
||||
'loggers': {
|
||||
'crawler': {
|
||||
'handlers': ['console', 'file'], # 只打印屏幕
|
||||
'level': 'DEBUG', # 只显示错误的log
|
||||
},
|
||||
'parser': {
|
||||
'handlers': ['file'],
|
||||
'level': 'INFO',
|
||||
},
|
||||
'other': {
|
||||
'handlers': ['console', 'file'],
|
||||
'level': 'INFO',
|
||||
},
|
||||
'storage': {
|
||||
'handlers': ['console'],
|
||||
'level': 'INFO',
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log_conf.dictConfig(log_config)
|
||||
|
||||
crawler = logging.getLogger('crawler')
|
||||
storage = logging.getLogger('storage')
|
||||
coloredlogs.install(level='DEBUG', logger=crawler)
|
||||
coloredlogs.install(level='DEBUG', logger=storage)
|
||||
@@ -1,13 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# @Time : 2019/3/22 10:02 PM
|
||||
# @Author : cxa
|
||||
# @File : testdic.py
|
||||
# @Software: PyCharm
|
||||
from copy import deepcopy
|
||||
img_url="http:"
|
||||
try:
|
||||
a=img_url.split("/")[-1]
|
||||
print(a)
|
||||
a.replace(".jpeg","")
|
||||
except Exception as e:
|
||||
print(e.args)
|
||||
@@ -1 +0,0 @@
|
||||
pass
|
||||
@@ -52,7 +52,7 @@ const MongoClient = require('mongodb').MongoClient;
|
||||
});
|
||||
|
||||
// open database connection
|
||||
const client = await MongoClient.connect('mongodb://192.168.99.100:27017');
|
||||
const client = await MongoClient.connect('mongodb://127.0.0.1:27017');
|
||||
let db = await client.db('crawlab_test');
|
||||
const colName = process.env.CRAWLAB_COLLECTION || 'results_juejin';
|
||||
const taskId = process.env.CRAWLAB_TASK_ID;
|
||||
|
||||
@@ -51,7 +51,7 @@ const MongoClient = require('mongodb').MongoClient;
|
||||
});
|
||||
|
||||
// open database connection
|
||||
const client = await MongoClient.connect('mongodb://192.168.99.100:27017');
|
||||
const client = await MongoClient.connect('mongodb://127.0.0.1:27017');
|
||||
let db = await client.db('crawlab_test');
|
||||
const colName = process.env.CRAWLAB_COLLECTION || 'results_segmentfault';
|
||||
const taskId = process.env.CRAWLAB_TASK_ID;
|
||||
|
||||
@@ -51,7 +51,7 @@ const MongoClient = require('mongodb').MongoClient;
|
||||
});
|
||||
|
||||
// open database connection
|
||||
const client = await MongoClient.connect('mongodb://192.168.99.100:27017');
|
||||
const client = await MongoClient.connect('mongodb://127.0.0.1:27017');
|
||||
let db = await client.db('crawlab_test');
|
||||
const colName = process.env.CRAWLAB_COLLECTION || 'results_segmentfault';
|
||||
const taskId = process.env.CRAWLAB_TASK_ID;
|
||||
|
||||
Reference in New Issue
Block a user