Merge branch 'develop' of https://github.com/crawlab-team/crawlab into v0.4.0

This commit is contained in:
陈景阳
2019-08-24 15:25:05 +08:00
11 changed files with 499 additions and 28 deletions

2
Jenkinsfile vendored
View File

@@ -57,7 +57,7 @@ pipeline {
steps {
echo 'Cleanup...'
sh """
docker image prune -f
docker rmi `docker images | grep '<none>' | grep -v IMAGE | awk '{ print \$3 }' | xargs`
"""
}
}

View File

@@ -1,14 +1,14 @@
# Crawlab
![](http://114.67.75.98:8082/buildStatus/icon?job=crawlab%2Fmaster)
![](https://img.shields.io/github/release/tikazyq/crawlab.svg)
![](https://img.shields.io/github/last-commit/tikazyq/crawlab.svg)
![](https://img.shields.io/github/issues/tikazyq/crawlab.svg)
![](https://img.shields.io/github/contributors/tikazyq/crawlab.svg)
![](https://img.shields.io/github/release/crawlab-team/crawlab.svg)
![](https://img.shields.io/github/last-commit/crawlab-team/crawlab.svg)
![](https://img.shields.io/github/issues/crawlab-team/crawlab.svg)
![](https://img.shields.io/github/contributors/crawlab-team/crawlab.svg)
![](https://img.shields.io/docker/pulls/tikazyq/crawlab)
![](https://img.shields.io/github/license/tikazyq/crawlab.svg)
![](https://img.shields.io/github/license/crawlab-team/crawlab.svg)
中文 | [English](https://github.com/tikazyq/crawlab)
中文 | [English](https://github.com/crawlab-team/crawlab)
[安装](#安装) | [运行](#运行) | [截图](#截图) | [架构](#架构) | [集成](#与其他框架的集成) | [比较](#与其他框架比较) | [相关文章](#相关文章) | [社区&赞助](#社区--赞助)
@@ -202,7 +202,7 @@ Crawlab使用起来很方便也很通用可以适用于几乎任何主流
|框架 | 类型 | 分布式 | 前端 | 依赖于Scrapyd |
|:---:|:---:|:---:|:---:|:---:|
| [Crawlab](https://github.com/tikazyq/crawlab) | 管理平台 | Y | Y | N
| [Crawlab](https://github.com/crawlab-team/crawlab) | 管理平台 | Y | Y | N
| [ScrapydWeb](https://github.com/my8100/scrapydweb) | 管理平台 | Y | Y | Y
| [SpiderKeeper](https://github.com/DormyMo/SpiderKeeper) | 管理平台 | Y | Y | Y
| [Gerapy](https://github.com/Gerapy/Gerapy) | 管理平台 | Y | Y | Y

View File

@@ -1,14 +1,14 @@
# Crawlab
![](http://114.67.75.98:8082/buildStatus/icon?job=crawlab%2Fmaster)
![](https://img.shields.io/github/release/tikazyq/crawlab.svg)
![](https://img.shields.io/github/last-commit/tikazyq/crawlab.svg)
![](https://img.shields.io/github/issues/tikazyq/crawlab.svg)
![](https://img.shields.io/github/contributors/tikazyq/crawlab.svg)
![](https://img.shields.io/github/release/crawlab-team/crawlab.svg)
![](https://img.shields.io/github/last-commit/crawlab-team/crawlab.svg)
![](https://img.shields.io/github/issues/crawlab-team/crawlab.svg)
![](https://img.shields.io/github/contributors/crawlab-team/crawlab.svg)
![](https://img.shields.io/docker/pulls/tikazyq/crawlab)
![](https://img.shields.io/github/license/tikazyq/crawlab.svg)
![](https://img.shields.io/github/license/crawlab-team/crawlab.svg)
[中文](https://github.com/tikazyq/crawlab/blob/master/README-zh.md) | English
[中文](https://github.com/crawlab-team/crawlab/blob/master/README-zh.md) | English
[Installation](#installation) | [Run](#run) | [Screenshot](#screenshot) | [Architecture](#architecture) | [Integration](#integration-with-other-frameworks) | [Compare](#comparison-with-other-frameworks) | [Community & Sponsorship](#community--sponsorship)
@@ -199,7 +199,7 @@ Crawlab is easy to use, general enough to adapt spiders in any language and any
|Framework | Type | Distributed | Frontend | Scrapyd-Dependent |
|:---:|:---:|:---:|:---:|:---:|
| [Crawlab](https://github.com/tikazyq/crawlab) | Admin Platform | Y | Y | N
| [Crawlab](https://github.com/crawlab-team/crawlab) | Admin Platform | Y | Y | N
| [ScrapydWeb](https://github.com/my8100/scrapydweb) | Admin Platform | Y | Y | Y
| [SpiderKeeper](https://github.com/DormyMo/SpiderKeeper) | Admin Platform | Y | Y | Y
| [Gerapy](https://github.com/Gerapy/Gerapy) | Admin Platform | Y | Y | Y

View File

@@ -1,9 +1,11 @@
package database
import (
"errors"
"fmt"
"github.com/apex/log"
"github.com/gomodule/redigo/redis"
"time"
"unsafe"
)
@@ -23,7 +25,9 @@ func (c *Subscriber) Connect() {
c.client = redis.PubSubConn{Conn: conn}
c.cbMap = make(map[string]SubscribeCallback)
go func() {
//retry connect redis 5 times, or panic
index := 0
go func(i int) {
for {
log.Debug("wait...")
switch res := c.client.Receive().(type) {
@@ -34,11 +38,24 @@ func (c *Subscriber) Connect() {
case redis.Subscription:
fmt.Printf("%s: %s %d\n", res.Channel, res.Kind, res.Count)
case error:
log.Error("error handle...")
log.Error("error handle redis connection...")
con, err := GetRedisConn()
if err != nil {
log.Fatal("redis dial failed")
continue
}
c.client = redis.PubSubConn{Conn: con}
c.cbMap = make(map[string]SubscribeCallback)
time.Sleep(2 * time.Second)
if i > 5 {
panic(errors.New("redis connection failed too many times, panic"))
}
i += 1
continue
}
}
}()
}(index)
}

View File

@@ -42,7 +42,7 @@ var NodeList = []model.Node{
var TaskList = []model.Task{
{
Id: "1234",
SpiderId: bson.ObjectId("xx429e6c19f7abede924fee2"),
SpiderId: bson.ObjectId("5d429e6c19f7abede924fee2"),
StartTs: time.Now(),
FinishTs: time.Now(),
Status: "进行中",
@@ -61,7 +61,7 @@ var TaskList = []model.Task{
},
{
Id: "5678",
SpiderId: bson.ObjectId("xx429e6c19f7abede924fddf"),
SpiderId: bson.ObjectId("5d429e6c19f7abede924fee2"),
StartTs: time.Now(),
FinishTs: time.Now(),
Status: "进行中",

View File

@@ -11,7 +11,6 @@ import (
"strings"
"testing"
"time"
"ucloudBilling/ucloud/log"
)
var app *gin.Engine
@@ -29,12 +28,19 @@ func init() {
app.GET("/nodes/:id/system", GetSystemInfo) // 节点任务列表
app.DELETE("/nodes/:id", DeleteNode) // 删除节点
//// 爬虫
app.GET("/stats/home",GetHomeStats) // 首页统计数据
// 定时任务
app.GET("/schedules", GetScheduleList) // 定时任务列表
app.GET("/schedules/:id", GetSchedule) // 定时任务详情
app.PUT("/schedules", PutSchedule) // 创建定时任务
app.POST("/schedules/:id", PostSchedule) // 修改定时任务
app.DELETE("/schedules/:id", DeleteSchedule) // 删除定时任务
app.GET("/tasks", GetTaskList) // 任务列表
app.GET("/tasks/:id", GetTask) // 任务详情
app.PUT("/tasks", PutTask) // 派发任务
app.DELETE("/tasks/:id", DeleteTask) // 删除任务
app.GET("/tasks/:id/results",GetTaskResults) // 任务结果
app.GET("/tasks/:id/results/download", DownloadTaskResultsCsv) // 下载任务结果
}
//mock test, test data in ./mock
@@ -44,7 +50,6 @@ func TestGetNodeList(t *testing.T) {
req, _ := http.NewRequest("GET", "/nodes", nil)
app.ServeHTTP(w, req)
err := json.Unmarshal([]byte(w.Body.String()), &resp)
t.Log(resp.Data)
if err != nil {
t.Fatal("Unmarshal resp failed")
}
@@ -148,7 +153,6 @@ func TestPostNode(t *testing.T) {
var resp Response
body, _ := json.Marshal(newItem)
log.Info(strings.NewReader(string(body)))
var mongoId = "5d429e6c19f7abede924fee2"
w := httptest.NewRecorder()

View File

@@ -10,7 +10,6 @@ import (
"strings"
"testing"
"time"
"ucloudBilling/ucloud/log"
)
func TestGetScheduleList(t *testing.T) {
@@ -58,7 +57,6 @@ func TestDeleteSchedule(t *testing.T) {
app.ServeHTTP(w, req)
err := json.Unmarshal([]byte(w.Body.String()), &resp)
log.Info(w.Body.String())
if err != nil {
t.Fatal("Unmarshal resp failed")
}
@@ -89,7 +87,6 @@ func TestPostSchedule(t *testing.T) {
var resp Response
var mongoId = "5d429e6c19f7abede924fee2"
body,_ := json.Marshal(newItem)
log.Info(strings.NewReader(string(body)))
w := httptest.NewRecorder()
req,_ := http.NewRequest("POST", "/schedules/"+mongoId,strings.NewReader(string(body)))
app.ServeHTTP(w, req)
@@ -125,7 +122,6 @@ func TestPutSchedule(t *testing.T) {
var resp Response
body,_ := json.Marshal(newItem)
log.Info(strings.NewReader(string(body)))
w := httptest.NewRecorder()
req,_ := http.NewRequest("PUT", "/schedules",strings.NewReader(string(body)))
app.ServeHTTP(w, req)

64
backend/mock/stats.go Normal file
View File

@@ -0,0 +1,64 @@
package mock
import (
"crawlab/model"
"github.com/gin-gonic/gin"
"net/http"
)
var taskDailyItems = []model.TaskDailyItem{
{
Date: "2019/08/19",
TaskCount: 2,
AvgRuntimeDuration: 1000,
},
{
Date: "2019/08/20",
TaskCount: 3,
AvgRuntimeDuration: 10130,
},
}
func GetHomeStats(c *gin.Context) {
type DataOverview struct {
TaskCount int `json:"task_count"`
SpiderCount int `json:"spider_count"`
ActiveNodeCount int `json:"active_node_count"`
ScheduleCount int `json:"schedule_count"`
}
type Data struct {
Overview DataOverview `json:"overview"`
Daily []model.TaskDailyItem `json:"daily"`
}
// 任务总数
taskCount := 10
// 在线节点总数
activeNodeCount := 4
// 爬虫总数
spiderCount := 5
// 定时任务数
scheduleCount := 2
// 每日任务数
items := taskDailyItems
c.JSON(http.StatusOK, Response{
Status: "ok",
Message: "success",
Data: Data{
Overview: DataOverview{
ActiveNodeCount: activeNodeCount,
TaskCount: taskCount,
SpiderCount: spiderCount,
ScheduleCount: scheduleCount,
},
Daily: items,
},
})
}

View File

@@ -0,0 +1,29 @@
package mock
import (
"encoding/json"
"fmt"
. "github.com/smartystreets/goconvey/convey"
"net/http"
"net/http/httptest"
"testing"
)
func TestGetHomeStats(t *testing.T) {
var resp Response
w := httptest.NewRecorder()
req, _ := http.NewRequest("GET", "/stats/home", nil)
app.ServeHTTP(w, req)
err := json.Unmarshal([]byte(w.Body.String()), &resp)
fmt.Println(resp.Data)
if err != nil {
t.Fatal("Unmarshal resp failed")
}
Convey("Test API GetHomeStats", t, func() {
Convey("Test response status", func() {
So(resp.Status, ShouldEqual, "ok")
So(resp.Message, ShouldEqual, "success")
})
})
}

View File

@@ -1 +1,224 @@
package mock
package mock
import (
"bytes"
"crawlab/constants"
"crawlab/model"
"crawlab/utils"
"encoding/csv"
"fmt"
"github.com/gin-gonic/gin"
"github.com/globalsign/mgo/bson"
"github.com/satori/go.uuid"
"net/http"
)
type TaskListRequestData struct {
PageNum int `form:"page_num"`
PageSize int `form:"page_size"`
NodeId string `form:"node_id"`
SpiderId string `form:"spider_id"`
}
type TaskResultsRequestData struct {
PageNum int `form:"page_num"`
PageSize int `form:"page_size"`
}
func GetTaskList(c *gin.Context) {
// 绑定数据
data := TaskListRequestData{}
if err := c.ShouldBindQuery(&data); err != nil {
HandleError(http.StatusBadRequest, c, err)
return
}
if data.PageNum == 0 {
data.PageNum = 1
}
if data.PageSize == 0 {
data.PageNum = 10
}
// 过滤条件
query := bson.M{}
if data.NodeId != "" {
query["node_id"] = bson.ObjectIdHex(data.NodeId)
}
if data.SpiderId != "" {
query["spider_id"] = bson.ObjectIdHex(data.SpiderId)
}
// 获取任务列表
tasks := TaskList
// 获取总任务数
total := len(TaskList)
c.JSON(http.StatusOK, ListResponse{
Status: "ok",
Message: "success",
Total: total,
Data: tasks,
})
}
func GetTask(c *gin.Context) {
id := c.Param("id")
var result model.Task
for _, task := range TaskList {
if task.Id == id {
result = task
}
}
c.JSON(http.StatusOK, Response{
Status: "ok",
Message: "success",
Data: result,
})
}
func PutTask(c *gin.Context) {
// 生成任务ID,generate task ID
id := uuid.NewV4()
// 绑定数据
var t model.Task
if err := c.ShouldBindJSON(&t); err != nil {
HandleError(http.StatusBadRequest, c, err)
return
}
t.Id = id.String()
t.Status = constants.StatusPending
// 如果没有传入node_id则置为null
if t.NodeId.Hex() == "" {
t.NodeId = bson.ObjectIdHex(constants.ObjectIdNull)
}
// 将任务存入数据库,put the task into database
fmt.Println("put the task into database")
// 加入任务队列, put the task into task queue
fmt.Println("put the task into task queue")
c.JSON(http.StatusOK, Response{
Status: "ok",
Message: "success",
})
}
func DeleteTask(c *gin.Context) {
id := c.Param("id")
for _, task := range TaskList {
if task.Id == id {
fmt.Println("delete the task")
}
}
c.JSON(http.StatusOK, Response{
Status: "ok",
Message: "success",
})
}
func GetTaskResults(c *gin.Context) {
id := c.Param("id")
// 绑定数据
data := TaskResultsRequestData{}
if err := c.ShouldBindQuery(&data); err != nil {
HandleError(http.StatusBadRequest, c, err)
return
}
// 获取任务
var task model.Task
for _, ta := range TaskList {
if ta.Id == id {
task = ta
}
}
fmt.Println(task)
// 获取结果
var results interface{}
total := len(TaskList)
c.JSON(http.StatusOK, ListResponse{
Status: "ok",
Message: "success",
Data: results,
Total: total,
})
}
func DownloadTaskResultsCsv(c *gin.Context) {
id := c.Param("id")
// 获取任务
var task model.Task
for _, ta := range TaskList {
if ta.Id == id {
task = ta
}
}
fmt.Println(task)
// 获取结果
var results []interface {
}
// 字段列表
var columns []string
if len(results) == 0 {
columns = []string{}
} else {
item := results[0].(bson.M)
for key := range item {
columns = append(columns, key)
}
}
// 缓冲
bytesBuffer := &bytes.Buffer{}
// 写入UTF-8 BOM避免使用Microsoft Excel打开乱码
bytesBuffer.Write([]byte("\xEF\xBB\xBF"))
writer := csv.NewWriter(bytesBuffer)
// 写入表头
if err := writer.Write(columns); err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
// 写入内容
for _, result := range results {
// 将result转换为[]string
item := result.(bson.M)
var values []string
for _, col := range columns {
value := utils.InterfaceToString(item[col])
values = append(values, value)
}
// 写入数据
if err := writer.Write(values); err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
}
// 此时才会将缓冲区数据写入
writer.Flush()
// 设置下载的文件名
c.Writer.Header().Set("Content-Disposition", "attachment;filename=data.csv")
// 设置文件类型以及输出数据
c.Data(http.StatusOK, "text/csv", bytesBuffer.Bytes())
}

138
backend/mock/task_test.go Normal file
View File

@@ -0,0 +1,138 @@
package mock
import (
"crawlab/model"
"encoding/json"
"github.com/globalsign/mgo/bson"
. "github.com/smartystreets/goconvey/convey"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
)
func TestGetTaskList(t *testing.T) {
//var teskListRequestFrom = TaskListRequestData{
// PageNum: 2,
// PageSize: 10,
// NodeId: "434221grfsf",
// SpiderId: "fdfewqrftea",
//}
var resp ListResponse
w := httptest.NewRecorder()
req, _ := http.NewRequest("GET", "/tasks?PageNum=2&PageSize=10&NodeId=342dfsff&SpiderId=f8dsf", nil)
app.ServeHTTP(w, req)
err := json.Unmarshal([]byte(w.Body.String()), &resp)
if err != nil {
t.Fatal("Unmarshal resp failed")
}
Convey("Test API GetNodeList", t, func() {
Convey("Test response status", func() {
So(resp.Status, ShouldEqual, "ok")
So(resp.Message, ShouldEqual, "success")
So(resp.Total, ShouldEqual, 2)
})
})
}
func TestGetTask(t *testing.T) {
var resp Response
var taskId = "1234"
w := httptest.NewRecorder()
req, _ := http.NewRequest("GET", "/tasks/"+taskId, nil)
app.ServeHTTP(w, req)
err := json.Unmarshal([]byte(w.Body.String()), &resp)
if err != nil {
t.Fatal("Unmarshal resp failed")
}
Convey("Test API GetTask", t, func() {
Convey("Test response status", func() {
So(resp.Status, ShouldEqual, "ok")
So(resp.Message, ShouldEqual, "success")
})
})
}
func TestPutTask(t *testing.T) {
var newItem = model.Task{
Id: "1234",
SpiderId: bson.ObjectIdHex("5d429e6c19f7abede924fee2"),
StartTs: time.Now(),
FinishTs: time.Now(),
Status: "online",
NodeId: bson.ObjectIdHex("5d429e6c19f7abede924fee2"),
LogPath: "./log",
Cmd: "scrapy crawl test",
Error: "",
ResultCount: 0,
WaitDuration: 10.0,
RuntimeDuration: 10,
TotalDuration: 20,
SpiderName: "test",
NodeName: "test",
CreateTs: time.Now(),
UpdateTs: time.Now(),
}
var resp Response
body, _ := json.Marshal(&newItem)
w := httptest.NewRecorder()
req, _ := http.NewRequest("PUT", "/tasks", strings.NewReader(string(body)))
app.ServeHTTP(w, req)
err := json.Unmarshal([]byte(w.Body.String()), &resp)
if err != nil {
t.Fatal("unmarshal resp failed")
}
Convey("Test API PutTask", t, func() {
Convey("Test response status", func() {
So(resp.Status, ShouldEqual, "ok")
So(resp.Message, ShouldEqual, "success")
})
})
}
func TestDeleteTask(t *testing.T) {
taskId := "1234"
var resp Response
w := httptest.NewRecorder()
req, _ := http.NewRequest("DELETE", "/tasks/"+taskId, nil)
app.ServeHTTP(w, req)
err := json.Unmarshal([]byte(w.Body.String()), &resp)
if err != nil {
t.Fatal("unmarshal resp failed")
}
Convey("Test API DeleteTask", t, func() {
Convey("Test response status", func() {
So(resp.Status, ShouldEqual, "ok")
So(resp.Message, ShouldEqual, "success")
})
})
}
func TestGetTaskResults(t *testing.T) {
//var teskListResultFrom = TaskResultsRequestData{
// PageNum: 2,
// PageSize: 1,
//}
taskId := "1234"
var resp ListResponse
w := httptest.NewRecorder()
req, _ := http.NewRequest("GET", "/tasks/"+taskId+"/results?PageNum=2&PageSize=1", nil)
app.ServeHTTP(w, req)
err := json.Unmarshal([]byte(w.Body.String()), &resp)
if err != nil {
t.Fatal("Unmarshal resp failed")
}
Convey("Test API GetNodeList", t, func() {
Convey("Test response status", func() {
So(resp.Status, ShouldEqual, "ok")
So(resp.Message, ShouldEqual, "success")
So(resp.Total, ShouldEqual, 2)
})
})
}