Merge pull request #193 from wo10378931/v0.4.0

FIX 一些问题
This commit is contained in:
Marvin Zhang
2019-08-31 18:08:23 +08:00
committed by GitHub
13 changed files with 197 additions and 104 deletions

View File

@@ -1,7 +1,9 @@
package model
import (
"crawlab/constants"
"crawlab/database"
"crawlab/services/register"
"github.com/apex/log"
"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
@@ -79,6 +81,7 @@ func GetNodeList(filter interface{}) ([]Node, error) {
var results []Node
if err := c.Find(filter).All(&results); err != nil {
log.Error("get node list error: " + err.Error())
debug.PrintStack()
return results, err
}
@@ -153,3 +156,47 @@ func GetNodeCount(query interface{}) (int, error) {
return count, nil
}
// 节点基本信息
func GetNodeBaseInfo() (ip string, mac string, key string, error error) {
ip, err := register.GetRegister().GetIp()
if err != nil {
debug.PrintStack()
return "", "", "", err
}
mac, err = register.GetRegister().GetMac()
if err != nil {
debug.PrintStack()
return "", "", "", err
}
key, err = register.GetRegister().GetKey()
if err != nil {
debug.PrintStack()
return "", "", "", err
}
return ip, mac, key, nil
}
// 根据redis的key值重置node节点为offline
func ResetNodeStatusToOffline(list []string) {
nodes, _ := GetNodeList(nil)
for _, node := range nodes {
hasNode := false
for _, key := range list {
if key == node.Key {
hasNode = true
break
}
}
if !hasNode || node.Status == "" {
node.Status = constants.StatusOffline
if err := node.Save(); err != nil {
log.Errorf(err.Error())
return
}
continue
}
}
}

View File

@@ -0,0 +1,50 @@
package model
import (
"crawlab/config"
"crawlab/constants"
"crawlab/database"
"github.com/apex/log"
. "github.com/smartystreets/goconvey/convey"
"runtime/debug"
"testing"
)
func TestAddNode(t *testing.T) {
Convey("Test AddNode", t, func() {
if err := config.InitConfig("../conf/config.yml"); err != nil {
log.Error("init config error:" + err.Error())
panic(err)
}
log.Info("初始化配置成功")
// 初始化Mongodb数据库
if err := database.InitMongo(); err != nil {
log.Error("init mongodb error:" + err.Error())
debug.PrintStack()
panic(err)
}
log.Info("初始化Mongodb数据库成功")
// 初始化Redis数据库
if err := database.InitRedis(); err != nil {
log.Error("init redis error:" + err.Error())
debug.PrintStack()
panic(err)
}
var node = Node{
Key: "c4:b3:01:bd:b5:e7",
Name: "10.27.238.101",
Ip: "10.27.238.101",
Port: "8000",
Mac: "c4:b3:01:bd:b5:e7",
Status: constants.StatusOnline,
IsMaster: true,
}
if err := node.Add(); err != nil {
log.Error("add node error:" + err.Error())
panic(err)
}
})
}

View File

@@ -23,13 +23,14 @@ type Spider struct {
Col string `json:"col"` // 结果储存位置
Site string `json:"site"` // 爬虫网站
Envs []Env `json:"envs" bson:"envs"` // 环境变量
Remark string `json:"remark"` // 备注
// 自定义爬虫
Src string `json:"src" bson:"src"` // 源码位置
Cmd string `json:"cmd" bson:"cmd"` // 执行命令
// 前端展示
LastRunTs time.Time `json:"last_run_ts"` // 最后一次执行时间
LastRunTs time.Time `json:"last_run_ts"` // 最后一次执行时间
LastStatus string `json:"last_status"` // 最后执行状态
// TODO: 可配置爬虫
//Fields []interface{} `json:"fields"`
@@ -98,7 +99,7 @@ func GetSpiderList(filter interface{}, skip int, limit int) ([]Spider, error) {
// 获取爬虫列表
spiders := []Spider{}
if err := c.Find(filter).Skip(skip).Limit(limit).All(&spiders); err != nil {
if err := c.Find(filter).Skip(skip).Limit(limit).Sort("name asc").All(&spiders); err != nil {
debug.PrintStack()
return spiders, err
}
@@ -115,6 +116,7 @@ func GetSpiderList(filter interface{}, skip int, limit int) ([]Spider, error) {
// 赋值
spiders[i].LastRunTs = task.CreateTs
spiders[i].LastStatus = task.Status
}
return spiders, nil

View File

@@ -35,8 +35,13 @@ func GetLocalLog(logPath string) (fileBytes []byte, err error) {
}
defer f.Close()
logBuf := make([]byte, 2048)
n, err := f.ReadAt(logBuf, fi.Size()-int64(len(logBuf)))
if err != nil {
off := int64(0)
if fi.Size() > int64(len(logBuf)) {
off = fi.Size() - int64(len(logBuf))
}
n, err := f.ReadAt(logBuf, off)
// 到文件结尾会有EOF的报错
if err != nil && err.Error() != "EOF" {
log.Error(err.Error())
debug.PrintStack()
return nil, err

View File

@@ -73,23 +73,11 @@ func GetCurrentNode() (model.Node, error) {
if err != nil {
// 如果为主节点,表示为第一次注册,插入节点信息
if IsMaster() {
// 获取本机IP地址
ip, err := register.GetRegister().GetIp()
// 获取本机信息
ip, mac, key, err := model.GetNodeBaseInfo()
if err != nil {
debug.PrintStack()
return model.Node{}, err
}
mac, err := register.GetRegister().GetMac()
if err != nil {
debug.PrintStack()
return model.Node{}, err
}
key, err := register.GetRegister().GetKey()
if err != nil {
debug.PrintStack()
return model.Node{}, err
return node, err
}
// 生成节点
@@ -97,7 +85,7 @@ func GetCurrentNode() (model.Node, error) {
Key: key,
Id: bson.NewObjectId(),
Ip: ip,
Name: key,
Name: ip,
Mac: mac,
IsMaster: true,
}
@@ -177,72 +165,54 @@ func UpdateNodeStatus() {
// 在Redis中删除该节点
if err := database.RedisClient.HDel("nodes", data.Key); err != nil {
log.Errorf(err.Error())
return
}
// 在MongoDB中该节点设置状态为离线
s, c := database.GetCol("nodes")
defer s.Close()
var node model.Node
if err := c.Find(bson.M{"key": key}).One(&node); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
}
node.Status = constants.StatusOffline
if err := node.Save(); err != nil {
log.Errorf(err.Error())
return
}
continue
}
// 更新节点信息到数据库
s, c := database.GetCol("nodes")
defer s.Close()
var node model.Node
if err := c.Find(bson.M{"key": key}).One(&node); err != nil {
// 数据库不存在该节点
node = model.Node{
Key: key,
Name: key,
Ip: data.Ip,
Port: "8000",
Mac: data.Mac,
Status: constants.StatusOnline,
IsMaster: data.Master,
}
if err := node.Add(); err != nil {
log.Errorf(err.Error())
return
}
} else {
// 数据库存在该节点
node.Status = constants.StatusOnline
if err := node.Save(); err != nil {
log.Errorf(err.Error())
return
}
// 处理node信息
handleNodeInfo(key, data)
}
// 重置不在redis的key为offline
model.ResetNodeStatusToOffline(list)
}
func handleNodeInfo(key string, data Data) {
// 更新节点信息到数据库
s, c := database.GetCol("nodes")
defer s.Close()
// 同个key可能因为并发被注册多次
var nodes []model.Node
_ = c.Find(bson.M{"key": key}).All(&nodes)
if nodes != nil && len(nodes) > 1 {
for _, node := range nodes {
_ = c.RemoveId(node.Id)
}
}
// 遍历数据库中的节点列表
nodes, err := model.GetNodeList(nil)
for _, node := range nodes {
hasNode := false
for _, key := range list {
if key == node.Key {
hasNode = true
break
}
var node model.Node
if err := c.Find(bson.M{"key": key}).One(&node); err != nil {
// 数据库不存在该节点
node = model.Node{
Key: key,
Name: data.Ip,
Ip: data.Ip,
Port: "8000",
Mac: data.Mac,
Status: constants.StatusOnline,
IsMaster: data.Master,
}
if !hasNode {
node.Status = constants.StatusOffline
if err := node.Save(); err != nil {
log.Errorf(err.Error())
return
}
continue
if err := node.Add(); err != nil {
log.Errorf(err.Error())
return
}
} else {
// 数据库存在该节点
node.Status = constants.StatusOnline
if err := node.Save(); err != nil {
log.Errorf(err.Error())
return
}
}
}
@@ -334,12 +304,15 @@ func WorkerNodeCallback(channel string, msgStr string) {
// 获取本地日志
logStr, err := GetLocalLog(msg.LogPath)
log.Info(string(logStr))
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
msgSd.Error = err.Error()
msgSd.Log = err.Error()
} else {
msgSd.Log = string(logStr)
}
msgSd.Log = string(logStr)
// 序列化
msgSdBytes, err := json.Marshal(&msgSd)
@@ -350,7 +323,7 @@ func WorkerNodeCallback(channel string, msgStr string) {
}
// 发布消息给主节点
fmt.Println(msgSd)
log.Info("publish get log msg to master")
if err := database.Publish("nodes:master", string(msgSdBytes)); err != nil {
log.Errorf(err.Error())
return

View File

@@ -31,6 +31,7 @@ type SpiderFileData struct {
type SpiderUploadMessage struct {
FileId string
FileName string
SpiderId string
}
// 从项目目录中获取爬虫列表
@@ -40,9 +41,9 @@ func GetSpidersFromDir() ([]model.Spider, error) {
// 如果爬虫项目目录不存在,则创建一个
if !utils.Exists(srcPath) {
mask := syscall.Umask(0) // 改为 0000 八进制
mask := syscall.Umask(0) // 改为 0000 八进制
defer syscall.Umask(mask) // 改为原来的 umask
if err := os.MkdirAll(srcPath, 0666); err != nil {
if err := os.MkdirAll(srcPath, 0766); err != nil {
debug.PrintStack()
return []model.Spider{}, err
}
@@ -295,13 +296,13 @@ func PublishSpider(spider model.Spider) (err error) {
msg := SpiderUploadMessage{
FileId: fid.Hex(),
FileName: fileName,
SpiderId: spider.Id.Hex(),
}
msgStr, err := json.Marshal(msg)
if err != nil {
return
}
channel := "files:upload"
log.Info("publish files.upload event, file id:" + msg.FileId)
if err = database.Publish(channel, string(msgStr)); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
@@ -313,7 +314,6 @@ func PublishSpider(spider model.Spider) (err error) {
// 上传爬虫回调
func OnFileUpload(channel string, msgStr string) {
log.Info("received files.upload event, msgStr:" + msgStr)
s, gf := database.GetGridFs("files")
defer s.Close()
@@ -328,7 +328,7 @@ func OnFileUpload(channel string, msgStr string) {
// 从GridFS获取该文件
f, err := gf.OpenId(bson.ObjectIdHex(msg.FileId))
if err != nil {
log.Errorf("open file id: " + msg.FileId + ", error: " + err.Error())
log.Errorf("open file id: " + msg.FileId + ", spider id:" + msg.SpiderId + ", error: " + err.Error())
debug.PrintStack()
return
}

View File

@@ -408,9 +408,12 @@ func GetTaskLog(id string) (logStr string, err error) {
logStr = string(logBytes)
if err != nil {
log.Errorf(err.Error())
return "", err
logStr = string(err.Error())
// return "", err
} else {
logStr = string(logBytes)
}
logStr = string(logBytes)
} else {
// 若不为主节点,获取远端日志
logStr, err = GetRemoteLog(task)

View File

@@ -6,6 +6,7 @@
"serve": "vue-cli-service serve --ip=0.0.0.0",
"serve:prod": "vue-cli-service serve --mode=production --ip=0.0.0.0",
"config": "vue ui",
"build:dev": "vue-cli-service build --mode development",
"build:prod": "vue-cli-service build --mode production",
"lint": "vue-cli-service lint",
"test:unit": "vue-cli-service test:unit"

View File

@@ -44,6 +44,9 @@
<el-option value="customized" :label="$t('Customized')"></el-option>
</el-select>
</el-form-item>
<el-form-item :label="$t('Remark')">
<el-input v-model="spiderForm.remark"/>
</el-form-item>
</el-form>
</el-row>
<el-row class="button-container" v-if="!isView">

View File

@@ -154,6 +154,8 @@ export default {
'Last Run': '上次运行',
'Action': '操作',
'No command line': '没有执行命令',
'Last Status': '上次运行状态',
'Remark': '备注',
// 任务
'Task Info': '任务信息',

View File

@@ -25,15 +25,7 @@ const mutations = {
const { id, systemInfo } = payload
for (let i = 0; i < state.nodeList.length; i++) {
if (state.nodeList[i]._id === id) {
// Vue.set(state.nodeList[i], 'systemInfo', {})
state.nodeList[i].systemInfo = systemInfo
// for (const key in systemInfo) {
// if (systemInfo.hasOwnProperty(key)) {
// console.log(key)
// state.nodeList[i].systemInfo[key] = systemInfo[key]
// // Vue.set(state.nodeList[i].systemInfo, key, systemInfo[key])
// }
// }
break
}
}
@@ -76,10 +68,12 @@ const actions = {
getTaskList ({ state, commit }, id) {
return request.get(`/nodes/${id}/tasks`)
.then(response => {
commit('task/SET_TASK_LIST',
response.data.data.map(d => d)
.sort((a, b) => a.create_ts < b.create_ts ? 1 : -1),
{ root: true })
if (response.data.data) {
commit('task/SET_TASK_LIST',
response.data.data.map(d => d)
.sort((a, b) => a.create_ts < b.create_ts ? 1 : -1),
{ root: true })
}
})
},
getNodeSystemInfo ({ state, commit }, id) {

View File

@@ -190,6 +190,14 @@
{{getTime(scope.row[col.name])}}
</template>
</el-table-column>
<el-table-column v-else-if="col.name === 'last_status'"
:key="col.name"
:label="$t(col.label)"
align="left" :width="col.width">
<template slot-scope="scope">
<status-tag :status="scope.row.last_status"/>
</template>
</el-table-column>
<el-table-column v-else
:key="col.name"
:property="col.name"
@@ -239,10 +247,14 @@ import {
} from 'vuex'
import dayjs from 'dayjs'
import CrawlConfirmDialog from '../../components/Common/CrawlConfirmDialog'
import StatusTag from '../../components/Status/StatusTag'
export default {
name: 'SpiderList',
components: { CrawlConfirmDialog },
components: {
CrawlConfirmDialog,
StatusTag
},
data () {
return {
pagination: {
@@ -267,10 +279,11 @@ export default {
// { name: 'site_name', label: 'Site', width: '140', align: 'left' },
{ name: 'type', label: 'Spider Type', width: '120' },
// { name: 'cmd', label: 'Command Line', width: '200' },
// { name: 'lang', label: 'Language', width: '120', sortable: true },
{ name: 'last_status', label: 'Last Status', width: '120' },
{ name: 'last_run_ts', label: 'Last Run', width: '160' },
{ name: 'create_ts', label: 'Create Time', width: '160' },
{ name: 'update_ts', label: 'Update Time', width: '160' }
{ name: 'update_ts', label: 'Update Time', width: '160' },
{ name: 'remark', label: 'Remark', width: '160' }
// { name: 'last_7d_tasks', label: 'Last 7-Day Tasks', width: '80' },
// { name: 'last_5_errors', label: 'Last 5-Run Errors', width: '80' }
],

View File

@@ -119,7 +119,7 @@
:width="col.width">
</el-table-column>
</template>
<el-table-column :label="$t('Action')" align="left" width="120" fixed="right">
<el-table-column :label="$t('Action')" align="left" fixed="right">
<template slot-scope="scope">
<el-tooltip :content="$t('View')" placement="top">
<el-button type="primary" icon="el-icon-search" size="mini" @click="onView(scope.row)"></el-button>