diff --git a/backend/routes/spider.go b/backend/routes/spider.go index e0afb1a8..8a9e643f 100644 --- a/backend/routes/spider.go +++ b/backend/routes/spider.go @@ -11,7 +11,7 @@ import ( "github.com/globalsign/mgo" "github.com/globalsign/mgo/bson" "github.com/pkg/errors" - uuid "github.com/satori/go.uuid" + "github.com/satori/go.uuid" "github.com/spf13/viper" "io/ioutil" "net/http" @@ -152,49 +152,68 @@ func PutSpider(c *gin.Context) { return } - // 读取临时文件 - tmpFile, err := os.OpenFile(tmpFilePath, os.O_RDONLY, 0777) + // 上传到GridFs + fid, err := services.UploadToGridFs(file.Filename, tmpFilePath) if err != nil { + log.Errorf("upload to grid fs error: %s", err.Error()) debug.PrintStack() - HandleError(http.StatusInternalServerError, c, err) return } - if err = tmpFile.Close(); err != nil { - debug.PrintStack() - HandleError(http.StatusInternalServerError, c, err) - return + + // 保存爬虫信息 + srcPath := viper.GetString("spider.path") + spider := model.Spider{ + Name: file.Filename, + DisplayName: file.Filename, + Type: constants.Customized, + Src: filepath.Join(srcPath, file.Filename), + FileId: fid, } + _ = spider.Save() + + // 读取临时文件 + //tmpFile, err := os.OpenFile(tmpFilePath, os.O_RDONLY, 0777) + //if err != nil { + // debug.PrintStack() + // HandleError(http.StatusInternalServerError, c, err) + // return + //} + //if err = tmpFile.Close(); err != nil { + // debug.PrintStack() + // HandleError(http.StatusInternalServerError, c, err) + // return + //} // 目标目录 - dstPath := filepath.Join( - viper.GetString("spider.path"), - strings.Replace(file.Filename, ".zip", "", 1), - ) + //dstPath := filepath.Join( + // viper.GetString("spider.path"), + // strings.Replace(file.Filename, ".zip", "", 1), + //) // 如果目标目录已存在,删除目标目录 - if utils.Exists(dstPath) { - if err := os.RemoveAll(dstPath); err != nil { - debug.PrintStack() - HandleError(http.StatusInternalServerError, c, err) - } - } + //if utils.Exists(dstPath) { + // if err := os.RemoveAll(dstPath); err != nil { + // debug.PrintStack() + // HandleError(http.StatusInternalServerError, c, err) + // } + //} // 将临时文件解压到爬虫目录 - if err := utils.DeCompress(tmpFile, dstPath); err != nil { - debug.PrintStack() - HandleError(http.StatusInternalServerError, c, err) - return - } + //if err := utils.DeCompress(tmpFile, dstPath); err != nil { + // debug.PrintStack() + // HandleError(http.StatusInternalServerError, c, err) + // return + //} // 删除临时文件 - if err = os.Remove(tmpFilePath); err != nil { - debug.PrintStack() - HandleError(http.StatusInternalServerError, c, err) - return - } + //if err = os.Remove(tmpFilePath); err != nil { + // debug.PrintStack() + // HandleError(http.StatusInternalServerError, c, err) + // return + //} // 更新爬虫 - services.UpdateSpiders() + // services.UpdateSpiders() c.JSON(http.StatusOK, Response{ Status: "ok", diff --git a/backend/services/spider.go b/backend/services/spider.go index fdf09517..bcf98700 100644 --- a/backend/services/spider.go +++ b/backend/services/spider.go @@ -13,16 +13,12 @@ import ( "github.com/globalsign/mgo" "github.com/globalsign/mgo/bson" "github.com/gomodule/redigo/redis" - "github.com/pkg/errors" "github.com/satori/go.uuid" "github.com/spf13/viper" "io" - "io/ioutil" "os" "path/filepath" "runtime/debug" - "strings" - "syscall" ) type SpiderFileData struct { @@ -36,175 +32,14 @@ type SpiderUploadMessage struct { SpiderId string } -// 从项目目录中获取爬虫列表 -func GetSpidersFromDir() ([]model.Spider, error) { - // 爬虫项目目录路径 - srcPath := viper.GetString("spider.path") - - // 如果爬虫项目目录不存在,则创建一个 - if !utils.Exists(srcPath) { - mask := syscall.Umask(0) // 改为 0000 八进制 - defer syscall.Umask(mask) // 改为原来的 umask - if err := os.MkdirAll(srcPath, 0766); err != nil { - debug.PrintStack() - return []model.Spider{}, err - } - } - - // 获取爬虫项目目录下的所有子项 - items, err := ioutil.ReadDir(srcPath) - if err != nil { - debug.PrintStack() - return []model.Spider{}, err - } - - // 定义爬虫列表 - spiders := make([]model.Spider, 0) - - // 遍历所有子项 - for _, item := range items { - // 忽略不为目录的子项 - if !item.IsDir() { - continue - } - - // 忽略隐藏目录 - if strings.HasPrefix(item.Name(), ".") { - continue - } - - // 构造爬虫 - spider := model.Spider{ - Name: item.Name(), - DisplayName: item.Name(), - Type: constants.Customized, - Src: filepath.Join(srcPath, item.Name()), - FileId: bson.ObjectIdHex(constants.ObjectIdNull), - } - - // 将爬虫加入列表 - spiders = append(spiders, spider) - } - - return spiders, nil -} - -// 将爬虫保存到数据库 -func SaveSpiders(spiders []model.Spider) error { - s, c := database.GetCol("spiders") - defer s.Close() - - if len(spiders) == 0 { - err := model.RemoveAllSpider() - if err != nil { - log.Error("remove all spider error:" + err.Error()) - return err - } - log.Info("get spider from dir is empty,removed all spider") - return nil - } - // 如果该爬虫不存在于数据库,则保存爬虫到数据库 - for _, spider := range spiders { - // 忽略非自定义爬虫 - if spider.Type != constants.Customized { - continue - } - spider_ := []*model.Spider{} - _ = c.Find(bson.M{"src": spider.Src}).All(&spider_) - // 以防出现多个重复的爬虫 - if len(spider_) > 1 { - if _, err := c.RemoveAll(bson.M{"src": spider.Src}); err != nil { - log.Errorf("remove spider error: %v, src:%v", err.Error(), spider.Src) - debug.PrintStack() - continue - } - if err := spider.Add(); err != nil { - log.Errorf("remove spider error: %v, src:%v", err.Error(), spider.Src) - debug.PrintStack() - continue - } - continue - } - if len(spider_) == 0 { - // 不存在 - if err := spider.Add(); err != nil { - log.Errorf("remove spider error: %v, src:%v", err.Error(), spider.Src) - debug.PrintStack() - continue - } - } - } - return nil -} - -// 更新爬虫 -func UpdateSpiders() { - // 从项目目录获取爬虫列表 - spiders, err := GetSpidersFromDir() - if err != nil { - log.Errorf(err.Error()) - return - } - - // 储存爬虫 - if err := SaveSpiders(spiders); err != nil { - log.Errorf(err.Error()) - return - } -} - -// 打包爬虫目录为zip文件 -func ZipSpider(spider model.Spider) (filePath string, err error) { - // 如果源文件夹不存在,抛错 - if !utils.Exists(spider.Src) { - debug.PrintStack() - // 删除该爬虫,否则会一直报错 - _ = model.RemoveSpider(spider.Id) - return "", errors.New("source path does not exist") - } - - // 临时文件路径 - randomId := uuid.NewV4() - - tmpPath := viper.GetString("other.tmppath") - if !utils.Exists(tmpPath) { - if err := os.MkdirAll(tmpPath, 0777); err != nil { - log.Errorf("mkdir other.tmppath error: %v", err.Error()) - return "", err - } - } - filePath = filepath.Join(tmpPath, randomId.String()+".zip") - // 将源文件夹打包为zip文件 - d, err := os.Open(spider.Src) - if err != nil { - debug.PrintStack() - return filePath, err - } - var files []*os.File - files = append(files, d) - if err := utils.Compress(files, filePath); err != nil { - return filePath, err - } - - return filePath, nil -} - // 上传zip文件到GridFS -func UploadToGridFs(spider model.Spider, fileName string, filePath string) (fid bson.ObjectId, err error) { +func UploadToGridFs(fileName string, filePath string) (fid bson.ObjectId, err error) { fid = "" // 获取MongoDB GridFS连接 s, gf := database.GetGridFs("files") defer s.Close() - // 如果存在FileId删除GridFS上的老文件 - if !utils.IsObjectIdNull(spider.FileId) { - if err = gf.RemoveId(spider.FileId); err != nil { - log.Error("remove gf file:" + err.Error()) - debug.PrintStack() - } - } - // 创建一个新GridFS文件 f, err := gf.Create(fileName) if err != nil { @@ -295,29 +130,24 @@ func PublishAllSpidersJob() { // 2. 上传zip文件到GridFS // 3. 发布消息给工作节点 func PublishSpider(spider model.Spider) (err error) { - // 将源文件夹打包为zip文件 - filePath, err := ZipSpider(spider) - if err != nil { - return err - } - // 上传zip文件到GridFS - fileName := filepath.Base(spider.Src) + ".zip" - fid, err := UploadToGridFs(spider, fileName, filePath) - if err != nil { - return err - } + s, gf := database.GetGridFs("files") + defer s.Close() - // 保存FileId - spider.FileId = fid - if err := spider.Save(); err != nil { + f, err := gf.OpenId(spider.FileId) + defer f.Close() + if err != nil { + log.Errorf("open file id: " + spider.FileId.Hex() + ", spider id:" + spider.Id.Hex() + ", error: " + err.Error()) + debug.PrintStack() + // 爬虫和文件没有对应,则删除爬虫 + _ = model.RemoveSpider(spider.Id) return err } // 发布消息给工作节点 msg := SpiderUploadMessage{ - FileId: fid.Hex(), - FileName: fileName, + FileId: spider.FileId.Hex(), + FileName: f.Name(), SpiderId: spider.Id.Hex(), } msgStr, err := json.Marshal(msg) @@ -349,12 +179,12 @@ func OnFileUpload(message redis.Message) (err error) { // 从GridFS获取该文件 f, err := gf.OpenId(bson.ObjectIdHex(msg.FileId)) + defer f.Close() if err != nil { log.Errorf("open file id: " + msg.FileId + ", spider id:" + msg.SpiderId + ", error: " + err.Error()) debug.PrintStack() return err } - defer f.Close() // 生成唯一ID randomId := uuid.NewV4() @@ -386,7 +216,6 @@ func OnFileUpload(message redis.Message) (err error) { // 解压缩临时文件到目标文件夹 dstPath := filepath.Join( viper.GetString("spider.path"), - // strings.Replace(msg.FileName, ".zip", "", -1), ) if err := utils.DeCompress(tmpFile, dstPath); err != nil { log.Errorf(err.Error()) @@ -418,11 +247,6 @@ func InitSpiderService() error { if IsMaster() { // 主节点 - // 每5秒更新一次爬虫信息 - if _, err := c.AddFunc("*/5 * * * * *", UpdateSpiders); err != nil { - return err - } - // 每60秒同步爬虫给工作节点 if _, err := c.AddFunc("0 * * * * *", PublishAllSpidersJob); err != nil { return err @@ -432,8 +256,6 @@ func InitSpiderService() error { // 订阅文件上传 channel := "files:upload" - - //sub.Connect() ctx := context.Background() return database.RedisClient.Subscribe(ctx, OnFileUpload, channel) diff --git a/backend/services/task.go b/backend/services/task.go index 3b05089b..e8d66a2a 100644 --- a/backend/services/task.go +++ b/backend/services/task.go @@ -145,7 +145,6 @@ func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider) (e if err := cmd.Process.Kill(); err != nil { log.Errorf("process kill error: %s", err.Error()) debug.PrintStack() - return } t.Status = constants.StatusCancelled } else { @@ -487,10 +486,9 @@ func CancelTask(id string) (err error) { if err := model.UpdateTaskToAbnormal(node.Id); err != nil { log.Errorf("update task to abnormal : {}", err.Error()) debug.PrintStack() - return + return err } } - } else { // 任务节点为工作节点