From e43332b34f9ed6ccb55d3551f4635b6421c6394b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Sat, 31 Aug 2019 12:42:53 +0800 Subject: [PATCH 01/11] =?UTF-8?q?FIX=20#178=20FIX=20=E5=BD=93=E6=97=A5?= =?UTF-8?q?=E5=BF=97=E6=96=87=E4=BB=B6=E5=B0=8F=E4=BA=8E2048=E7=9A=84?= =?UTF-8?q?=E6=97=B6=E5=80=99=E6=97=A0=E6=B3=95=E6=AD=A3=E5=B8=B8=E6=8B=89?= =?UTF-8?q?=E5=8F=96=E6=97=A5=E5=BF=97=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/services/log.go | 9 +++++++-- backend/services/node.go | 4 ++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/backend/services/log.go b/backend/services/log.go index c6e4f090..a248c176 100644 --- a/backend/services/log.go +++ b/backend/services/log.go @@ -35,8 +35,13 @@ func GetLocalLog(logPath string) (fileBytes []byte, err error) { } defer f.Close() logBuf := make([]byte, 2048) - n, err := f.ReadAt(logBuf, fi.Size()-int64(len(logBuf))) - if err != nil { + off := int64(0) + if fi.Size() > int64(len(logBuf)) { + off = fi.Size() - int64(len(logBuf)) + } + n, err := f.ReadAt(logBuf, off) + // 到文件结尾会有EOF的报错 + if err.Error() != "EOF" && err != nil { log.Error(err.Error()) debug.PrintStack() return nil, err diff --git a/backend/services/node.go b/backend/services/node.go index 09b49dbf..083fdc3d 100644 --- a/backend/services/node.go +++ b/backend/services/node.go @@ -97,7 +97,7 @@ func GetCurrentNode() (model.Node, error) { Key: key, Id: bson.NewObjectId(), Ip: ip, - Name: key, + Name: ip, Mac: mac, IsMaster: true, } @@ -205,7 +205,7 @@ func UpdateNodeStatus() { // 数据库不存在该节点 node = model.Node{ Key: key, - Name: key, + Name: data.Ip, Ip: data.Ip, Port: "8000", Mac: data.Mac, From 494601ab625a53aa91e9cc3ce7c4cb5d45e84741 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Sat, 31 Aug 2019 13:49:34 +0800 Subject: [PATCH 02/11] =?UTF-8?q?fix=20=E8=8A=82=E7=82=B9=E6=B3=A8?= =?UTF-8?q?=E5=86=8C=E5=BC=82=E5=B8=B8=E6=83=85=E5=86=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/model/node.go | 47 +++++++++++++++ backend/model/node_test.go | 50 ++++++++++++++++ backend/services/node.go | 116 ++++++++++++++----------------------- backend/services/spider.go | 4 +- 4 files changed, 143 insertions(+), 74 deletions(-) create mode 100644 backend/model/node_test.go diff --git a/backend/model/node.go b/backend/model/node.go index 61c20473..6211115c 100644 --- a/backend/model/node.go +++ b/backend/model/node.go @@ -1,7 +1,9 @@ package model import ( + "crawlab/constants" "crawlab/database" + "crawlab/services/register" "github.com/apex/log" "github.com/globalsign/mgo" "github.com/globalsign/mgo/bson" @@ -79,6 +81,7 @@ func GetNodeList(filter interface{}) ([]Node, error) { var results []Node if err := c.Find(filter).All(&results); err != nil { + log.Error("get node list error: " + err.Error()) debug.PrintStack() return results, err } @@ -153,3 +156,47 @@ func GetNodeCount(query interface{}) (int, error) { return count, nil } + +// 节点基本信息 +func GetNodeBaseInfo() (ip string, mac string, key string, error error) { + ip, err := register.GetRegister().GetIp() + if err != nil { + debug.PrintStack() + return "", "", "", err + } + + mac, err = register.GetRegister().GetMac() + if err != nil { + debug.PrintStack() + return "", "", "", err + } + + key, err = register.GetRegister().GetKey() + if err != nil { + debug.PrintStack() + return "", "", "", err + } + return ip, mac, key, nil +} + +// 根据redis的key值,重置node节点为offline +func ResetNodeStatusToOffline(list []string) { + nodes, _ := GetNodeList(nil) + for _, node := range nodes { + hasNode := false + for _, key := range list { + if key == node.Key { + hasNode = true + break + } + } + if !hasNode || node.Status == "" { + node.Status = constants.StatusOffline + if err := node.Save(); err != nil { + log.Errorf(err.Error()) + return + } + continue + } + } +} diff --git a/backend/model/node_test.go b/backend/model/node_test.go new file mode 100644 index 00000000..ba3f4aaa --- /dev/null +++ b/backend/model/node_test.go @@ -0,0 +1,50 @@ +package model + +import ( + "crawlab/config" + "crawlab/constants" + "crawlab/database" + "github.com/apex/log" + . "github.com/smartystreets/goconvey/convey" + "runtime/debug" + "testing" +) + +func TestAddNode(t *testing.T) { + Convey("Test AddNode", t, func() { + if err := config.InitConfig("../conf/config.yml"); err != nil { + log.Error("init config error:" + err.Error()) + panic(err) + } + log.Info("初始化配置成功") + + // 初始化Mongodb数据库 + if err := database.InitMongo(); err != nil { + log.Error("init mongodb error:" + err.Error()) + debug.PrintStack() + panic(err) + } + log.Info("初始化Mongodb数据库成功") + + // 初始化Redis数据库 + if err := database.InitRedis(); err != nil { + log.Error("init redis error:" + err.Error()) + debug.PrintStack() + panic(err) + } + + var node = Node{ + Key: "c4:b3:01:bd:b5:e7", + Name: "10.27.238.101", + Ip: "10.27.238.101", + Port: "8000", + Mac: "c4:b3:01:bd:b5:e7", + Status: constants.StatusOnline, + IsMaster: true, + } + if err := node.Add(); err != nil { + log.Error("add node error:" + err.Error()) + panic(err) + } + }) +} diff --git a/backend/services/node.go b/backend/services/node.go index 083fdc3d..3ed84149 100644 --- a/backend/services/node.go +++ b/backend/services/node.go @@ -73,23 +73,11 @@ func GetCurrentNode() (model.Node, error) { if err != nil { // 如果为主节点,表示为第一次注册,插入节点信息 if IsMaster() { - // 获取本机IP地址 - ip, err := register.GetRegister().GetIp() + // 获取本机信息 + ip, mac, key, err := model.GetNodeBaseInfo() if err != nil { debug.PrintStack() - return model.Node{}, err - } - - mac, err := register.GetRegister().GetMac() - if err != nil { - debug.PrintStack() - return model.Node{}, err - } - - key, err := register.GetRegister().GetKey() - if err != nil { - debug.PrintStack() - return model.Node{}, err + return node, err } // 生成节点 @@ -179,70 +167,56 @@ func UpdateNodeStatus() { log.Errorf(err.Error()) return } - // 在MongoDB中该节点设置状态为离线 - s, c := database.GetCol("nodes") - defer s.Close() - var node model.Node - if err := c.Find(bson.M{"key": key}).One(&node); err != nil { - log.Errorf(err.Error()) - debug.PrintStack() - return - } - node.Status = constants.StatusOffline - if err := node.Save(); err != nil { - log.Errorf(err.Error()) - return - } + keys, _ := database.RedisClient.HKeys("nodes") + model.ResetNodeStatusToOffline(keys) continue } - // 更新节点信息到数据库 - s, c := database.GetCol("nodes") - defer s.Close() - var node model.Node - if err := c.Find(bson.M{"key": key}).One(&node); err != nil { - // 数据库不存在该节点 - node = model.Node{ - Key: key, - Name: data.Ip, - Ip: data.Ip, - Port: "8000", - Mac: data.Mac, - Status: constants.StatusOnline, - IsMaster: data.Master, - } - if err := node.Add(); err != nil { - log.Errorf(err.Error()) - return - } - } else { - // 数据库存在该节点 - node.Status = constants.StatusOnline - if err := node.Save(); err != nil { - log.Errorf(err.Error()) - return - } + // 处理node信息 + handleNodeInfo(key, data) + } + + // 重置不在redis的key为offline + model.ResetNodeStatusToOffline(list) +} + +func handleNodeInfo(key string, data Data) { + // 更新节点信息到数据库 + s, c := database.GetCol("nodes") + defer s.Close() + + // 同个key可能因为并发,被注册多次 + var nodes []model.Node + _ = c.Find(bson.M{"key": key}).All(&nodes) + if nodes != nil && len(nodes) > 1 { + for _, node := range nodes { + _ = c.RemoveId(node.Id) } } - // 遍历数据库中的节点列表 - nodes, err := model.GetNodeList(nil) - for _, node := range nodes { - hasNode := false - for _, key := range list { - if key == node.Key { - hasNode = true - break - } + var node model.Node + if err := c.Find(bson.M{"key": key}).One(&node); err != nil { + // 数据库不存在该节点 + node = model.Node{ + Key: key, + Name: data.Ip, + Ip: data.Ip, + Port: "8000", + Mac: data.Mac, + Status: constants.StatusOnline, + IsMaster: data.Master, } - if !hasNode { - node.Status = constants.StatusOffline - if err := node.Save(); err != nil { - log.Errorf(err.Error()) - return - } - continue + if err := node.Add(); err != nil { + log.Errorf(err.Error()) + return + } + } else { + // 数据库存在该节点 + node.Status = constants.StatusOnline + if err := node.Save(); err != nil { + log.Errorf(err.Error()) + return } } } diff --git a/backend/services/spider.go b/backend/services/spider.go index ad0c0ae5..a3242849 100644 --- a/backend/services/spider.go +++ b/backend/services/spider.go @@ -40,7 +40,7 @@ func GetSpidersFromDir() ([]model.Spider, error) { // 如果爬虫项目目录不存在,则创建一个 if !utils.Exists(srcPath) { - mask := syscall.Umask(0) // 改为 0000 八进制 + mask := syscall.Umask(0) // 改为 0000 八进制 defer syscall.Umask(mask) // 改为原来的 umask if err := os.MkdirAll(srcPath, 0666); err != nil { debug.PrintStack() @@ -301,7 +301,6 @@ func PublishSpider(spider model.Spider) (err error) { return } channel := "files:upload" - log.Info("publish files.upload event, file id:" + msg.FileId) if err = database.Publish(channel, string(msgStr)); err != nil { log.Errorf(err.Error()) debug.PrintStack() @@ -313,7 +312,6 @@ func PublishSpider(spider model.Spider) (err error) { // 上传爬虫回调 func OnFileUpload(channel string, msgStr string) { - log.Info("received files.upload event, msgStr:" + msgStr) s, gf := database.GetGridFs("files") defer s.Close() From f750fe5f8a42a9b6a294658133ac98ecb6cb8fc1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Sat, 31 Aug 2019 13:50:01 +0800 Subject: [PATCH 03/11] =?UTF-8?q?fix=20=E5=89=8D=E7=AB=AF=E6=8A=A5?= =?UTF-8?q?=E9=94=99=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- frontend/src/store/modules/node.js | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/frontend/src/store/modules/node.js b/frontend/src/store/modules/node.js index 266beb3e..5e21a222 100644 --- a/frontend/src/store/modules/node.js +++ b/frontend/src/store/modules/node.js @@ -25,15 +25,7 @@ const mutations = { const { id, systemInfo } = payload for (let i = 0; i < state.nodeList.length; i++) { if (state.nodeList[i]._id === id) { - // Vue.set(state.nodeList[i], 'systemInfo', {}) state.nodeList[i].systemInfo = systemInfo - // for (const key in systemInfo) { - // if (systemInfo.hasOwnProperty(key)) { - // console.log(key) - // state.nodeList[i].systemInfo[key] = systemInfo[key] - // // Vue.set(state.nodeList[i].systemInfo, key, systemInfo[key]) - // } - // } break } } @@ -76,10 +68,12 @@ const actions = { getTaskList ({ state, commit }, id) { return request.get(`/nodes/${id}/tasks`) .then(response => { - commit('task/SET_TASK_LIST', - response.data.data.map(d => d) - .sort((a, b) => a.create_ts < b.create_ts ? 1 : -1), - { root: true }) + if (response.data.data) { + commit('task/SET_TASK_LIST', + response.data.data.map(d => d) + .sort((a, b) => a.create_ts < b.create_ts ? 1 : -1), + { root: true }) + } }) }, getNodeSystemInfo ({ state, commit }, id) { From ecaf31f0f07644742dcf8c4e5051abd5207510b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Sat, 31 Aug 2019 13:57:14 +0800 Subject: [PATCH 04/11] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/services/node.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/backend/services/node.go b/backend/services/node.go index 3ed84149..977ff0ef 100644 --- a/backend/services/node.go +++ b/backend/services/node.go @@ -165,11 +165,7 @@ func UpdateNodeStatus() { // 在Redis中删除该节点 if err := database.RedisClient.HDel("nodes", data.Key); err != nil { log.Errorf(err.Error()) - return } - // 在MongoDB中该节点设置状态为离线 - keys, _ := database.RedisClient.HKeys("nodes") - model.ResetNodeStatusToOffline(keys) continue } From 1877677844fa2e165fddb7215dd5b0c55d4b0d38 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Sat, 31 Aug 2019 15:03:57 +0800 Subject: [PATCH 05/11] =?UTF-8?q?fix=20=E6=97=A0=E6=B3=95=E8=8E=B7?= =?UTF-8?q?=E5=8F=96=E6=97=A5=E5=BF=97=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/services/log.go | 2 +- backend/services/node.go | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/backend/services/log.go b/backend/services/log.go index a248c176..a0ba0311 100644 --- a/backend/services/log.go +++ b/backend/services/log.go @@ -41,7 +41,7 @@ func GetLocalLog(logPath string) (fileBytes []byte, err error) { } n, err := f.ReadAt(logBuf, off) // 到文件结尾会有EOF的报错 - if err.Error() != "EOF" && err != nil { + if err != nil && err.Error() != "EOF" { log.Error(err.Error()) debug.PrintStack() return nil, err diff --git a/backend/services/node.go b/backend/services/node.go index 977ff0ef..124f5bba 100644 --- a/backend/services/node.go +++ b/backend/services/node.go @@ -308,8 +308,10 @@ func WorkerNodeCallback(channel string, msgStr string) { log.Errorf(err.Error()) debug.PrintStack() msgSd.Error = err.Error() + msgSd.Log = err.Error() + } else { + msgSd.Log = string(logStr) } - msgSd.Log = string(logStr) // 序列化 msgSdBytes, err := json.Marshal(&msgSd) From c0201f566ad9c14f644cab49e144340afc1ed211 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Sat, 31 Aug 2019 15:59:43 +0800 Subject: [PATCH 06/11] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/model/spider.go | 6 ++++-- backend/services/node.go | 3 ++- frontend/src/views/task/TaskList.vue | 2 +- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/backend/model/spider.go b/backend/model/spider.go index 65782fe8..d44f651b 100644 --- a/backend/model/spider.go +++ b/backend/model/spider.go @@ -23,13 +23,14 @@ type Spider struct { Col string `json:"col"` // 结果储存位置 Site string `json:"site"` // 爬虫网站 Envs []Env `json:"envs" bson:"envs"` // 环境变量 - + Remark string `json:"remark"` // 备注 // 自定义爬虫 Src string `json:"src" bson:"src"` // 源码位置 Cmd string `json:"cmd" bson:"cmd"` // 执行命令 // 前端展示 - LastRunTs time.Time `json:"last_run_ts"` // 最后一次执行时间 + LastRunTs time.Time `json:"last_run_ts"` // 最后一次执行时间 + LastStatus string `json:"last_status"` // 最后执行状态 // TODO: 可配置爬虫 //Fields []interface{} `json:"fields"` @@ -115,6 +116,7 @@ func GetSpiderList(filter interface{}, skip int, limit int) ([]Spider, error) { // 赋值 spiders[i].LastRunTs = task.CreateTs + spiders[i].LastStatus = task.Status } return spiders, nil diff --git a/backend/services/node.go b/backend/services/node.go index 124f5bba..9685a1bb 100644 --- a/backend/services/node.go +++ b/backend/services/node.go @@ -304,6 +304,7 @@ func WorkerNodeCallback(channel string, msgStr string) { // 获取本地日志 logStr, err := GetLocalLog(msg.LogPath) + log.Info(string(logStr)) if err != nil { log.Errorf(err.Error()) debug.PrintStack() @@ -322,7 +323,7 @@ func WorkerNodeCallback(channel string, msgStr string) { } // 发布消息给主节点 - fmt.Println(msgSd) + log.Info("publish get log msg to master") if err := database.Publish("nodes:master", string(msgSdBytes)); err != nil { log.Errorf(err.Error()) return diff --git a/frontend/src/views/task/TaskList.vue b/frontend/src/views/task/TaskList.vue index 9cbceb20..a3ffbeea 100644 --- a/frontend/src/views/task/TaskList.vue +++ b/frontend/src/views/task/TaskList.vue @@ -119,7 +119,7 @@ :width="col.width"> - + + + + Date: Sat, 31 Aug 2019 16:27:01 +0800 Subject: [PATCH 09/11] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E5=BC=80=E5=8F=91?= =?UTF-8?q?=E7=8E=AF=E5=A2=83=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- frontend/package.json | 1 + 1 file changed, 1 insertion(+) diff --git a/frontend/package.json b/frontend/package.json index 139297d3..e3bc84f8 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -6,6 +6,7 @@ "serve": "vue-cli-service serve --ip=0.0.0.0", "serve:prod": "vue-cli-service serve --mode=production --ip=0.0.0.0", "config": "vue ui", + "build:dev": "vue-cli-service build --mode development", "build:prod": "vue-cli-service build --mode production", "lint": "vue-cli-service lint", "test:unit": "vue-cli-service test:unit" From 1a383dac1730ab6d761f974f5705275e45af82cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Sat, 31 Aug 2019 17:04:49 +0800 Subject: [PATCH 10/11] =?UTF-8?q?fix=20=E7=88=AC=E8=99=AB=E7=9B=AE?= =?UTF-8?q?=E5=BD=95=E6=97=A0=E6=B3=95=E6=89=93=E5=BC=80=E7=9A=84=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/services/spider.go | 2 +- backend/services/task.go | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/backend/services/spider.go b/backend/services/spider.go index a3242849..f526e11d 100644 --- a/backend/services/spider.go +++ b/backend/services/spider.go @@ -42,7 +42,7 @@ func GetSpidersFromDir() ([]model.Spider, error) { if !utils.Exists(srcPath) { mask := syscall.Umask(0) // 改为 0000 八进制 defer syscall.Umask(mask) // 改为原来的 umask - if err := os.MkdirAll(srcPath, 0666); err != nil { + if err := os.MkdirAll(srcPath, 0766); err != nil { debug.PrintStack() return []model.Spider{}, err } diff --git a/backend/services/task.go b/backend/services/task.go index 8c3c3407..1b0a5676 100644 --- a/backend/services/task.go +++ b/backend/services/task.go @@ -408,9 +408,12 @@ func GetTaskLog(id string) (logStr string, err error) { logStr = string(logBytes) if err != nil { log.Errorf(err.Error()) - return "", err + logStr = string(err.Error()) + // return "", err + } else { + logStr = string(logBytes) } - logStr = string(logBytes) + } else { // 若不为主节点,获取远端日志 logStr, err = GetRemoteLog(task) From e027aeb71fe7efd8aa94bf0228df067a904ec93e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Sat, 31 Aug 2019 17:56:42 +0800 Subject: [PATCH 11/11] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E6=97=A5=E5=BF=97?= =?UTF-8?q?=E6=89=93=E5=8D=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/services/spider.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/backend/services/spider.go b/backend/services/spider.go index f526e11d..c3f63139 100644 --- a/backend/services/spider.go +++ b/backend/services/spider.go @@ -31,6 +31,7 @@ type SpiderFileData struct { type SpiderUploadMessage struct { FileId string FileName string + SpiderId string } // 从项目目录中获取爬虫列表 @@ -295,6 +296,7 @@ func PublishSpider(spider model.Spider) (err error) { msg := SpiderUploadMessage{ FileId: fid.Hex(), FileName: fileName, + SpiderId: spider.Id.Hex(), } msgStr, err := json.Marshal(msg) if err != nil { @@ -326,7 +328,7 @@ func OnFileUpload(channel string, msgStr string) { // 从GridFS获取该文件 f, err := gf.OpenId(bson.ObjectIdHex(msg.FileId)) if err != nil { - log.Errorf("open file id: " + msg.FileId + ", error: " + err.Error()) + log.Errorf("open file id: " + msg.FileId + ", spider id:" + msg.SpiderId + ", error: " + err.Error()) debug.PrintStack() return }