From 5f158ddb4474763604d2bbd544cff5a6046fe85d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Thu, 26 Sep 2019 19:12:02 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E6=88=90=E7=88=AC=E8=99=AB=E8=8E=B7?= =?UTF-8?q?=E5=8F=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/main.go | 5 +- backend/model/node.go | 68 ++++++++++ backend/services/node.go | 78 +---------- backend/services/spider.go | 105 ++++----------- backend/services/spider_handler/spider.go | 127 ++++++++++++++++++ .../services/spider_handler/spider_test.go | 19 +++ backend/services/task.go | 4 +- backend/utils/file.go | 35 +++++ 8 files changed, 282 insertions(+), 159 deletions(-) create mode 100644 backend/services/spider_handler/spider.go create mode 100644 backend/services/spider_handler/spider_test.go diff --git a/backend/main.go b/backend/main.go index bf98674e..47196fe5 100644 --- a/backend/main.go +++ b/backend/main.go @@ -5,6 +5,7 @@ import ( "crawlab/database" "crawlab/lib/validate_bridge" "crawlab/middlewares" + "crawlab/model" "crawlab/routes" "crawlab/services" "github.com/apex/log" @@ -57,7 +58,7 @@ func main() { } log.Info("初始化Redis数据库成功") - if services.IsMaster() { + if model.IsMaster() { // 初始化定时任务 if err := services.InitScheduler(); err != nil { log.Error("init scheduler error:" + err.Error()) @@ -99,7 +100,7 @@ func main() { log.Info("初始化用户服务成功") // 以下为主节点服务 - if services.IsMaster() { + if model.IsMaster() { // 中间件 app.Use(middlewares.CORSMiddleware()) //app.Use(middlewares.AuthorizationMiddleware()) diff --git a/backend/model/node.go b/backend/model/node.go index 6211115c..7af93dbe 100644 --- a/backend/model/node.go +++ b/backend/model/node.go @@ -7,6 +7,7 @@ import ( "github.com/apex/log" "github.com/globalsign/mgo" "github.com/globalsign/mgo/bson" + "github.com/spf13/viper" "runtime/debug" "time" ) @@ -30,6 +31,73 @@ type Node struct { UpdateTsUnix int64 `json:"update_ts_unix" bson:"update_ts_unix"` } +const ( + Yes = "Y" + No = "N" +) + +// 当前节点是否为主节点 +func IsMaster() bool { + return viper.GetString("server.master") == Yes +} + +// 获取本机节点 +func GetCurrentNode() (Node, error) { + // 获得注册的key值 + key, err := register.GetRegister().GetKey() + if err != nil { + return Node{}, err + } + + // 从数据库中获取当前节点 + var node Node + errNum := 0 + for { + // 如果错误次数超过10次 + if errNum >= 10 { + panic("cannot get current node") + } + + // 尝试获取节点 + node, err = GetNodeByKey(key) + // 如果获取失败 + if err != nil { + // 如果为主节点,表示为第一次注册,插入节点信息 + if IsMaster() { + // 获取本机信息 + ip, mac, key, err := GetNodeBaseInfo() + if err != nil { + debug.PrintStack() + return node, err + } + + // 生成节点 + node = Node{ + Key: key, + Id: bson.NewObjectId(), + Ip: ip, + Name: ip, + Mac: mac, + IsMaster: true, + } + if err := node.Add(); err != nil { + return node, err + } + return node, nil + } + // 增加错误次数 + errNum++ + + // 5秒后重试 + time.Sleep(5 * time.Second) + continue + } + // 跳出循环 + break + } + return node, nil +} + func (n *Node) Save() error { s, c := database.GetCol("nodes") defer s.Close() diff --git a/backend/services/node.go b/backend/services/node.go index 5526cb01..e3397e74 100644 --- a/backend/services/node.go +++ b/backend/services/node.go @@ -14,7 +14,6 @@ import ( "github.com/apex/log" "github.com/globalsign/mgo/bson" "github.com/gomodule/redigo/redis" - "github.com/spf13/viper" "runtime/debug" "time" ) @@ -28,77 +27,10 @@ type Data struct { UpdateTsUnix int64 `json:"update_ts_unix"` } -const ( - Yes = "Y" - No = "N" -) - -// 获取本机节点 -func GetCurrentNode() (model.Node, error) { - // 获得注册的key值 - key, err := register.GetRegister().GetKey() - if err != nil { - return model.Node{}, err - } - - // 从数据库中获取当前节点 - var node model.Node - errNum := 0 - for { - // 如果错误次数超过10次 - if errNum >= 10 { - panic("cannot get current node") - } - - // 尝试获取节点 - node, err = model.GetNodeByKey(key) - // 如果获取失败 - if err != nil { - // 如果为主节点,表示为第一次注册,插入节点信息 - if IsMaster() { - // 获取本机信息 - ip, mac, key, err := model.GetNodeBaseInfo() - if err != nil { - debug.PrintStack() - return node, err - } - - // 生成节点 - node = model.Node{ - Key: key, - Id: bson.NewObjectId(), - Ip: ip, - Name: ip, - Mac: mac, - IsMaster: true, - } - if err := node.Add(); err != nil { - return node, err - } - return node, nil - } - // 增加错误次数 - errNum++ - - // 5秒后重试 - time.Sleep(5 * time.Second) - continue - } - // 跳出循环 - break - } - return node, nil -} - -// 当前节点是否为主节点 -func IsMaster() bool { - return viper.GetString("server.master") == Yes -} - // 所有调用IsMasterNode的方法,都永远会在master节点执行,所以GetCurrentNode方法返回永远是master节点 // 该ID的节点是否为主节点 func IsMasterNode(id string) bool { - curNode, _ := GetCurrentNode() + curNode, _ := model.GetCurrentNode() node, _ := model.GetNode(bson.ObjectIdHex(id)) return curNode.Id == node.Id } @@ -223,7 +155,7 @@ func UpdateNodeData() { Key: key, Mac: mac, Ip: ip, - Master: IsMaster(), + Master: model.IsMaster(), UpdateTs: time.Now(), UpdateTsUnix: time.Now().Unix(), } @@ -297,13 +229,13 @@ func InitNodeService() error { UpdateNodeData() // 获取当前节点 - node, err := GetCurrentNode() + node, err := model.GetCurrentNode() if err != nil { log.Errorf(err.Error()) return err } ctx := context.Background() - if IsMaster() { + if model.IsMaster() { // 如果为主节点,订阅主节点通信频道 channel := "nodes:master" err := database.RedisClient.Subscribe(ctx, MasterNodeCallback, channel) @@ -320,7 +252,7 @@ func InitNodeService() error { } // 如果为主节点,每30秒刷新所有节点信息 - if IsMaster() { + if model.IsMaster() { spec := "*/10 * * * * *" if _, err := c.AddFunc(spec, UpdateNodeStatus); err != nil { debug.PrintStack() diff --git a/backend/services/spider.go b/backend/services/spider.go index f166b1b7..34693bf5 100644 --- a/backend/services/spider.go +++ b/backend/services/spider.go @@ -5,14 +5,13 @@ import ( "crawlab/database" "crawlab/lib/cron" "crawlab/model" + "crawlab/services/spider_handler" "crawlab/utils" "fmt" "github.com/apex/log" "github.com/globalsign/mgo" "github.com/globalsign/mgo/bson" - "github.com/satori/go.uuid" "github.com/spf13/viper" - "io" "os" "path/filepath" "runtime/debug" @@ -102,6 +101,7 @@ func PublishAllSpiders() { if len(spiders) == 0 { return } + log.Infof("start sync spider to local, total: %d", len(spiders)) // 遍历爬虫列表 for _, spider := range spiders { // 异步发布爬虫 @@ -113,104 +113,45 @@ func PublishAllSpiders() { // 发布爬虫 func PublishSpider(spider model.Spider) { - s, gf := database.GetGridFs("files") - defer s.Close() - + // 查询gf file,不存在则删除 gfFile := model.GetGridFs(spider.FileId) if gfFile == nil { _ = model.RemoveSpider(spider.FileId) return } + spiderSync := spider_handler.SpiderSync{} + defer spiderSync.CreateMd5File(gfFile.Md5, spider.Name) - // 爬虫文件没有变化 - if spider.Md5 == spider.OldMd5 { + //目录不存在,则直接下载 + path := filepath.Join(viper.GetString("spider.path"), spider.Name) + if !utils.Exists(path) { + log.Infof("path not found: %s", path) + spiderSync.Download(spider.Id.Hex(), spider.FileId.Hex()) return } - - //爬虫文件有变化,先删除本地文件 - _ = os.Remove(filepath.Join( - viper.GetString("spider.path"), - spider.Name, - )) - - // 重新下载爬虫文件 - node, _ := GetCurrentNode() - key := node.Id.Hex() + "#" + spider.Id.Hex() - if _, err := database.RedisClient.HGet("spider", key); err == nil { - log.Infof("downloading spider") + // md5文件不存在,则下载 + md5 := filepath.Join(path, spider_handler.Md5File) + if !utils.Exists(md5) { + log.Infof("md5.txt file not found: %s", md5) + spiderSync.RemoveSpiderFile(spider.Name) + spiderSync.Download(spider.Id.Hex(), spider.FileId.Hex()) return } - _ = database.RedisClient.HSet("spider", key, key) - defer database.RedisClient.HDel("spider", key) - - f, err := gf.OpenId(spider.FileId) - defer f.Close() - if err != nil { - log.Errorf("open file id: " + spider.FileId.Hex() + ", spider id:" + spider.Id.Hex() + ", error: " + err.Error()) - debug.PrintStack() + // md5值不一样,则下载 + md5Str := utils.ReadFile(md5) + if spider.Md5 != md5Str { + log.Infof("md5 is different: %s:%s ", md5Str, md5) + spiderSync.RemoveSpiderFile(spider.Name) + spiderSync.Download(spider.Id.Hex(), spider.FileId.Hex()) return } - - // 生成唯一ID - randomId := uuid.NewV4() - tmpPath := viper.GetString("other.tmppath") - if !utils.Exists(tmpPath) { - if err := os.MkdirAll(tmpPath, 0777); err != nil { - log.Errorf("mkdir other.tmppath error: %v", err.Error()) - return - } - } - // 创建临时文件 - tmpFilePath := filepath.Join(tmpPath, randomId.String()+".zip") - tmpFile, err := os.OpenFile(tmpFilePath, os.O_CREATE|os.O_WRONLY, os.ModePerm) - if err != nil { - log.Errorf(err.Error()) - debug.PrintStack() - return - } - defer tmpFile.Close() - - // 将该文件写入临时文件 - if _, err := io.Copy(tmpFile, f); err != nil { - log.Errorf(err.Error()) - debug.PrintStack() - return - } - - // 解压缩临时文件到目标文件夹 - dstPath := filepath.Join( - viper.GetString("spider.path"), - ) - if err := utils.DeCompress(tmpFile, dstPath); err != nil { - log.Errorf(err.Error()) - debug.PrintStack() - return - } - - // 关闭临时文件 - if err := tmpFile.Close(); err != nil { - log.Errorf(err.Error()) - debug.PrintStack() - return - } - - // 删除临时文件 - if err := os.Remove(tmpFilePath); err != nil { - log.Errorf(err.Error()) - debug.PrintStack() - return - } - - // 修改spider的MD5和上一次的MD一致 - spider.OldMd5 = spider.Md5 - _ = spider.Save() } // 启动爬虫服务 func InitSpiderService() error { // 构造定时任务执行器 c := cron.New(cron.WithSeconds()) - if _, err := c.AddFunc("0 * * * * *", PublishAllSpiders); err != nil { + if _, err := c.AddFunc("0/15 * * * * *", PublishAllSpiders); err != nil { return err } // 启动定时任务 diff --git a/backend/services/spider_handler/spider.go b/backend/services/spider_handler/spider.go new file mode 100644 index 00000000..c78b3d5b --- /dev/null +++ b/backend/services/spider_handler/spider.go @@ -0,0 +1,127 @@ +package spider_handler + +import ( + "crawlab/database" + "crawlab/model" + "crawlab/utils" + "github.com/apex/log" + "github.com/globalsign/mgo/bson" + "github.com/satori/go.uuid" + "github.com/spf13/viper" + "io" + "os" + "path/filepath" + "runtime/debug" +) + +const ( + Md5File = "md5.txt" +) + +type SpiderSync struct { +} + +func (s *SpiderSync) CreateMd5File(md5 string, spiderName string) { + path := filepath.Join(viper.GetString("spider.path"), spiderName) + utils.CreateFilePath(path) + + fileName := filepath.Join(path, Md5File) + file := utils.OpenFile(fileName) + defer file.Close() + if file != nil { + if _, err := file.WriteString(md5); err != nil { + log.Errorf("file write string error: %s", err.Error()) + debug.PrintStack() + } + } +} + +// 获得下载锁的key +func (s *SpiderSync) GetLockDownloadKey(spiderId string) string { + node, _ := model.GetCurrentNode() + return node.Id.Hex() + "#" + spiderId +} + +// 删除本地文件 +func (s *SpiderSync) RemoveSpiderFile(spiderName string) { + //爬虫文件有变化,先删除本地文件 + _ = os.Remove(filepath.Join( + viper.GetString("spider.path"), + spiderName, + )) +} + +// 检测是否已经下载中 +func (s *SpiderSync) CheckDownLoading(spiderId string, fileId string) (bool, string) { + key := s.GetLockDownloadKey(spiderId) + if _, err := database.RedisClient.HGet("spider", key); err == nil { + log.Infof("downloading spider file, spider_id: %s, file_id:%s", spiderId, fileId) + return true, key + } + return false, key +} + +// 下载爬虫 +func (s *SpiderSync) Download(spiderId string, fileId string) { + + session, gf := database.GetGridFs("files") + defer session.Close() + + f, err := gf.OpenId(bson.ObjectIdHex(fileId)) + defer f.Close() + if err != nil { + log.Errorf("open file id: " + fileId + ", spider id:" + spiderId + ", error: " + err.Error()) + debug.PrintStack() + return + } + + // 生成唯一ID + randomId := uuid.NewV4() + tmpPath := viper.GetString("other.tmppath") + if !utils.Exists(tmpPath) { + if err := os.MkdirAll(tmpPath, 0777); err != nil { + log.Errorf("mkdir other.tmppath error: %v", err.Error()) + return + } + } + // 创建临时文件 + tmpFilePath := filepath.Join(tmpPath, randomId.String()+".zip") + tmpFile, err := os.OpenFile(tmpFilePath, os.O_CREATE|os.O_WRONLY, os.ModePerm) + if err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + return + } + defer tmpFile.Close() + + // 将该文件写入临时文件 + if _, err := io.Copy(tmpFile, f); err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + return + } + + // 解压缩临时文件到目标文件夹 + dstPath := filepath.Join( + viper.GetString("spider.path"), + ) + if err := utils.DeCompress(tmpFile, dstPath); err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + return + } + + // 关闭临时文件 + if err := tmpFile.Close(); err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + return + } + + // 删除临时文件 + if err := os.Remove(tmpFilePath); err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + return + } +} diff --git a/backend/services/spider_handler/spider_test.go b/backend/services/spider_handler/spider_test.go new file mode 100644 index 00000000..5289e4d8 --- /dev/null +++ b/backend/services/spider_handler/spider_test.go @@ -0,0 +1,19 @@ +package spider_handler + +import ( + "crawlab/config" + "github.com/apex/log" + "testing" +) + +func init() { + if err := config.InitConfig("../../conf/config.yml"); err != nil { + log.Fatal("Init config failed") + } + log.Infof("初始化配置成功") +} + +func TestSpiderSync_CreateMd5File(t *testing.T) { + s := SpiderSync{} + s.CreateMd5File("asssss", "gongyu_abc") +} diff --git a/backend/services/task.go b/backend/services/task.go index 1c26b45a..2a68f10e 100644 --- a/backend/services/task.go +++ b/backend/services/task.go @@ -258,7 +258,7 @@ func ExecuteTask(id int) { tic := time.Now() // 获取当前节点 - node, err := GetCurrentNode() + node, err := model.GetCurrentNode() if err != nil { log.Errorf(GetWorkerPrefix(id) + err.Error()) return @@ -464,7 +464,7 @@ func CancelTask(id string) (err error) { } // 获取当前节点(默认当前节点为主节点) - node, err := GetCurrentNode() + node, err := model.GetCurrentNode() if err != nil { log.Errorf("get current node error: %s", err.Error()) debug.PrintStack() diff --git a/backend/utils/file.go b/backend/utils/file.go index dda73c13..b5cf059e 100644 --- a/backend/utils/file.go +++ b/backend/utils/file.go @@ -2,6 +2,7 @@ package utils import ( "archive/zip" + "bufio" "github.com/apex/log" "io" "os" @@ -9,6 +10,40 @@ import ( "runtime/debug" ) +func ReadFile(fileName string) string { + file := OpenFile(fileName) + defer file.Close() + buf := bufio.NewReader(file) + line, err := buf.ReadString('\n') + if err != nil { + log.Errorf("read file error: %s", err.Error()) + return "" + } + return line + +} + +// 创建文件 +func OpenFile(fileName string) *os.File { + file, err := os.OpenFile(fileName, os.O_CREATE|os.O_WRONLY, os.ModePerm) + if err != nil { + log.Errorf("create file error: %s, file_name: %s", err.Error(), fileName) + debug.PrintStack() + return nil + } + return file +} + +// 创建文件夹 +func CreateFilePath(filePath string) { + if !Exists(filePath) { + if err := os.MkdirAll(filePath, os.ModePerm); err != nil { + log.Errorf("create file error: %s, file_path: %s", err.Error(), filePath) + debug.PrintStack() + } + } +} + // 判断所给路径文件/文件夹是否存在 func Exists(path string) bool { _, err := os.Stat(path) //os.Stat获取文件信息