From 7fe2c9a42535fedada07e4efd9eed3cc73aec9f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Thu, 10 Oct 2019 17:59:15 +0800 Subject: [PATCH 01/24] =?UTF-8?q?fix=20=E7=88=AC=E8=99=AB=E5=88=97?= =?UTF-8?q?=E8=A1=A8=E6=8C=89=E9=92=AE=E6=97=A0=E6=B3=95=E7=82=B9=E5=87=BB?= =?UTF-8?q?=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- frontend/src/views/spider/SpiderList.vue | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/frontend/src/views/spider/SpiderList.vue b/frontend/src/views/spider/SpiderList.vue index 743aabbe..8c97339a 100644 --- a/frontend/src/views/spider/SpiderList.vue +++ b/frontend/src/views/spider/SpiderList.vue @@ -424,13 +424,18 @@ export default { this.dialogVisible = true }, isShowRun (row) { - if (this.isCustomized(row)) { - // customized spider - return !!row.cmd + if (row.cmd) { + return true } else { - // configurable spider - return !!row.fields + return false } + // if (this.isCustomized(row)) { + // // customized spider + // return !!row.cmd + // } else { + // // configurable spider + // return !!row.fields + // } }, isCustomized (row) { return row.type === 'customized' From 46d89c8cce2d417b089c8e80bb8f68e9059975e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Thu, 10 Oct 2019 19:56:04 +0800 Subject: [PATCH 02/24] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E6=97=A5=E5=BF=97?= =?UTF-8?q?=E6=89=93=E5=8D=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/services/msg_handler/handler.go | 2 ++ backend/services/msg_handler/msg_log.go | 3 +++ backend/services/node.go | 20 +++++++++++++------- backend/services/task.go | 2 +- 4 files changed, 19 insertions(+), 8 deletions(-) diff --git a/backend/services/msg_handler/handler.go b/backend/services/msg_handler/handler.go index 848e0c5d..b8b8e231 100644 --- a/backend/services/msg_handler/handler.go +++ b/backend/services/msg_handler/handler.go @@ -3,6 +3,7 @@ package msg_handler import ( "crawlab/constants" "crawlab/entity" + "github.com/apex/log" ) type Handler interface { @@ -10,6 +11,7 @@ type Handler interface { } func GetMsgHandler(msg entity.NodeMessage) Handler { + log.Infof("received msg , type is : %s", msg.Type) if msg.Type == constants.MsgTypeGetLog || msg.Type == constants.MsgTypeRemoveLog { // 日志相关 return &Log{ diff --git a/backend/services/msg_handler/msg_log.go b/backend/services/msg_handler/msg_log.go index 37080bd6..b865f4e3 100644 --- a/backend/services/msg_handler/msg_log.go +++ b/backend/services/msg_handler/msg_log.go @@ -40,8 +40,11 @@ func (g *Log) get() error { } // 发布消息给主节点 if err := utils.Pub(constants.ChannelMasterNode, msgSd); err != nil { + log.Errorf("pub log to master node error: %s", err.Error()) + debug.PrintStack() return err } + log.Infof(msgSd.Log) return nil } diff --git a/backend/services/node.go b/backend/services/node.go index 53af8d32..7fc134c5 100644 --- a/backend/services/node.go +++ b/backend/services/node.go @@ -110,13 +110,15 @@ func handleNodeInfo(key string, data Data) { if err := c.Find(bson.M{"key": key}).One(&node); err != nil { // 数据库不存在该节点 node = model.Node{ - Key: key, - Name: data.Ip, - Ip: data.Ip, - Port: "8000", - Mac: data.Mac, - Status: constants.StatusOnline, - IsMaster: data.Master, + Key: key, + Name: data.Ip, + Ip: data.Ip, + Port: "8000", + Mac: data.Mac, + Status: constants.StatusOnline, + IsMaster: data.Master, + UpdateTs: time.Now(), + UpdateTsUnix: time.Now().Unix(), } if err := node.Add(); err != nil { log.Errorf(err.Error()) @@ -125,6 +127,8 @@ func handleNodeInfo(key string, data Data) { } else { // 数据库存在该节点 node.Status = constants.StatusOnline + node.UpdateTs = time.Now() + node.UpdateTsUnix = time.Now().Unix() if err := node.Save(); err != nil { log.Errorf(err.Error()) return @@ -201,6 +205,8 @@ func WorkerNodeCallback(message redis.Message) (err error) { // 反序列化 msg := utils.GetMessage(message) if err := msg_handler.GetMsgHandler(*msg).Handle(); err != nil { + log.Errorf("msg handler error: %s", err.Error()) + debug.PrintStack() return err } return nil diff --git a/backend/services/task.go b/backend/services/task.go index ce62a95e..f515f48d 100644 --- a/backend/services/task.go +++ b/backend/services/task.go @@ -148,12 +148,12 @@ func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider) (e debug.PrintStack() } t.Status = constants.StatusCancelled + t.Error = "user kill the process ..." } else { // 保存任务 t.Status = constants.StatusFinished } t.FinishTs = time.Now() - t.Error = "user kill the process ..." if err := t.Save(); err != nil { log.Infof("save task error: %s", err.Error()) debug.PrintStack() From 973251a0fbe7a2184ac0da09e0404a17c736aee7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Fri, 11 Oct 2019 21:57:25 +0800 Subject: [PATCH 03/24] =?UTF-8?q?=E4=BC=98=E5=8C=96=E8=8A=82=E7=82=B9?= =?UTF-8?q?=E4=BF=A1=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/model/node.go | 6 ++++-- backend/model/schedule.go | 19 ++++++++++++++----- backend/model/spider.go | 11 ++++++++--- backend/model/task.go | 13 ++++--------- backend/routes/node.go | 10 +++++----- backend/services/node.go | 4 +++- 6 files changed, 38 insertions(+), 25 deletions(-) diff --git a/backend/model/node.go b/backend/model/node.go index 7af93dbe..1a1ebce5 100644 --- a/backend/model/node.go +++ b/backend/model/node.go @@ -157,10 +157,12 @@ func GetNodeList(filter interface{}) ([]Node, error) { } func GetNode(id bson.ObjectId) (Node, error) { + var node Node + if id.Hex() == "" { + return node, nil + } s, c := database.GetCol("nodes") defer s.Close() - - var node Node if err := c.FindId(id).One(&node); err != nil { if err != mgo.ErrNotFound { log.Errorf(err.Error()) diff --git a/backend/model/schedule.go b/backend/model/schedule.go index 6415e22b..8ec065fb 100644 --- a/backend/model/schedule.go +++ b/backend/model/schedule.go @@ -38,6 +38,12 @@ func (sch *Schedule) Save() error { return nil } +func (sch *Schedule) Delete() error { + s, c := database.GetCol("schedules") + defer s.Close() + return c.RemoveId(sch.Id) +} + func GetScheduleList(filter interface{}) ([]Schedule, error) { s, c := database.GetCol("schedules") defer s.Close() @@ -47,11 +53,12 @@ func GetScheduleList(filter interface{}) ([]Schedule, error) { return schedules, err } - for i, schedule := range schedules { + var schs []Schedule + for _, schedule := range schedules { // 获取节点名称 if schedule.NodeId == bson.ObjectIdHex(constants.ObjectIdNull) { // 选择所有节点 - schedules[i].NodeName = "All Nodes" + schedule.NodeName = "All Nodes" } else { // 选择单一节点 node, err := GetNode(schedule.NodeId) @@ -59,7 +66,7 @@ func GetScheduleList(filter interface{}) ([]Schedule, error) { log.Errorf(err.Error()) continue } - schedules[i].NodeName = node.Name + schedule.NodeName = node.Name } // 获取爬虫名称 @@ -67,11 +74,13 @@ func GetScheduleList(filter interface{}) ([]Schedule, error) { if err != nil { log.Errorf("get spider by id: %s, error: %s", schedule.SpiderId.Hex(), err.Error()) debug.PrintStack() + _ = schedule.Delete() continue } - schedules[i].SpiderName = spider.Name + schedule.SpiderName = spider.Name + schs = append(schs, schedule) } - return schedules, nil + return schs, nil } func GetSchedule(id bson.ObjectId) (Schedule, error) { diff --git a/backend/model/spider.go b/backend/model/spider.go index 1f88acff..efd93c3d 100644 --- a/backend/model/spider.go +++ b/backend/model/spider.go @@ -98,6 +98,12 @@ func (spider *Spider) GetLastTask() (Task, error) { return tasks[0], nil } +func (spider *Spider) Delete() error { + s, c := database.GetCol("spiders") + defer s.Close() + return c.RemoveId(spider.Id) +} + // 爬虫列表 func GetSpiderList(filter interface{}, skip int, limit int) ([]Spider, int, error) { s, c := database.GetCol("spiders") @@ -256,15 +262,14 @@ func GetSpiderTypes() ([]*entity.SpiderType, error) { s, c := database.GetCol("spiders") defer s.Close() - group := bson.M{ "$group": bson.M{ - "_id": "$type", + "_id": "$type", "count": bson.M{"$sum": 1}, }, } var types []*entity.SpiderType - if err := c.Pipe([]bson.M{ group}).All(&types); err != nil { + if err := c.Pipe([]bson.M{group}).All(&types); err != nil { log.Errorf("get spider types error: %s", err.Error()) debug.PrintStack() return nil, err diff --git a/backend/model/task.go b/backend/model/task.go index f568b7fe..df046ecc 100644 --- a/backend/model/task.go +++ b/backend/model/task.go @@ -4,7 +4,6 @@ import ( "crawlab/constants" "crawlab/database" "github.com/apex/log" - "github.com/globalsign/mgo" "github.com/globalsign/mgo/bson" "runtime/debug" "time" @@ -118,20 +117,16 @@ func GetTaskList(filter interface{}, skip int, limit int, sortKey string) ([]Tas for i, task := range tasks { // 获取爬虫名称 spider, err := task.GetSpider() - if err == mgo.ErrNotFound { - // do nothing - } else if err != nil { - return tasks, err + if spider.Id.Hex() == "" || err != nil { + _ = spider.Delete() } else { tasks[i].SpiderName = spider.DisplayName } // 获取节点名称 node, err := task.GetNode() - if err == mgo.ErrNotFound { - // do nothing - } else if err != nil { - return tasks, err + if node.Id.Hex() == "" || err != nil { + _ = task.Delete() } else { tasks[i].NodeName = node.Name } diff --git a/backend/routes/node.go b/backend/routes/node.go index f86c152d..7d030773 100644 --- a/backend/routes/node.go +++ b/backend/routes/node.go @@ -15,9 +15,9 @@ func GetNodeList(c *gin.Context) { return } - for i, node := range nodes { - nodes[i].IsMaster = services.IsMasterNode(node.Id.Hex()) - } + //for i, node := range nodes { + // nodes[i].IsMaster = services.IsMasterNode(node.Id.Hex()) + //} c.JSON(http.StatusOK, Response{ Status: "ok", @@ -109,11 +109,11 @@ func GetSystemInfo(c *gin.Context) { }) } -func DeleteNode(c *gin.Context) { +func DeleteNode(c *gin.Context) { id := c.Param("id") node, err := model.GetNode(bson.ObjectIdHex(id)) if err != nil { - HandleError(http.StatusInternalServerError, c ,err) + HandleError(http.StatusInternalServerError, c, err) return } err = node.Delete() diff --git a/backend/services/node.go b/backend/services/node.go index 7fc134c5..144cdbd8 100644 --- a/backend/services/node.go +++ b/backend/services/node.go @@ -88,6 +88,8 @@ func UpdateNodeStatus() { handleNodeInfo(key, data) } + // 重新获取list + list, _ = database.RedisClient.HKeys("nodes") // 重置不在redis的key为offline model.ResetNodeStatusToOffline(list) } @@ -225,7 +227,7 @@ func InitNodeService() error { } // 首次更新节点数据(注册到Redis) - UpdateNodeData() + // UpdateNodeData() // 获取当前节点 node, err := model.GetCurrentNode() From 311f72da19094e3fa05ab4af49812f58843d8d93 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Fri, 11 Oct 2019 21:57:59 +0800 Subject: [PATCH 04/24] =?UTF-8?q?fix=20=E7=9B=B4=E6=8E=A5=E7=82=B9?= =?UTF-8?q?=E5=87=BB=E5=AE=9A=E6=97=B6=E4=BB=BB=E5=8A=A1=E6=97=A0=E6=B3=95?= =?UTF-8?q?=E6=89=BE=E5=88=B0=E7=88=AC=E8=99=AB=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- frontend/src/views/schedule/ScheduleList.vue | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/frontend/src/views/schedule/ScheduleList.vue b/frontend/src/views/schedule/ScheduleList.vue index c44d46e2..743a186e 100644 --- a/frontend/src/views/schedule/ScheduleList.vue +++ b/frontend/src/views/schedule/ScheduleList.vue @@ -269,7 +269,7 @@ export default { }, created () { this.$store.dispatch('schedule/getScheduleList') - // this.$store.dispatch('spider/getSpiderList') + this.$store.dispatch('spider/getSpiderList') this.$store.dispatch('node/getNodeList') } } From 7dae91ab50a99901e03a72d52c673167ae4267de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Fri, 11 Oct 2019 22:48:06 +0800 Subject: [PATCH 05/24] fix --- backend/model/schedule.go | 9 +++++++++ backend/routes/schedule.go | 6 +++--- backend/services/node.go | 14 +++++++------- backend/services/schedule.go | 18 +++++++++++++++--- 4 files changed, 34 insertions(+), 13 deletions(-) diff --git a/backend/model/schedule.go b/backend/model/schedule.go index 8ec065fb..951cb043 100644 --- a/backend/model/schedule.go +++ b/backend/model/schedule.go @@ -16,6 +16,7 @@ type Schedule struct { Description string `json:"description" bson:"description"` SpiderId bson.ObjectId `json:"spider_id" bson:"spider_id"` NodeId bson.ObjectId `json:"node_id" bson:"node_id"` + NodeKey string `json:"node_key" bson:"node_key"` Cron string `json:"cron" bson:"cron"` EntryId cron.EntryID `json:"entry_id" bson:"entry_id"` Param string `json:"param" bson:"param"` @@ -113,9 +114,17 @@ func AddSchedule(item Schedule) error { s, c := database.GetCol("schedules") defer s.Close() + node, err := GetNode(item.NodeId) + if err != nil { + log.Errorf("get node error: %s", err.Error()) + debug.PrintStack() + return nil + } + item.Id = bson.NewObjectId() item.CreateTs = time.Now() item.UpdateTs = time.Now() + item.NodeKey = node.Key if err := c.Insert(&item); err != nil { debug.PrintStack() diff --git a/backend/routes/schedule.go b/backend/routes/schedule.go index b447abb5..4ca245b3 100644 --- a/backend/routes/schedule.go +++ b/backend/routes/schedule.go @@ -81,9 +81,9 @@ func PutSchedule(c *gin.Context) { } // 如果node_id为空,则置为空ObjectId - if item.NodeId == "" { - item.NodeId = bson.ObjectIdHex(constants.ObjectIdNull) - } + //if item.NodeId == "" { + // item.NodeId = bson.ObjectIdHex(constants.ObjectIdNull) + //} // 更新数据库 if err := model.AddSchedule(item); err != nil { diff --git a/backend/services/node.go b/backend/services/node.go index 144cdbd8..04cbc0ef 100644 --- a/backend/services/node.go +++ b/backend/services/node.go @@ -100,13 +100,13 @@ func handleNodeInfo(key string, data Data) { defer s.Close() // 同个key可能因为并发,被注册多次 - var nodes []model.Node - _ = c.Find(bson.M{"key": key}).All(&nodes) - if nodes != nil && len(nodes) > 1 { - for _, node := range nodes { - _ = c.RemoveId(node.Id) - } - } + //var nodes []model.Node + //_ = c.Find(bson.M{"key": key}).All(&nodes) + //if nodes != nil && len(nodes) > 1 { + // for _, node := range nodes { + // _ = c.RemoveId(node.Id) + // } + //} var node model.Node if err := c.Find(bson.M{"key": key}).One(&node); err != nil { diff --git a/backend/services/schedule.go b/backend/services/schedule.go index 58cdf628..f011f02a 100644 --- a/backend/services/schedule.go +++ b/backend/services/schedule.go @@ -17,7 +17,19 @@ type Scheduler struct { func AddTask(s model.Schedule) func() { return func() { - nodeId := s.NodeId + node, err := model.GetNodeByKey(s.NodeKey) + if err != nil || node.Id.Hex() == "" { + log.Errorf("get node by key error: %s", err.Error()) + debug.PrintStack() + return + } + + spider := model.GetSpiderByName(s.SpiderName) + if spider == nil || spider.Id.Hex() == "" { + log.Errorf("get spider by name error: %s", err.Error()) + debug.PrintStack() + return + } // 生成任务ID id := uuid.NewV4() @@ -25,8 +37,8 @@ func AddTask(s model.Schedule) func() { // 生成任务模型 t := model.Task{ Id: id.String(), - SpiderId: s.SpiderId, - NodeId: nodeId, + SpiderId: spider.Id, + NodeId: node.Id, Status: constants.StatusPending, Param: s.Param, } From 8eef98e082c9dcdd7423797cfcbb70cc100aa277 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Fri, 11 Oct 2019 22:48:22 +0800 Subject: [PATCH 06/24] fix --- frontend/src/views/schedule/ScheduleList.vue | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/frontend/src/views/schedule/ScheduleList.vue b/frontend/src/views/schedule/ScheduleList.vue index 743a186e..d16a6f69 100644 --- a/frontend/src/views/schedule/ScheduleList.vue +++ b/frontend/src/views/schedule/ScheduleList.vue @@ -16,7 +16,7 @@ - + Date: Fri, 11 Oct 2019 23:22:25 +0800 Subject: [PATCH 07/24] =?UTF-8?q?fix=20=E5=AE=9A=E6=97=B6=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E7=AE=A1=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/services/log.go | 11 +++++++++-- backend/services/task.go | 3 +++ 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/backend/services/log.go b/backend/services/log.go index 485cb7dd..81140c0a 100644 --- a/backend/services/log.go +++ b/backend/services/log.go @@ -15,6 +15,7 @@ import ( "os" "path/filepath" "runtime/debug" + "time" ) // 任务日志频道映射 @@ -45,8 +46,14 @@ func GetRemoteLog(task model.Task) (logStr string, err error) { // 生成频道,等待获取log ch := TaskLogChanMap.ChanBlocked(task.Id) - // 此处阻塞,等待结果 - logStr = <-ch + select { + case logStr = <-ch: + log.Infof("get remote log") + break + case <-time.After(5 * time.Second): + logStr = "get remote log timeout" + break + } return logStr, nil } diff --git a/backend/services/task.go b/backend/services/task.go index f515f48d..12f0330e 100644 --- a/backend/services/task.go +++ b/backend/services/task.go @@ -285,6 +285,9 @@ func ExecuteTask(id int) { // 节点队列任务 var msg string msg, err = database.RedisClient.LPop(queueCur) + if msg != "" { + log.Infof("queue cur: %s", msg) + } if err != nil { if msg == "" { // 节点队列没有任务,获取公共队列任务 From d853948718b440a0f42a5c5dbf9f7e57df191459 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Fri, 11 Oct 2019 23:23:04 +0800 Subject: [PATCH 08/24] =?UTF-8?q?fix=20=E5=AE=9A=E6=97=B6=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E7=AE=A1=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- frontend/src/views/schedule/ScheduleList.vue | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/frontend/src/views/schedule/ScheduleList.vue b/frontend/src/views/schedule/ScheduleList.vue index d16a6f69..4d283966 100644 --- a/frontend/src/views/schedule/ScheduleList.vue +++ b/frontend/src/views/schedule/ScheduleList.vue @@ -132,6 +132,7 @@ From 5bd30d8046ef5bafb483223a5a66f1f8531ed036 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Fri, 11 Oct 2019 23:53:07 +0800 Subject: [PATCH 09/24] =?UTF-8?q?fix=20=E5=AE=9A=E6=97=B6=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E7=AE=A1=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/model/schedule.go | 7 +++++++ backend/routes/schedule.go | 7 +++---- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/backend/model/schedule.go b/backend/model/schedule.go index 951cb043..bcd051e3 100644 --- a/backend/model/schedule.go +++ b/backend/model/schedule.go @@ -104,6 +104,13 @@ func UpdateSchedule(id bson.ObjectId, item Schedule) error { return err } + node, err := GetNode(item.NodeId) + if err != nil { + log.Errorf("get node error: %s", err.Error()) + debug.PrintStack() + return nil + } + item.NodeKey = node.Key if err := item.Save(); err != nil { return err } diff --git a/backend/routes/schedule.go b/backend/routes/schedule.go index 4ca245b3..24df0c0f 100644 --- a/backend/routes/schedule.go +++ b/backend/routes/schedule.go @@ -1,7 +1,6 @@ package routes import ( - "crawlab/constants" "crawlab/model" "crawlab/services" "github.com/gin-gonic/gin" @@ -49,9 +48,9 @@ func PostSchedule(c *gin.Context) { newItem.Id = bson.ObjectIdHex(id) // 如果node_id为空,则置为空ObjectId - if newItem.NodeId == "" { - newItem.NodeId = bson.ObjectIdHex(constants.ObjectIdNull) - } + //if newItem.NodeId == "" { + // newItem.NodeId = bson.ObjectIdHex(constants.ObjectIdNull) + //} // 更新数据库 if err := model.UpdateSchedule(bson.ObjectIdHex(id), newItem); err != nil { From 9d8b0fd13767f60b4bf69ba69005b5df3acf4ebf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Sat, 12 Oct 2019 06:28:53 +0800 Subject: [PATCH 10/24] =?UTF-8?q?=E4=BF=AE=E6=94=B9Dockerfile?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Dockerfile | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index 893cf6fe..52c668e9 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,6 +14,7 @@ ADD ./frontend /app WORKDIR /app # install frontend +RUN npm config set unsafe-perm true RUN npm install -g yarn && yarn install RUN npm run build:prod @@ -56,4 +57,4 @@ EXPOSE 8080 EXPOSE 8000 # start backend -CMD ["/bin/sh", "/app/docker_init.sh"] \ No newline at end of file +CMD ["/bin/sh", "/app/docker_init.sh"] From decb662c12361e4c6fc0290c8c885f319d1c7293 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Sat, 12 Oct 2019 06:29:52 +0800 Subject: [PATCH 11/24] =?UTF-8?q?=E4=BF=AE=E6=94=B9Dockerfile?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index 52c668e9..0809a0ba 100644 --- a/Dockerfile +++ b/Dockerfile @@ -15,7 +15,7 @@ WORKDIR /app # install frontend RUN npm config set unsafe-perm true -RUN npm install -g yarn && yarn install +RUN npm install -g yarn && yarn install --registry=https://registry.npm.taobao.org RUN npm run build:prod From 6af06efc17685a9e232e8c2b5fd819ec7d2d1674 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Sat, 12 Oct 2019 07:16:11 +0800 Subject: [PATCH 12/24] =?UTF-8?q?fix=20worker=E6=97=A0=E6=B3=95=E8=8E=B7?= =?UTF-8?q?=E5=8F=96=E5=88=B0=E5=BD=93=E5=89=8DNode=E7=9A=84=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/services/node.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/services/node.go b/backend/services/node.go index 04cbc0ef..d3409ed2 100644 --- a/backend/services/node.go +++ b/backend/services/node.go @@ -227,7 +227,7 @@ func InitNodeService() error { } // 首次更新节点数据(注册到Redis) - // UpdateNodeData() + UpdateNodeData() // 获取当前节点 node, err := model.GetCurrentNode() From d4c152f93a15d60994c4617fe4f4d9a6ba77ba80 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Tue, 15 Oct 2019 17:23:03 +0800 Subject: [PATCH 13/24] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/services/node.go | 14 ++-- backend/services/task.go | 173 ++++++++++++++++++++++++--------------- 2 files changed, 113 insertions(+), 74 deletions(-) diff --git a/backend/services/node.go b/backend/services/node.go index d3409ed2..8b1f998a 100644 --- a/backend/services/node.go +++ b/backend/services/node.go @@ -100,13 +100,13 @@ func handleNodeInfo(key string, data Data) { defer s.Close() // 同个key可能因为并发,被注册多次 - //var nodes []model.Node - //_ = c.Find(bson.M{"key": key}).All(&nodes) - //if nodes != nil && len(nodes) > 1 { - // for _, node := range nodes { - // _ = c.RemoveId(node.Id) - // } - //} + var nodes []model.Node + _ = c.Find(bson.M{"key": key}).All(&nodes) + if nodes != nil && len(nodes) > 1 { + for _, node := range nodes { + _ = c.RemoveId(node.Id) + } + } var node model.Node if err := c.Find(bson.M{"key": key}).One(&node); err != nil { diff --git a/backend/services/task.go b/backend/services/task.go index 12f0330e..b79dbe7a 100644 --- a/backend/services/task.go +++ b/backend/services/task.go @@ -100,10 +100,104 @@ func AssignTask(task model.Task) error { return nil } +// 设置环境变量 +func SetEnv(cmd *exec.Cmd, envs []model.Env, taskId string, dataCol string) *exec.Cmd { + // 默认环境变量 + cmd.Env = append(cmd.Env, "CRAWLAB_TASK_ID="+taskId) + cmd.Env = append(cmd.Env, "CRAWLAB_COLLECTION="+dataCol) + cmd.Env = append(cmd.Env, "PYTHONUNBUFFERED=0") + cmd.Env = append(cmd.Env, "PYTHONIOENCODING=utf-8") + + //任务环境变量 + for _, env := range envs { + cmd.Env = append(cmd.Env, env.Name+"="+env.Value) + } + + // TODO 全局环境变量 + return cmd +} + +func SetLogConfig(cmd *exec.Cmd, path string) error { + fLog, err := os.Create(path) + if err != nil { + log.Errorf("create task log file error: %s", path) + debug.PrintStack() + return err + } + defer fLog.Close() + cmd.Stdout = fLog + cmd.Stderr = fLog + return nil +} + +func FinishOrCancelTask(ch chan string, cmd *exec.Cmd, t model.Task) { + // 传入信号,此处阻塞 + signal := <-ch + log.Infof("process received signal: %s", signal) + + if signal == constants.TaskCancel && cmd.Process != nil { + // 取消进程 + if err := syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL); err != nil { + log.Errorf("process kill error: %s", err.Error()) + debug.PrintStack() + + t.Error = "kill process error: " + err.Error() + t.Status = constants.StatusError + } else { + t.Error = "user kill the process ..." + t.Status = constants.StatusCancelled + } + } else { + // 保存任务 + t.Status = constants.StatusFinished + } + + t.FinishTs = time.Now() + _ = t.Save() +} + +func StartTaskProcess(cmd *exec.Cmd, t model.Task) error { + if err := cmd.Start(); err != nil { + log.Errorf("start spider error:{}", err.Error()) + debug.PrintStack() + + t.Error = "start task error: " + err.Error() + t.Status = constants.StatusError + t.FinishTs = time.Now() + _ = t.Save() + return err + } + return nil +} + +func WaitTaskProcess(cmd *exec.Cmd, t model.Task) error { + if err := cmd.Wait(); err != nil { + log.Errorf("wait process finish error: %s", err.Error()) + debug.PrintStack() + + if exitError, ok := err.(*exec.ExitError); ok { + exitCode := exitError.ExitCode() + log.Errorf("exit error, exit code: %d", exitCode) + + // 非kill 的错误类型 + if exitCode != -1 { + // 非手动kill保存为错误状态 + t.Error = err.Error() + t.FinishTs = time.Now() + t.Status = constants.StatusError + _ = t.Save() + } + } + + return err + } + return nil +} + // 执行shell命令 func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider) (err error) { - log.Infof("cwd: " + cwd) - log.Infof("cmd: " + cmdStr) + log.Infof("cwd: %s", cwd) + log.Infof("cmd: %s", cmdStr) // 生成执行命令 var cmd *exec.Cmd @@ -116,84 +210,29 @@ func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider) (e // 工作目录 cmd.Dir = cwd - // 指定stdout, stderr日志位置 - fLog, err := os.Create(t.LogPath) - if err != nil { - HandleTaskError(t, err) + // 日志配置 + if err := SetLogConfig(cmd, t.LogPath); err != nil { return err } - defer fLog.Close() - cmd.Stdout = fLog - cmd.Stderr = fLog - // 添加默认环境变量 - cmd.Env = append(cmd.Env, "CRAWLAB_TASK_ID="+t.Id) - cmd.Env = append(cmd.Env, "CRAWLAB_COLLECTION="+s.Col) - - // 添加任务环境变量 - for _, env := range s.Envs { - cmd.Env = append(cmd.Env, env.Name+"="+env.Value) - } + // 环境变量配置 + cmd = SetEnv(cmd, s.Envs, t.Id, s.Col) // 起一个goroutine来监控进程 ch := utils.TaskExecChanMap.ChanBlocked(t.Id) - go func() { - // 传入信号,此处阻塞 - signal := <-ch - log.Infof("cancel process signal: %s", signal) - if signal == constants.TaskCancel && cmd.Process != nil { - // 取消进程 - if err := syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL); err != nil { - log.Errorf("process kill error: %s", err.Error()) - debug.PrintStack() - } - t.Status = constants.StatusCancelled - t.Error = "user kill the process ..." - } else { - // 保存任务 - t.Status = constants.StatusFinished - } - t.FinishTs = time.Now() - if err := t.Save(); err != nil { - log.Infof("save task error: %s", err.Error()) - debug.PrintStack() - return - } - }() - // 在选择所有节点执行的时候,实际就是随机一个节点执行的, + go FinishOrCancelTask(ch, cmd, t) + + // kill的时候,可以kill所有的子进程 cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} - // 异步启动进程 - if err := cmd.Start(); err != nil { - log.Errorf("start spider error:{}", err.Error()) - debug.PrintStack() + // 启动进程 + if err := StartTaskProcess(cmd, t); err != nil { return err } - // 保存pid到task - t.Pid = cmd.Process.Pid - if err := t.Save(); err != nil { - log.Errorf("save task pid error: %s", err.Error()) - debug.PrintStack() - return err - } // 同步等待进程完成 - if err := cmd.Wait(); err != nil { - log.Errorf("wait process finish error: %s", err.Error()) - debug.PrintStack() - if exitError, ok := err.(*exec.ExitError); ok { - exitCode := exitError.ExitCode() - log.Errorf("exit error, exit code: %d", exitCode) - // 非kill 的错误类型 - if exitCode != -1 { - // 非手动kill保存为错误状态 - t.Error = err.Error() - t.FinishTs = time.Now() - t.Status = constants.StatusError - _ = t.Save() - } - } + if err := WaitTaskProcess(cmd, t); err != nil { return err } ch <- constants.TaskFinish From f92d2d426643e873a894b2357b060df2c2c7ff76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Wed, 16 Oct 2019 09:45:50 +0800 Subject: [PATCH 14/24] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/services/task.go | 49 +++++++++++++--------------------------- 1 file changed, 16 insertions(+), 33 deletions(-) diff --git a/backend/services/task.go b/backend/services/task.go index b79dbe7a..50b902cb 100644 --- a/backend/services/task.go +++ b/backend/services/task.go @@ -311,70 +311,53 @@ func ExecuteTask(id int) { // 获取当前节点 node, err := model.GetCurrentNode() if err != nil { - log.Errorf(GetWorkerPrefix(id) + err.Error()) + log.Errorf("execute task get current node error: %s", err.Error()) + debug.PrintStack() return } - // 公共队列 - queuePub := "tasks:public" - // 节点队列 queueCur := "tasks:node:" + node.Id.Hex() - // 节点队列任务 var msg string - msg, err = database.RedisClient.LPop(queueCur) - if msg != "" { - log.Infof("queue cur: %s", msg) - } - if err != nil { - if msg == "" { - // 节点队列没有任务,获取公共队列任务 - msg, err = database.RedisClient.LPop(queuePub) - if err != nil { - if msg == "" { - // 公共队列没有任务 - log.Debugf(GetWorkerPrefix(id) + "没有任务...") - return - } else { - log.Errorf(GetWorkerPrefix(id) + err.Error()) - debug.PrintStack() - return - } - } - } else { - log.Errorf(GetWorkerPrefix(id) + err.Error()) - debug.PrintStack() - return + if msg, err = database.RedisClient.LPop(queueCur); err != nil { + log.Errorf("get current node task error: %s", err.Error()) + // 节点队列没有任务,获取公共队列任务 + queuePub := "tasks:public" + if msg, err = database.RedisClient.LPop(queuePub); err != nil { + log.Errorf("get public task error: %s", err.Error()) } } + if msg == "" { + return + } + // 反序列化 tMsg := TaskMessage{} if err := json.Unmarshal([]byte(msg), &tMsg); err != nil { - log.Errorf(GetWorkerPrefix(id) + err.Error()) - debug.PrintStack() + log.Errorf("json string to struct error: %s", err.Error()) return } // 获取任务 t, err := model.GetTask(tMsg.Id) if err != nil { - log.Errorf(GetWorkerPrefix(id) + err.Error()) + log.Errorf("execute task, get task error: %s", err.Error()) return } // 获取爬虫 spider, err := t.GetSpider() if err != nil { - log.Errorf(GetWorkerPrefix(id) + err.Error()) + log.Errorf("execute task, get spider error: %s", err.Error()) return } // 创建日志目录 fileDir, err := MakeLogDir(t) if err != nil { - log.Errorf(GetWorkerPrefix(id) + err.Error()) + log.Errorf("execute task, make log dir error: %s", err.Error()) return } From 467796b55a5b84bb299fbad5b4e9d49a1269e0bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Wed, 16 Oct 2019 10:11:00 +0800 Subject: [PATCH 15/24] =?UTF-8?q?fix=20=E4=B8=80=E4=BA=9B=E4=B8=9C?= =?UTF-8?q?=E8=A5=BF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/services/task.go | 19 +++---------------- 1 file changed, 3 insertions(+), 16 deletions(-) diff --git a/backend/services/task.go b/backend/services/task.go index 50b902cb..78c11f17 100644 --- a/backend/services/task.go +++ b/backend/services/task.go @@ -124,7 +124,6 @@ func SetLogConfig(cmd *exec.Cmd, path string) error { debug.PrintStack() return err } - defer fLog.Close() cmd.Stdout = fLog cmd.Stderr = fLog return nil @@ -247,6 +246,7 @@ func MakeLogDir(t model.Task) (fileDir string, err error) { // 如果日志目录不存在,生成该目录 if !utils.Exists(fileDir) { if err := os.MkdirAll(fileDir, 0777); err != nil { + log.Errorf("execute task, make log dir error: %s", err.Error()) debug.PrintStack() return "", err } @@ -321,11 +321,9 @@ func ExecuteTask(id int) { // 节点队列任务 var msg string if msg, err = database.RedisClient.LPop(queueCur); err != nil { - log.Errorf("get current node task error: %s", err.Error()) // 节点队列没有任务,获取公共队列任务 queuePub := "tasks:public" if msg, err = database.RedisClient.LPop(queuePub); err != nil { - log.Errorf("get public task error: %s", err.Error()) } } @@ -355,24 +353,13 @@ func ExecuteTask(id int) { } // 创建日志目录 - fileDir, err := MakeLogDir(t) - if err != nil { - log.Errorf("execute task, make log dir error: %s", err.Error()) + var fileDir string + if fileDir, err = MakeLogDir(t); err != nil { return } - // 获取日志文件路径 t.LogPath = GetLogFilePaths(fileDir) - // 创建日志目录文件夹 - fileStdoutDir := filepath.Dir(t.LogPath) - if !utils.Exists(fileStdoutDir) { - if err := os.MkdirAll(fileStdoutDir, os.ModePerm); err != nil { - log.Errorf(GetWorkerPrefix(id) + err.Error()) - return - } - } - // 工作目录 cwd := filepath.Join( viper.GetString("spider.path"), From 3fc97e78d8cd5fd4c3b41c18dd61873b14572c59 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Wed, 16 Oct 2019 11:26:15 +0800 Subject: [PATCH 16/24] =?UTF-8?q?fix=20=E6=97=B6=E5=8C=BA=E7=9A=84?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/services/task.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/backend/services/task.go b/backend/services/task.go index 78c11f17..5bcb54e8 100644 --- a/backend/services/task.go +++ b/backend/services/task.go @@ -107,6 +107,7 @@ func SetEnv(cmd *exec.Cmd, envs []model.Env, taskId string, dataCol string) *exe cmd.Env = append(cmd.Env, "CRAWLAB_COLLECTION="+dataCol) cmd.Env = append(cmd.Env, "PYTHONUNBUFFERED=0") cmd.Env = append(cmd.Env, "PYTHONIOENCODING=utf-8") + cmd.Env = append(cmd.Env, "TZ=Asia/Shanghai") //任务环境变量 for _, env := range envs { @@ -203,6 +204,7 @@ func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider) (e if runtime.GOOS == constants.Windows { cmd = exec.Command("cmd", "/C", cmdStr) } else { + cmd = exec.Command("") cmd = exec.Command("sh", "-c", cmdStr) } From b3b2efd6112c32b51a83d0bc921447d6f29e69e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Wed, 16 Oct 2019 15:27:12 +0800 Subject: [PATCH 17/24] =?UTF-8?q?fix=20=E5=AE=9A=E6=97=B6=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E5=8F=82=E6=95=B0=E9=94=99=E8=AF=AF=E7=9A=84=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/model/node.go | 14 +++++++++----- backend/model/schedule.go | 31 ++++++++++++++++++++++++------- backend/routes/schedule.go | 22 ++++++++++++---------- backend/services/schedule.go | 17 ++++++++++++++++- 4 files changed, 61 insertions(+), 23 deletions(-) diff --git a/backend/model/node.go b/backend/model/node.go index 1a1ebce5..2beb9e1c 100644 --- a/backend/model/node.go +++ b/backend/model/node.go @@ -4,6 +4,7 @@ import ( "crawlab/constants" "crawlab/database" "crawlab/services/register" + "errors" "github.com/apex/log" "github.com/globalsign/mgo" "github.com/globalsign/mgo/bson" @@ -158,16 +159,19 @@ func GetNodeList(filter interface{}) ([]Node, error) { func GetNode(id bson.ObjectId) (Node, error) { var node Node + if id.Hex() == "" { - return node, nil + log.Infof("id is empty") + debug.PrintStack() + return node, errors.New("id is empty") } + s, c := database.GetCol("nodes") defer s.Close() + if err := c.FindId(id).One(&node); err != nil { - if err != mgo.ErrNotFound { - log.Errorf(err.Error()) - debug.PrintStack() - } + log.Errorf(err.Error()) + debug.PrintStack() return node, err } return node, nil diff --git a/backend/model/schedule.go b/backend/model/schedule.go index bcd051e3..36799ac3 100644 --- a/backend/model/schedule.go +++ b/backend/model/schedule.go @@ -45,6 +45,27 @@ func (sch *Schedule) Delete() error { return c.RemoveId(sch.Id) } +func (sch *Schedule) SyncNodeIdAndSpiderId(node Node, spider Spider) { + sch.syncNodeId(node) + sch.syncSpiderId(spider) +} + +func (sch *Schedule) syncNodeId(node Node) { + if node.Id.Hex() == sch.NodeId.Hex() { + return + } + sch.NodeId = node.Id + _ = sch.Save() +} + +func (sch *Schedule) syncSpiderId(spider Spider) { + if spider.Id.Hex() == sch.SpiderId.Hex() { + return + } + sch.SpiderId = spider.Id + _ = sch.Save() +} + func GetScheduleList(filter interface{}) ([]Schedule, error) { s, c := database.GetCol("schedules") defer s.Close() @@ -103,13 +124,11 @@ func UpdateSchedule(id bson.ObjectId, item Schedule) error { if err := c.FindId(id).One(&result); err != nil { return err } - node, err := GetNode(item.NodeId) if err != nil { - log.Errorf("get node error: %s", err.Error()) - debug.PrintStack() - return nil + return err } + item.NodeKey = node.Key if err := item.Save(); err != nil { return err @@ -123,9 +142,7 @@ func AddSchedule(item Schedule) error { node, err := GetNode(item.NodeId) if err != nil { - log.Errorf("get node error: %s", err.Error()) - debug.PrintStack() - return nil + return err } item.Id = bson.NewObjectId() diff --git a/backend/routes/schedule.go b/backend/routes/schedule.go index 24df0c0f..73b75323 100644 --- a/backend/routes/schedule.go +++ b/backend/routes/schedule.go @@ -45,13 +45,14 @@ func PostSchedule(c *gin.Context) { HandleError(http.StatusBadRequest, c, err) return } + + // 验证cron表达式 + if err := services.ParserCron(newItem.Cron); err != nil { + HandleError(http.StatusOK, c, err) + return + } + newItem.Id = bson.ObjectIdHex(id) - - // 如果node_id为空,则置为空ObjectId - //if newItem.NodeId == "" { - // newItem.NodeId = bson.ObjectIdHex(constants.ObjectIdNull) - //} - // 更新数据库 if err := model.UpdateSchedule(bson.ObjectIdHex(id), newItem); err != nil { HandleError(http.StatusInternalServerError, c, err) @@ -79,10 +80,11 @@ func PutSchedule(c *gin.Context) { return } - // 如果node_id为空,则置为空ObjectId - //if item.NodeId == "" { - // item.NodeId = bson.ObjectIdHex(constants.ObjectIdNull) - //} + // 验证cron表达式 + if err := services.ParserCron(item.Cron); err != nil { + HandleError(http.StatusOK, c, err) + return + } // 更新数据库 if err := model.AddSchedule(item); err != nil { diff --git a/backend/services/schedule.go b/backend/services/schedule.go index f011f02a..d4c1635b 100644 --- a/backend/services/schedule.go +++ b/backend/services/schedule.go @@ -5,7 +5,7 @@ import ( "crawlab/lib/cron" "crawlab/model" "github.com/apex/log" - uuid "github.com/satori/go.uuid" + "github.com/satori/go.uuid" "runtime/debug" ) @@ -31,6 +31,9 @@ func AddTask(s model.Schedule) func() { return } + // 同步ID到定时任务 + s.SyncNodeIdAndSpiderId(node, *spider) + // 生成任务ID id := uuid.NewV4() @@ -119,6 +122,18 @@ func (s *Scheduler) RemoveAll() { } } +// 验证cron表达式是否正确 +func ParserCron(spec string) error { + parser := cron.NewParser( + cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor, + ) + + if _, err := parser.Parse(spec); err != nil { + return err + } + return nil +} + func (s *Scheduler) Update() error { // 删除所有定时任务 s.RemoveAll() From 2e3ec18d676bd2d1ea4a0aac1553dd3d9e034049 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Wed, 16 Oct 2019 15:27:35 +0800 Subject: [PATCH 18/24] =?UTF-8?q?fix=20=E5=AE=9A=E6=97=B6=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- frontend/src/views/schedule/ScheduleList.vue | 101 ++++++++++--------- 1 file changed, 52 insertions(+), 49 deletions(-) diff --git a/frontend/src/views/schedule/ScheduleList.vue b/frontend/src/views/schedule/ScheduleList.vue index 4d283966..b170c9ed 100644 --- a/frontend/src/views/schedule/ScheduleList.vue +++ b/frontend/src/views/schedule/ScheduleList.vue @@ -14,7 +14,7 @@ - + - - - + + + + + + + + + + + - {{$t('schedules.add_cron')}} + + {{$t('Cancel')}} {{$t('Submit')}} @@ -76,9 +78,9 @@ - - - + + + @@ -131,7 +133,7 @@ diff --git a/frontend/src/views/spider/SpiderList.vue b/frontend/src/views/spider/SpiderList.vue index 8c97339a..eb1e548f 100644 --- a/frontend/src/views/spider/SpiderList.vue +++ b/frontend/src/views/spider/SpiderList.vue @@ -429,13 +429,6 @@ export default { } else { return false } - // if (this.isCustomized(row)) { - // // customized spider - // return !!row.cmd - // } else { - // // configurable spider - // return !!row.fields - // } }, isCustomized (row) { return row.type === 'customized' From 4ab4892471965d6342d30385578ca60dc51f8ad3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Wed, 16 Oct 2019 15:39:25 +0800 Subject: [PATCH 20/24] =?UTF-8?q?fix=20=E7=8E=AF=E5=A2=83=E5=8F=98?= =?UTF-8?q?=E9=87=8F=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/services/task.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/services/task.go b/backend/services/task.go index 5bcb54e8..03038613 100644 --- a/backend/services/task.go +++ b/backend/services/task.go @@ -103,7 +103,7 @@ func AssignTask(task model.Task) error { // 设置环境变量 func SetEnv(cmd *exec.Cmd, envs []model.Env, taskId string, dataCol string) *exec.Cmd { // 默认环境变量 - cmd.Env = append(cmd.Env, "CRAWLAB_TASK_ID="+taskId) + cmd.Env = append(os.Environ(), "CRAWLAB_TASK_ID="+taskId) cmd.Env = append(cmd.Env, "CRAWLAB_COLLECTION="+dataCol) cmd.Env = append(cmd.Env, "PYTHONUNBUFFERED=0") cmd.Env = append(cmd.Env, "PYTHONIOENCODING=utf-8") From 3d6fb33d3c7729cb9c3da0bcf36b419828283af9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Wed, 16 Oct 2019 16:00:01 +0800 Subject: [PATCH 21/24] =?UTF-8?q?fix=20=E6=97=A5=E5=BF=97=E4=B8=8D?= =?UTF-8?q?=E4=BC=9A=E8=87=AA=E5=8A=A8=E5=88=B7=E6=96=B0=E7=9A=84=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- frontend/src/views/task/TaskDetail.vue | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/frontend/src/views/task/TaskDetail.vue b/frontend/src/views/task/TaskDetail.vue index b4ec0652..c1bb4ff0 100644 --- a/frontend/src/views/task/TaskDetail.vue +++ b/frontend/src/views/task/TaskDetail.vue @@ -35,6 +35,7 @@ import { import TaskOverview from '../../components/Overview/TaskOverview' import GeneralTableView from '../../components/TableView/GeneralTableView' import LogView from '../../components/ScrollView/LogView' +import request from '../../api/request' export default { name: 'TaskDetail', @@ -46,12 +47,12 @@ export default { data () { return { activeTabName: 'overview', - handle: undefined + handle: undefined, + taskLog: '' } }, computed: { ...mapState('task', [ - 'taskLog', 'taskResultsData', 'taskResultsTotalCount' ]), @@ -97,18 +98,22 @@ export default { downloadCSV () { this.$store.dispatch('task/getTaskResultExcel', this.$route.params.id) this.$st.sendEv('任务详情-结果', '下载CSV') + }, + getTaskLog () { + if (this.$route.params.id) { + request.get(`/tasks/${this.$route.params.id}/log`).then(response => { + this.taskLog = response.data.data + }) + } } }, async created () { - await this.$store.dispatch('task/getTaskData', this.$route.params.id) - this.$store.dispatch('task/getTaskLog', this.$route.params.id) + this.$store.dispatch('task/getTaskData', this.$route.params.id) this.$store.dispatch('task/getTaskResults', this.$route.params.id) - if (this.taskForm && ['running'].includes(this.taskForm.status)) { - this.handle = setInterval(() => { - this.$store.dispatch('task/getTaskLog', this.$route.params.id) - }, 5000) - } + this.handle = setInterval(() => { + this.getTaskLog() + }, 5000) }, destroyed () { clearInterval(this.handle) From c028c4d1f3b13c3cbb0550cba42a487032b09f55 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Wed, 16 Oct 2019 16:00:36 +0800 Subject: [PATCH 22/24] =?UTF-8?q?fix=20=E6=97=A5=E5=BF=97=E4=B8=8D?= =?UTF-8?q?=E4=BC=9A=E8=87=AA=E5=8A=A8=E5=88=B7=E6=96=B0=E7=9A=84=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- frontend/src/views/task/TaskDetail.vue | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/frontend/src/views/task/TaskDetail.vue b/frontend/src/views/task/TaskDetail.vue index c1bb4ff0..5ba4ab4d 100644 --- a/frontend/src/views/task/TaskDetail.vue +++ b/frontend/src/views/task/TaskDetail.vue @@ -107,7 +107,7 @@ export default { } } }, - async created () { + created () { this.$store.dispatch('task/getTaskData', this.$route.params.id) this.$store.dispatch('task/getTaskResults', this.$route.params.id) From f9882cf0f516d99543ce3538272626795af2b4ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Wed, 16 Oct 2019 16:29:54 +0800 Subject: [PATCH 23/24] =?UTF-8?q?fix=20=E6=97=A5=E5=BF=97=E8=8E=B7?= =?UTF-8?q?=E5=8F=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- frontend/src/views/task/TaskDetail.vue | 1 + 1 file changed, 1 insertion(+) diff --git a/frontend/src/views/task/TaskDetail.vue b/frontend/src/views/task/TaskDetail.vue index 5ba4ab4d..d61394e8 100644 --- a/frontend/src/views/task/TaskDetail.vue +++ b/frontend/src/views/task/TaskDetail.vue @@ -111,6 +111,7 @@ export default { this.$store.dispatch('task/getTaskData', this.$route.params.id) this.$store.dispatch('task/getTaskResults', this.$route.params.id) + this.getTaskLog() this.handle = setInterval(() => { this.getTaskLog() }, 5000) From 418d728825d949ab7480bb2723d6b41a68f56f29 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Thu, 17 Oct 2019 17:54:19 +0800 Subject: [PATCH 24/24] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E6=97=A5=E5=BF=97?= =?UTF-8?q?=E8=8E=B7=E5=8F=96=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/model/log.go | 2 +- backend/services/log.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/model/log.go b/backend/model/log.go index ae6973b1..fc2cc79d 100644 --- a/backend/model/log.go +++ b/backend/model/log.go @@ -23,7 +23,7 @@ func GetLocalLog(logPath string) (fileBytes []byte, err error) { } defer f.Close() - const bufLen = 2 * 1024 * 1024 + const bufLen = 1 * 1024 * 1024 logBuf := make([]byte, bufLen) off := int64(0) diff --git a/backend/services/log.go b/backend/services/log.go index 81140c0a..47280fe5 100644 --- a/backend/services/log.go +++ b/backend/services/log.go @@ -50,7 +50,7 @@ func GetRemoteLog(task model.Task) (logStr string, err error) { case logStr = <-ch: log.Infof("get remote log") break - case <-time.After(5 * time.Second): + case <-time.After(30 * time.Second): logStr = "get remote log timeout" break }