diff --git a/.github/workflows/dockerpush.yml b/.github/workflows/dockerpush.yml index ba65f700..f85e6d37 100644 --- a/.github/workflows/dockerpush.yml +++ b/.github/workflows/dockerpush.yml @@ -11,9 +11,6 @@ on: tags: - v* - # Run tests for any PRs. - pull_request: - env: IMAGE_NAME: tikazyq/crawlab @@ -54,6 +51,12 @@ jobs: - name: Deploy run: | + # Strip git ref prefix from version + VERSION=$(echo "${{ github.ref }}" | sed -e 's,.*/\(.*\),\1,') + + # Strip "v" prefix from tag name + [[ "${{ github.ref }}" == "refs/tags/"* ]] && VERSION=$(echo $VERSION | sed -e 's/^v//') + if [ $VERSION == "release"]; then apt-get install -y curl curl ${{ secrets.JENKINS_RELEASE_URL }} diff --git a/CHANGELOG-zh.md b/CHANGELOG-zh.md index e4f87aeb..7c1cbdd8 100644 --- a/CHANGELOG-zh.md +++ b/CHANGELOG-zh.md @@ -5,6 +5,8 @@ - **长任务支持**. 用户可以添加长任务爬虫,这些爬虫可以跑长期运行的任务. [425](https://github.com/crawlab-team/crawlab/issues/425) - **爬虫列表优化**. 分状态任务列数统计,任务列表详情弹出框,图例. [425](https://github.com/crawlab-team/crawlab/issues/425) - **版本升级检测**. 检测最新版本,通知用户升级. +- **批量操作爬虫**. 允许用户批量运行/停止爬虫任务,以及批量删除爬虫. +- **复制爬虫**. 允许用户复制已存在爬虫来创建新爬虫. ### Bug 修复 diff --git a/CHANGELOG.md b/CHANGELOG.md index 5248c661..bd2fc69c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,8 @@ - **Long Task Support**. Users can add long-task spiders which is supposed to run without finishing. [#425](https://github.com/crawlab-team/crawlab/issues/425) - **Spider List Optimization**. Tasks count by status, tasks detail popup, legend. [#425](https://github.com/crawlab-team/crawlab/issues/425) - **Upgrade Check**. Check latest version and notifiy users to upgrade. +- **Spiders Batch Operation**. Allow users to run/stop spider tasks and delete spiders in batches. +- **Copy Spiders**. Allow users to copy an existing spider to create a new one. ### Bug Fixes diff --git a/backend/main.go b/backend/main.go index 7ddfdbf8..a35291ca 100644 --- a/backend/main.go +++ b/backend/main.go @@ -164,6 +164,7 @@ func main() { authGroup.POST("/spiders/:id/upload", routes.UploadSpiderFromId) // 上传爬虫(ID) authGroup.DELETE("/spiders", routes.DeleteSelectedSpider) // 删除选择的爬虫 authGroup.DELETE("/spiders/:id", routes.DeleteSpider) // 删除爬虫 + authGroup.POST("/spiders/:id/copy", routes.CopySpider) // 拷贝爬虫 authGroup.GET("/spiders/:id/tasks", routes.GetSpiderTasks) // 爬虫任务列表 authGroup.GET("/spiders/:id/file/tree", routes.GetSpiderFileTree) // 爬虫文件目录树读取 authGroup.GET("/spiders/:id/file", routes.GetSpiderFile) // 爬虫文件读取 diff --git a/backend/routes/spider.go b/backend/routes/spider.go index 9ad77948..1436e744 100644 --- a/backend/routes/spider.go +++ b/backend/routes/spider.go @@ -26,6 +26,8 @@ import ( "time" ) +// ======== 爬虫管理 ======== + func GetSpiderList(c *gin.Context) { pageNum, _ := c.GetQuery("page_num") pageSize, _ := c.GetQuery("page_size") @@ -240,6 +242,50 @@ func PutSpider(c *gin.Context) { }) } +func CopySpider(c *gin.Context) { + type ReqBody struct { + Name string `json:"name"` + } + + id := c.Param("id") + + if !bson.IsObjectIdHex(id) { + HandleErrorF(http.StatusBadRequest, c, "invalid id") + } + + var reqBody ReqBody + if err := c.ShouldBindJSON(&reqBody); err != nil { + HandleError(http.StatusBadRequest, c, err) + return + } + + // 检查新爬虫名称是否存在 + // 如果存在,则返回错误 + s := model.GetSpiderByName(reqBody.Name) + if s.Name != "" { + HandleErrorF(http.StatusBadRequest, c, fmt.Sprintf("spider name '%s' already exists", reqBody.Name)) + return + } + + // 被复制爬虫 + spider, err := model.GetSpider(bson.ObjectIdHex(id)) + if err != nil { + HandleError(http.StatusInternalServerError, c, err) + return + } + + // 复制爬虫 + if err := services.CopySpider(spider, reqBody.Name); err != nil { + HandleError(http.StatusInternalServerError, c, err) + return + } + + c.JSON(http.StatusOK, Response{ + Status: "ok", + Message: "success", + }) +} + func UploadSpider(c *gin.Context) { // 从body中获取文件 uploadFile, err := c.FormFile("file") @@ -647,7 +693,151 @@ func GetSpiderTasks(c *gin.Context) { }) } -// 爬虫文件管理 +func GetSpiderStats(c *gin.Context) { + type Overview struct { + TaskCount int `json:"task_count" bson:"task_count"` + ResultCount int `json:"result_count" bson:"result_count"` + SuccessCount int `json:"success_count" bson:"success_count"` + SuccessRate float64 `json:"success_rate"` + TotalWaitDuration float64 `json:"wait_duration" bson:"wait_duration"` + TotalRuntimeDuration float64 `json:"runtime_duration" bson:"runtime_duration"` + AvgWaitDuration float64 `json:"avg_wait_duration"` + AvgRuntimeDuration float64 `json:"avg_runtime_duration"` + } + + type Data struct { + Overview Overview `json:"overview"` + Daily []model.TaskDailyItem `json:"daily"` + } + + id := c.Param("id") + + spider, err := model.GetSpider(bson.ObjectIdHex(id)) + if err != nil { + log.Errorf(err.Error()) + HandleError(http.StatusInternalServerError, c, err) + return + } + + s, col := database.GetCol("tasks") + defer s.Close() + + // 起始日期 + startDate := time.Now().Add(-time.Hour * 24 * 30) + endDate := time.Now() + + // match + op1 := bson.M{ + "$match": bson.M{ + "spider_id": spider.Id, + "create_ts": bson.M{ + "$gte": startDate, + "$lt": endDate, + }, + }, + } + + // project + op2 := bson.M{ + "$project": bson.M{ + "success_count": bson.M{ + "$cond": []interface{}{ + bson.M{ + "$eq": []string{ + "$status", + constants.StatusFinished, + }, + }, + 1, + 0, + }, + }, + "result_count": "$result_count", + "wait_duration": "$wait_duration", + "runtime_duration": "$runtime_duration", + }, + } + + // group + op3 := bson.M{ + "$group": bson.M{ + "_id": nil, + "task_count": bson.M{"$sum": 1}, + "success_count": bson.M{"$sum": "$success_count"}, + "result_count": bson.M{"$sum": "$result_count"}, + "wait_duration": bson.M{"$sum": "$wait_duration"}, + "runtime_duration": bson.M{"$sum": "$runtime_duration"}, + }, + } + + // run aggregation pipeline + var overview Overview + if err := col.Pipe([]bson.M{op1, op2, op3}).One(&overview); err != nil { + if err == mgo.ErrNotFound { + c.JSON(http.StatusOK, Response{ + Status: "ok", + Message: "success", + Data: Data{ + Overview: overview, + Daily: []model.TaskDailyItem{}, + }, + }) + return + } + log.Errorf(err.Error()) + HandleError(http.StatusInternalServerError, c, err) + return + } + + // 后续处理 + successCount, _ := strconv.ParseFloat(strconv.Itoa(overview.SuccessCount), 64) + taskCount, _ := strconv.ParseFloat(strconv.Itoa(overview.TaskCount), 64) + overview.SuccessRate = successCount / taskCount + overview.AvgWaitDuration = overview.TotalWaitDuration / taskCount + overview.AvgRuntimeDuration = overview.TotalRuntimeDuration / taskCount + + items, err := model.GetDailyTaskStats(bson.M{"spider_id": spider.Id}) + if err != nil { + log.Errorf(err.Error()) + HandleError(http.StatusInternalServerError, c, err) + return + } + + c.JSON(http.StatusOK, Response{ + Status: "ok", + Message: "success", + Data: Data{ + Overview: overview, + Daily: items, + }, + }) +} + +func GetSpiderSchedules(c *gin.Context) { + id := c.Param("id") + + if !bson.IsObjectIdHex(id) { + HandleErrorF(http.StatusBadRequest, c, "spider_id is invalid") + return + } + + // 获取定时任务 + list, err := model.GetScheduleList(bson.M{"spider_id": bson.ObjectIdHex(id)}) + if err != nil { + HandleError(http.StatusInternalServerError, c, err) + return + } + + c.JSON(http.StatusOK, Response{ + Status: "ok", + Message: "success", + Data: list, + }) +} + +// ======== ./爬虫管理 ======== + +// ======== 爬虫文件管理 ======== func GetSpiderDir(c *gin.Context) { // 爬虫ID @@ -946,147 +1136,9 @@ func RenameSpiderFile(c *gin.Context) { }) } -func GetSpiderStats(c *gin.Context) { - type Overview struct { - TaskCount int `json:"task_count" bson:"task_count"` - ResultCount int `json:"result_count" bson:"result_count"` - SuccessCount int `json:"success_count" bson:"success_count"` - SuccessRate float64 `json:"success_rate"` - TotalWaitDuration float64 `json:"wait_duration" bson:"wait_duration"` - TotalRuntimeDuration float64 `json:"runtime_duration" bson:"runtime_duration"` - AvgWaitDuration float64 `json:"avg_wait_duration"` - AvgRuntimeDuration float64 `json:"avg_runtime_duration"` - } +// ======== 爬虫文件管理 ======== - type Data struct { - Overview Overview `json:"overview"` - Daily []model.TaskDailyItem `json:"daily"` - } - - id := c.Param("id") - - spider, err := model.GetSpider(bson.ObjectIdHex(id)) - if err != nil { - log.Errorf(err.Error()) - HandleError(http.StatusInternalServerError, c, err) - return - } - - s, col := database.GetCol("tasks") - defer s.Close() - - // 起始日期 - startDate := time.Now().Add(-time.Hour * 24 * 30) - endDate := time.Now() - - // match - op1 := bson.M{ - "$match": bson.M{ - "spider_id": spider.Id, - "create_ts": bson.M{ - "$gte": startDate, - "$lt": endDate, - }, - }, - } - - // project - op2 := bson.M{ - "$project": bson.M{ - "success_count": bson.M{ - "$cond": []interface{}{ - bson.M{ - "$eq": []string{ - "$status", - constants.StatusFinished, - }, - }, - 1, - 0, - }, - }, - "result_count": "$result_count", - "wait_duration": "$wait_duration", - "runtime_duration": "$runtime_duration", - }, - } - - // group - op3 := bson.M{ - "$group": bson.M{ - "_id": nil, - "task_count": bson.M{"$sum": 1}, - "success_count": bson.M{"$sum": "$success_count"}, - "result_count": bson.M{"$sum": "$result_count"}, - "wait_duration": bson.M{"$sum": "$wait_duration"}, - "runtime_duration": bson.M{"$sum": "$runtime_duration"}, - }, - } - - // run aggregation pipeline - var overview Overview - if err := col.Pipe([]bson.M{op1, op2, op3}).One(&overview); err != nil { - if err == mgo.ErrNotFound { - c.JSON(http.StatusOK, Response{ - Status: "ok", - Message: "success", - Data: Data{ - Overview: overview, - Daily: []model.TaskDailyItem{}, - }, - }) - return - } - log.Errorf(err.Error()) - HandleError(http.StatusInternalServerError, c, err) - return - } - - // 后续处理 - successCount, _ := strconv.ParseFloat(strconv.Itoa(overview.SuccessCount), 64) - taskCount, _ := strconv.ParseFloat(strconv.Itoa(overview.TaskCount), 64) - overview.SuccessRate = successCount / taskCount - overview.AvgWaitDuration = overview.TotalWaitDuration / taskCount - overview.AvgRuntimeDuration = overview.TotalRuntimeDuration / taskCount - - items, err := model.GetDailyTaskStats(bson.M{"spider_id": spider.Id}) - if err != nil { - log.Errorf(err.Error()) - HandleError(http.StatusInternalServerError, c, err) - return - } - - c.JSON(http.StatusOK, Response{ - Status: "ok", - Message: "success", - Data: Data{ - Overview: overview, - Daily: items, - }, - }) -} - -func GetSpiderSchedules(c *gin.Context) { - id := c.Param("id") - - if !bson.IsObjectIdHex(id) { - HandleErrorF(http.StatusBadRequest, c, "spider_id is invalid") - return - } - - // 获取定时任务 - list, err := model.GetScheduleList(bson.M{"spider_id": bson.ObjectIdHex(id)}) - if err != nil { - HandleError(http.StatusInternalServerError, c, err) - return - } - - c.JSON(http.StatusOK, Response{ - Status: "ok", - Message: "success", - Data: list, - }) -} +// ======== Scrapy 部分 ======== func GetSpiderScrapySpiders(c *gin.Context) { id := c.Param("id") @@ -1328,6 +1380,10 @@ func GetSpiderScrapySpiderFilepath(c *gin.Context) { }) } +// ======== ./Scrapy 部分 ======== + +// ======== Git 部分 ======== + func PostSpiderSyncGit(c *gin.Context) { id := c.Param("id") @@ -1377,3 +1433,5 @@ func PostSpiderResetGit(c *gin.Context) { Message: "success", }) } + +// ======== ./Git 部分 ======== diff --git a/backend/services/spider.go b/backend/services/spider.go index 6d450ef1..392b0488 100644 --- a/backend/services/spider.go +++ b/backend/services/spider.go @@ -15,11 +15,13 @@ import ( "github.com/satori/go.uuid" "github.com/spf13/viper" "gopkg.in/yaml.v2" + "io" "io/ioutil" "os" "path" "path/filepath" "runtime/debug" + "time" ) type SpiderFileData struct { @@ -293,6 +295,123 @@ func CancelSpider(id string) error { return nil } +func cloneGridFsFile(spider model.Spider, newName string) (err error) { + // 构造新爬虫 + newSpider := spider + newSpider.Id = bson.NewObjectId() + newSpider.Name = newName + newSpider.DisplayName = newName + newSpider.Src = path.Join(path.Dir(spider.Src), newName) + newSpider.CreateTs = time.Now() + newSpider.UpdateTs = time.Now() + + // GridFS连接实例 + s, gf := database.GetGridFs("files") + defer s.Close() + + // 被克隆爬虫的GridFS文件 + f, err := gf.OpenId(spider.FileId) + if err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + return err + } + + // 新爬虫的GridFS文件 + fNew, err := gf.Create(newSpider.Name + ".zip") + if err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + return err + } + + // 生成唯一ID + randomId := uuid.NewV4() + tmpPath := viper.GetString("other.tmppath") + if !utils.Exists(tmpPath) { + if err := os.MkdirAll(tmpPath, 0777); err != nil { + log.Errorf("mkdir other.tmppath error: %v", err.Error()) + return err + } + } + + // 创建临时文件 + tmpFilePath := filepath.Join(tmpPath, randomId.String()+".zip") + tmpFile := utils.OpenFile(tmpFilePath) + + // 拷贝到临时文件 + if _, err := io.Copy(tmpFile, f); err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + return err + } + + // 关闭临时文件 + if err := tmpFile.Close(); err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + return err + } + + // 读取内容 + fContent, err := ioutil.ReadFile(tmpFilePath) + if err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + return err + } + + // 写入GridFS文件 + if _, err := fNew.Write(fContent); err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + return err + } + + // 关闭被克隆爬虫GridFS文件 + if err = f.Close(); err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + return err + } + + // 将新爬虫文件复制 + newSpider.FileId = fNew.Id().(bson.ObjectId) + + // 保存新爬虫 + if err := newSpider.Add(); err != nil { + return err + } + + // 关闭新爬虫GridFS文件 + if err := fNew.Close(); err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + return err + } + + // 删除临时文件 + if err := os.RemoveAll(tmpFilePath); err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + return err + } + + // 同步爬虫 + PublishSpider(newSpider) + + return nil +} + +func CopySpider(spider model.Spider, newName string) error { + // 克隆GridFS文件 + if err := cloneGridFsFile(spider, newName); err != nil { + return err + } + + return nil +} + // 启动爬虫服务 func InitSpiderService() error { // 构造定时任务执行器 diff --git a/backend/services/task.go b/backend/services/task.go index 76aeed83..3bcdc8d6 100644 --- a/backend/services/task.go +++ b/backend/services/task.go @@ -536,7 +536,7 @@ func SpiderFileCheck(t model.Task, spider model.Spider) error { // 判断爬虫文件是否存在 gfFile := model.GetGridFs(spider.FileId) if gfFile == nil { - t.Error = "找不到爬虫文件,请重新上传" + t.Error = "cannot find spider files, please re-upload" t.Status = constants.StatusError t.FinishTs = time.Now() // 结束时间 t.RuntimeDuration = t.FinishTs.Sub(t.StartTs).Seconds() // 运行时长 diff --git a/backend/utils/chan.go b/backend/utils/chan.go index 7b63ac0f..7fb4cea3 100644 --- a/backend/utils/chan.go +++ b/backend/utils/chan.go @@ -1,29 +1,33 @@ package utils +import ( + "sync" +) + var TaskExecChanMap = NewChanMap() type ChanMap struct { - m map[string]chan string + m sync.Map } func NewChanMap() *ChanMap { - return &ChanMap{m: make(map[string]chan string)} + return &ChanMap{m: sync.Map{}} } func (cm *ChanMap) Chan(key string) chan string { - if ch, ok := cm.m[key]; ok { - return ch + if ch, ok := cm.m.Load(key); ok { + return ch.(interface{}).(chan string) } ch := make(chan string, 10) - cm.m[key] = ch + cm.m.Store(key, ch) return ch } func (cm *ChanMap) ChanBlocked(key string) chan string { - if ch, ok := cm.m[key]; ok { - return ch + if ch, ok := cm.m.Load(key); ok { + return ch.(interface{}).(chan string) } ch := make(chan string) - cm.m[key] = ch + cm.m.Store(key, ch) return ch } diff --git a/frontend/src/components/Common/CrawlConfirmDialog.vue b/frontend/src/components/Common/CrawlConfirmDialog.vue index aa89ea9a..93d62d9d 100644 --- a/frontend/src/components/Common/CrawlConfirmDialog.vue +++ b/frontend/src/components/Common/CrawlConfirmDialog.vue @@ -78,8 +78,6 @@ 跳转到任务详情页 - - - +