diff --git a/backend/model/node.go b/backend/model/node.go index 61c20473..6211115c 100644 --- a/backend/model/node.go +++ b/backend/model/node.go @@ -1,7 +1,9 @@ package model import ( + "crawlab/constants" "crawlab/database" + "crawlab/services/register" "github.com/apex/log" "github.com/globalsign/mgo" "github.com/globalsign/mgo/bson" @@ -79,6 +81,7 @@ func GetNodeList(filter interface{}) ([]Node, error) { var results []Node if err := c.Find(filter).All(&results); err != nil { + log.Error("get node list error: " + err.Error()) debug.PrintStack() return results, err } @@ -153,3 +156,47 @@ func GetNodeCount(query interface{}) (int, error) { return count, nil } + +// 节点基本信息 +func GetNodeBaseInfo() (ip string, mac string, key string, error error) { + ip, err := register.GetRegister().GetIp() + if err != nil { + debug.PrintStack() + return "", "", "", err + } + + mac, err = register.GetRegister().GetMac() + if err != nil { + debug.PrintStack() + return "", "", "", err + } + + key, err = register.GetRegister().GetKey() + if err != nil { + debug.PrintStack() + return "", "", "", err + } + return ip, mac, key, nil +} + +// 根据redis的key值,重置node节点为offline +func ResetNodeStatusToOffline(list []string) { + nodes, _ := GetNodeList(nil) + for _, node := range nodes { + hasNode := false + for _, key := range list { + if key == node.Key { + hasNode = true + break + } + } + if !hasNode || node.Status == "" { + node.Status = constants.StatusOffline + if err := node.Save(); err != nil { + log.Errorf(err.Error()) + return + } + continue + } + } +} diff --git a/backend/model/node_test.go b/backend/model/node_test.go new file mode 100644 index 00000000..ba3f4aaa --- /dev/null +++ b/backend/model/node_test.go @@ -0,0 +1,50 @@ +package model + +import ( + "crawlab/config" + "crawlab/constants" + "crawlab/database" + "github.com/apex/log" + . "github.com/smartystreets/goconvey/convey" + "runtime/debug" + "testing" +) + +func TestAddNode(t *testing.T) { + Convey("Test AddNode", t, func() { + if err := config.InitConfig("../conf/config.yml"); err != nil { + log.Error("init config error:" + err.Error()) + panic(err) + } + log.Info("初始化配置成功") + + // 初始化Mongodb数据库 + if err := database.InitMongo(); err != nil { + log.Error("init mongodb error:" + err.Error()) + debug.PrintStack() + panic(err) + } + log.Info("初始化Mongodb数据库成功") + + // 初始化Redis数据库 + if err := database.InitRedis(); err != nil { + log.Error("init redis error:" + err.Error()) + debug.PrintStack() + panic(err) + } + + var node = Node{ + Key: "c4:b3:01:bd:b5:e7", + Name: "10.27.238.101", + Ip: "10.27.238.101", + Port: "8000", + Mac: "c4:b3:01:bd:b5:e7", + Status: constants.StatusOnline, + IsMaster: true, + } + if err := node.Add(); err != nil { + log.Error("add node error:" + err.Error()) + panic(err) + } + }) +} diff --git a/backend/model/spider.go b/backend/model/spider.go index 65782fe8..c1ef7b6e 100644 --- a/backend/model/spider.go +++ b/backend/model/spider.go @@ -23,13 +23,14 @@ type Spider struct { Col string `json:"col"` // 结果储存位置 Site string `json:"site"` // 爬虫网站 Envs []Env `json:"envs" bson:"envs"` // 环境变量 - + Remark string `json:"remark"` // 备注 // 自定义爬虫 Src string `json:"src" bson:"src"` // 源码位置 Cmd string `json:"cmd" bson:"cmd"` // 执行命令 // 前端展示 - LastRunTs time.Time `json:"last_run_ts"` // 最后一次执行时间 + LastRunTs time.Time `json:"last_run_ts"` // 最后一次执行时间 + LastStatus string `json:"last_status"` // 最后执行状态 // TODO: 可配置爬虫 //Fields []interface{} `json:"fields"` @@ -98,7 +99,7 @@ func GetSpiderList(filter interface{}, skip int, limit int) ([]Spider, error) { // 获取爬虫列表 spiders := []Spider{} - if err := c.Find(filter).Skip(skip).Limit(limit).All(&spiders); err != nil { + if err := c.Find(filter).Skip(skip).Limit(limit).Sort("name asc").All(&spiders); err != nil { debug.PrintStack() return spiders, err } @@ -115,6 +116,7 @@ func GetSpiderList(filter interface{}, skip int, limit int) ([]Spider, error) { // 赋值 spiders[i].LastRunTs = task.CreateTs + spiders[i].LastStatus = task.Status } return spiders, nil diff --git a/backend/services/log.go b/backend/services/log.go index c6e4f090..a0ba0311 100644 --- a/backend/services/log.go +++ b/backend/services/log.go @@ -35,8 +35,13 @@ func GetLocalLog(logPath string) (fileBytes []byte, err error) { } defer f.Close() logBuf := make([]byte, 2048) - n, err := f.ReadAt(logBuf, fi.Size()-int64(len(logBuf))) - if err != nil { + off := int64(0) + if fi.Size() > int64(len(logBuf)) { + off = fi.Size() - int64(len(logBuf)) + } + n, err := f.ReadAt(logBuf, off) + // 到文件结尾会有EOF的报错 + if err != nil && err.Error() != "EOF" { log.Error(err.Error()) debug.PrintStack() return nil, err diff --git a/backend/services/node.go b/backend/services/node.go index 09b49dbf..9685a1bb 100644 --- a/backend/services/node.go +++ b/backend/services/node.go @@ -73,23 +73,11 @@ func GetCurrentNode() (model.Node, error) { if err != nil { // 如果为主节点,表示为第一次注册,插入节点信息 if IsMaster() { - // 获取本机IP地址 - ip, err := register.GetRegister().GetIp() + // 获取本机信息 + ip, mac, key, err := model.GetNodeBaseInfo() if err != nil { debug.PrintStack() - return model.Node{}, err - } - - mac, err := register.GetRegister().GetMac() - if err != nil { - debug.PrintStack() - return model.Node{}, err - } - - key, err := register.GetRegister().GetKey() - if err != nil { - debug.PrintStack() - return model.Node{}, err + return node, err } // 生成节点 @@ -97,7 +85,7 @@ func GetCurrentNode() (model.Node, error) { Key: key, Id: bson.NewObjectId(), Ip: ip, - Name: key, + Name: ip, Mac: mac, IsMaster: true, } @@ -177,72 +165,54 @@ func UpdateNodeStatus() { // 在Redis中删除该节点 if err := database.RedisClient.HDel("nodes", data.Key); err != nil { log.Errorf(err.Error()) - return - } - - // 在MongoDB中该节点设置状态为离线 - s, c := database.GetCol("nodes") - defer s.Close() - var node model.Node - if err := c.Find(bson.M{"key": key}).One(&node); err != nil { - log.Errorf(err.Error()) - debug.PrintStack() - return - } - node.Status = constants.StatusOffline - if err := node.Save(); err != nil { - log.Errorf(err.Error()) - return } continue } - // 更新节点信息到数据库 - s, c := database.GetCol("nodes") - defer s.Close() - var node model.Node - if err := c.Find(bson.M{"key": key}).One(&node); err != nil { - // 数据库不存在该节点 - node = model.Node{ - Key: key, - Name: key, - Ip: data.Ip, - Port: "8000", - Mac: data.Mac, - Status: constants.StatusOnline, - IsMaster: data.Master, - } - if err := node.Add(); err != nil { - log.Errorf(err.Error()) - return - } - } else { - // 数据库存在该节点 - node.Status = constants.StatusOnline - if err := node.Save(); err != nil { - log.Errorf(err.Error()) - return - } + // 处理node信息 + handleNodeInfo(key, data) + } + + // 重置不在redis的key为offline + model.ResetNodeStatusToOffline(list) +} + +func handleNodeInfo(key string, data Data) { + // 更新节点信息到数据库 + s, c := database.GetCol("nodes") + defer s.Close() + + // 同个key可能因为并发,被注册多次 + var nodes []model.Node + _ = c.Find(bson.M{"key": key}).All(&nodes) + if nodes != nil && len(nodes) > 1 { + for _, node := range nodes { + _ = c.RemoveId(node.Id) } } - // 遍历数据库中的节点列表 - nodes, err := model.GetNodeList(nil) - for _, node := range nodes { - hasNode := false - for _, key := range list { - if key == node.Key { - hasNode = true - break - } + var node model.Node + if err := c.Find(bson.M{"key": key}).One(&node); err != nil { + // 数据库不存在该节点 + node = model.Node{ + Key: key, + Name: data.Ip, + Ip: data.Ip, + Port: "8000", + Mac: data.Mac, + Status: constants.StatusOnline, + IsMaster: data.Master, } - if !hasNode { - node.Status = constants.StatusOffline - if err := node.Save(); err != nil { - log.Errorf(err.Error()) - return - } - continue + if err := node.Add(); err != nil { + log.Errorf(err.Error()) + return + } + } else { + // 数据库存在该节点 + node.Status = constants.StatusOnline + if err := node.Save(); err != nil { + log.Errorf(err.Error()) + return } } } @@ -334,12 +304,15 @@ func WorkerNodeCallback(channel string, msgStr string) { // 获取本地日志 logStr, err := GetLocalLog(msg.LogPath) + log.Info(string(logStr)) if err != nil { log.Errorf(err.Error()) debug.PrintStack() msgSd.Error = err.Error() + msgSd.Log = err.Error() + } else { + msgSd.Log = string(logStr) } - msgSd.Log = string(logStr) // 序列化 msgSdBytes, err := json.Marshal(&msgSd) @@ -350,7 +323,7 @@ func WorkerNodeCallback(channel string, msgStr string) { } // 发布消息给主节点 - fmt.Println(msgSd) + log.Info("publish get log msg to master") if err := database.Publish("nodes:master", string(msgSdBytes)); err != nil { log.Errorf(err.Error()) return diff --git a/backend/services/spider.go b/backend/services/spider.go index ad0c0ae5..c3f63139 100644 --- a/backend/services/spider.go +++ b/backend/services/spider.go @@ -31,6 +31,7 @@ type SpiderFileData struct { type SpiderUploadMessage struct { FileId string FileName string + SpiderId string } // 从项目目录中获取爬虫列表 @@ -40,9 +41,9 @@ func GetSpidersFromDir() ([]model.Spider, error) { // 如果爬虫项目目录不存在,则创建一个 if !utils.Exists(srcPath) { - mask := syscall.Umask(0) // 改为 0000 八进制 + mask := syscall.Umask(0) // 改为 0000 八进制 defer syscall.Umask(mask) // 改为原来的 umask - if err := os.MkdirAll(srcPath, 0666); err != nil { + if err := os.MkdirAll(srcPath, 0766); err != nil { debug.PrintStack() return []model.Spider{}, err } @@ -295,13 +296,13 @@ func PublishSpider(spider model.Spider) (err error) { msg := SpiderUploadMessage{ FileId: fid.Hex(), FileName: fileName, + SpiderId: spider.Id.Hex(), } msgStr, err := json.Marshal(msg) if err != nil { return } channel := "files:upload" - log.Info("publish files.upload event, file id:" + msg.FileId) if err = database.Publish(channel, string(msgStr)); err != nil { log.Errorf(err.Error()) debug.PrintStack() @@ -313,7 +314,6 @@ func PublishSpider(spider model.Spider) (err error) { // 上传爬虫回调 func OnFileUpload(channel string, msgStr string) { - log.Info("received files.upload event, msgStr:" + msgStr) s, gf := database.GetGridFs("files") defer s.Close() @@ -328,7 +328,7 @@ func OnFileUpload(channel string, msgStr string) { // 从GridFS获取该文件 f, err := gf.OpenId(bson.ObjectIdHex(msg.FileId)) if err != nil { - log.Errorf("open file id: " + msg.FileId + ", error: " + err.Error()) + log.Errorf("open file id: " + msg.FileId + ", spider id:" + msg.SpiderId + ", error: " + err.Error()) debug.PrintStack() return } diff --git a/backend/services/task.go b/backend/services/task.go index 8c3c3407..1b0a5676 100644 --- a/backend/services/task.go +++ b/backend/services/task.go @@ -408,9 +408,12 @@ func GetTaskLog(id string) (logStr string, err error) { logStr = string(logBytes) if err != nil { log.Errorf(err.Error()) - return "", err + logStr = string(err.Error()) + // return "", err + } else { + logStr = string(logBytes) } - logStr = string(logBytes) + } else { // 若不为主节点,获取远端日志 logStr, err = GetRemoteLog(task) diff --git a/frontend/package.json b/frontend/package.json index 139297d3..e3bc84f8 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -6,6 +6,7 @@ "serve": "vue-cli-service serve --ip=0.0.0.0", "serve:prod": "vue-cli-service serve --mode=production --ip=0.0.0.0", "config": "vue ui", + "build:dev": "vue-cli-service build --mode development", "build:prod": "vue-cli-service build --mode production", "lint": "vue-cli-service lint", "test:unit": "vue-cli-service test:unit" diff --git a/frontend/src/components/InfoView/SpiderInfoView.vue b/frontend/src/components/InfoView/SpiderInfoView.vue index 661e4757..39702a5d 100644 --- a/frontend/src/components/InfoView/SpiderInfoView.vue +++ b/frontend/src/components/InfoView/SpiderInfoView.vue @@ -44,6 +44,9 @@ + + + diff --git a/frontend/src/i18n/zh.js b/frontend/src/i18n/zh.js index b1de3b47..58317ec3 100644 --- a/frontend/src/i18n/zh.js +++ b/frontend/src/i18n/zh.js @@ -154,6 +154,8 @@ export default { 'Last Run': '上次运行', 'Action': '操作', 'No command line': '没有执行命令', + 'Last Status': '上次运行状态', + 'Remark': '备注', // 任务 'Task Info': '任务信息', diff --git a/frontend/src/store/modules/node.js b/frontend/src/store/modules/node.js index 266beb3e..5e21a222 100644 --- a/frontend/src/store/modules/node.js +++ b/frontend/src/store/modules/node.js @@ -25,15 +25,7 @@ const mutations = { const { id, systemInfo } = payload for (let i = 0; i < state.nodeList.length; i++) { if (state.nodeList[i]._id === id) { - // Vue.set(state.nodeList[i], 'systemInfo', {}) state.nodeList[i].systemInfo = systemInfo - // for (const key in systemInfo) { - // if (systemInfo.hasOwnProperty(key)) { - // console.log(key) - // state.nodeList[i].systemInfo[key] = systemInfo[key] - // // Vue.set(state.nodeList[i].systemInfo, key, systemInfo[key]) - // } - // } break } } @@ -76,10 +68,12 @@ const actions = { getTaskList ({ state, commit }, id) { return request.get(`/nodes/${id}/tasks`) .then(response => { - commit('task/SET_TASK_LIST', - response.data.data.map(d => d) - .sort((a, b) => a.create_ts < b.create_ts ? 1 : -1), - { root: true }) + if (response.data.data) { + commit('task/SET_TASK_LIST', + response.data.data.map(d => d) + .sort((a, b) => a.create_ts < b.create_ts ? 1 : -1), + { root: true }) + } }) }, getNodeSystemInfo ({ state, commit }, id) { diff --git a/frontend/src/views/spider/SpiderList.vue b/frontend/src/views/spider/SpiderList.vue index 348728a1..0380a6b0 100644 --- a/frontend/src/views/spider/SpiderList.vue +++ b/frontend/src/views/spider/SpiderList.vue @@ -190,6 +190,14 @@ {{getTime(scope.row[col.name])}} + + + - +