From ce8fb8670089aea2e6939dd3aee55fd28e4021f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Sat, 24 Aug 2019 15:11:52 +0800 Subject: [PATCH 01/46] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E6=B3=A8=E9=87=8A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/services/node.go | 1 + 1 file changed, 1 insertion(+) diff --git a/backend/services/node.go b/backend/services/node.go index 1fa2370c..09b49dbf 100644 --- a/backend/services/node.go +++ b/backend/services/node.go @@ -124,6 +124,7 @@ func IsMaster() bool { return viper.GetString("server.master") == Yes } +// 所有调用IsMasterNode的方法,都永远会在master节点执行,所以GetCurrentNode方法返回永远是master节点 // 该ID的节点是否为主节点 func IsMasterNode(id string) bool { curNode, _ := GetCurrentNode() From 1507e6724b26985dbffafdd5fbea9fe9b37bc342 Mon Sep 17 00:00:00 2001 From: hantmac Date: Mon, 26 Aug 2019 10:38:40 +0800 Subject: [PATCH 02/46] hot fix:fix redis lose connection problem --- backend/database/pubsub.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/database/pubsub.go b/backend/database/pubsub.go index b100535f..01e52fa1 100644 --- a/backend/database/pubsub.go +++ b/backend/database/pubsub.go @@ -46,12 +46,12 @@ func (c *Subscriber) Connect() { panic(errors.New("redis connection failed too many times, panic")) } con, err := GetRedisConn() + i += 1 if err != nil { log.Error("redis dial failed") continue } c.client = redis.PubSubConn{Conn: con} - i += 1 continue } From 007f10b83bb924e31b71219cd9aa9a1baae1ce24 Mon Sep 17 00:00:00 2001 From: hantmac Date: Mon, 26 Aug 2019 17:44:01 +0800 Subject: [PATCH 03/46] bug fix:fix permission bug when create spiders dir caused by umask --- backend/services/spider.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/backend/services/spider.go b/backend/services/spider.go index f4f856e6..4d32594b 100644 --- a/backend/services/spider.go +++ b/backend/services/spider.go @@ -39,6 +39,8 @@ func GetSpidersFromDir() ([]model.Spider, error) { // 如果爬虫项目目录不存在,则创建一个 if !utils.Exists(srcPath) { + mask := syscall.Umask(0) // 改为 0000 八进制 + defer syscall.Umask(mask) // 改为原来的 umask if err := os.MkdirAll(srcPath, 0666); err != nil { debug.PrintStack() return []model.Spider{}, err From 5e707adad4451591a89ad49f5e36003f754fbfca Mon Sep 17 00:00:00 2001 From: hantmac Date: Mon, 26 Aug 2019 21:10:55 +0800 Subject: [PATCH 04/46] add unit test for routes/spider.go --- backend/mock/spider.go | 179 +++++++++++++++++++++++++++++++++++- backend/mock/spider_test.go | 137 +++++++++++++++++++++++++++ 2 files changed, 315 insertions(+), 1 deletion(-) create mode 100644 backend/mock/spider_test.go diff --git a/backend/mock/spider.go b/backend/mock/spider.go index c4807247..ef3e6104 100644 --- a/backend/mock/spider.go +++ b/backend/mock/spider.go @@ -1 +1,178 @@ -package mock \ No newline at end of file +package mock + +import ( + "crawlab/model" + "github.com/apex/log" + "github.com/gin-gonic/gin" + "github.com/globalsign/mgo/bson" + "io/ioutil" + "net/http" + "os" + "path/filepath" + "time" +) + +var SpiderList = []model.Spider{ + { + Id: bson.ObjectId("5d429e6c19f7abede924fee2"), + Name: "For test", + DisplayName: "test", + Type: "test", + Col: "test", + Site: "www.baidu.com", + Envs: nil, + Src: "../app/spiders", + Cmd: "scrapy crawl test", + LastRunTs: time.Now(), + CreateTs: time.Now(), + UpdateTs: time.Now(), + }, +} + +func GetSpiderList(c *gin.Context) { + + // mock get spider list from database + results := SpiderList + + c.JSON(http.StatusOK, Response{ + Status: "ok", + Message: "success", + Data: results, + }) +} + +func GetSpider(c *gin.Context) { + id := c.Param("id") + var result model.Spider + + if !bson.IsObjectIdHex(id) { + HandleErrorF(http.StatusBadRequest, c, "invalid id") + } + + for _, spider := range SpiderList { + if spider.Id == bson.ObjectId(id) { + result = spider + } + } + + c.JSON(http.StatusOK, Response{ + Status: "ok", + Message: "success", + Data: result, + }) +} + +func PostSpider(c *gin.Context) { + id := c.Param("id") + + if !bson.IsObjectIdHex(id) { + HandleErrorF(http.StatusBadRequest, c, "invalid id") + } + + var item model.Spider + if err := c.ShouldBindJSON(&item); err != nil { + HandleError(http.StatusBadRequest, c, err) + return + } + + log.Info("modify the item") + + c.JSON(http.StatusOK, Response{ + Status: "ok", + Message: "success", + }) +} +func GetSpiderDir(c *gin.Context) { + // 爬虫ID + id := c.Param("id") + + // 目录相对路径 + path := c.Query("path") + var spi model.Spider + + // 获取爬虫 + for _, spider := range SpiderList { + if spider.Id == bson.ObjectId(id) { + spi = spider + } + } + + // 获取目录下文件列表 + f, err := ioutil.ReadDir(filepath.Join(spi.Src, path)) + if err != nil { + HandleError(http.StatusInternalServerError, c, err) + return + } + + // 遍历文件列表 + var fileList []model.File + for _, file := range f { + fileList = append(fileList, model.File{ + Name: file.Name(), + IsDir: file.IsDir(), + Size: file.Size(), + Path: filepath.Join(path, file.Name()), + }) + } + + // 返回结果 + c.JSON(http.StatusOK, Response{ + Status: "ok", + Message: "success", + Data: fileList, + }) +} + +func GetSpiderTasks(c *gin.Context) { + id := c.Param("id") + + var spider model.Spider + for _, spi := range SpiderList { + if spi.Id == bson.ObjectId(id) { + spider = spi + } + } + + var tasks model.Task + for _, task := range TaskList { + if task.SpiderId == spider.Id { + tasks = task + } + } + + c.JSON(http.StatusOK, Response{ + Status: "ok", + Message: "success", + Data: tasks, + }) +} + +func DeleteSpider(c *gin.Context) { + id := c.Param("id") + + if !bson.IsObjectIdHex(id) { + HandleErrorF(http.StatusBadRequest, c, "invalid id") + return + } + + // 获取该爬虫,get this spider + var spider model.Spider + for _, spi := range SpiderList { + if spi.Id == bson.ObjectId(id) { + spider = spi + } + } + + // 删除爬虫文件目录,delete the spider dir + if err := os.RemoveAll(spider.Src); err != nil { + HandleError(http.StatusInternalServerError, c, err) + return + } + + // 从数据库中删除该爬虫,delete this spider from database + + c.JSON(http.StatusOK, Response{ + Status: "ok", + Message: "success", + }) +} diff --git a/backend/mock/spider_test.go b/backend/mock/spider_test.go new file mode 100644 index 00000000..87634ff7 --- /dev/null +++ b/backend/mock/spider_test.go @@ -0,0 +1,137 @@ +package mock + +import ( + "crawlab/model" + "encoding/json" + "github.com/globalsign/mgo/bson" + . "github.com/smartystreets/goconvey/convey" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" +) + +func TestGetSpiderList(t *testing.T) { + var resp Response + w := httptest.NewRecorder() + req, _ := http.NewRequest("GET", "/spiders", nil) + app.ServeHTTP(w, req) + err := json.Unmarshal([]byte(w.Body.String()), &resp) + if err != nil { + t.Fatal("unmarshal resp faild") + } + Convey("Test API GetSpiderList", t, func() { + Convey("Test response status", func() { + So(resp.Status, ShouldEqual, "ok") + So(resp.Message, ShouldEqual, "success") + }) + }) +} + +func TestGetSpider(t *testing.T) { + var resp Response + var spiderId = "5d429e6c19f7abede924fee2" + w := httptest.NewRecorder() + req, _ := http.NewRequest("GET", "/spiders/"+spiderId, nil) + app.ServeHTTP(w, req) + err := json.Unmarshal([]byte(w.Body.String()), &resp) + if err != nil { + t.Fatal("unmarshal resp failed") + } + Convey("Test API GetSpider", t, func() { + Convey("Test response status", func() { + So(resp.Status, ShouldEqual, "ok") + So(resp.Message, ShouldEqual, "success") + }) + }) +} + +func TestPostSpider(t *testing.T) { + var spider = model.Spider{ + Id: bson.ObjectIdHex("5d429e6c19f7abede924fee2"), + Name: "For test", + DisplayName: "test", + Type: "test", + Col: "test", + Site: "www.baidu.com", + Envs: nil, + Src: "/app/spider", + Cmd: "scrapy crawl test", + LastRunTs: time.Now(), + CreateTs: time.Now(), + UpdateTs: time.Now(), + } + var resp Response + var spiderId = "5d429e6c19f7abede924fee2" + w := httptest.NewRecorder() + body, _ := json.Marshal(spider) + req, _ := http.NewRequest("POST", "/spiders/"+spiderId, strings.NewReader(string(body))) + app.ServeHTTP(w, req) + err := json.Unmarshal([]byte(w.Body.String()), &resp) + if err != nil { + t.Fatal("unmarshal resp failed") + } + Convey("Test API PostSpider", t, func() { + Convey("Test response status", func() { + So(resp.Status, ShouldEqual, "ok") + So(resp.Message, ShouldEqual, "success") + }) + }) + +} + +func TestGetSpiderDir(t *testing.T) { + var spiderId = "5d429e6c19f7abede924fee2" + var resp Response + w := httptest.NewRecorder() + req, _ := http.NewRequest("GET", "/spiders/"+spiderId+"/dir", nil) + app.ServeHTTP(w, req) + err := json.Unmarshal([]byte(w.Body.String()), &resp) + if err != nil { + t.Fatal("unmarshal resp failed") + } + Convey("Test API GetSpiderDir", t, func() { + Convey("Test response status", func() { + So(resp.Status, ShouldEqual, "ok") + So(resp.Message, ShouldEqual, "success") + }) + }) + +} + +func TestGetSpiderTasks(t *testing.T) { + var spiderId = "5d429e6c19f7abede924fee2" + var resp Response + w := httptest.NewRecorder() + req, _ := http.NewRequest("GET", "/spiders/"+spiderId+"/tasks", nil) + app.ServeHTTP(w, req) + err := json.Unmarshal([]byte(w.Body.String()), &resp) + if err != nil { + t.Fatal("unmarshal resp failed") + } + Convey("Test API GetSpiderTasks", t, func() { + Convey("Test response status", func() { + So(resp.Status, ShouldEqual, "ok") + So(resp.Message, ShouldEqual, "success") + }) + }) +} + +func TestDeleteSpider(t *testing.T) { + var spiderId = "5d429e6c19f7abede924fee2" + var resp Response + w := httptest.NewRecorder() + req, _ := http.NewRequest("DELETE", "/spiders/"+spiderId, nil) + app.ServeHTTP(w, req) + err := json.Unmarshal([]byte(w.Body.String()), &resp) + if err != nil { + t.Fatal("unmarshal resp failed") + } + Convey("Test API DeleteSpider", t, func() { + Convey("Test response status", func() { + So(resp.Status, ShouldEqual, "ok") + So(resp.Message, ShouldEqual, "success") + }) + }) +} \ No newline at end of file From ba0471355b993fe2176ef035cbdd6170a3cdc5f4 Mon Sep 17 00:00:00 2001 From: yaziming Date: Mon, 26 Aug 2019 23:13:26 +0800 Subject: [PATCH 05/46] JSON -> AbortWithStatusJSON --- backend/routes/utils.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/routes/utils.go b/backend/routes/utils.go index 14c5853e..2b45c5f1 100644 --- a/backend/routes/utils.go +++ b/backend/routes/utils.go @@ -7,7 +7,7 @@ import ( func HandleError(statusCode int, c *gin.Context, err error) { debug.PrintStack() - c.JSON(statusCode, Response{ + c.AbortWithStatusJSON(statusCode, Response{ Status: "ok", Message: "error", Error: err.Error(), @@ -16,7 +16,7 @@ func HandleError(statusCode int, c *gin.Context, err error) { func HandleErrorF(statusCode int, c *gin.Context, err string) { debug.PrintStack() - c.JSON(statusCode, Response{ + c.AbortWithStatusJSON(statusCode, Response{ Status: "ok", Message: "error", Error: err, From c7e137a6aa04f627db0a5c98e51a09c793c3b23e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Tue, 27 Aug 2019 09:39:16 +0800 Subject: [PATCH 06/46] =?UTF-8?q?=E5=88=A0=E9=99=A4=E7=88=AC=E8=99=AB?= =?UTF-8?q?=E9=A1=BA=E5=B8=A6=E5=88=A0=E9=99=A4=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/model/spider.go | 11 +++++++++-- backend/routes/utils.go | 2 ++ backend/services/spider.go | 2 +- backend/services/task.go | 1 + 4 files changed, 13 insertions(+), 3 deletions(-) diff --git a/backend/model/spider.go b/backend/model/spider.go index c4c94edf..65782fe8 100644 --- a/backend/model/spider.go +++ b/backend/model/spider.go @@ -92,8 +92,6 @@ func (spider *Spider) GetLastTask() (Task, error) { return tasks[0], nil } - - func GetSpiderList(filter interface{}, skip int, limit int) ([]Spider, error) { s, c := database.GetCol("spiders") defer s.Close() @@ -165,6 +163,15 @@ func RemoveSpider(id bson.ObjectId) error { return err } + // gf上的文件 + s, gf := database.GetGridFs("files") + defer s.Close() + + if err := gf.RemoveId(result.FileId); err != nil { + log.Error("remove file error, id:" + result.FileId.Hex()) + return err + } + return nil } diff --git a/backend/routes/utils.go b/backend/routes/utils.go index 14c5853e..8044ffc9 100644 --- a/backend/routes/utils.go +++ b/backend/routes/utils.go @@ -1,11 +1,13 @@ package routes import ( + "github.com/apex/log" "github.com/gin-gonic/gin" "runtime/debug" ) func HandleError(statusCode int, c *gin.Context, err error) { + log.Errorf("handle error:" + err.Error()) debug.PrintStack() c.JSON(statusCode, Response{ Status: "ok", diff --git a/backend/services/spider.go b/backend/services/spider.go index f4f856e6..5c5c6e6e 100644 --- a/backend/services/spider.go +++ b/backend/services/spider.go @@ -320,7 +320,7 @@ func OnFileUpload(channel string, msgStr string) { // 从GridFS获取该文件 f, err := gf.OpenId(bson.ObjectIdHex(msg.FileId)) if err != nil { - log.Errorf(err.Error()) + log.Errorf("open file id" + msg.FileId + ", error: " + err.Error()) debug.PrintStack() return } diff --git a/backend/services/task.go b/backend/services/task.go index 8c0ff8a1..8c3c3407 100644 --- a/backend/services/task.go +++ b/backend/services/task.go @@ -472,6 +472,7 @@ func CancelTask(id string) (err error) { } func HandleTaskError(t model.Task, err error) { + log.Error("handle task error:" + err.Error()) t.Status = constants.StatusError t.Error = err.Error() t.FinishTs = time.Now() From 36eaad64a5c8c48db9d9294f804949c87c98c48a Mon Sep 17 00:00:00 2001 From: hantmac Date: Tue, 27 Aug 2019 14:21:39 +0800 Subject: [PATCH 07/46] bug fix: fix test failed for config.go --- backend/config/config_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/config/config_test.go b/backend/config/config_test.go index ee966877..0068e6ad 100644 --- a/backend/config/config_test.go +++ b/backend/config/config_test.go @@ -7,7 +7,7 @@ import ( func TestInitConfig(t *testing.T) { Convey("Test InitConfig func", t, func() { - x := InitConfig("") + x := InitConfig("../conf/config.yml") Convey("The value should be nil", func() { So(x, ShouldEqual, nil) From cae11e37960e26d68c7876662e9357df12e0d509 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Wed, 28 Aug 2019 16:00:09 +0800 Subject: [PATCH 08/46] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E6=89=93=E5=8D=B0?= =?UTF-8?q?=E6=97=A5=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/services/spider.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/backend/services/spider.go b/backend/services/spider.go index 5c5c6e6e..ce416a79 100644 --- a/backend/services/spider.go +++ b/backend/services/spider.go @@ -295,6 +295,7 @@ func PublishSpider(spider model.Spider) (err error) { return } channel := "files:upload" + log.Info("publish files.upload event, file id:" + msg.FileId) if err = database.Publish(channel, string(msgStr)); err != nil { log.Errorf(err.Error()) debug.PrintStack() @@ -306,6 +307,7 @@ func PublishSpider(spider model.Spider) (err error) { // 上传爬虫回调 func OnFileUpload(channel string, msgStr string) { + log.Info("received files.upload event, msgStr:" + msgStr) s, gf := database.GetGridFs("files") defer s.Close() @@ -320,7 +322,7 @@ func OnFileUpload(channel string, msgStr string) { // 从GridFS获取该文件 f, err := gf.OpenId(bson.ObjectIdHex(msg.FileId)) if err != nil { - log.Errorf("open file id" + msg.FileId + ", error: " + err.Error()) + log.Errorf("open file id: " + msg.FileId + ", error: " + err.Error()) debug.PrintStack() return } From 1b29a27ab0b163f8a3e0abac4c3fafd64f25cd3d Mon Sep 17 00:00:00 2001 From: hantmac Date: Fri, 30 Aug 2019 17:58:58 +0800 Subject: [PATCH 09/46] bug fix: hot fix out of memory problem caused by log read --- backend/services/log.go | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/backend/services/log.go b/backend/services/log.go index d59e463e..b3eec028 100644 --- a/backend/services/log.go +++ b/backend/services/log.go @@ -7,7 +7,7 @@ import ( "crawlab/utils" "encoding/json" "github.com/apex/log" - "io/ioutil" + "os" "runtime/debug" ) @@ -16,13 +16,28 @@ var TaskLogChanMap = utils.NewChanMap() // 获取本地日志 func GetLocalLog(logPath string) (fileBytes []byte, err error) { - fileBytes, err = ioutil.ReadFile(logPath) + + f, err := os.Open(logPath) if err != nil { - log.Errorf(err.Error()) + log.Error(err.Error()) debug.PrintStack() - return fileBytes, err + return nil, err } - return fileBytes, nil + fi, err := f.Stat() + if err != nil { + log.Error(err.Error()) + debug.PrintStack() + return nil, err + } + defer f.Close() + logBuf := make([]byte, 2048) + n, err := f.ReadAt(logBuf, fi.Size()-int64(len(logBuf))) + if err != nil { + log.Error(err.Error()) + debug.PrintStack() + } + logBuf = logBuf[:n] + return logBuf, nil } // 获取远端日志 From 782e5694e6c9a3e991d7345c7226f4c7dada7a2b Mon Sep 17 00:00:00 2001 From: hantmac Date: Fri, 30 Aug 2019 18:01:22 +0800 Subject: [PATCH 10/46] bug fix: hot fix out of memory problem caused by log read --- backend/services/log.go | 1 + 1 file changed, 1 insertion(+) diff --git a/backend/services/log.go b/backend/services/log.go index b3eec028..9e96be03 100644 --- a/backend/services/log.go +++ b/backend/services/log.go @@ -35,6 +35,7 @@ func GetLocalLog(logPath string) (fileBytes []byte, err error) { if err != nil { log.Error(err.Error()) debug.PrintStack() + return nil, err } logBuf = logBuf[:n] return logBuf, nil From 369ef02fc71d2a0d63f54efdd0fcbf4b1a976fd4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Sat, 31 Aug 2019 10:08:37 +0800 Subject: [PATCH 11/46] fix #184 --- frontend/src/components/InfoView/TaskInfoView.vue | 6 +++--- frontend/src/views/task/TaskDetail.vue | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/frontend/src/components/InfoView/TaskInfoView.vue b/frontend/src/components/InfoView/TaskInfoView.vue index bfe6419a..e902e959 100644 --- a/frontend/src/components/InfoView/TaskInfoView.vue +++ b/frontend/src/components/InfoView/TaskInfoView.vue @@ -86,15 +86,15 @@ export default { return dayjs(str).format('YYYY-MM-DD HH:mm:ss') }, getWaitDuration (row) { - if (row.start_ts.match('^0001')) return 'NA' + if (!row.start_ts || row.start_ts.match('^0001')) return 'NA' return dayjs(row.start_ts).diff(row.create_ts, 'second') }, getRuntimeDuration (row) { - if (row.finish_ts.match('^0001')) return 'NA' + if (!row.finish_ts || row.finish_ts.match('^0001')) return 'NA' return dayjs(row.finish_ts).diff(row.start_ts, 'second') }, getTotalDuration (row) { - if (row.finish_ts.match('^0001')) return 'NA' + if (!row.finish_ts || row.finish_ts.match('^0001')) return 'NA' return dayjs(row.finish_ts).diff(row.create_ts, 'second') } } diff --git a/frontend/src/views/task/TaskDetail.vue b/frontend/src/views/task/TaskDetail.vue index a1edb497..89309ea8 100644 --- a/frontend/src/views/task/TaskDetail.vue +++ b/frontend/src/views/task/TaskDetail.vue @@ -99,12 +99,12 @@ export default { this.$st.sendEv('任务详情-结果', '下载CSV') } }, - created () { - this.$store.dispatch('task/getTaskData', this.$route.params.id) + async created () { + await this.$store.dispatch('task/getTaskData', this.$route.params.id) this.$store.dispatch('task/getTaskLog', this.$route.params.id) this.$store.dispatch('task/getTaskResults', this.$route.params.id) - if (['running'].includes(this.taskForm.status)) { + if (this.taskForm && ['running'].includes(this.taskForm.status)) { this.handle = setInterval(() => { this.$store.dispatch('task/getTaskLog', this.$route.params.id) }, 5000) From 8b50e12681f214d7941c2d1e1b95e02e47889028 Mon Sep 17 00:00:00 2001 From: hantmac Date: Sat, 31 Aug 2019 11:28:16 +0800 Subject: [PATCH 12/46] add option for config.yml to config log delete periodically --- backend/conf/config.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/backend/conf/config.yml b/backend/conf/config.yml index f1042ca6..3805762a 100644 --- a/backend/conf/config.yml +++ b/backend/conf/config.yml @@ -15,6 +15,8 @@ redis: log: level: info path: "/var/logs/crawlab" + isDeletePeriodically: "Y" + deleteFrequency: "@hourly" server: host: 0.0.0.0 port: 8000 From 5720f3f277a5daeb069c8e85203792e59a2d5def Mon Sep 17 00:00:00 2001 From: hantmac Date: Sat, 31 Aug 2019 11:29:24 +0800 Subject: [PATCH 13/46] Add delete log files periodically --- backend/services/log.go | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/backend/services/log.go b/backend/services/log.go index 9e96be03..c6e4f090 100644 --- a/backend/services/log.go +++ b/backend/services/log.go @@ -3,11 +3,15 @@ package services import ( "crawlab/constants" "crawlab/database" + "crawlab/lib/cron" "crawlab/model" "crawlab/utils" "encoding/json" "github.com/apex/log" + "github.com/spf13/viper" + "io/ioutil" "os" + "path/filepath" "runtime/debug" ) @@ -71,3 +75,36 @@ func GetRemoteLog(task model.Task) (logStr string, err error) { return logStr, nil } + +func DeleteLogPeriodically() { + logDir := viper.GetString("log.path") + if !utils.Exists(logDir) { + log.Error("Can Not Set Delete Logs Periodically,No Log Dir") + return + } + rd, err := ioutil.ReadDir(logDir) + if err != nil { + log.Error("Read Log Dir Failed") + return + } + + for _, fi := range rd { + if fi.IsDir() { + log.Info(filepath.Join(logDir, fi.Name())) + os.RemoveAll(filepath.Join(logDir, fi.Name())) + log.Info("Delete Log File Success") + } + } + +} + +func InitDeleteLogPeriodically() error { + c := cron.New(cron.WithSeconds()) + if _, err := c.AddFunc(viper.GetString("log.deleteFrequency"), DeleteLogPeriodically); err != nil { + return err + } + + c.Start() + return nil + +} From f4a5dfa50309c58ba6bb12715b872da4cac32ba1 Mon Sep 17 00:00:00 2001 From: hantmac Date: Sat, 31 Aug 2019 11:30:01 +0800 Subject: [PATCH 14/46] Init delete log files periodically --- backend/main.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/backend/main.go b/backend/main.go index 489a17ce..f8442c1d 100644 --- a/backend/main.go +++ b/backend/main.go @@ -29,6 +29,15 @@ func main() { } log.Info("初始化日志设置成功") + if viper.GetString("log.isDeletePeriodically") == "Y" { + err := services.InitDeleteLogPeriodically() + if err != nil { + log.Error("Init DeletePeriodically Failed") + panic(err) + } + log.Info("初始化定期清理日志配置成功") + } + // 初始化Mongodb数据库 if err := database.InitMongo(); err != nil { log.Error("init mongodb error:" + err.Error()) From 39a2a3d63566b5140c3208b3149337c25a9ea5a2 Mon Sep 17 00:00:00 2001 From: hantmac Date: Sat, 31 Aug 2019 11:30:41 +0800 Subject: [PATCH 15/46] Add unit test for log.go --- backend/services/log_test.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 backend/services/log_test.go diff --git a/backend/services/log_test.go b/backend/services/log_test.go new file mode 100644 index 00000000..0a52747c --- /dev/null +++ b/backend/services/log_test.go @@ -0,0 +1,22 @@ +package services + +import ( + "crawlab/config" + "github.com/apex/log" + . "github.com/smartystreets/goconvey/convey" + "github.com/spf13/viper" + "testing" +) + +func TestDeleteLogPeriodically(t *testing.T) { + Convey("Test DeleteLogPeriodically", t, func() { + if err := config.InitConfig("../conf/config.yml"); err != nil { + log.Error("init config error:" + err.Error()) + panic(err) + } + log.Info("初始化配置成功") + logDir := viper.GetString("log.path") + log.Info(logDir) + DeleteLogPeriodically() + }) +} From 771cb762770ecc221e64a98d737aeaf0e5cd6a0f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Sat, 31 Aug 2019 12:04:12 +0800 Subject: [PATCH 16/46] =?UTF-8?q?fix=20=E5=89=8D=E7=AB=AF=E6=8E=A7?= =?UTF-8?q?=E5=88=B6=E5=8F=B0=E6=8A=A5=E9=94=99=E7=9A=84=E9=97=AE=E9=A2=98?= =?UTF-8?q?=20\=20fix=20=E6=97=A0=E6=B3=95=E6=89=93=E5=8D=B0=E4=B8=AD?= =?UTF-8?q?=E6=96=87=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/services/spider.go | 4 ++++ docker/Dockerfile.master.alpine | 2 +- docker/Dockerfile.worker.alpine | 2 +- frontend/src/components/InfoView/NodeInfoView.vue | 2 +- frontend/src/views/layout/components/Navbar.vue | 1 + 5 files changed, 8 insertions(+), 3 deletions(-) diff --git a/backend/services/spider.go b/backend/services/spider.go index 1397e335..ad0c0ae5 100644 --- a/backend/services/spider.go +++ b/backend/services/spider.go @@ -20,6 +20,7 @@ import ( "path/filepath" "runtime/debug" "strings" + "syscall" ) type SpiderFileData struct { @@ -133,6 +134,8 @@ func ZipSpider(spider model.Spider) (filePath string, err error) { // 如果源文件夹不存在,抛错 if !utils.Exists(spider.Src) { debug.PrintStack() + // 删除该爬虫,否则会一直报错 + _ = model.RemoveSpider(spider.Id) return "", errors.New("source path does not exist") } @@ -173,6 +176,7 @@ func UploadToGridFs(spider model.Spider, fileName string, filePath string) (fid // 如果存在FileId删除GridFS上的老文件 if !utils.IsObjectIdNull(spider.FileId) { if err = gf.RemoveId(spider.FileId); err != nil { + log.Error("remove gf file:" + err.Error()) debug.PrintStack() } } diff --git a/docker/Dockerfile.master.alpine b/docker/Dockerfile.master.alpine index 6979861b..b9dbb742 100644 --- a/docker/Dockerfile.master.alpine +++ b/docker/Dockerfile.master.alpine @@ -75,7 +75,7 @@ RUN sed -i 's/#rc_sys=""/rc_sys="lxc"/g' /etc/rc.conf && \ # working directory WORKDIR /app/backend - +ENV PYTHONIOENCODING utf-8 # frontend port EXPOSE 8080 diff --git a/docker/Dockerfile.worker.alpine b/docker/Dockerfile.worker.alpine index e7a66776..388125a2 100644 --- a/docker/Dockerfile.worker.alpine +++ b/docker/Dockerfile.worker.alpine @@ -35,7 +35,7 @@ RUN apk del .build-deps # working directory WORKDIR /app/backend - +ENV PYTHONIOENCODING utf-8 # backend port EXPOSE 8000 diff --git a/frontend/src/components/InfoView/NodeInfoView.vue b/frontend/src/components/InfoView/NodeInfoView.vue index 8e350448..e6ffb58a 100644 --- a/frontend/src/components/InfoView/NodeInfoView.vue +++ b/frontend/src/components/InfoView/NodeInfoView.vue @@ -22,7 +22,7 @@ - {{$t('Save')}} + {{$t('Save')}} diff --git a/frontend/src/views/layout/components/Navbar.vue b/frontend/src/views/layout/components/Navbar.vue index f60c0051..3b30c049 100644 --- a/frontend/src/views/layout/components/Navbar.vue +++ b/frontend/src/views/layout/components/Navbar.vue @@ -32,6 +32,7 @@ {{$t('Documentation')}} + From 57dddc84ed5fc9d34657c16567793c852cf2a423 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Sat, 31 Aug 2019 12:42:53 +0800 Subject: [PATCH 17/46] =?UTF-8?q?FIX=20#178=20FIX=20=E5=BD=93=E6=97=A5?= =?UTF-8?q?=E5=BF=97=E6=96=87=E4=BB=B6=E5=B0=8F=E4=BA=8E2048=E7=9A=84?= =?UTF-8?q?=E6=97=B6=E5=80=99=E6=97=A0=E6=B3=95=E6=AD=A3=E5=B8=B8=E6=8B=89?= =?UTF-8?q?=E5=8F=96=E6=97=A5=E5=BF=97=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/services/log.go | 9 +++++++-- backend/services/node.go | 4 ++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/backend/services/log.go b/backend/services/log.go index c6e4f090..a248c176 100644 --- a/backend/services/log.go +++ b/backend/services/log.go @@ -35,8 +35,13 @@ func GetLocalLog(logPath string) (fileBytes []byte, err error) { } defer f.Close() logBuf := make([]byte, 2048) - n, err := f.ReadAt(logBuf, fi.Size()-int64(len(logBuf))) - if err != nil { + off := int64(0) + if fi.Size() > int64(len(logBuf)) { + off = fi.Size() - int64(len(logBuf)) + } + n, err := f.ReadAt(logBuf, off) + // 到文件结尾会有EOF的报错 + if err.Error() != "EOF" && err != nil { log.Error(err.Error()) debug.PrintStack() return nil, err diff --git a/backend/services/node.go b/backend/services/node.go index 09b49dbf..083fdc3d 100644 --- a/backend/services/node.go +++ b/backend/services/node.go @@ -97,7 +97,7 @@ func GetCurrentNode() (model.Node, error) { Key: key, Id: bson.NewObjectId(), Ip: ip, - Name: key, + Name: ip, Mac: mac, IsMaster: true, } @@ -205,7 +205,7 @@ func UpdateNodeStatus() { // 数据库不存在该节点 node = model.Node{ Key: key, - Name: key, + Name: data.Ip, Ip: data.Ip, Port: "8000", Mac: data.Mac, From 4944883f957f959879787d7591ba7926aa697fd1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Sat, 31 Aug 2019 13:49:34 +0800 Subject: [PATCH 18/46] =?UTF-8?q?fix=20=E8=8A=82=E7=82=B9=E6=B3=A8?= =?UTF-8?q?=E5=86=8C=E5=BC=82=E5=B8=B8=E6=83=85=E5=86=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/model/node.go | 47 +++++++++++++++ backend/model/node_test.go | 50 ++++++++++++++++ backend/services/node.go | 116 ++++++++++++++----------------------- backend/services/spider.go | 4 +- 4 files changed, 143 insertions(+), 74 deletions(-) create mode 100644 backend/model/node_test.go diff --git a/backend/model/node.go b/backend/model/node.go index 61c20473..6211115c 100644 --- a/backend/model/node.go +++ b/backend/model/node.go @@ -1,7 +1,9 @@ package model import ( + "crawlab/constants" "crawlab/database" + "crawlab/services/register" "github.com/apex/log" "github.com/globalsign/mgo" "github.com/globalsign/mgo/bson" @@ -79,6 +81,7 @@ func GetNodeList(filter interface{}) ([]Node, error) { var results []Node if err := c.Find(filter).All(&results); err != nil { + log.Error("get node list error: " + err.Error()) debug.PrintStack() return results, err } @@ -153,3 +156,47 @@ func GetNodeCount(query interface{}) (int, error) { return count, nil } + +// 节点基本信息 +func GetNodeBaseInfo() (ip string, mac string, key string, error error) { + ip, err := register.GetRegister().GetIp() + if err != nil { + debug.PrintStack() + return "", "", "", err + } + + mac, err = register.GetRegister().GetMac() + if err != nil { + debug.PrintStack() + return "", "", "", err + } + + key, err = register.GetRegister().GetKey() + if err != nil { + debug.PrintStack() + return "", "", "", err + } + return ip, mac, key, nil +} + +// 根据redis的key值,重置node节点为offline +func ResetNodeStatusToOffline(list []string) { + nodes, _ := GetNodeList(nil) + for _, node := range nodes { + hasNode := false + for _, key := range list { + if key == node.Key { + hasNode = true + break + } + } + if !hasNode || node.Status == "" { + node.Status = constants.StatusOffline + if err := node.Save(); err != nil { + log.Errorf(err.Error()) + return + } + continue + } + } +} diff --git a/backend/model/node_test.go b/backend/model/node_test.go new file mode 100644 index 00000000..ba3f4aaa --- /dev/null +++ b/backend/model/node_test.go @@ -0,0 +1,50 @@ +package model + +import ( + "crawlab/config" + "crawlab/constants" + "crawlab/database" + "github.com/apex/log" + . "github.com/smartystreets/goconvey/convey" + "runtime/debug" + "testing" +) + +func TestAddNode(t *testing.T) { + Convey("Test AddNode", t, func() { + if err := config.InitConfig("../conf/config.yml"); err != nil { + log.Error("init config error:" + err.Error()) + panic(err) + } + log.Info("初始化配置成功") + + // 初始化Mongodb数据库 + if err := database.InitMongo(); err != nil { + log.Error("init mongodb error:" + err.Error()) + debug.PrintStack() + panic(err) + } + log.Info("初始化Mongodb数据库成功") + + // 初始化Redis数据库 + if err := database.InitRedis(); err != nil { + log.Error("init redis error:" + err.Error()) + debug.PrintStack() + panic(err) + } + + var node = Node{ + Key: "c4:b3:01:bd:b5:e7", + Name: "10.27.238.101", + Ip: "10.27.238.101", + Port: "8000", + Mac: "c4:b3:01:bd:b5:e7", + Status: constants.StatusOnline, + IsMaster: true, + } + if err := node.Add(); err != nil { + log.Error("add node error:" + err.Error()) + panic(err) + } + }) +} diff --git a/backend/services/node.go b/backend/services/node.go index 083fdc3d..3ed84149 100644 --- a/backend/services/node.go +++ b/backend/services/node.go @@ -73,23 +73,11 @@ func GetCurrentNode() (model.Node, error) { if err != nil { // 如果为主节点,表示为第一次注册,插入节点信息 if IsMaster() { - // 获取本机IP地址 - ip, err := register.GetRegister().GetIp() + // 获取本机信息 + ip, mac, key, err := model.GetNodeBaseInfo() if err != nil { debug.PrintStack() - return model.Node{}, err - } - - mac, err := register.GetRegister().GetMac() - if err != nil { - debug.PrintStack() - return model.Node{}, err - } - - key, err := register.GetRegister().GetKey() - if err != nil { - debug.PrintStack() - return model.Node{}, err + return node, err } // 生成节点 @@ -179,70 +167,56 @@ func UpdateNodeStatus() { log.Errorf(err.Error()) return } - // 在MongoDB中该节点设置状态为离线 - s, c := database.GetCol("nodes") - defer s.Close() - var node model.Node - if err := c.Find(bson.M{"key": key}).One(&node); err != nil { - log.Errorf(err.Error()) - debug.PrintStack() - return - } - node.Status = constants.StatusOffline - if err := node.Save(); err != nil { - log.Errorf(err.Error()) - return - } + keys, _ := database.RedisClient.HKeys("nodes") + model.ResetNodeStatusToOffline(keys) continue } - // 更新节点信息到数据库 - s, c := database.GetCol("nodes") - defer s.Close() - var node model.Node - if err := c.Find(bson.M{"key": key}).One(&node); err != nil { - // 数据库不存在该节点 - node = model.Node{ - Key: key, - Name: data.Ip, - Ip: data.Ip, - Port: "8000", - Mac: data.Mac, - Status: constants.StatusOnline, - IsMaster: data.Master, - } - if err := node.Add(); err != nil { - log.Errorf(err.Error()) - return - } - } else { - // 数据库存在该节点 - node.Status = constants.StatusOnline - if err := node.Save(); err != nil { - log.Errorf(err.Error()) - return - } + // 处理node信息 + handleNodeInfo(key, data) + } + + // 重置不在redis的key为offline + model.ResetNodeStatusToOffline(list) +} + +func handleNodeInfo(key string, data Data) { + // 更新节点信息到数据库 + s, c := database.GetCol("nodes") + defer s.Close() + + // 同个key可能因为并发,被注册多次 + var nodes []model.Node + _ = c.Find(bson.M{"key": key}).All(&nodes) + if nodes != nil && len(nodes) > 1 { + for _, node := range nodes { + _ = c.RemoveId(node.Id) } } - // 遍历数据库中的节点列表 - nodes, err := model.GetNodeList(nil) - for _, node := range nodes { - hasNode := false - for _, key := range list { - if key == node.Key { - hasNode = true - break - } + var node model.Node + if err := c.Find(bson.M{"key": key}).One(&node); err != nil { + // 数据库不存在该节点 + node = model.Node{ + Key: key, + Name: data.Ip, + Ip: data.Ip, + Port: "8000", + Mac: data.Mac, + Status: constants.StatusOnline, + IsMaster: data.Master, } - if !hasNode { - node.Status = constants.StatusOffline - if err := node.Save(); err != nil { - log.Errorf(err.Error()) - return - } - continue + if err := node.Add(); err != nil { + log.Errorf(err.Error()) + return + } + } else { + // 数据库存在该节点 + node.Status = constants.StatusOnline + if err := node.Save(); err != nil { + log.Errorf(err.Error()) + return } } } diff --git a/backend/services/spider.go b/backend/services/spider.go index ad0c0ae5..a3242849 100644 --- a/backend/services/spider.go +++ b/backend/services/spider.go @@ -40,7 +40,7 @@ func GetSpidersFromDir() ([]model.Spider, error) { // 如果爬虫项目目录不存在,则创建一个 if !utils.Exists(srcPath) { - mask := syscall.Umask(0) // 改为 0000 八进制 + mask := syscall.Umask(0) // 改为 0000 八进制 defer syscall.Umask(mask) // 改为原来的 umask if err := os.MkdirAll(srcPath, 0666); err != nil { debug.PrintStack() @@ -301,7 +301,6 @@ func PublishSpider(spider model.Spider) (err error) { return } channel := "files:upload" - log.Info("publish files.upload event, file id:" + msg.FileId) if err = database.Publish(channel, string(msgStr)); err != nil { log.Errorf(err.Error()) debug.PrintStack() @@ -313,7 +312,6 @@ func PublishSpider(spider model.Spider) (err error) { // 上传爬虫回调 func OnFileUpload(channel string, msgStr string) { - log.Info("received files.upload event, msgStr:" + msgStr) s, gf := database.GetGridFs("files") defer s.Close() From cbd549255e7ceb0874574f4310789ecb69a4c9ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Sat, 31 Aug 2019 13:50:01 +0800 Subject: [PATCH 19/46] =?UTF-8?q?fix=20=E5=89=8D=E7=AB=AF=E6=8A=A5?= =?UTF-8?q?=E9=94=99=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- frontend/src/store/modules/node.js | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/frontend/src/store/modules/node.js b/frontend/src/store/modules/node.js index 266beb3e..5e21a222 100644 --- a/frontend/src/store/modules/node.js +++ b/frontend/src/store/modules/node.js @@ -25,15 +25,7 @@ const mutations = { const { id, systemInfo } = payload for (let i = 0; i < state.nodeList.length; i++) { if (state.nodeList[i]._id === id) { - // Vue.set(state.nodeList[i], 'systemInfo', {}) state.nodeList[i].systemInfo = systemInfo - // for (const key in systemInfo) { - // if (systemInfo.hasOwnProperty(key)) { - // console.log(key) - // state.nodeList[i].systemInfo[key] = systemInfo[key] - // // Vue.set(state.nodeList[i].systemInfo, key, systemInfo[key]) - // } - // } break } } @@ -76,10 +68,12 @@ const actions = { getTaskList ({ state, commit }, id) { return request.get(`/nodes/${id}/tasks`) .then(response => { - commit('task/SET_TASK_LIST', - response.data.data.map(d => d) - .sort((a, b) => a.create_ts < b.create_ts ? 1 : -1), - { root: true }) + if (response.data.data) { + commit('task/SET_TASK_LIST', + response.data.data.map(d => d) + .sort((a, b) => a.create_ts < b.create_ts ? 1 : -1), + { root: true }) + } }) }, getNodeSystemInfo ({ state, commit }, id) { From b61445acb0d01bf25c454762e0658a811eeff259 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Sat, 31 Aug 2019 13:57:14 +0800 Subject: [PATCH 20/46] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/services/node.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/backend/services/node.go b/backend/services/node.go index 3ed84149..977ff0ef 100644 --- a/backend/services/node.go +++ b/backend/services/node.go @@ -165,11 +165,7 @@ func UpdateNodeStatus() { // 在Redis中删除该节点 if err := database.RedisClient.HDel("nodes", data.Key); err != nil { log.Errorf(err.Error()) - return } - // 在MongoDB中该节点设置状态为离线 - keys, _ := database.RedisClient.HKeys("nodes") - model.ResetNodeStatusToOffline(keys) continue } From b3cdb231acf08172edb8d08b22501d13233e8d5d Mon Sep 17 00:00:00 2001 From: yaziming Date: Sat, 31 Aug 2019 14:57:09 +0800 Subject: [PATCH 21/46] =?UTF-8?q?Backend:=20=20=20=20=201.[improve]=20=20?= =?UTF-8?q?=20=20=20=20=20=20=E4=BD=BF=E7=94=A8gin=E6=8F=90=E4=BE=9B?= =?UTF-8?q?=E7=9A=84RouteGroup=E5=8A=9F=E8=83=BD=E7=AE=80=E5=8C=96?= =?UTF-8?q?=E4=B8=AD=E9=97=B4=E4=BB=B6=E9=80=BB=E8=BE=91=20=20=20=20=202.[?= =?UTF-8?q?break=20change]=20=20=20=20=20=20=20=20=E7=A7=BB=E9=99=A4Author?= =?UTF-8?q?ization=20Middleware=E4=B8=AD=E5=AF=B9=E7=99=BB=E5=BD=95?= =?UTF-8?q?=E6=B3=A8=E5=86=8C=E4=B8=8B=E8=BD=BD=E7=89=B9=E6=AE=8A=E5=A4=84?= =?UTF-8?q?=E7=90=86=E9=80=BB=E8=BE=91=20=20=20=20=203.[unsafe=20problem]?= =?UTF-8?q?=20=20=20=20=20=20=20=20=E4=B8=8B=E8=BD=BD=E4=BB=BB=E5=8A=A1csv?= =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E7=99=BB=E5=BD=95=E9=AA=8C=E8=AF=81=20Fronte?= =?UTF-8?q?nd:=20=20=20=20=201.=20=E6=9B=B4=E6=94=B9csv=E4=B8=8B=E8=BD=BD?= =?UTF-8?q?=E6=96=B9=E5=BC=8F=20=20=20=20=202.=20task=20list=E9=A1=B5?= =?UTF-8?q?=E9=9D=A2table=20section=20=E7=A7=BB=E9=99=A4=E5=9B=BA=E5=AE=9A?= =?UTF-8?q?width=E8=AE=BE=E7=BD=AE,=E9=87=87=E7=94=A8=E8=87=AA=E9=80=82?= =?UTF-8?q?=E5=BA=94,=E9=98=B2=E6=AD=A2=E5=A4=A7=E5=B1=8F=E7=A9=BA?= =?UTF-8?q?=E7=99=BD=E6=96=AD=E8=A3=82=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/main.go | 99 ++++++++++++++------------ backend/middlewares/auth.go | 12 ++-- frontend/src/api/request.js | 5 +- frontend/src/store/modules/task.js | 16 +++++ frontend/src/views/task/TaskDetail.vue | 2 +- frontend/src/views/task/TaskList.vue | 2 +- 6 files changed, 80 insertions(+), 56 deletions(-) diff --git a/backend/main.go b/backend/main.go index 489a17ce..31b5d64b 100644 --- a/backend/main.go +++ b/backend/main.go @@ -90,53 +90,60 @@ func main() { if services.IsMaster() { // 中间件 app.Use(middlewares.CORSMiddleware()) - app.Use(middlewares.AuthorizationMiddleware()) + //app.Use(middlewares.AuthorizationMiddleware()) + anonymousGroup := app.Group("/") + { + anonymousGroup.POST("/login", routes.Login) // 用户登录 + anonymousGroup.PUT("/users", routes.PutUser) // 添加用户 + + } + authGroup := app.Group("/", middlewares.AuthorizationMiddleware()) + { + // 路由 + // 节点 + authGroup.GET("/nodes", routes.GetNodeList) // 节点列表 + authGroup.GET("/nodes/:id", routes.GetNode) // 节点详情 + authGroup.POST("/nodes/:id", routes.PostNode) // 修改节点 + authGroup.GET("/nodes/:id/tasks", routes.GetNodeTaskList) // 节点任务列表 + authGroup.GET("/nodes/:id/system", routes.GetSystemInfo) // 节点任务列表 + authGroup.DELETE("/nodes/:id", routes.DeleteNode) // 删除节点 + // 爬虫 + authGroup.GET("/spiders", routes.GetSpiderList) // 爬虫列表 + authGroup.GET("/spiders/:id", routes.GetSpider) // 爬虫详情 + authGroup.POST("/spiders", routes.PutSpider) // 上传爬虫 + authGroup.POST("/spiders/:id", routes.PostSpider) // 修改爬虫 + authGroup.POST("/spiders/:id/publish", routes.PublishSpider) // 发布爬虫 + authGroup.DELETE("/spiders/:id", routes.DeleteSpider) // 删除爬虫 + authGroup.GET("/spiders/:id/tasks", routes.GetSpiderTasks) // 爬虫任务列表 + authGroup.GET("/spiders/:id/file", routes.GetSpiderFile) // 爬虫文件读取 + authGroup.POST("/spiders/:id/file", routes.PostSpiderFile) // 爬虫目录写入 + authGroup.GET("/spiders/:id/dir", routes.GetSpiderDir) // 爬虫目录 + authGroup.GET("/spiders/:id/stats", routes.GetSpiderStats) // 爬虫统计数据 + // 任务 + authGroup.GET("/tasks", routes.GetTaskList) // 任务列表 + authGroup.GET("/tasks/:id", routes.GetTask) // 任务详情 + authGroup.PUT("/tasks", routes.PutTask) // 派发任务 + authGroup.DELETE("/tasks/:id", routes.DeleteTask) // 删除任务 + authGroup.POST("/tasks/:id/cancel", routes.CancelTask) // 取消任务 + authGroup.GET("/tasks/:id/log", routes.GetTaskLog) // 任务日志 + authGroup.GET("/tasks/:id/results", routes.GetTaskResults) // 任务结果 + authGroup.GET("/tasks/:id/results/download", routes.DownloadTaskResultsCsv) // 下载任务结果 + // 定时任务 + authGroup.GET("/schedules", routes.GetScheduleList) // 定时任务列表 + authGroup.GET("/schedules/:id", routes.GetSchedule) // 定时任务详情 + authGroup.PUT("/schedules", routes.PutSchedule) // 创建定时任务 + authGroup.POST("/schedules/:id", routes.PostSchedule) // 修改定时任务 + authGroup.DELETE("/schedules/:id", routes.DeleteSchedule) // 删除定时任务 + // 统计数据 + authGroup.GET("/stats/home", routes.GetHomeStats) // 首页统计数据 + // 用户 + authGroup.GET("/users", routes.GetUserList) // 用户列表 + authGroup.GET("/users/:id", routes.GetUser) // 用户详情 + authGroup.POST("/users/:id", routes.PostUser) // 更改用户 + authGroup.DELETE("/users/:id", routes.DeleteUser) // 删除用户 + authGroup.GET("/me", routes.GetMe) // 获取自己账户 + } - // 路由 - // 节点 - app.GET("/nodes", routes.GetNodeList) // 节点列表 - app.GET("/nodes/:id", routes.GetNode) // 节点详情 - app.POST("/nodes/:id", routes.PostNode) // 修改节点 - app.GET("/nodes/:id/tasks", routes.GetNodeTaskList) // 节点任务列表 - app.GET("/nodes/:id/system", routes.GetSystemInfo) // 节点任务列表 - app.DELETE("/nodes/:id", routes.DeleteNode) // 删除节点 - // 爬虫 - app.GET("/spiders", routes.GetSpiderList) // 爬虫列表 - app.GET("/spiders/:id", routes.GetSpider) // 爬虫详情 - app.POST("/spiders", routes.PutSpider) // 上传爬虫 - app.POST("/spiders/:id", routes.PostSpider) // 修改爬虫 - app.POST("/spiders/:id/publish", routes.PublishSpider) // 发布爬虫 - app.DELETE("/spiders/:id", routes.DeleteSpider) // 删除爬虫 - app.GET("/spiders/:id/tasks", routes.GetSpiderTasks) // 爬虫任务列表 - app.GET("/spiders/:id/file", routes.GetSpiderFile) // 爬虫文件读取 - app.POST("/spiders/:id/file", routes.PostSpiderFile) // 爬虫目录写入 - app.GET("/spiders/:id/dir", routes.GetSpiderDir) // 爬虫目录 - app.GET("/spiders/:id/stats", routes.GetSpiderStats) // 爬虫统计数据 - // 任务 - app.GET("/tasks", routes.GetTaskList) // 任务列表 - app.GET("/tasks/:id", routes.GetTask) // 任务详情 - app.PUT("/tasks", routes.PutTask) // 派发任务 - app.DELETE("/tasks/:id", routes.DeleteTask) // 删除任务 - app.POST("/tasks/:id/cancel", routes.CancelTask) // 取消任务 - app.GET("/tasks/:id/log", routes.GetTaskLog) // 任务日志 - app.GET("/tasks/:id/results", routes.GetTaskResults) // 任务结果 - app.GET("/tasks/:id/results/download", routes.DownloadTaskResultsCsv) // 下载任务结果 - // 定时任务 - app.GET("/schedules", routes.GetScheduleList) // 定时任务列表 - app.GET("/schedules/:id", routes.GetSchedule) // 定时任务详情 - app.PUT("/schedules", routes.PutSchedule) // 创建定时任务 - app.POST("/schedules/:id", routes.PostSchedule) // 修改定时任务 - app.DELETE("/schedules/:id", routes.DeleteSchedule) // 删除定时任务 - // 统计数据 - app.GET("/stats/home", routes.GetHomeStats) // 首页统计数据 - // 用户 - app.GET("/users", routes.GetUserList) // 用户列表 - app.GET("/users/:id", routes.GetUser) // 用户详情 - app.PUT("/users", routes.PutUser) // 添加用户 - app.POST("/users/:id", routes.PostUser) // 更改用户 - app.DELETE("/users/:id", routes.DeleteUser) // 删除用户 - app.POST("/login", routes.Login) // 用户登录 - app.GET("/me", routes.GetMe) // 获取自己账户 } // 路由ping diff --git a/backend/middlewares/auth.go b/backend/middlewares/auth.go index 977fea78..5298beea 100644 --- a/backend/middlewares/auth.go +++ b/backend/middlewares/auth.go @@ -12,12 +12,12 @@ import ( func AuthorizationMiddleware() gin.HandlerFunc { return func(c *gin.Context) { // 如果为登录或注册,不用校验 - if c.Request.URL.Path == "/login" || - (c.Request.URL.Path == "/users" && c.Request.Method == "PUT") || - strings.HasSuffix(c.Request.URL.Path, "download") { - c.Next() - return - } + //if c.Request.URL.Path == "/login" || + // (c.Request.URL.Path == "/users" && c.Request.Method == "PUT") || + // strings.HasSuffix(c.Request.URL.Path, "download") { + // c.Next() + // return + //} // 获取token string tokenStr := c.GetHeader("Authorization") diff --git a/frontend/src/api/request.js b/frontend/src/api/request.js index 38734c46..5b612719 100644 --- a/frontend/src/api/request.js +++ b/frontend/src/api/request.js @@ -3,7 +3,7 @@ import router from '../router' let baseUrl = process.env.VUE_APP_BASE_URL ? process.env.VUE_APP_BASE_URL : 'http://localhost:8000' -const request = (method, path, params, data) => { +const request = (method, path, params, data, others = {}) => { return new Promise((resolve, reject) => { const url = baseUrl + path const headers = { @@ -14,7 +14,8 @@ const request = (method, path, params, data) => { url, params, data, - headers + headers, + ...others }) .then(resolve) .catch(error => { diff --git a/frontend/src/store/modules/task.js b/frontend/src/store/modules/task.js index 545a169b..1d7e6c09 100644 --- a/frontend/src/store/modules/task.js +++ b/frontend/src/store/modules/task.js @@ -120,6 +120,22 @@ const actions = { commit('SET_TASK_RESULTS_TOTAL_COUNT', response.data.total) }) }, + async getTaskResultExcel ({ state, commit }, id) { + const { data } = await request.request('GET', '/tasks/' + id + '/results/download', {}, { + responseType: 'blob' // important + }) + const downloadUrl = window.URL.createObjectURL(new Blob([data])) + + const link = document.createElement('a') + + link.href = downloadUrl + + link.setAttribute('download', 'data.csv') // any other extension + + document.body.appendChild(link) + link.click() + link.remove() + }, cancelTask ({ state, dispatch }, id) { return request.post(`/tasks/${id}/cancel`) .then(() => { diff --git a/frontend/src/views/task/TaskDetail.vue b/frontend/src/views/task/TaskDetail.vue index a1edb497..2361c0c2 100644 --- a/frontend/src/views/task/TaskDetail.vue +++ b/frontend/src/views/task/TaskDetail.vue @@ -95,7 +95,7 @@ export default { this.$store.dispatch('task/getTaskResults', this.$route.params.id) }, downloadCSV () { - window.location.href = this.$request.baseUrl + '/tasks/' + this.$route.params.id + '/results/download' + this.$store.dispatch('task/getTaskResultExcel', this.$route.params.id) this.$st.sendEv('任务详情-结果', '下载CSV') } }, diff --git a/frontend/src/views/task/TaskList.vue b/frontend/src/views/task/TaskList.vue index e15aa661..9efcaa92 100644 --- a/frontend/src/views/task/TaskList.vue +++ b/frontend/src/views/task/TaskList.vue @@ -45,7 +45,7 @@ :label="$t(col.label)" :sortable="col.sortable" :align="col.align" - :width="col.width"> + > From 5e6aec2393303a5a7411f1b0828e0e331f176831 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Sat, 31 Aug 2019 15:03:57 +0800 Subject: [PATCH 22/46] =?UTF-8?q?fix=20=E6=97=A0=E6=B3=95=E8=8E=B7?= =?UTF-8?q?=E5=8F=96=E6=97=A5=E5=BF=97=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/services/log.go | 2 +- backend/services/node.go | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/backend/services/log.go b/backend/services/log.go index a248c176..a0ba0311 100644 --- a/backend/services/log.go +++ b/backend/services/log.go @@ -41,7 +41,7 @@ func GetLocalLog(logPath string) (fileBytes []byte, err error) { } n, err := f.ReadAt(logBuf, off) // 到文件结尾会有EOF的报错 - if err.Error() != "EOF" && err != nil { + if err != nil && err.Error() != "EOF" { log.Error(err.Error()) debug.PrintStack() return nil, err diff --git a/backend/services/node.go b/backend/services/node.go index 977ff0ef..124f5bba 100644 --- a/backend/services/node.go +++ b/backend/services/node.go @@ -308,8 +308,10 @@ func WorkerNodeCallback(channel string, msgStr string) { log.Errorf(err.Error()) debug.PrintStack() msgSd.Error = err.Error() + msgSd.Log = err.Error() + } else { + msgSd.Log = string(logStr) } - msgSd.Log = string(logStr) // 序列化 msgSdBytes, err := json.Marshal(&msgSd) From 8115f0649683081e9d3cfadfa5ef48720ca0ea2a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Sat, 31 Aug 2019 15:59:43 +0800 Subject: [PATCH 23/46] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/model/spider.go | 6 ++++-- backend/services/node.go | 3 ++- frontend/src/views/task/TaskList.vue | 2 +- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/backend/model/spider.go b/backend/model/spider.go index 65782fe8..d44f651b 100644 --- a/backend/model/spider.go +++ b/backend/model/spider.go @@ -23,13 +23,14 @@ type Spider struct { Col string `json:"col"` // 结果储存位置 Site string `json:"site"` // 爬虫网站 Envs []Env `json:"envs" bson:"envs"` // 环境变量 - + Remark string `json:"remark"` // 备注 // 自定义爬虫 Src string `json:"src" bson:"src"` // 源码位置 Cmd string `json:"cmd" bson:"cmd"` // 执行命令 // 前端展示 - LastRunTs time.Time `json:"last_run_ts"` // 最后一次执行时间 + LastRunTs time.Time `json:"last_run_ts"` // 最后一次执行时间 + LastStatus string `json:"last_status"` // 最后执行状态 // TODO: 可配置爬虫 //Fields []interface{} `json:"fields"` @@ -115,6 +116,7 @@ func GetSpiderList(filter interface{}, skip int, limit int) ([]Spider, error) { // 赋值 spiders[i].LastRunTs = task.CreateTs + spiders[i].LastStatus = task.Status } return spiders, nil diff --git a/backend/services/node.go b/backend/services/node.go index 124f5bba..9685a1bb 100644 --- a/backend/services/node.go +++ b/backend/services/node.go @@ -304,6 +304,7 @@ func WorkerNodeCallback(channel string, msgStr string) { // 获取本地日志 logStr, err := GetLocalLog(msg.LogPath) + log.Info(string(logStr)) if err != nil { log.Errorf(err.Error()) debug.PrintStack() @@ -322,7 +323,7 @@ func WorkerNodeCallback(channel string, msgStr string) { } // 发布消息给主节点 - fmt.Println(msgSd) + log.Info("publish get log msg to master") if err := database.Publish("nodes:master", string(msgSdBytes)); err != nil { log.Errorf(err.Error()) return diff --git a/frontend/src/views/task/TaskList.vue b/frontend/src/views/task/TaskList.vue index 9cbceb20..a3ffbeea 100644 --- a/frontend/src/views/task/TaskList.vue +++ b/frontend/src/views/task/TaskList.vue @@ -119,7 +119,7 @@ :width="col.width"> - + + + + Date: Sat, 31 Aug 2019 16:27:01 +0800 Subject: [PATCH 26/46] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E5=BC=80=E5=8F=91?= =?UTF-8?q?=E7=8E=AF=E5=A2=83=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- frontend/package.json | 1 + 1 file changed, 1 insertion(+) diff --git a/frontend/package.json b/frontend/package.json index 139297d3..e3bc84f8 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -6,6 +6,7 @@ "serve": "vue-cli-service serve --ip=0.0.0.0", "serve:prod": "vue-cli-service serve --mode=production --ip=0.0.0.0", "config": "vue ui", + "build:dev": "vue-cli-service build --mode development", "build:prod": "vue-cli-service build --mode production", "lint": "vue-cli-service lint", "test:unit": "vue-cli-service test:unit" From e423ff564cc74674b3c31a48797ea27fb3e10654 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Sat, 31 Aug 2019 17:04:49 +0800 Subject: [PATCH 27/46] =?UTF-8?q?fix=20=E7=88=AC=E8=99=AB=E7=9B=AE?= =?UTF-8?q?=E5=BD=95=E6=97=A0=E6=B3=95=E6=89=93=E5=BC=80=E7=9A=84=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/services/spider.go | 2 +- backend/services/task.go | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/backend/services/spider.go b/backend/services/spider.go index a3242849..f526e11d 100644 --- a/backend/services/spider.go +++ b/backend/services/spider.go @@ -42,7 +42,7 @@ func GetSpidersFromDir() ([]model.Spider, error) { if !utils.Exists(srcPath) { mask := syscall.Umask(0) // 改为 0000 八进制 defer syscall.Umask(mask) // 改为原来的 umask - if err := os.MkdirAll(srcPath, 0666); err != nil { + if err := os.MkdirAll(srcPath, 0766); err != nil { debug.PrintStack() return []model.Spider{}, err } diff --git a/backend/services/task.go b/backend/services/task.go index 8c3c3407..1b0a5676 100644 --- a/backend/services/task.go +++ b/backend/services/task.go @@ -408,9 +408,12 @@ func GetTaskLog(id string) (logStr string, err error) { logStr = string(logBytes) if err != nil { log.Errorf(err.Error()) - return "", err + logStr = string(err.Error()) + // return "", err + } else { + logStr = string(logBytes) } - logStr = string(logBytes) + } else { // 若不为主节点,获取远端日志 logStr, err = GetRemoteLog(task) From b9ed17695015895c7108c702326f914805081409 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Sat, 31 Aug 2019 17:56:42 +0800 Subject: [PATCH 28/46] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E6=97=A5=E5=BF=97?= =?UTF-8?q?=E6=89=93=E5=8D=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/services/spider.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/backend/services/spider.go b/backend/services/spider.go index f526e11d..c3f63139 100644 --- a/backend/services/spider.go +++ b/backend/services/spider.go @@ -31,6 +31,7 @@ type SpiderFileData struct { type SpiderUploadMessage struct { FileId string FileName string + SpiderId string } // 从项目目录中获取爬虫列表 @@ -295,6 +296,7 @@ func PublishSpider(spider model.Spider) (err error) { msg := SpiderUploadMessage{ FileId: fid.Hex(), FileName: fileName, + SpiderId: spider.Id.Hex(), } msgStr, err := json.Marshal(msg) if err != nil { @@ -326,7 +328,7 @@ func OnFileUpload(channel string, msgStr string) { // 从GridFS获取该文件 f, err := gf.OpenId(bson.ObjectIdHex(msg.FileId)) if err != nil { - log.Errorf("open file id: " + msg.FileId + ", error: " + err.Error()) + log.Errorf("open file id: " + msg.FileId + ", spider id:" + msg.SpiderId + ", error: " + err.Error()) debug.PrintStack() return } From 81f6cf021fe72e6fb1ed83675a20e7bcede0ec90 Mon Sep 17 00:00:00 2001 From: yaziming Date: Sat, 31 Aug 2019 21:22:47 +0800 Subject: [PATCH 29/46] =?UTF-8?q?Backend:=20=20=20=20improve=20=20=20=20?= =?UTF-8?q?=20=20-=20AuthMiddleware=20=E6=B3=A8=E5=85=A5=E5=BD=93=E5=89=8D?= =?UTF-8?q?=E7=94=A8=E6=88=B7=E7=9A=84=E4=BF=A1=E6=81=AF=20=20=20=20=20=20?= =?UTF-8?q?-=20=E5=A2=9E=E5=8A=A0Context=E6=9C=8D=E5=8A=A1=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E5=BF=AB=E6=8D=B7=E8=8E=B7=E5=8F=96=E5=BD=93=E5=89=8D?= =?UTF-8?q?=E7=99=BB=E5=BD=95=E8=80=85=E4=BF=A1=E6=81=AF=20=20=20=20=20=20?= =?UTF-8?q?-=20=E9=87=8D=E6=9E=84Login/GetMe=E6=8E=A5=E5=8F=A3=E9=80=BB?= =?UTF-8?q?=E8=BE=91=E9=81=BF=E5=85=8D=E9=87=8D=E5=A4=8D=E7=9A=84=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=BA=93=E6=9F=A5=E8=AF=A2=20=20=20=20=20=20-=20?= =?UTF-8?q?=E8=A7=84=E8=8C=83=E5=8C=96error=E4=BF=A1=E6=81=AF=E5=A3=B0?= =?UTF-8?q?=E6=98=8E(=E5=90=91=E4=B8=8B=E5=85=BC=E5=AE=B9,=E6=97=A7?= =?UTF-8?q?=E4=BB=A3=E7=A0=81=E5=8F=AF=E9=80=90=E6=B8=90=E8=BF=81=E7=A7=BB?= =?UTF-8?q?=E8=A7=84=E8=8C=83=E5=8C=96)=20=20=20=20=20=20-=20=E4=BF=AE?= =?UTF-8?q?=E6=AD=A3=E9=83=A8=E5=88=86=E4=B8=8D=E7=AC=A6=E5=90=88=E8=A7=84?= =?UTF-8?q?=E8=8C=83=E7=9A=84=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/constants/context.go | 5 ++ backend/constants/errors.go | 8 ++++ backend/errors/errors.go | 43 +++++++++++++++++ backend/middlewares/auth.go | 1 + backend/mock/schedule.go | 2 +- backend/routes/user.go | 27 +++++------ backend/services/context/context.go | 73 +++++++++++++++++++++++++++++ backend/services/spider.go | 3 +- backend/services/user.go | 40 +++++++++------- 9 files changed, 168 insertions(+), 34 deletions(-) create mode 100644 backend/constants/context.go create mode 100644 backend/constants/errors.go create mode 100644 backend/errors/errors.go create mode 100644 backend/services/context/context.go diff --git a/backend/constants/context.go b/backend/constants/context.go new file mode 100644 index 00000000..0759b54b --- /dev/null +++ b/backend/constants/context.go @@ -0,0 +1,5 @@ +package constants + +const ( + ContextUser = "currentUser" +) diff --git a/backend/constants/errors.go b/backend/constants/errors.go new file mode 100644 index 00000000..a6175319 --- /dev/null +++ b/backend/constants/errors.go @@ -0,0 +1,8 @@ +package constants + +import "crawlab/errors" + +var ( + //users + ErrorUserNotFound = errors.NewBusinessError(10001, "user not found.") +) diff --git a/backend/errors/errors.go b/backend/errors/errors.go new file mode 100644 index 00000000..0110808b --- /dev/null +++ b/backend/errors/errors.go @@ -0,0 +1,43 @@ +package errors + +import "fmt" + +type Scope int + +const ( + ScopeSystem Scope = 1 + ScopeBusiness Scope = 2 +) + +type OPError struct { + Message string + Code int + Scope Scope +} + +func (O OPError) Error() string { + var scope string + switch O.Scope { + case ScopeSystem: + scope = "system" + break + case ScopeBusiness: + scope = "business" + } + return fmt.Sprintf("%s : %d -> %s.", scope, O.Code, O.Message) +} + +func NewSystemOPError(code int, message string) *OPError { + return &OPError{ + Message: message, + Code: code, + Scope: ScopeSystem, + } +} +func NewBusinessError(code int, message string) *OPError { + return &OPError{ + Message: message, + Code: code, + Scope: ScopeBusiness, + } +} diff --git a/backend/middlewares/auth.go b/backend/middlewares/auth.go index 5298beea..07249e82 100644 --- a/backend/middlewares/auth.go +++ b/backend/middlewares/auth.go @@ -46,6 +46,7 @@ func AuthorizationMiddleware() gin.HandlerFunc { return } } + c.Set(constants.ContextUser, &user) // 校验成功 c.Next() diff --git a/backend/mock/schedule.go b/backend/mock/schedule.go index ae982ca6..702e8754 100644 --- a/backend/mock/schedule.go +++ b/backend/mock/schedule.go @@ -113,7 +113,7 @@ func PutSchedule(c *gin.Context) { func DeleteSchedule(c *gin.Context) { id := bson.ObjectIdHex("5d429e6c19f7abede924fee2") for _, sch := range scheduleList { - if sch.Id == bson.ObjectId(id) { + if sch.Id == id { fmt.Println("delete a schedule") } } diff --git a/backend/routes/user.go b/backend/routes/user.go index a3d5a431..a6d44cae 100644 --- a/backend/routes/user.go +++ b/backend/routes/user.go @@ -4,6 +4,7 @@ import ( "crawlab/constants" "crawlab/model" "crawlab/services" + "crawlab/services/context" "crawlab/utils" "github.com/gin-gonic/gin" "github.com/globalsign/mgo/bson" @@ -171,7 +172,7 @@ func Login(c *gin.Context) { } // 获取token - tokenStr, err := services.GetToken(user.Username) + tokenStr, err := services.MakeToken(&user) if err != nil { HandleError(http.StatusUnauthorized, c, errors.New("not authorized")) return @@ -185,20 +186,16 @@ func Login(c *gin.Context) { } func GetMe(c *gin.Context) { - // 获取token string - tokenStr := c.GetHeader("Authorization") - - // 校验token - user, err := services.CheckToken(tokenStr) - if err != nil { - HandleError(http.StatusUnauthorized, c, errors.New("not authorized")) + ctx := context.WithGinContext(c) + user := ctx.User() + if user == nil { + ctx.FailedWithError(constants.ErrorUserNotFound, http.StatusUnauthorized) return } - user.Password = "" - - c.JSON(http.StatusOK, Response{ - Status: "ok", - Message: "success", - Data: user, - }) + ctx.Success(struct { + *model.User + Password string `json:"password,omitempty"` + }{ + User: user, + }, nil) } diff --git a/backend/services/context/context.go b/backend/services/context/context.go new file mode 100644 index 00000000..d5d2b6ad --- /dev/null +++ b/backend/services/context/context.go @@ -0,0 +1,73 @@ +package context + +import ( + "crawlab/constants" + "crawlab/errors" + "crawlab/model" + "fmt" + "github.com/apex/log" + "github.com/gin-gonic/gin" + errors2 "github.com/pkg/errors" + "net/http" + "runtime/debug" +) + +type Context struct { + *gin.Context +} + +func (c *Context) User() *model.User { + userIfe, exists := c.Get(constants.ContextUser) + if !exists { + return nil + } + user, ok := userIfe.(*model.User) + if !ok { + return nil + } + return user +} +func (c *Context) Success(data interface{}, meta interface{}) { + if meta == nil { + meta = gin.H{} + } + if data == nil { + data = gin.H{} + } + c.JSON(http.StatusOK, gin.H{ + "status": "ok", + "message": "success", + "data": data, + "error": "", + }) +} +func (c *Context) FailedWithError(err error, httpCode ...int) { + + var code = 200 + if len(httpCode) > 0 { + code = httpCode[0] + } + log.Errorf("handle error:" + err.Error()) + debug.PrintStack() + switch errors2.Cause(err).(type) { + case errors.OPError: + c.AbortWithStatusJSON(code, gin.H{ + "status": "ok", + "message": "error", + "error": err.Error(), + }) + break + default: + fmt.Println("deprecated....") + c.AbortWithStatusJSON(code, gin.H{ + "status": "ok", + "message": "error", + "error": err.Error(), + }) + } + +} + +func WithGinContext(context *gin.Context) *Context { + return &Context{Context: context} +} diff --git a/backend/services/spider.go b/backend/services/spider.go index c3f63139..47c1fa33 100644 --- a/backend/services/spider.go +++ b/backend/services/spider.go @@ -230,7 +230,7 @@ func ReadFileByStep(filePath string, handle func([]byte, *mgo.GridFile), fileCre for { switch nr, err := f.Read(s[:]); true { case nr < 0: - fmt.Fprintf(os.Stderr, "cat: error reading: %s\n", err.Error()) + _, _ = fmt.Fprintf(os.Stderr, "cat: error reading: %s\n", err.Error()) debug.PrintStack() case nr == 0: // EOF return nil @@ -238,7 +238,6 @@ func ReadFileByStep(filePath string, handle func([]byte, *mgo.GridFile), fileCre handle(s[0:nr], fileCreate) } } - return nil } // 发布所有爬虫 diff --git a/backend/services/user.go b/backend/services/user.go index fb688fd1..4811f767 100644 --- a/backend/services/user.go +++ b/backend/services/user.go @@ -5,11 +5,9 @@ import ( "crawlab/model" "crawlab/utils" "errors" - "github.com/apex/log" "github.com/dgrijalva/jwt-go" "github.com/globalsign/mgo/bson" "github.com/spf13/viper" - "runtime/debug" "time" ) @@ -24,28 +22,38 @@ func InitUserService() error { } return nil } - -func GetToken(username string) (tokenStr string, err error) { - user, err := model.GetUserByUsername(username) - if err != nil { - log.Errorf(err.Error()) - debug.PrintStack() - return - } - +func MakeToken(user *model.User) (tokenStr string, err error) { token := jwt.NewWithClaims(jwt.SigningMethodHS256, jwt.MapClaims{ "id": user.Id, "username": user.Username, "nbf": time.Now().Unix(), }) - tokenStr, err = token.SignedString([]byte(viper.GetString("server.secret"))) - if err != nil { - return - } - return + return token.SignedString([]byte(viper.GetString("server.secret"))) + } +//func GetToken(username string) (tokenStr string, err error) { +// user, err := model.GetUserByUsername(username) +// if err != nil { +// log.Errorf(err.Error()) +// debug.PrintStack() +// return +// } +// +// token := jwt.NewWithClaims(jwt.SigningMethodHS256, jwt.MapClaims{ +// "id": user.Id, +// "username": user.Username, +// "nbf": time.Now().Unix(), +// }) +// +// tokenStr, err = token.SignedString([]byte(viper.GetString("server.secret"))) +// if err != nil { +// return +// } +// return +//} + func SecretFunc() jwt.Keyfunc { return func(token *jwt.Token) (interface{}, error) { return []byte(viper.GetString("server.secret")), nil From b766be577acb6d0874f38a6f1f24997890c3a0f8 Mon Sep 17 00:00:00 2001 From: hantmac Date: Sun, 1 Sep 2019 00:06:26 +0800 Subject: [PATCH 30/46] bug fix:fix read log file oom --- backend/services/log.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/backend/services/log.go b/backend/services/log.go index c6e4f090..2e6f25ed 100644 --- a/backend/services/log.go +++ b/backend/services/log.go @@ -34,9 +34,16 @@ func GetLocalLog(logPath string) (fileBytes []byte, err error) { return nil, err } defer f.Close() - logBuf := make([]byte, 2048) - n, err := f.ReadAt(logBuf, fi.Size()-int64(len(logBuf))) - if err != nil { + const bufLen = 2048 + logBuf := make([]byte, bufLen) + off := int64(0) + if fi.Size() > int64(len(logBuf)) { + off = fi.Size() - int64(len(logBuf)) + } + n, err := f.ReadAt(logBuf, off) + + //到文件结尾会有EOF标识 + if err != nil && err.Error() != "EOF" { log.Error(err.Error()) debug.PrintStack() return nil, err From 1de80f7779099057607fdda33ee953c678009848 Mon Sep 17 00:00:00 2001 From: hantmac Date: Sun, 1 Sep 2019 00:07:18 +0800 Subject: [PATCH 31/46] add unit test for GetLocalLog --- backend/services/log_test.go | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/backend/services/log_test.go b/backend/services/log_test.go index 0a52747c..a0b049c5 100644 --- a/backend/services/log_test.go +++ b/backend/services/log_test.go @@ -2,9 +2,11 @@ package services import ( "crawlab/config" + "fmt" "github.com/apex/log" . "github.com/smartystreets/goconvey/convey" "github.com/spf13/viper" + "os" "testing" ) @@ -20,3 +22,29 @@ func TestDeleteLogPeriodically(t *testing.T) { DeleteLogPeriodically() }) } + +func TestGetLocalLog(t *testing.T) { + //create a log file for test + logPath := "../logs/crawlab/test.log" + f, err := os.Create(logPath) + defer f.Close() + if err != nil { + fmt.Println(err.Error()) + + } else { + _, err = f.Write([]byte("This is for test")) + } + + Convey("Test GetLocalLog", t, func() { + Convey("Test response", func() { + logStr, err := GetLocalLog(logPath) + log.Info(string(logStr)) + fmt.Println(err) + So(err, ShouldEqual, nil) + + }) + }) + //delete the test log file + os.Remove(logPath) + +} From 1c0b86428cf87e143f832408d4d1982954328b1f Mon Sep 17 00:00:00 2001 From: yaziming Date: Sun, 1 Sep 2019 00:36:59 +0800 Subject: [PATCH 32/46] 1. Fix Some I18n Warning. 2. Fix Navigation Duplicated Error When Current Route Path is `/login`. 3. Fix Parent Level Named Route Warning. 4. Change `request` function From Promise Syntax To Await/Async. --- frontend/src/api/request.js | 46 +++++++++++++++----- frontend/src/i18n/zh.js | 19 +++++++- frontend/src/router/index.js | 6 --- frontend/src/views/node/NodeList.vue | 2 +- frontend/src/views/schedule/ScheduleList.vue | 12 ++--- 5 files changed, 58 insertions(+), 27 deletions(-) diff --git a/frontend/src/api/request.js b/frontend/src/api/request.js index 5b612719..22707159 100644 --- a/frontend/src/api/request.js +++ b/frontend/src/api/request.js @@ -3,13 +3,13 @@ import router from '../router' let baseUrl = process.env.VUE_APP_BASE_URL ? process.env.VUE_APP_BASE_URL : 'http://localhost:8000' -const request = (method, path, params, data, others = {}) => { - return new Promise((resolve, reject) => { +const request = async (method, path, params, data, others = {}) => { + try { const url = baseUrl + path const headers = { 'Authorization': window.localStorage.getItem('token') } - axios({ + const response = await axios({ method, url, params, @@ -17,15 +17,37 @@ const request = (method, path, params, data, others = {}) => { headers, ...others }) - .then(resolve) - .catch(error => { - console.log(error) - if (error.response.status === 401) { - router.push('/login') - } - reject(error) - }) - }) + // console.log(response) + return response + } catch (e) { + if (e.response.status === 401 && router.currentRoute.path !== '/login') { + router.push('/login') + } + await Promise.reject(e) + } + + // return new Promise((resolve, reject) => { + // const url = baseUrl + path + // const headers = { + // 'Authorization': window.localStorage.getItem('token') + // } + // axios({ + // method, + // url, + // params, + // data, + // headers, + // ...others + // }) + // .then(resolve) + // .catch(error => { + // console.log(error) + // if (error.response.status === 401) { + // router.push('/login') + // } + // reject(error) + // }) + // }) } const get = (path, params) => { diff --git a/frontend/src/i18n/zh.js b/frontend/src/i18n/zh.js index 58317ec3..d3c8243f 100644 --- a/frontend/src/i18n/zh.js +++ b/frontend/src/i18n/zh.js @@ -247,7 +247,7 @@ export default { 'username already exists': '用户名已存在', 'Deleted successfully': '成功删除', 'Saved successfully': '成功保存', - + 'English': 'English', // 登录 'Sign in': '登录', 'Sign-in': '登录', @@ -266,5 +266,20 @@ export default { 'admin': '管理用户', 'Role': '角色', 'Edit User': '更改用户', - 'Users': '用户' + 'Users': '用户', + tagsView: { + closeOthers: '关闭其他', + close: '关闭', + refresh: '刷新', + closeAll: '关闭所有' + }, + nodeList: { + type: '节点类型' + }, + schedules: { + cron: 'Cron', + add_cron: '生成Cron', + // Cron Format: [second] [minute] [hour] [day of month] [month] [day of week] + cron_format: 'Cron 格式: [秒] [分] [小时] [日] [月] [周]' + } } diff --git a/frontend/src/router/index.js b/frontend/src/router/index.js index 9a238d08..84c96cd3 100644 --- a/frontend/src/router/index.js +++ b/frontend/src/router/index.js @@ -46,7 +46,6 @@ export const constantRouterMap = [ ] }, { - name: 'Node', path: '/nodes', component: Layout, meta: { @@ -76,7 +75,6 @@ export const constantRouterMap = [ ] }, { - name: 'Spider', path: '/spiders', component: Layout, meta: { @@ -106,7 +104,6 @@ export const constantRouterMap = [ ] }, { - name: 'Task', path: '/tasks', component: Layout, meta: { @@ -136,7 +133,6 @@ export const constantRouterMap = [ ] }, { - name: 'Schedule', path: '/schedules', component: Layout, meta: { @@ -157,7 +153,6 @@ export const constantRouterMap = [ ] }, { - name: 'Site', path: '/sites', component: Layout, hidden: true, @@ -178,7 +173,6 @@ export const constantRouterMap = [ ] }, { - name: 'User', path: '/users', component: Layout, meta: { diff --git a/frontend/src/views/node/NodeList.vue b/frontend/src/views/node/NodeList.vue index 641009f3..9ea51502 100644 --- a/frontend/src/views/node/NodeList.vue +++ b/frontend/src/views/node/NodeList.vue @@ -163,7 +163,7 @@ export default { columns: [ { name: 'name', label: 'Name', width: '220' }, { name: 'ip', label: 'IP', width: '160' }, - { name: 'type', label: 'Type', width: '120' }, + { name: 'type', label: 'nodeList.type', width: '120' }, // { name: 'port', label: 'Port', width: '80' }, { name: 'status', label: 'Status', width: '120' }, { name: 'description', label: 'Description', width: 'auto' } diff --git a/frontend/src/views/schedule/ScheduleList.vue b/frontend/src/views/schedule/ScheduleList.vue index 28ca4961..477302b8 100644 --- a/frontend/src/views/schedule/ScheduleList.vue +++ b/frontend/src/views/schedule/ScheduleList.vue @@ -38,21 +38,21 @@ - + + :placeholder="$t('schedules.cron')"> - {{$t('生成Cron')}} + {{$t('schedules.add_cron')}} Date: Sun, 1 Sep 2019 10:13:53 +0800 Subject: [PATCH 33/46] Backend: upgrade gin framework depend validator version : v8 -> v9 --- backend/go.mod | 4 ++ backend/go.sum | 8 ++++ backend/lib/validate_bridge/validator.go | 54 ++++++++++++++++++++++++ backend/main.go | 3 ++ 4 files changed, 69 insertions(+) create mode 100644 backend/lib/validate_bridge/validator.go diff --git a/backend/go.mod b/backend/go.mod index 5a575910..428c2fd3 100644 --- a/backend/go.mod +++ b/backend/go.mod @@ -8,9 +8,13 @@ require ( github.com/fsnotify/fsnotify v1.4.7 github.com/gin-gonic/gin v1.4.0 github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8 + github.com/go-playground/locales v0.12.1 // indirect + github.com/go-playground/universal-translator v0.16.0 // indirect github.com/gomodule/redigo v2.0.0+incompatible + github.com/leodido/go-urn v1.1.0 // indirect github.com/pkg/errors v0.8.1 github.com/satori/go.uuid v1.2.0 github.com/smartystreets/goconvey v0.0.0-20190731233626-505e41936337 github.com/spf13/viper v1.4.0 + gopkg.in/go-playground/validator.v9 v9.29.1 ) diff --git a/backend/go.sum b/backend/go.sum index 910e18be..cc056d70 100644 --- a/backend/go.sum +++ b/backend/go.sum @@ -39,6 +39,10 @@ github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8/go.mod h1:xkRDCp4j0 github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= +github.com/go-playground/locales v0.12.1 h1:2FITxuFt/xuCNP1Acdhv62OzaCiviiE4kotfhkmOqEc= +github.com/go-playground/locales v0.12.1/go.mod h1:IUMDtCfWo/w/mtMfIE/IG2K+Ey3ygWanZIBtBW0W2TM= +github.com/go-playground/universal-translator v0.16.0 h1:X++omBR/4cE2MNg91AoC3rmGrCjJ8eAeUP/K/EKx4DM= +github.com/go-playground/universal-translator v0.16.0/go.mod h1:1AnU7NaIRDWWzGEKwgtJRd2xk99HeFyHw3yid4rvQIY= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= @@ -77,6 +81,8 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/leodido/go-urn v1.1.0 h1:Sm1gr51B1kKyfD2BlRcLSiEkffoG96g6TPv6eRoEiB8= +github.com/leodido/go-urn v1.1.0/go.mod h1:+cyI34gQWZcE1eQU7NVgKkkzdXDQHr1dBMtdAPozLkw= github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDePerRcY= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ= @@ -202,6 +208,8 @@ gopkg.in/go-playground/assert.v1 v1.2.1 h1:xoYuJVE7KT85PYWrN730RguIQO0ePzVRfFMXa gopkg.in/go-playground/assert.v1 v1.2.1/go.mod h1:9RXL0bg/zibRAgZUYszZSwO/z8Y/a8bDuhia5mkpMnE= gopkg.in/go-playground/validator.v8 v8.18.2 h1:lFB4DoMU6B626w8ny76MV7VX6W2VHct2GVOI3xgiMrQ= gopkg.in/go-playground/validator.v8 v8.18.2/go.mod h1:RX2a/7Ha8BgOhfk7j780h4/u/RRjR0eouCJSH80/M2Y= +gopkg.in/go-playground/validator.v9 v9.29.1 h1:SvGtYmN60a5CVKTOzMSyfzWDeZRxRuGvRQyEAKbw1xc= +gopkg.in/go-playground/validator.v9 v9.29.1/go.mod h1:+c9/zcJMFNgbLvly1L1V+PpxWdVbfP1avr/N00E2vyQ= gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= diff --git a/backend/lib/validate_bridge/validator.go b/backend/lib/validate_bridge/validator.go new file mode 100644 index 00000000..509dc475 --- /dev/null +++ b/backend/lib/validate_bridge/validator.go @@ -0,0 +1,54 @@ +package validate_bridge + +import ( + "reflect" + "sync" + + "github.com/gin-gonic/gin/binding" + "gopkg.in/go-playground/validator.v9" +) + +type DefaultValidator struct { + once sync.Once + validate *validator.Validate +} + +var _ binding.StructValidator = &DefaultValidator{validate: validator.New()} + +func (v *DefaultValidator) ValidateStruct(obj interface{}) error { + if kindOfData(obj) == reflect.Struct { + + v.lazyinit() + + if err := v.validate.Struct(obj); err != nil { + return err + } + } + + return nil +} + +func (v *DefaultValidator) Engine() interface{} { + v.lazyinit() + return v.validate +} + +func (v *DefaultValidator) lazyinit() { + v.once.Do(func() { + v.validate = validator.New() + v.validate.SetTagName("binding") + + // add any custom validations etc. here + }) +} + +func kindOfData(data interface{}) reflect.Kind { + + value := reflect.ValueOf(data) + valueType := value.Kind() + + if valueType == reflect.Ptr { + valueType = value.Elem().Kind() + } + return valueType +} diff --git a/backend/main.go b/backend/main.go index 896bc8f9..bf98674e 100644 --- a/backend/main.go +++ b/backend/main.go @@ -3,16 +3,19 @@ package main import ( "crawlab/config" "crawlab/database" + "crawlab/lib/validate_bridge" "crawlab/middlewares" "crawlab/routes" "crawlab/services" "github.com/apex/log" "github.com/gin-gonic/gin" + "github.com/gin-gonic/gin/binding" "github.com/spf13/viper" "runtime/debug" ) func main() { + binding.Validator = new(validate_bridge.DefaultValidator) app := gin.Default() // 初始化配置 From a9346a093400fa919725bf3b8dc8cc5372ec4bbc Mon Sep 17 00:00:00 2001 From: yaziming Date: Sun, 1 Sep 2019 17:18:08 +0800 Subject: [PATCH 34/46] backend: 1. Mongo dial add 5 seconds connection timeout. 2. Redis uses connection pool mode. 3. Redis pool new connection have 10 seconds write timeout and read timeout and connection timeout. --- backend/database/mongo.go | 3 +- backend/database/pubsub.go | 145 +++++++++++++++++++------------------ backend/database/redis.go | 75 +++++++++---------- backend/services/log.go | 2 +- backend/services/node.go | 59 ++++++++------- backend/services/spider.go | 36 ++++----- backend/services/system.go | 2 +- backend/services/task.go | 2 +- backend/utils/file.go | 2 +- 9 files changed, 166 insertions(+), 160 deletions(-) diff --git a/backend/database/mongo.go b/backend/database/mongo.go index 6b155791..1c2d6433 100644 --- a/backend/database/mongo.go +++ b/backend/database/mongo.go @@ -3,6 +3,7 @@ package database import ( "github.com/globalsign/mgo" "github.com/spf13/viper" + "time" ) var Session *mgo.Session @@ -44,7 +45,7 @@ func InitMongo() error { } else { uri = "mongodb://" + mongoUsername + ":" + mongoPassword + "@" + mongoHost + ":" + mongoPort + "/" + mongoDb + "?authSource=" + mongoAuth } - sess, err := mgo.Dial(uri) + sess, err := mgo.DialWithTimeout(uri, time.Second*5) if err != nil { return err } diff --git a/backend/database/pubsub.go b/backend/database/pubsub.go index 01e52fa1..152dc9a3 100644 --- a/backend/database/pubsub.go +++ b/backend/database/pubsub.go @@ -1,90 +1,97 @@ package database import ( - "errors" + "context" "fmt" "github.com/apex/log" "github.com/gomodule/redigo/redis" + errors2 "github.com/pkg/errors" "time" - "unsafe" ) -type SubscribeCallback func(channel, message string) +type ConsumeFunc func(message redis.Message) error -type Subscriber struct { - client redis.PubSubConn - cbMap map[string]SubscribeCallback -} - -func (c *Subscriber) Connect() { - conn, err := GetRedisConn() - if err != nil { - log.Fatalf("redis dial failed.") - } - - c.client = redis.PubSubConn{Conn: conn} - c.cbMap = make(map[string]SubscribeCallback) - - //retry connect redis 5 times, or panic - index := 0 - go func(i int) { - for { - log.Debug("wait...") - switch res := c.client.Receive().(type) { - case redis.Message: - i = 0 - channel := (*string)(unsafe.Pointer(&res.Channel)) - message := (*string)(unsafe.Pointer(&res.Data)) - c.cbMap[*channel](*channel, *message) - case redis.Subscription: - fmt.Printf("%s: %s %d\n", res.Channel, res.Kind, res.Count) - case error: - log.Error("error handle redis connection...") - - time.Sleep(2 * time.Second) - if i > 5 { - panic(errors.New("redis connection failed too many times, panic")) - } - con, err := GetRedisConn() - i += 1 - if err != nil { - log.Error("redis dial failed") - continue - } - c.client = redis.PubSubConn{Conn: con} - - continue - } - } - }(index) - -} - -func (c *Subscriber) Close() { - err := c.client.Close() +func (r *Redis) Close() { + err := r.pool.Close() if err != nil { log.Errorf("redis close error.") } } +func (r *Redis) subscribe(ctx context.Context, consume ConsumeFunc, channel ...string) error { + psc := redis.PubSubConn{Conn: r.pool.Get()} + if err := psc.Subscribe(redis.Args{}.AddFlat(channel)); err != nil { + return err + } + done := make(chan error, 1) + tick := time.NewTicker(time.Second * 3) + defer tick.Stop() + go func() { + defer func() { _ = psc.Close() }() + for { + switch msg := psc.Receive().(type) { + case error: + done <- fmt.Errorf("redis pubsub receive err: %v", msg) + return + case redis.Message: + fmt.Println(msg) + if err := consume(msg); err != nil { + fmt.Printf("redis pubsub consume message err: %v", err) + continue + } + case redis.Subscription: + fmt.Println(msg) + // + //if msg.Count == 0 { + // // all channels are unsubscribed + // return + //} + } -func (c *Subscriber) Subscribe(channel interface{}, cb SubscribeCallback) { - err := c.client.Subscribe(channel) - if err != nil { - log.Fatalf("redis Subscribe error.") + } + }() + // start a new goroutine to receive message + for { + select { + case <-ctx.Done(): + if err := psc.Unsubscribe(); err != nil { + fmt.Printf("redis pubsub unsubscribe err: %v", err) + } + return nil + case <-tick.C: + //fmt.Printf("ping message \n") + if err := psc.Ping(""); err != nil { + done <- err + } + case err := <-done: + close(done) + return err + } } - c.cbMap[channel.(string)] = cb } +func (r *Redis) Subscribe(ctx context.Context, consume ConsumeFunc, channel ...string) error { + index := 0 + go func() { + for { + err := r.subscribe(ctx, consume, channel...) + fmt.Println(err) -func Publish(channel string, msg string) error { - c, err := GetRedisConn() - if err != nil { - return err - } - - if _, err := c.Do("PUBLISH", channel, msg); err != nil { - return err - } - + if err == nil { + break + } + time.Sleep(5 * time.Second) + index += 1 + fmt.Printf("try reconnect %d times \n", index) + } + }() return nil } +func (r *Redis) Publish(channel, message string) (n int, err error) { + conn := r.pool.Get() + defer func() { _ = conn.Close() }() + n, err = redis.Int(conn.Do("PUBLISH", channel, message)) + if err != nil { + return 0, errors2.Wrapf(err, "redis publish %s %s", channel, message) + } + return +} diff --git a/backend/database/redis.go b/backend/database/redis.go index ffebf776..d159cccd 100644 --- a/backend/database/redis.go +++ b/backend/database/redis.go @@ -1,24 +1,24 @@ package database import ( + "fmt" "github.com/gomodule/redigo/redis" "github.com/spf13/viper" "runtime/debug" + "time" ) -var RedisClient = Redis{} - -type ConsumeFunc func(channel string, message []byte) error +var RedisClient *Redis type Redis struct { + pool *redis.Pool } +func NewRedisClient() *Redis { + return &Redis{pool: NewRedisPool()} +} func (r *Redis) RPush(collection string, value interface{}) error { - c, err := GetRedisConn() - if err != nil { - debug.PrintStack() - return err - } + c := r.pool.Get() defer c.Close() if _, err := c.Do("RPUSH", collection, value); err != nil { @@ -29,11 +29,7 @@ func (r *Redis) RPush(collection string, value interface{}) error { } func (r *Redis) LPop(collection string) (string, error) { - c, err := GetRedisConn() - if err != nil { - debug.PrintStack() - return "", err - } + c := r.pool.Get() defer c.Close() value, err2 := redis.String(c.Do("LPOP", collection)) @@ -44,11 +40,7 @@ func (r *Redis) LPop(collection string) (string, error) { } func (r *Redis) HSet(collection string, key string, value string) error { - c, err := GetRedisConn() - if err != nil { - debug.PrintStack() - return err - } + c := r.pool.Get() defer c.Close() if _, err := c.Do("HSET", collection, key, value); err != nil { @@ -59,11 +51,7 @@ func (r *Redis) HSet(collection string, key string, value string) error { } func (r *Redis) HGet(collection string, key string) (string, error) { - c, err := GetRedisConn() - if err != nil { - debug.PrintStack() - return "", err - } + c := r.pool.Get() defer c.Close() value, err2 := redis.String(c.Do("HGET", collection, key)) @@ -74,11 +62,7 @@ func (r *Redis) HGet(collection string, key string) (string, error) { } func (r *Redis) HDel(collection string, key string) error { - c, err := GetRedisConn() - if err != nil { - debug.PrintStack() - return err - } + c := r.pool.Get() defer c.Close() if _, err := c.Do("HDEL", collection, key); err != nil { @@ -88,11 +72,7 @@ func (r *Redis) HDel(collection string, key string) error { } func (r *Redis) HKeys(collection string) ([]string, error) { - c, err := GetRedisConn() - if err != nil { - debug.PrintStack() - return []string{}, err - } + c := r.pool.Get() defer c.Close() value, err2 := redis.Strings(c.Do("HKeys", collection)) @@ -102,7 +82,7 @@ func (r *Redis) HKeys(collection string) ([]string, error) { return value, nil } -func GetRedisConn() (redis.Conn, error) { +func NewRedisPool() *redis.Pool { var address = viper.GetString("redis.address") var port = viper.GetString("redis.port") var database = viper.GetString("redis.database") @@ -114,14 +94,31 @@ func GetRedisConn() (redis.Conn, error) { } else { url = "redis://x:" + password + "@" + address + ":" + port + "/" + database } - c, err := redis.DialURL(url) - if err != nil { - debug.PrintStack() - return c, err + fmt.Println(url) + return &redis.Pool{ + Dial: func() (conn redis.Conn, e error) { + return redis.DialURL(url, + redis.DialConnectTimeout(time.Second*10), + redis.DialReadTimeout(time.Second*10), + redis.DialWriteTimeout(time.Second*10), + ) + }, + TestOnBorrow: func(c redis.Conn, t time.Time) error { + if time.Since(t) < time.Minute { + return nil + } + _, err := c.Do("PING") + return err + }, + MaxIdle: 10, + MaxActive: 0, + IdleTimeout: 300 * time.Second, + Wait: false, + MaxConnLifetime: 0, } - return c, nil } func InitRedis() error { + RedisClient = NewRedisClient() return nil } diff --git a/backend/services/log.go b/backend/services/log.go index a83926f2..da1d72e1 100644 --- a/backend/services/log.go +++ b/backend/services/log.go @@ -71,7 +71,7 @@ func GetRemoteLog(task model.Task) (logStr string, err error) { // 发布获取日志消息 channel := "nodes:" + task.NodeId.Hex() - if err := database.Publish(channel, string(msgBytes)); err != nil { + if _, err := database.RedisClient.Publish(channel, string(msgBytes)); err != nil { log.Errorf(err.Error()) return "", err } diff --git a/backend/services/node.go b/backend/services/node.go index 9685a1bb..eb24f759 100644 --- a/backend/services/node.go +++ b/backend/services/node.go @@ -1,6 +1,7 @@ package services import ( + "context" "crawlab/constants" "crawlab/database" "crawlab/lib/cron" @@ -10,6 +11,7 @@ import ( "fmt" "github.com/apex/log" "github.com/globalsign/mgo/bson" + "github.com/gomodule/redigo/redis" "github.com/spf13/viper" "runtime/debug" "time" @@ -258,13 +260,12 @@ func UpdateNodeData() { } } -func MasterNodeCallback(channel string, msgStr string) { +func MasterNodeCallback(message redis.Message) (err error) { // 反序列化 var msg NodeMessage - if err := json.Unmarshal([]byte(msgStr), &msg); err != nil { - log.Errorf(err.Error()) - debug.PrintStack() - return + if err := json.Unmarshal(message.Data, &msg); err != nil { + + return err } if msg.Type == constants.MsgTypeGetLog { @@ -281,16 +282,15 @@ func MasterNodeCallback(channel string, msgStr string) { sysInfoBytes, _ := json.Marshal(&msg.SysInfo) ch <- string(sysInfoBytes) } + return nil } -func WorkerNodeCallback(channel string, msgStr string) { +func WorkerNodeCallback(message redis.Message) (err error) { // 反序列化 msg := NodeMessage{} - fmt.Println(msgStr) - if err := json.Unmarshal([]byte(msgStr), &msg); err != nil { - log.Errorf(err.Error()) - debug.PrintStack() - return + if err := json.Unmarshal(message.Data, &msg); err != nil { + + return err } if msg.Type == constants.MsgTypeGetLog { @@ -317,16 +317,14 @@ func WorkerNodeCallback(channel string, msgStr string) { // 序列化 msgSdBytes, err := json.Marshal(&msgSd) if err != nil { - log.Errorf(err.Error()) - debug.PrintStack() - return + return err } // 发布消息给主节点 log.Info("publish get log msg to master") - if err := database.Publish("nodes:master", string(msgSdBytes)); err != nil { - log.Errorf(err.Error()) - return + if _, err := database.RedisClient.Publish("nodes:master", string(msgSdBytes)); err != nil { + + return err } } else if msg.Type == constants.MsgTypeCancelTask { // 取消任务 @@ -336,8 +334,7 @@ func WorkerNodeCallback(channel string, msgStr string) { // 获取环境信息 sysInfo, err := GetLocalSystemInfo() if err != nil { - log.Errorf(err.Error()) - return + return err } msgSd := NodeMessage{ Type: constants.MsgTypeGetSystemInfo, @@ -348,14 +345,14 @@ func WorkerNodeCallback(channel string, msgStr string) { if err != nil { log.Errorf(err.Error()) debug.PrintStack() - return + return err } - fmt.Println(msgSd) - if err := database.Publish("nodes:master", string(msgSdBytes)); err != nil { + if _, err := database.RedisClient.Publish("nodes:master", string(msgSdBytes)); err != nil { log.Errorf(err.Error()) - return + return err } } + return } // 初始化节点服务 @@ -373,25 +370,27 @@ func InitNodeService() error { // 首次更新节点数据(注册到Redis) UpdateNodeData() - // 消息订阅 - var sub database.Subscriber - sub.Connect() - // 获取当前节点 node, err := GetCurrentNode() if err != nil { log.Errorf(err.Error()) return err } - + ctx := context.Background() if IsMaster() { // 如果为主节点,订阅主节点通信频道 channel := "nodes:master" - sub.Subscribe(channel, MasterNodeCallback) + err := database.RedisClient.Subscribe(ctx, MasterNodeCallback, channel) + if err != nil { + return err + } } else { // 若为工作节点,订阅单独指定通信频道 channel := "nodes:" + node.Id.Hex() - sub.Subscribe(channel, WorkerNodeCallback) + err := database.RedisClient.Subscribe(ctx, WorkerNodeCallback, channel) + if err != nil { + return err + } } // 如果为主节点,每30秒刷新所有节点信息 diff --git a/backend/services/spider.go b/backend/services/spider.go index 47c1fa33..61a561a9 100644 --- a/backend/services/spider.go +++ b/backend/services/spider.go @@ -1,6 +1,7 @@ package services import ( + "context" "crawlab/constants" "crawlab/database" "crawlab/lib/cron" @@ -11,6 +12,7 @@ import ( "github.com/apex/log" "github.com/globalsign/mgo" "github.com/globalsign/mgo/bson" + "github.com/gomodule/redigo/redis" "github.com/pkg/errors" "github.com/satori/go.uuid" "github.com/spf13/viper" @@ -142,10 +144,7 @@ func ZipSpider(spider model.Spider) (filePath string, err error) { // 临时文件路径 randomId := uuid.NewV4() - if err != nil { - debug.PrintStack() - return "", err - } + filePath = filepath.Join( viper.GetString("other.tmppath"), randomId.String()+".zip", @@ -302,7 +301,7 @@ func PublishSpider(spider model.Spider) (err error) { return } channel := "files:upload" - if err = database.Publish(channel, string(msgStr)); err != nil { + if _, err = database.RedisClient.Publish(channel, string(msgStr)); err != nil { log.Errorf(err.Error()) debug.PrintStack() return @@ -312,16 +311,16 @@ func PublishSpider(spider model.Spider) (err error) { } // 上传爬虫回调 -func OnFileUpload(channel string, msgStr string) { +func OnFileUpload(message redis.Message) (err error) { s, gf := database.GetGridFs("files") defer s.Close() // 反序列化消息 var msg SpiderUploadMessage - if err := json.Unmarshal([]byte(msgStr), &msg); err != nil { + if err := json.Unmarshal(message.Data, &msg); err != nil { log.Errorf(err.Error()) debug.PrintStack() - return + return err } // 从GridFS获取该文件 @@ -329,7 +328,7 @@ func OnFileUpload(channel string, msgStr string) { if err != nil { log.Errorf("open file id: " + msg.FileId + ", spider id:" + msg.SpiderId + ", error: " + err.Error()) debug.PrintStack() - return + return err } defer f.Close() @@ -342,7 +341,7 @@ func OnFileUpload(channel string, msgStr string) { if err != nil { log.Errorf(err.Error()) debug.PrintStack() - return + return err } defer tmpFile.Close() @@ -350,7 +349,7 @@ func OnFileUpload(channel string, msgStr string) { if _, err := io.Copy(tmpFile, f); err != nil { log.Errorf(err.Error()) debug.PrintStack() - return + return err } // 解压缩临时文件到目标文件夹 @@ -361,22 +360,23 @@ func OnFileUpload(channel string, msgStr string) { if err := utils.DeCompress(tmpFile, dstPath); err != nil { log.Errorf(err.Error()) debug.PrintStack() - return + return err } // 关闭临时文件 if err := tmpFile.Close(); err != nil { log.Errorf(err.Error()) debug.PrintStack() - return + return err } // 删除临时文件 if err := os.Remove(tmpFilePath); err != nil { log.Errorf(err.Error()) debug.PrintStack() - return + return err } + return nil } // 启动爬虫服务 @@ -401,9 +401,11 @@ func InitSpiderService() error { // 订阅文件上传 channel := "files:upload" - var sub database.Subscriber - sub.Connect() - sub.Subscribe(channel, OnFileUpload) + + //sub.Connect() + ctx := context.Background() + return database.RedisClient.Subscribe(ctx, OnFileUpload, channel) + } // 启动定时任务 diff --git a/backend/services/system.go b/backend/services/system.go index 5f50dec9..ff177aa0 100644 --- a/backend/services/system.go +++ b/backend/services/system.go @@ -112,7 +112,7 @@ func GetRemoteSystemInfo(id string) (sysInfo model.SystemInfo, err error) { // 序列化 msgBytes, _ := json.Marshal(&msg) - if err := database.Publish("nodes:"+id, string(msgBytes)); err != nil { + if _, err := database.RedisClient.Publish("nodes:"+id, string(msgBytes)); err != nil { return model.SystemInfo{}, err } diff --git a/backend/services/task.go b/backend/services/task.go index 1b0a5676..6ba6b257 100644 --- a/backend/services/task.go +++ b/backend/services/task.go @@ -466,7 +466,7 @@ func CancelTask(id string) (err error) { } // 发布消息 - if err := database.Publish("nodes:"+task.NodeId.Hex(), string(msgBytes)); err != nil { + if _, err := database.RedisClient.Publish("nodes:"+task.NodeId.Hex(), string(msgBytes)); err != nil { return err } } diff --git a/backend/utils/file.go b/backend/utils/file.go index 9a4300a1..dda73c13 100644 --- a/backend/utils/file.go +++ b/backend/utils/file.go @@ -179,11 +179,11 @@ func _Compress(file *os.File, prefix string, zw *zip.Writer) error { } } else { header, err := zip.FileInfoHeader(info) - header.Name = prefix + "/" + header.Name if err != nil { debug.PrintStack() return err } + header.Name = prefix + "/" + header.Name writer, err := zw.CreateHeader(header) if err != nil { debug.PrintStack() From bc39f678904b7df4d8e3078392666c4393b1985b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Mon, 2 Sep 2019 09:40:24 +0800 Subject: [PATCH 35/46] =?UTF-8?q?fix=20=E7=88=AC=E8=99=AB=E5=88=97?= =?UTF-8?q?=E8=A1=A8=E5=9C=A8=E5=B0=8F=E5=B1=8F=E5=B9=95=E4=B8=8B=E9=94=99?= =?UTF-8?q?=E4=BD=8D=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- frontend/src/views/spider/SpiderList.vue | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/frontend/src/views/spider/SpiderList.vue b/frontend/src/views/spider/SpiderList.vue index 0380a6b0..74d1da2b 100644 --- a/frontend/src/views/spider/SpiderList.vue +++ b/frontend/src/views/spider/SpiderList.vue @@ -275,15 +275,15 @@ export default { }, // tableData, columns: [ - { name: 'name', label: 'Name', width: '180', align: 'left' }, + { name: 'name', label: 'Name', width: '160', align: 'left' }, // { name: 'site_name', label: 'Site', width: '140', align: 'left' }, { name: 'type', label: 'Spider Type', width: '120' }, // { name: 'cmd', label: 'Command Line', width: '200' }, { name: 'last_status', label: 'Last Status', width: '120' }, - { name: 'last_run_ts', label: 'Last Run', width: '160' }, - { name: 'create_ts', label: 'Create Time', width: '160' }, - { name: 'update_ts', label: 'Update Time', width: '160' }, - { name: 'remark', label: 'Remark', width: '160' } + { name: 'last_run_ts', label: 'Last Run', width: '140' }, + { name: 'create_ts', label: 'Create Time', width: '140' }, + { name: 'update_ts', label: 'Update Time', width: '140' }, + { name: 'remark', label: 'Remark', width: '140' } // { name: 'last_7d_tasks', label: 'Last 7-Day Tasks', width: '80' }, // { name: 'last_5_errors', label: 'Last 5-Run Errors', width: '80' } ], From 033ca493aaa68674a46fc098d545ff9d7ae114b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Mon, 2 Sep 2019 10:40:16 +0800 Subject: [PATCH 36/46] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E6=8E=92=E5=BA=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/model/spider.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/model/spider.go b/backend/model/spider.go index c1ef7b6e..2ba4c8c5 100644 --- a/backend/model/spider.go +++ b/backend/model/spider.go @@ -99,7 +99,7 @@ func GetSpiderList(filter interface{}, skip int, limit int) ([]Spider, error) { // 获取爬虫列表 spiders := []Spider{} - if err := c.Find(filter).Skip(skip).Limit(limit).Sort("name asc").All(&spiders); err != nil { + if err := c.Find(filter).Skip(skip).Limit(limit).Sort("+name").All(&spiders); err != nil { debug.PrintStack() return spiders, err } From f9fdf5b8d804e850e21946cdb2df827be3059d5b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Mon, 2 Sep 2019 16:45:56 +0800 Subject: [PATCH 37/46] =?UTF-8?q?fix=20=E6=89=93=E5=8D=B0=E6=97=A5?= =?UTF-8?q?=E5=BF=97=E5=8F=AA=E6=9C=892KB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/services/log.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/services/log.go b/backend/services/log.go index a83926f2..a7935015 100644 --- a/backend/services/log.go +++ b/backend/services/log.go @@ -35,7 +35,7 @@ func GetLocalLog(logPath string) (fileBytes []byte, err error) { } defer f.Close() - const bufLen = 2048 + const bufLen = 2 * 1024 * 1024 logBuf := make([]byte, bufLen) off := int64(0) From 93dd3b714a7383aefaefb55b1a3e5a4cc38b1d6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Mon, 2 Sep 2019 17:37:48 +0800 Subject: [PATCH 38/46] =?UTF-8?q?fix=20=E5=A6=82=E6=9E=9C=E4=BB=8Edir?= =?UTF-8?q?=E8=AF=BB=E5=8F=96=E7=88=AC=E8=99=AB=E4=B8=BA=E7=A9=BA=EF=BC=8C?= =?UTF-8?q?=E5=88=99=E7=A7=BB=E9=99=A4=E6=89=80=E6=9C=89=E7=9A=84=E7=88=AC?= =?UTF-8?q?=E8=99=AB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/model/spider.go | 18 ++++++++++++++++++ backend/services/spider.go | 20 +++++++++++++------- 2 files changed, 31 insertions(+), 7 deletions(-) diff --git a/backend/model/spider.go b/backend/model/spider.go index 2ba4c8c5..e0e5f836 100644 --- a/backend/model/spider.go +++ b/backend/model/spider.go @@ -177,6 +177,24 @@ func RemoveSpider(id bson.ObjectId) error { return nil } +func RemoveAllSpider() error { + s, c := database.GetCol("spiders") + defer s.Close() + + spiders := []Spider{} + err := c.Find(nil).All(&spiders) + if err != nil { + log.Error("get all spiders error:" + err.Error()) + return err + } + for _, spider := range spiders { + if err := RemoveSpider(spider.Id); err != nil { + log.Error("remove spider error:" + err.Error()) + } + } + return nil +} + func GetSpiderCount() (int, error) { s, c := database.GetCol("spiders") defer s.Close() diff --git a/backend/services/spider.go b/backend/services/spider.go index 47c1fa33..e75b0418 100644 --- a/backend/services/spider.go +++ b/backend/services/spider.go @@ -89,16 +89,25 @@ func GetSpidersFromDir() ([]model.Spider, error) { // 将爬虫保存到数据库 func SaveSpiders(spiders []model.Spider) error { - // 遍历爬虫列表 + s, c := database.GetCol("spiders") + defer s.Close() + + if len(spiders) == 0 { + err := model.RemoveAllSpider() + if err != nil { + log.Error("remove all spider error:" + err.Error()) + return err + } + log.Info("get spider from dir is empty,removed all spider") + return nil + } + // 如果该爬虫不存在于数据库,则保存爬虫到数据库 for _, spider := range spiders { // 忽略非自定义爬虫 if spider.Type != constants.Customized { continue } - // 如果该爬虫不存在于数据库,则保存爬虫到数据库 - s, c := database.GetCol("spiders") - defer s.Close() var spider_ *model.Spider if err := c.Find(bson.M{"src": spider.Src}).One(&spider_); err != nil { // 不存在 @@ -106,11 +115,8 @@ func SaveSpiders(spiders []model.Spider) error { debug.PrintStack() return err } - } else { - // 存在 } } - return nil } From 8b237ddc2ab065ba6c8ebed2d318575143c0b453 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Mon, 2 Sep 2019 18:04:47 +0800 Subject: [PATCH 39/46] =?UTF-8?q?fix=20=E6=97=A0=E6=B3=95=E6=AD=A3?= =?UTF-8?q?=E5=B8=B8=E5=88=A0=E9=99=A4=E6=9C=89=E9=97=AE=E9=A2=98=E7=9A=84?= =?UTF-8?q?=E7=88=AC=E8=99=AB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/services/spider.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/services/spider.go b/backend/services/spider.go index e75b0418..a4f16bc8 100644 --- a/backend/services/spider.go +++ b/backend/services/spider.go @@ -259,8 +259,8 @@ func PublishAllSpiders() error { for _, spider := range spiders { // 发布爬虫 if err := PublishSpider(spider); err != nil { - log.Errorf(err.Error()) - return err + log.Errorf("publish spider error:" + err.Error()) + // return err } } From b5084c964b3119da6b4ee24e5317832c8fbd0a21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Mon, 2 Sep 2019 18:14:34 +0800 Subject: [PATCH 40/46] =?UTF-8?q?=E8=BF=98=E5=8E=9F=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/services/spider.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/services/spider.go b/backend/services/spider.go index a4f16bc8..1796e32a 100644 --- a/backend/services/spider.go +++ b/backend/services/spider.go @@ -362,7 +362,7 @@ func OnFileUpload(channel string, msgStr string) { // 解压缩临时文件到目标文件夹 dstPath := filepath.Join( viper.GetString("spider.path"), - //strings.Replace(msg.FileName, ".zip", "", -1), + strings.Replace(msg.FileName, ".zip", "", -1), ) if err := utils.DeCompress(tmpFile, dstPath); err != nil { log.Errorf(err.Error()) From 10c3488cafa113ac48526f21b7545ad77584726c Mon Sep 17 00:00:00 2001 From: yaziming Date: Mon, 2 Sep 2019 18:32:47 +0800 Subject: [PATCH 41/46] break loop when subscribe count is 0 --- backend/database/pubsub.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/backend/database/pubsub.go b/backend/database/pubsub.go index 152dc9a3..c5fdbda9 100644 --- a/backend/database/pubsub.go +++ b/backend/database/pubsub.go @@ -40,11 +40,11 @@ func (r *Redis) subscribe(ctx context.Context, consume ConsumeFunc, channel ...s } case redis.Subscription: fmt.Println(msg) - // - //if msg.Count == 0 { - // // all channels are unsubscribed - // return - //} + + if msg.Count == 0 { + // all channels are unsubscribed + return + } } } @@ -54,9 +54,9 @@ func (r *Redis) subscribe(ctx context.Context, consume ConsumeFunc, channel ...s select { case <-ctx.Done(): if err := psc.Unsubscribe(); err != nil { - fmt.Printf("redis pubsub unsubscribe err: %v", err) + fmt.Printf("redis pubsub unsubscribe err: %v \n", err) } - return nil + done <- nil case <-tick.C: //fmt.Printf("ping message \n") if err := psc.Ping(""); err != nil { From 352fac4096ea302935694cc9f3717db1cbd4f4a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Mon, 2 Sep 2019 20:49:00 +0800 Subject: [PATCH 42/46] =?UTF-8?q?fix=20=E5=88=A0=E9=99=A4spider=E6=B2=A1?= =?UTF-8?q?=E6=9C=89=E5=88=A0=E9=99=A4task=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/model/task.go | 17 ++++++++++++++++- backend/routes/spider.go | 6 ++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/backend/model/task.go b/backend/model/task.go index 8ae782b5..968055a6 100644 --- a/backend/model/task.go +++ b/backend/model/task.go @@ -190,6 +190,21 @@ func RemoveTask(id string) error { return nil } +func RemoveTaskBySpiderId(id string) error { + tasks, err := GetTaskList(bson.M{"spider_id": id}, 0, constants.Infinite, "-create_ts") + if err != nil { + log.Error("get tasks error:" + err.Error()) + } + + for _, task := range tasks { + if err := RemoveTask(task.Id); err != nil { + log.Error("remove task error:" + err.Error()) + continue + } + } + return nil +} + func GetTaskCount(query interface{}) (int, error) { s, c := database.GetCol("tasks") defer s.Close() @@ -207,7 +222,7 @@ func GetDailyTaskStats(query bson.M) ([]TaskDailyItem, error) { defer s.Close() // 起始日期 - startDate := time.Now().Add(- 30 * 24 * time.Hour) + startDate := time.Now().Add(-30 * 24 * time.Hour) endDate := time.Now() // query diff --git a/backend/routes/spider.go b/backend/routes/spider.go index dceb2651..f1a3c9e5 100644 --- a/backend/routes/spider.go +++ b/backend/routes/spider.go @@ -229,6 +229,12 @@ func DeleteSpider(c *gin.Context) { return } + // 删除爬虫对应的task任务 + if err := model.RemoveTaskBySpiderId(spider.Id.Hex()); err != nil { + HandleError(http.StatusInternalServerError, c, err) + return + } + c.JSON(http.StatusOK, Response{ Status: "ok", Message: "success", From 361134f8ddea8ad692a0d7f5f9a3f2c082e897f1 Mon Sep 17 00:00:00 2001 From: yaziming Date: Mon, 2 Sep 2019 22:57:29 +0800 Subject: [PATCH 43/46] Remove redundant fmt.Println --- backend/database/redis.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/backend/database/redis.go b/backend/database/redis.go index d159cccd..ede229a2 100644 --- a/backend/database/redis.go +++ b/backend/database/redis.go @@ -1,7 +1,6 @@ package database import ( - "fmt" "github.com/gomodule/redigo/redis" "github.com/spf13/viper" "runtime/debug" @@ -94,7 +93,6 @@ func NewRedisPool() *redis.Pool { } else { url = "redis://x:" + password + "@" + address + ":" + port + "/" + database } - fmt.Println(url) return &redis.Pool{ Dial: func() (conn redis.Conn, e error) { return redis.DialURL(url, From cd78e6c745edcc0e2d0544c818dc0e92f5db964b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Tue, 3 Sep 2019 09:06:04 +0800 Subject: [PATCH 44/46] =?UTF-8?q?=E8=BF=98=E5=8E=9F=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/services/spider.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/services/spider.go b/backend/services/spider.go index fca2b200..87b4a1d5 100644 --- a/backend/services/spider.go +++ b/backend/services/spider.go @@ -361,7 +361,7 @@ func OnFileUpload(message redis.Message) (err error) { // 解压缩临时文件到目标文件夹 dstPath := filepath.Join( viper.GetString("spider.path"), - strings.Replace(msg.FileName, ".zip", "", -1), + // strings.Replace(msg.FileName, ".zip", "", -1), ) if err := utils.DeCompress(tmpFile, dstPath); err != nil { log.Errorf(err.Error()) From cb3d7263b2ec3844a5e729efffa10d227fbbc460 Mon Sep 17 00:00:00 2001 From: hantmac Date: Tue, 3 Sep 2019 12:47:18 +0800 Subject: [PATCH 45/46] bug fix:fix go test ./.. bug --- backend/mock/node_test.go | 6 ++++ backend/utils/file_test.go | 71 +++++++++++++++++++++++++++++++++----- 2 files changed, 69 insertions(+), 8 deletions(-) diff --git a/backend/mock/node_test.go b/backend/mock/node_test.go index cc2f94e5..eca321ad 100644 --- a/backend/mock/node_test.go +++ b/backend/mock/node_test.go @@ -41,6 +41,12 @@ func init() { app.DELETE("/tasks/:id", DeleteTask) // 删除任务 app.GET("/tasks/:id/results",GetTaskResults) // 任务结果 app.GET("/tasks/:id/results/download", DownloadTaskResultsCsv) // 下载任务结果 + app.GET("/spiders", GetSpiderList) // 爬虫列表 + app.GET("/spiders/:id", GetSpider) // 爬虫详情 + app.POST("/spiders/:id", PostSpider) // 修改爬虫 + app.DELETE("/spiders/:id",DeleteSpider) // 删除爬虫 + app.GET("/spiders/:id/tasks",GetSpiderTasks) // 爬虫任务列表 + app.GET("/spiders/:id/dir",GetSpiderDir) // 爬虫目录 } //mock test, test data in ./mock diff --git a/backend/utils/file_test.go b/backend/utils/file_test.go index 484366f5..64f2df6d 100644 --- a/backend/utils/file_test.go +++ b/backend/utils/file_test.go @@ -1,8 +1,12 @@ package utils import ( + "archive/zip" . "github.com/smartystreets/goconvey/convey" + "io" + "log" "os" + "runtime/debug" "testing" ) @@ -38,9 +42,13 @@ func TestIsDir(t *testing.T) { } func TestCompress(t *testing.T) { - var pathString = "../utils" + err := os.Mkdir("testCompress", os.ModePerm) + if err != nil { + t.Error("create testCompress failed") + } + var pathString = "testCompress" var files []*os.File - var disPath = "../utils/test" + var disPath = "testCompress" file, err := os.Open(pathString) if err != nil { t.Error("open source path failed") @@ -52,15 +60,60 @@ func TestCompress(t *testing.T) { So(er, ShouldEqual, nil) }) }) + os.RemoveAll("testCompress") } - -// 测试之前需存在有效的test(.zip)文件 -func TestDeCompress(t *testing.T) { - var tmpFilePath = "./test" - tmpFile, err := os.OpenFile(tmpFilePath, os.O_RDONLY, 0777) +func Zip(zipFile string, fileList []string) error { + // 创建 zip 包文件 + fw, err := os.Create(zipFile) if err != nil { - t.Fatal("open zip file failed") + log.Fatal() + } + defer fw.Close() + + // 实例化新的 zip.Writer + zw := zip.NewWriter(fw) + defer func() { + // 检测一下是否成功关闭 + if err := zw.Close(); err != nil { + log.Fatalln(err) + } + }() + + for _, fileName := range fileList { + fr, err := os.Open(fileName) + if err != nil { + return err + } + fi, err := fr.Stat() + if err != nil { + return err + } + // 写入文件的头信息 + fh, err := zip.FileInfoHeader(fi) + w, err := zw.CreateHeader(fh) + if err != nil { + return err + } + // 写入文件内容 + _, err = io.Copy(w, fr) + if err != nil { + return err + } + } + return nil +} + +func TestDeCompress(t *testing.T) { + err := os.Mkdir("testDeCompress", os.ModePerm) + err = Zip("demo.zip", []string{}) + if err != nil { + t.Error("create zip file failed") + } + tmpFile, err := os.OpenFile("demo.zip", os.O_RDONLY, 0777) + if err != nil { + debug.PrintStack() + t.Error("open demo.zip failed") } var dstPath = "./testDeCompress" Convey("Test DeCopmress func", t, func() { @@ -68,5 +121,7 @@ func TestDeCompress(t *testing.T) { err := DeCompress(tmpFile, dstPath) So(err, ShouldEqual, nil) }) + os.RemoveAll("testDeCompress") + os.Remove("demo.zip") } From 71c83251e5186ef5f6801411b4f63113bb22f6e4 Mon Sep 17 00:00:00 2001 From: yaziming Date: Tue, 3 Sep 2019 01:16:36 +0800 Subject: [PATCH 46/46] feat(backend): add some feat for error message system errors support custom http code and support return invalid validated error --- backend/constants/errors.go | 9 +++- backend/errors/errors.go | 40 +++++++++++------ backend/services/context/context.go | 67 ++++++++++++++++++++--------- 3 files changed, 80 insertions(+), 36 deletions(-) diff --git a/backend/constants/errors.go b/backend/constants/errors.go index a6175319..a273cb75 100644 --- a/backend/constants/errors.go +++ b/backend/constants/errors.go @@ -1,8 +1,13 @@ package constants -import "crawlab/errors" +import ( + "crawlab/errors" + "net/http" +) var ( + ErrorMongoError = errors.NewSystemOPError(1001, "system error:[mongo]%s", http.StatusInternalServerError) //users - ErrorUserNotFound = errors.NewBusinessError(10001, "user not found.") + ErrorUserNotFound = errors.NewBusinessError(10001, "user not found.", http.StatusUnauthorized) + ErrorUsernameOrPasswordInvalid = errors.NewBusinessError(11001, "username or password invalid", http.StatusUnauthorized) ) diff --git a/backend/errors/errors.go b/backend/errors/errors.go index 0110808b..f191cd3e 100644 --- a/backend/errors/errors.go +++ b/backend/errors/errors.go @@ -1,6 +1,9 @@ package errors -import "fmt" +import ( + "fmt" + "net/http" +) type Scope int @@ -10,9 +13,10 @@ const ( ) type OPError struct { - Message string - Code int - Scope Scope + HttpCode int + Message string + Code int + Scope Scope } func (O OPError) Error() string { @@ -24,20 +28,28 @@ func (O OPError) Error() string { case ScopeBusiness: scope = "business" } - return fmt.Sprintf("%s : %d -> %s.", scope, O.Code, O.Message) + return fmt.Sprintf("%s error: [%d]%s.", scope, O.Code, O.Message) } -func NewSystemOPError(code int, message string) *OPError { +func NewSystemOPError(code int, message string, httpCodes ...int) *OPError { + httpCode := http.StatusOK + if len(httpCodes) > 0 { + httpCode = httpCodes[0] + } + return NewOpError(code, message, ScopeSystem, httpCode) +} +func NewOpError(code int, message string, scope Scope, httpCode int) *OPError { return &OPError{ - Message: message, - Code: code, - Scope: ScopeSystem, + Message: message, + Code: code, + Scope: scope, + HttpCode: httpCode, } } -func NewBusinessError(code int, message string) *OPError { - return &OPError{ - Message: message, - Code: code, - Scope: ScopeBusiness, +func NewBusinessError(code int, message string, httpCodes ...int) *OPError { + httpCode := http.StatusOK + if len(httpCodes) > 0 { + httpCode = httpCodes[0] } + return NewOpError(code, message, ScopeBusiness, httpCode) } diff --git a/backend/services/context/context.go b/backend/services/context/context.go index d5d2b6ad..ce8eb72e 100644 --- a/backend/services/context/context.go +++ b/backend/services/context/context.go @@ -8,6 +8,7 @@ import ( "github.com/apex/log" "github.com/gin-gonic/gin" errors2 "github.com/pkg/errors" + "gopkg.in/go-playground/validator.v9" "net/http" "runtime/debug" ) @@ -27,9 +28,12 @@ func (c *Context) User() *model.User { } return user } -func (c *Context) Success(data interface{}, meta interface{}) { - if meta == nil { +func (c *Context) Success(data interface{}, metas ...interface{}) { + var meta interface{} + if len(metas) == 0 { meta = gin.H{} + } else { + meta = metas[0] } if data == nil { data = gin.H{} @@ -38,33 +42,56 @@ func (c *Context) Success(data interface{}, meta interface{}) { "status": "ok", "message": "success", "data": data, + "meta": meta, "error": "", }) } +func (c *Context) Failed(err error, variables ...interface{}) { + c.failed(err, http.StatusOK, variables...) +} +func (c *Context) failed(err error, httpCode int, variables ...interface{}) { + errStr := err.Error() + if len(variables) > 0 { + errStr = fmt.Sprintf(errStr, variables...) + } + log.Errorf("handle error:" + errStr) + debug.PrintStack() + causeError := errors2.Cause(err) + switch causeError.(type) { + case errors.OPError: + opError := causeError.(errors.OPError) + + c.AbortWithStatusJSON(opError.HttpCode, gin.H{ + "status": "ok", + "message": "error", + "error": errStr, + }) + break + case validator.ValidationErrors: + validatorErrors := causeError.(validator.ValidationErrors) + //firstError := validatorErrors[0].(validator.FieldError) + c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{ + "status": "ok", + "message": "error", + "error": validatorErrors.Error(), + }) + break + default: + fmt.Println("deprecated....") + c.AbortWithStatusJSON(httpCode, gin.H{ + "status": "ok", + "message": "error", + "error": errStr, + }) + } +} func (c *Context) FailedWithError(err error, httpCode ...int) { var code = 200 if len(httpCode) > 0 { code = httpCode[0] } - log.Errorf("handle error:" + err.Error()) - debug.PrintStack() - switch errors2.Cause(err).(type) { - case errors.OPError: - c.AbortWithStatusJSON(code, gin.H{ - "status": "ok", - "message": "error", - "error": err.Error(), - }) - break - default: - fmt.Println("deprecated....") - c.AbortWithStatusJSON(code, gin.H{ - "status": "ok", - "message": "error", - "error": err.Error(), - }) - } + c.failed(err, code) }