diff --git a/Jenkinsfile b/Jenkinsfile index 16220039..b7441bea 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -48,8 +48,11 @@ pipeline { sh """ # 重启docker compose cd ./jenkins/${ENV:GIT_BRANCH} - docker-compose stop | true - docker-compose up -d + docker-compose stop master | true + docker-compose rm -f master | true + docker-compose stop worker | true + docker-compose rm -f worker | true + docker-compose up -d | true """ } } @@ -57,7 +60,7 @@ pipeline { steps { echo 'Cleanup...' sh """ - docker rmi `docker images | grep '' | grep -v IMAGE | awk '{ print \$3 }' | xargs` + docker rmi -f `docker images | grep '' | grep -v IMAGE | awk '{ print \$3 }' | xargs` """ } } diff --git a/backend/database/pubsub.go b/backend/database/pubsub.go index 4570e7b4..b100535f 100644 --- a/backend/database/pubsub.go +++ b/backend/database/pubsub.go @@ -1,9 +1,11 @@ package database import ( + "errors" "fmt" "github.com/apex/log" "github.com/gomodule/redigo/redis" + "time" "unsafe" ) @@ -23,22 +25,38 @@ func (c *Subscriber) Connect() { c.client = redis.PubSubConn{Conn: conn} c.cbMap = make(map[string]SubscribeCallback) - go func() { + //retry connect redis 5 times, or panic + index := 0 + go func(i int) { for { log.Debug("wait...") switch res := c.client.Receive().(type) { case redis.Message: + i = 0 channel := (*string)(unsafe.Pointer(&res.Channel)) message := (*string)(unsafe.Pointer(&res.Data)) c.cbMap[*channel](*channel, *message) case redis.Subscription: fmt.Printf("%s: %s %d\n", res.Channel, res.Kind, res.Count) case error: - log.Error("error handle...") + log.Error("error handle redis connection...") + + time.Sleep(2 * time.Second) + if i > 5 { + panic(errors.New("redis connection failed too many times, panic")) + } + con, err := GetRedisConn() + if err != nil { + log.Error("redis dial failed") + continue + } + c.client = redis.PubSubConn{Conn: con} + i += 1 + continue } } - }() + }(index) } diff --git a/backend/mock/node.go b/backend/mock/node.go index 878dbcfa..4857a96d 100644 --- a/backend/mock/node.go +++ b/backend/mock/node.go @@ -42,7 +42,7 @@ var NodeList = []model.Node{ var TaskList = []model.Task{ { Id: "1234", - SpiderId: bson.ObjectId("xx429e6c19f7abede924fee2"), + SpiderId: bson.ObjectId("5d429e6c19f7abede924fee2"), StartTs: time.Now(), FinishTs: time.Now(), Status: "进行中", @@ -61,7 +61,7 @@ var TaskList = []model.Task{ }, { Id: "5678", - SpiderId: bson.ObjectId("xx429e6c19f7abede924fddf"), + SpiderId: bson.ObjectId("5d429e6c19f7abede924fee2"), StartTs: time.Now(), FinishTs: time.Now(), Status: "进行中", diff --git a/backend/mock/node_test.go b/backend/mock/node_test.go index 0cfd77ed..cc2f94e5 100644 --- a/backend/mock/node_test.go +++ b/backend/mock/node_test.go @@ -35,6 +35,12 @@ func init() { app.PUT("/schedules", PutSchedule) // 创建定时任务 app.POST("/schedules/:id", PostSchedule) // 修改定时任务 app.DELETE("/schedules/:id", DeleteSchedule) // 删除定时任务 + app.GET("/tasks", GetTaskList) // 任务列表 + app.GET("/tasks/:id", GetTask) // 任务详情 + app.PUT("/tasks", PutTask) // 派发任务 + app.DELETE("/tasks/:id", DeleteTask) // 删除任务 + app.GET("/tasks/:id/results",GetTaskResults) // 任务结果 + app.GET("/tasks/:id/results/download", DownloadTaskResultsCsv) // 下载任务结果 } //mock test, test data in ./mock diff --git a/backend/mock/schedule_test.go b/backend/mock/schedule_test.go index d26a08d8..c24631b2 100644 --- a/backend/mock/schedule_test.go +++ b/backend/mock/schedule_test.go @@ -10,7 +10,6 @@ import ( "strings" "testing" "time" - "ucloudBilling/ucloud/log" ) func TestGetScheduleList(t *testing.T) { @@ -58,7 +57,6 @@ func TestDeleteSchedule(t *testing.T) { app.ServeHTTP(w, req) err := json.Unmarshal([]byte(w.Body.String()), &resp) - log.Info(w.Body.String()) if err != nil { t.Fatal("Unmarshal resp failed") } @@ -89,7 +87,6 @@ func TestPostSchedule(t *testing.T) { var resp Response var mongoId = "5d429e6c19f7abede924fee2" body,_ := json.Marshal(newItem) - log.Info(strings.NewReader(string(body))) w := httptest.NewRecorder() req,_ := http.NewRequest("POST", "/schedules/"+mongoId,strings.NewReader(string(body))) app.ServeHTTP(w, req) @@ -125,7 +122,6 @@ func TestPutSchedule(t *testing.T) { var resp Response body,_ := json.Marshal(newItem) - log.Info(strings.NewReader(string(body))) w := httptest.NewRecorder() req,_ := http.NewRequest("PUT", "/schedules",strings.NewReader(string(body))) app.ServeHTTP(w, req) diff --git a/backend/mock/task.go b/backend/mock/task.go index c4807247..84dece09 100644 --- a/backend/mock/task.go +++ b/backend/mock/task.go @@ -1 +1,224 @@ -package mock \ No newline at end of file +package mock + +import ( + "bytes" + "crawlab/constants" + "crawlab/model" + "crawlab/utils" + "encoding/csv" + "fmt" + "github.com/gin-gonic/gin" + "github.com/globalsign/mgo/bson" + "github.com/satori/go.uuid" + "net/http" +) + +type TaskListRequestData struct { + PageNum int `form:"page_num"` + PageSize int `form:"page_size"` + NodeId string `form:"node_id"` + SpiderId string `form:"spider_id"` +} + +type TaskResultsRequestData struct { + PageNum int `form:"page_num"` + PageSize int `form:"page_size"` +} + +func GetTaskList(c *gin.Context) { + // 绑定数据 + data := TaskListRequestData{} + + if err := c.ShouldBindQuery(&data); err != nil { + HandleError(http.StatusBadRequest, c, err) + return + } + if data.PageNum == 0 { + data.PageNum = 1 + } + if data.PageSize == 0 { + data.PageNum = 10 + } + + // 过滤条件 + query := bson.M{} + if data.NodeId != "" { + query["node_id"] = bson.ObjectIdHex(data.NodeId) + } + if data.SpiderId != "" { + query["spider_id"] = bson.ObjectIdHex(data.SpiderId) + } + + // 获取任务列表 + tasks := TaskList + + // 获取总任务数 + total := len(TaskList) + + c.JSON(http.StatusOK, ListResponse{ + Status: "ok", + Message: "success", + Total: total, + Data: tasks, + }) +} + +func GetTask(c *gin.Context) { + id := c.Param("id") + + var result model.Task + for _, task := range TaskList { + if task.Id == id { + result = task + } + } + c.JSON(http.StatusOK, Response{ + Status: "ok", + Message: "success", + Data: result, + }) +} + +func PutTask(c *gin.Context) { + // 生成任务ID,generate task ID + id := uuid.NewV4() + + // 绑定数据 + var t model.Task + if err := c.ShouldBindJSON(&t); err != nil { + HandleError(http.StatusBadRequest, c, err) + return + } + t.Id = id.String() + t.Status = constants.StatusPending + + // 如果没有传入node_id,则置为null + if t.NodeId.Hex() == "" { + t.NodeId = bson.ObjectIdHex(constants.ObjectIdNull) + } + + // 将任务存入数据库,put the task into database + fmt.Println("put the task into database") + + // 加入任务队列, put the task into task queue + fmt.Println("put the task into task queue") + + c.JSON(http.StatusOK, Response{ + Status: "ok", + Message: "success", + }) +} + +func DeleteTask(c *gin.Context) { + id := c.Param("id") + + for _, task := range TaskList { + if task.Id == id { + fmt.Println("delete the task") + } + } + + c.JSON(http.StatusOK, Response{ + Status: "ok", + Message: "success", + }) +} + +func GetTaskResults(c *gin.Context) { + id := c.Param("id") + + // 绑定数据 + data := TaskResultsRequestData{} + if err := c.ShouldBindQuery(&data); err != nil { + HandleError(http.StatusBadRequest, c, err) + return + } + + // 获取任务 + var task model.Task + for _, ta := range TaskList { + if ta.Id == id { + task = ta + } + } + + fmt.Println(task) + // 获取结果 + var results interface{} + total := len(TaskList) + + c.JSON(http.StatusOK, ListResponse{ + Status: "ok", + Message: "success", + Data: results, + Total: total, + }) +} + +func DownloadTaskResultsCsv(c *gin.Context) { + id := c.Param("id") + + // 获取任务 + var task model.Task + for _, ta := range TaskList { + if ta.Id == id { + task = ta + } + } + fmt.Println(task) + + // 获取结果 + var results []interface { + } + + // 字段列表 + var columns []string + if len(results) == 0 { + columns = []string{} + } else { + item := results[0].(bson.M) + for key := range item { + columns = append(columns, key) + } + } + + // 缓冲 + bytesBuffer := &bytes.Buffer{} + + // 写入UTF-8 BOM,避免使用Microsoft Excel打开乱码 + bytesBuffer.Write([]byte("\xEF\xBB\xBF")) + + writer := csv.NewWriter(bytesBuffer) + + // 写入表头 + if err := writer.Write(columns); err != nil { + HandleError(http.StatusInternalServerError, c, err) + return + } + + // 写入内容 + for _, result := range results { + // 将result转换为[]string + item := result.(bson.M) + var values []string + for _, col := range columns { + value := utils.InterfaceToString(item[col]) + values = append(values, value) + } + + // 写入数据 + if err := writer.Write(values); err != nil { + HandleError(http.StatusInternalServerError, c, err) + return + } + } + + // 此时才会将缓冲区数据写入 + writer.Flush() + + // 设置下载的文件名 + c.Writer.Header().Set("Content-Disposition", "attachment;filename=data.csv") + + // 设置文件类型以及输出数据 + c.Data(http.StatusOK, "text/csv", bytesBuffer.Bytes()) +} diff --git a/backend/mock/task_test.go b/backend/mock/task_test.go new file mode 100644 index 00000000..103ed643 --- /dev/null +++ b/backend/mock/task_test.go @@ -0,0 +1,138 @@ +package mock + +import ( + "crawlab/model" + "encoding/json" + "github.com/globalsign/mgo/bson" + . "github.com/smartystreets/goconvey/convey" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" +) + +func TestGetTaskList(t *testing.T) { + //var teskListRequestFrom = TaskListRequestData{ + // PageNum: 2, + // PageSize: 10, + // NodeId: "434221grfsf", + // SpiderId: "fdfewqrftea", + //} + + var resp ListResponse + w := httptest.NewRecorder() + req, _ := http.NewRequest("GET", "/tasks?PageNum=2&PageSize=10&NodeId=342dfsff&SpiderId=f8dsf", nil) + app.ServeHTTP(w, req) + err := json.Unmarshal([]byte(w.Body.String()), &resp) + if err != nil { + t.Fatal("Unmarshal resp failed") + } + + Convey("Test API GetNodeList", t, func() { + Convey("Test response status", func() { + So(resp.Status, ShouldEqual, "ok") + So(resp.Message, ShouldEqual, "success") + So(resp.Total, ShouldEqual, 2) + }) + }) +} + +func TestGetTask(t *testing.T) { + var resp Response + var taskId = "1234" + w := httptest.NewRecorder() + req, _ := http.NewRequest("GET", "/tasks/"+taskId, nil) + app.ServeHTTP(w, req) + err := json.Unmarshal([]byte(w.Body.String()), &resp) + if err != nil { + t.Fatal("Unmarshal resp failed") + } + Convey("Test API GetTask", t, func() { + Convey("Test response status", func() { + So(resp.Status, ShouldEqual, "ok") + So(resp.Message, ShouldEqual, "success") + }) + }) +} + +func TestPutTask(t *testing.T) { + var newItem = model.Task{ + Id: "1234", + SpiderId: bson.ObjectIdHex("5d429e6c19f7abede924fee2"), + StartTs: time.Now(), + FinishTs: time.Now(), + Status: "online", + NodeId: bson.ObjectIdHex("5d429e6c19f7abede924fee2"), + LogPath: "./log", + Cmd: "scrapy crawl test", + Error: "", + ResultCount: 0, + WaitDuration: 10.0, + RuntimeDuration: 10, + TotalDuration: 20, + SpiderName: "test", + NodeName: "test", + CreateTs: time.Now(), + UpdateTs: time.Now(), + } + + var resp Response + body, _ := json.Marshal(&newItem) + w := httptest.NewRecorder() + req, _ := http.NewRequest("PUT", "/tasks", strings.NewReader(string(body))) + app.ServeHTTP(w, req) + err := json.Unmarshal([]byte(w.Body.String()), &resp) + if err != nil { + t.Fatal("unmarshal resp failed") + } + Convey("Test API PutTask", t, func() { + Convey("Test response status", func() { + So(resp.Status, ShouldEqual, "ok") + So(resp.Message, ShouldEqual, "success") + }) + }) +} + +func TestDeleteTask(t *testing.T) { + taskId := "1234" + var resp Response + w := httptest.NewRecorder() + req, _ := http.NewRequest("DELETE", "/tasks/"+taskId, nil) + app.ServeHTTP(w, req) + err := json.Unmarshal([]byte(w.Body.String()), &resp) + if err != nil { + t.Fatal("unmarshal resp failed") + } + Convey("Test API DeleteTask", t, func() { + Convey("Test response status", func() { + So(resp.Status, ShouldEqual, "ok") + So(resp.Message, ShouldEqual, "success") + }) + }) +} + +func TestGetTaskResults(t *testing.T) { + //var teskListResultFrom = TaskResultsRequestData{ + // PageNum: 2, + // PageSize: 1, + //} + taskId := "1234" + + var resp ListResponse + w := httptest.NewRecorder() + req, _ := http.NewRequest("GET", "/tasks/"+taskId+"/results?PageNum=2&PageSize=1", nil) + app.ServeHTTP(w, req) + err := json.Unmarshal([]byte(w.Body.String()), &resp) + if err != nil { + t.Fatal("Unmarshal resp failed") + } + + Convey("Test API GetNodeList", t, func() { + Convey("Test response status", func() { + So(resp.Status, ShouldEqual, "ok") + So(resp.Message, ShouldEqual, "success") + So(resp.Total, ShouldEqual, 2) + }) + }) +} diff --git a/frontend/src/assets/logo.svg b/frontend/src/assets/logo.svg new file mode 100644 index 00000000..b0e23910 --- /dev/null +++ b/frontend/src/assets/logo.svg @@ -0,0 +1,14 @@ + + + + + + + + + + diff --git a/frontend/src/components/Environment/EnvironmentList.vue b/frontend/src/components/Environment/EnvironmentList.vue index dbc2c9bb..e7c89a51 100644 --- a/frontend/src/components/Environment/EnvironmentList.vue +++ b/frontend/src/components/Environment/EnvironmentList.vue @@ -2,8 +2,8 @@
- {{$t('Add Environment Variables')}} - {{$t('Save')}} + {{$t('Add Environment Variables')}} + {{$t('Save')}}
diff --git a/frontend/src/components/InfoView/SpiderInfoView.vue b/frontend/src/components/InfoView/SpiderInfoView.vue index 45560c60..661e4757 100644 --- a/frontend/src/components/InfoView/SpiderInfoView.vue +++ b/frontend/src/components/InfoView/SpiderInfoView.vue @@ -47,8 +47,8 @@ - {{$t('Run')}} - {{$t('Save')}} + {{$t('Run')}} + {{$t('Save')}}
diff --git a/frontend/src/i18n/zh.js b/frontend/src/i18n/zh.js index c8573bd4..b1de3b47 100644 --- a/frontend/src/i18n/zh.js +++ b/frontend/src/i18n/zh.js @@ -214,6 +214,7 @@ export default { // 下拉框 User: '用户', Logout: '退出登录', + Documentation: '文档', // 选择 'Yes': '是', diff --git a/frontend/src/views/layout/components/Navbar.vue b/frontend/src/views/layout/components/Navbar.vue index 976d98d9..f60c0051 100644 --- a/frontend/src/views/layout/components/Navbar.vue +++ b/frontend/src/views/layout/components/Navbar.vue @@ -30,7 +30,7 @@ - 文档 + {{$t('Documentation')}} diff --git a/frontend/src/views/login/index.vue b/frontend/src/views/login/index.vue index 195ae1de..a21c0f42 100644 --- a/frontend/src/views/login/index.vue +++ b/frontend/src/views/login/index.vue @@ -4,7 +4,7 @@