diff --git a/core/constants/node.go b/core/constants/node.go index 1736a413..5d6480e0 100644 --- a/core/constants/node.go +++ b/core/constants/node.go @@ -1,6 +1,6 @@ package constants const ( - NodeStatusOnline = "on" - NodeStatusOffline = "off" + NodeStatusOnline = "online" + NodeStatusOffline = "offline" ) diff --git a/core/controllers/base.go b/core/controllers/base.go index 1fb5a91a..fcfc931f 100644 --- a/core/controllers/base.go +++ b/core/controllers/base.go @@ -1,16 +1,17 @@ package controllers import ( + "encoding/json" "net/http" "time" - "github.com/loopfz/gadgeto/tonic" - "github.com/crawlab-team/crawlab/core/interfaces" "github.com/crawlab-team/crawlab/core/models/service" "github.com/crawlab-team/crawlab/core/mongo" "github.com/gin-gonic/gin" + "github.com/go-playground/validator/v10" "github.com/juju/errors" + "github.com/loopfz/gadgeto/tonic" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" mongo2 "go.mongodb.org/mongo-driver/mongo" @@ -141,6 +142,11 @@ func (ctr *BaseController[T]) PutById(c *gin.Context, params *PutByIdParams[T]) m.SetId(id) } + // Validate + if err := validator.New().Struct(params.Data); err != nil { + return GetErrorResponse[T](errors.BadRequestf("invalid data: %v", err)) + } + if err := ctr.modelSvc.ReplaceById(id, params.Data); err != nil { return GetErrorResponse[T](err) } @@ -153,6 +159,45 @@ func (ctr *BaseController[T]) PutById(c *gin.Context, params *PutByIdParams[T]) return GetDataResponse(*result) } +type PatchByIdParams[T any] struct { + Id string `path:"id" description:"The ID of the item to update" format:"objectid" pattern:"^[0-9a-fA-F]{24}$"` + Data T `json:"data" description:"The data to update" validate:"required"` +} + +func (ctr *BaseController[T]) PatchById(c *gin.Context, params *PatchByIdParams[T]) (response *Response[T], err error) { + id, err := primitive.ObjectIDFromHex(params.Id) + if err != nil { + return GetErrorResponse[T](errors.BadRequestf("invalid id format: %v", err)) + } + + u := GetUserFromContext(c) + + // Convert the data to bson.M + dataJSON, _ := json.Marshal(params.Data) + var update bson.M + if err := json.Unmarshal(dataJSON, &update); err != nil { + return GetErrorResponse[T](errors.BadRequestf("invalid data: %v", err)) + } + + // Remove _id field if present to prevent immutable field error + delete(update, "_id") + + // Add updated_by and updated_at + update["updated_by"] = u.Id + update["updated_at"] = time.Now() + + if err := ctr.modelSvc.UpdateById(id, bson.M{"$set": update}); err != nil { + return GetErrorResponse[T](err) + } + + result, err := ctr.modelSvc.GetById(id) + if err != nil { + return GetErrorResponse[T](err) + } + + return GetDataResponse(*result) +} + type PatchParams struct { Ids []string `json:"ids" description:"The IDs of the items to update" validate:"required" items.type:"string" items.format:"objectid" items.pattern:"^[0-9a-fA-F]{24}$"` Update bson.M `json:"update" description:"The update object" validate:"required"` @@ -178,10 +223,10 @@ func (ctr *BaseController[T]) PatchList(c *gin.Context, params *PatchParams) (re }, } - // Add updated_by and updated_ts to the update object + // Add updated_by and updated_at to the update object updateObj := params.Update updateObj["updated_by"] = u.Id - updateObj["updated_ts"] = time.Now() + updateObj["updated_at"] = time.Now() // update if err := ctr.modelSvc.UpdateMany(query, bson.M{"$set": updateObj}); err != nil { diff --git a/core/controllers/base_test.go b/core/controllers/base_test.go index 671ed9db..9ad8f6d5 100644 --- a/core/controllers/base_test.go +++ b/core/controllers/base_test.go @@ -12,9 +12,8 @@ import ( "testing" "time" - "github.com/crawlab-team/crawlab/core/entity" - "github.com/crawlab-team/crawlab/core/controllers" + "github.com/crawlab-team/crawlab/core/entity" "github.com/crawlab-team/crawlab/core/middlewares" "github.com/crawlab-team/crawlab/core/models/models" "github.com/crawlab-team/crawlab/core/models/service" @@ -413,7 +412,7 @@ func TestBaseController_PatchList(t *testing.T) { // Verify updated_by is set to the current user's ID assert.Equal(t, userId, result.UpdatedBy) - // Verify updated_ts is set to a recent timestamp + // Verify updated_at is set to a recent timestamp assert.GreaterOrEqual(t, result.UpdatedAt.UnixMilli(), beforeUpdate.UnixMilli()) assert.LessOrEqual(t, result.UpdatedAt.UnixMilli(), afterUpdate.UnixMilli()) } @@ -502,3 +501,248 @@ func TestBaseController_DeleteList(t *testing.T) { // Check the response assert.Equal(t, http.StatusBadRequest, w.Code) } + +func TestBaseController_PatchById(t *testing.T) { + SetupTestDB() + defer CleanupTestDB() + + // Insert a test document + id, err := service.NewModelService[TestModel]().InsertOne(TestModel{Name: "test"}) + assert.NoError(t, err) + + // Initialize the controller + ctr := controllers.NewController[TestModel]() + + // Set up the router + router := SetupRouter() + router.Use(middlewares.AuthorizationMiddleware()) + router.PATCH("/testmodels/:id", nil, tonic.Handler(ctr.PatchById, 200)) + + // Test case 1: Successful patch + t.Run("test_patch_success", func(t *testing.T) { + // Create update data + updateData := TestModel{ + Name: "patched", + } + + requestBody := controllers.PatchByIdParams[TestModel]{ + Id: id.Hex(), + Data: updateData, + } + + jsonValue, _ := json.Marshal(requestBody) + req, _ := http.NewRequest("PATCH", "/testmodels/"+id.Hex(), bytes.NewBuffer(jsonValue)) + req.Header.Set("Authorization", TestToken) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + + // Get the user ID + userId := TestUserId + + // Record time before the update + beforeUpdate := time.Now() + + time.Sleep(time.Millisecond) // Ensure time difference + + // Serve the request + router.ServeHTTP(w, req) + + // Check the response + assert.Equal(t, http.StatusOK, w.Code) + + // Check if the document was updated in the database + result, err := service.NewModelService[TestModel]().GetById(id) + assert.NoError(t, err) + assert.Equal(t, "patched", result.Name) + assert.Equal(t, userId, result.UpdatedBy) + assert.True(t, result.UpdatedAt.After(beforeUpdate)) + }) + + // Test case 2: Invalid ID + t.Run("test_patch_invalid_id", func(t *testing.T) { + updateData := TestModel{ + Name: "patched", + } + + requestBody := controllers.PatchByIdParams[TestModel]{ + Id: "invalid-id", + Data: updateData, + } + + jsonValue, _ := json.Marshal(requestBody) + req, _ := http.NewRequest("PATCH", "/testmodels/invalid-id", bytes.NewBuffer(jsonValue)) + req.Header.Set("Authorization", TestToken) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + + // Serve the request + router.ServeHTTP(w, req) + + // Check the response + assert.Equal(t, http.StatusBadRequest, w.Code) + }) +} + +func TestBaseController_Post_Validation(t *testing.T) { + SetupTestDB() + defer CleanupTestDB() + + // Initialize the controller + ctr := controllers.NewController[TestModel]() + + // Set up the router + router := SetupRouter() + router.Use(middlewares.AuthorizationMiddleware()) + router.POST("/testmodels", nil, tonic.Handler(ctr.Post, 200)) + + // Test case: Empty data + t.Run("test_post_empty_data", func(t *testing.T) { + requestBody := controllers.PostParams[TestModel]{ + Data: TestModel{}, + } + jsonValue, _ := json.Marshal(requestBody) + req, _ := http.NewRequest("POST", "/testmodels", bytes.NewBuffer(jsonValue)) + req.Header.Set("Authorization", TestToken) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + + // Serve the request + router.ServeHTTP(w, req) + + // Check the response + assert.Equal(t, http.StatusOK, w.Code) + + var response controllers.Response[TestModel] + err := json.Unmarshal(w.Body.Bytes(), &response) + assert.NoError(t, err) + assert.NotEmpty(t, response.Data.Id) + assert.Equal(t, TestUserId, response.Data.CreatedBy) + assert.Equal(t, TestUserId, response.Data.UpdatedBy) + }) +} + +func TestBaseController_GetList_Sort(t *testing.T) { + SetupTestDB() + defer CleanupTestDB() + + // Insert test documents + modelSvc := service.NewModelService[TestModel]() + for i := 0; i < 3; i++ { + _, err := modelSvc.InsertOne(TestModel{Name: fmt.Sprintf("test%d", i)}) + assert.NoError(t, err) + } + + // Initialize the controller + ctr := controllers.NewController[TestModel]() + + // Set up the router + router := SetupRouter() + router.Use(middlewares.AuthorizationMiddleware()) + router.GET("/testmodels/list", nil, tonic.Handler(ctr.GetList, 200)) + + t.Run("test_get_list_valid_sort", func(t *testing.T) { + params := url.Values{} + params.Add("sort", "-name") // Sort by name descending + requestUrl := url.URL{Path: "/testmodels/list", RawQuery: params.Encode()} + req, _ := http.NewRequest("GET", requestUrl.String(), nil) + req.Header.Set("Authorization", TestToken) + w := httptest.NewRecorder() + + // Serve the request + router.ServeHTTP(w, req) + + // Check the response + assert.Equal(t, http.StatusOK, w.Code) + + var response controllers.ListResponse[TestModel] + err := json.Unmarshal(w.Body.Bytes(), &response) + assert.NoError(t, err) + assert.Equal(t, 3, len(response.Data)) + + // Verify descending order + for i := 0; i < len(response.Data)-1; i++ { + assert.True(t, response.Data[i].Name > response.Data[i+1].Name) + } + }) + + // Test case: Invalid sort format + t.Run("test_get_list_invalid_sort", func(t *testing.T) { + // Use an invalid sort format (missing field name) + params := url.Values{} + params.Add("sort", "-") // Invalid: hyphen without field name + requestUrl := url.URL{Path: "/testmodels/list", RawQuery: params.Encode()} + req, _ := http.NewRequest("GET", requestUrl.String(), nil) + req.Header.Set("Authorization", TestToken) + w := httptest.NewRecorder() + + // Serve the request + router.ServeHTTP(w, req) + + // Check the response + assert.Equal(t, http.StatusBadRequest, w.Code) + + var response map[string]interface{} + err := json.Unmarshal(w.Body.Bytes(), &response) + assert.NoError(t, err) + assert.Contains(t, response, "error") + assert.Contains(t, response["error"].(string), "invalid sort format") + }) +} + +func TestBaseController_GetList_Pagination_Edge_Cases(t *testing.T) { + SetupTestDB() + defer CleanupTestDB() + + // Insert test documents + modelSvc := service.NewModelService[TestModel]() + for i := 0; i < 5; i++ { + _, err := modelSvc.InsertOne(TestModel{Name: fmt.Sprintf("test%d", i)}) + assert.NoError(t, err) + } + + // Initialize the controller + ctr := controllers.NewController[TestModel]() + + // Set up the router + router := SetupRouter() + router.Use(middlewares.AuthorizationMiddleware()) + router.GET("/testmodels/list", nil, tonic.Handler(ctr.GetList, 200)) + + // Test cases + testCases := []struct { + name string + page int + size int + expectedCount int + expectedTotal int + }{ + {"test_empty_page", 10, 10, 0, 5}, // Page beyond data + {"test_large_page_size", 1, 100, 5, 5}, // Page size larger than data + {"test_exact_page_size", 1, 5, 5, 5}, // Page size equals data size + {"test_last_page_partial", 2, 3, 2, 5}, // Last page with partial data + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + params := url.Values{} + params.Add("page", strconv.Itoa(tc.page)) + params.Add("size", strconv.Itoa(tc.size)) + requestUrl := url.URL{Path: "/testmodels/list", RawQuery: params.Encode()} + req, _ := http.NewRequest("GET", requestUrl.String(), nil) + req.Header.Set("Authorization", TestToken) + w := httptest.NewRecorder() + + // Serve the request + router.ServeHTTP(w, req) + + // Check the response + assert.Equal(t, http.StatusOK, w.Code) + + var response controllers.ListResponse[TestModel] + err := json.Unmarshal(w.Body.Bytes(), &response) + assert.NoError(t, err) + assert.Equal(t, tc.expectedCount, len(response.Data)) + assert.Equal(t, tc.expectedTotal, response.Total) + }) + } +} diff --git a/core/controllers/router.go b/core/controllers/router.go index 653ba6c5..0370e6fd 100644 --- a/core/controllers/router.go +++ b/core/controllers/router.go @@ -71,8 +71,9 @@ func RegisterController[T any](group *fizz.RouterGroup, basePath string, ctr *Ba registerBuiltinHandler(group, globalWrapper, basePath, http.MethodGet, "", ctr.GetList, actionPaths, fmt.Sprintf("Get %s List", resource), "Get a list of items") registerBuiltinHandler(group, globalWrapper, basePath, http.MethodGet, "/:id", ctr.GetById, actionPaths, fmt.Sprintf("Get %s by ID", resource), "Get a single item by ID") registerBuiltinHandler(group, globalWrapper, basePath, http.MethodPost, "", ctr.Post, actionPaths, fmt.Sprintf("Create %s", resource), "Create a new item") - registerBuiltinHandler(group, globalWrapper, basePath, http.MethodPut, "/:id", ctr.PutById, actionPaths, fmt.Sprintf("Update %s by ID", resource), "Update an item by ID") - registerBuiltinHandler(group, globalWrapper, basePath, http.MethodPatch, "", ctr.PatchList, actionPaths, fmt.Sprintf("Patch %s List", resource), "Patch multiple items") + registerBuiltinHandler(group, globalWrapper, basePath, http.MethodPut, "/:id", ctr.PutById, actionPaths, fmt.Sprintf("Replace %s by ID", resource), "Replace an item by ID with (full update)") + registerBuiltinHandler(group, globalWrapper, basePath, http.MethodPatch, "/:id", ctr.PatchById, actionPaths, fmt.Sprintf("Update %s by ID", resource), "Update an item by ID with (partial update)") + registerBuiltinHandler(group, globalWrapper, basePath, http.MethodPatch, "", ctr.PatchList, actionPaths, fmt.Sprintf("Batch Update %s List", resource), "Batch update multiple items with partial fields") registerBuiltinHandler(group, globalWrapper, basePath, http.MethodDelete, "/:id", ctr.DeleteById, actionPaths, fmt.Sprintf("Delete %s by ID", resource), "Delete an item by ID") registerBuiltinHandler(group, globalWrapper, basePath, http.MethodDelete, "", ctr.DeleteList, actionPaths, fmt.Sprintf("Delete %s List", resource), "Delete multiple items") } @@ -275,13 +276,6 @@ func InitRoutes(app *gin.Engine) (err error) { Description: "Create a new spider", HandlerFunc: PostSpider, }, - { - Method: http.MethodPut, - Path: "/:id", - Name: "Update Spider by ID", - Description: "Update a spider by ID", - HandlerFunc: PutSpiderById, - }, { Method: http.MethodDelete, Path: "/:id", @@ -392,8 +386,8 @@ func InitRoutes(app *gin.Engine) (err error) { { Method: http.MethodPut, Path: "/:id", - Name: "Update Schedule by ID", - Description: "Update a schedule by ID", + Name: "Replace Schedule by ID", + Description: "Replace a schedule by ID (full update)", HandlerFunc: PutScheduleById, }, { @@ -430,7 +424,7 @@ func InitRoutes(app *gin.Engine) (err error) { Method: http.MethodGet, Path: "", Name: "Get Task List", - Description: "Get a list of tasks", + Description: "Get a list of tasks (default sorted in descending order)", HandlerFunc: GetTaskList, }, { @@ -508,8 +502,8 @@ func InitRoutes(app *gin.Engine) (err error) { { Method: http.MethodPut, Path: "/:id", - Name: "Update User by ID", - Description: "Update a user by ID", + Name: "Replace User by ID", + Description: "Replace a user by ID (full update)", HandlerFunc: PutUserById, }, { @@ -543,8 +537,8 @@ func InitRoutes(app *gin.Engine) (err error) { { Method: http.MethodPut, Path: "/me", - Name: "Update Me", - Description: "Update the current user", + Name: "Replace Me", + Description: "Update the current user (full update)", HandlerFunc: PutUserMe, }, { @@ -639,8 +633,8 @@ func InitRoutes(app *gin.Engine) (err error) { { Method: http.MethodPut, Path: "/:key", - Name: "Update Setting", - Description: "Update a setting", + Name: "Replace Setting by Key", + Description: "Replace a setting by key (full update)", HandlerFunc: PutSetting, }, }) diff --git a/core/controllers/router_test.go b/core/controllers/router_test.go index 93db1e71..299f5829 100644 --- a/core/controllers/router_test.go +++ b/core/controllers/router_test.go @@ -47,6 +47,7 @@ func TestRegisterController_Routes(t *testing.T) { {Method: "GET", Path: basePath + "/:id"}, {Method: "POST", Path: basePath}, {Method: "PUT", Path: basePath + "/:id"}, + {Method: "PATCH", Path: basePath + "/:id"}, {Method: "PATCH", Path: basePath}, {Method: "DELETE", Path: basePath + "/:id"}, {Method: "DELETE", Path: basePath}, @@ -58,34 +59,6 @@ func TestRegisterController_Routes(t *testing.T) { } } -func TestInitRoutes_ProjectsRoute(t *testing.T) { - router := gin.Default() - - _ = controllers.InitRoutes(router) - - // Check if the projects route is registered - routes := controllers.GetGlobalFizzWrapper().GetFizz().Engine().Routes() - - var methodPaths []string - for _, route := range routes { - methodPaths = append(methodPaths, route.Method+" - "+route.Path) - } - - expectedRoutes := []gin.RouteInfo{ - {Method: "GET", Path: "/projects"}, - {Method: "GET", Path: "/projects/:id"}, - {Method: "POST", Path: "/projects"}, - {Method: "PUT", Path: "/projects/:id"}, - {Method: "PATCH", Path: "/projects"}, - {Method: "DELETE", Path: "/projects/:id"}, - {Method: "DELETE", Path: "/projects"}, - } - - for _, route := range expectedRoutes { - assert.Contains(t, methodPaths, route.Method+" - "+route.Path) - } -} - func TestMain(m *testing.M) { gin.SetMode(gin.TestMode) m.Run() diff --git a/core/controllers/spider.go b/core/controllers/spider.go index 44722448..a08006fa 100644 --- a/core/controllers/spider.go +++ b/core/controllers/spider.go @@ -289,34 +289,6 @@ func PostSpider(c *gin.Context, params *PostParams[models.Spider]) (response *Re return GetDataResponse(s) } -// PutSpiderById handles updating a spider by ID -func PutSpiderById(c *gin.Context, params *PutByIdParams[models.Spider]) (response *Response[models.Spider], err error) { - id, err := primitive.ObjectIDFromHex(params.Id) - if err != nil { - return GetErrorResponse[models.Spider](errors.BadRequestf("invalid id format")) - } - - u := GetUserFromContext(c) - modelSvc := service.NewModelService[models.Spider]() - - params.Data.SetUpdated(u.Id) - if params.Data.Id.IsZero() { - params.Data.SetId(id) - } - - err = modelSvc.ReplaceById(id, params.Data) - if err != nil { - return GetErrorResponse[models.Spider](err) - } - - s, err := modelSvc.GetById(id) - if err != nil { - return GetErrorResponse[models.Spider](err) - } - - return GetDataResponse(*s) -} - // DeleteSpiderById handles deleting a spider by ID func DeleteSpiderById(_ *gin.Context, params *DeleteByIdParams) (response *Response[models.Spider], err error) { id, err := primitive.ObjectIDFromHex(params.Id) diff --git a/core/controllers/spider_test.go b/core/controllers/spider_test.go index fe2e3954..23963a71 100644 --- a/core/controllers/spider_test.go +++ b/core/controllers/spider_test.go @@ -88,56 +88,6 @@ func TestGetSpiderById(t *testing.T) { assert.Equal(t, model.Name, response.Data.Name) } -func TestUpdateSpiderById(t *testing.T) { - SetupTestDB() - defer CleanupTestDB() - - gin.SetMode(gin.TestMode) - - router := SetupRouter() - router.Use(middlewares.AuthorizationMiddleware()) - router.PUT("/spiders/:id", nil, tonic.Handler(controllers.PutSpiderById, 200)) - - model := models.Spider{ - Name: "Test Spider", - ColName: "test_spiders", - } - id, err := service.NewModelService[models.Spider]().InsertOne(model) - require.Nil(t, err) - ts := models.SpiderStat{} - ts.SetId(id) - _, err = service.NewModelService[models.SpiderStat]().InsertOne(ts) - require.Nil(t, err) - - spiderId := id.Hex() - payload := models.Spider{ - Name: "Updated Spider", - ColName: "test_spider", - } - payload.SetId(id) - requestBody := controllers.PutByIdParams[models.Spider]{ - Data: payload, - } - jsonValue, _ := json.Marshal(requestBody) - req, _ := http.NewRequest("PUT", "/spiders/"+spiderId, bytes.NewBuffer(jsonValue)) - req.Header.Set("Authorization", TestToken) - resp := httptest.NewRecorder() - - router.ServeHTTP(resp, req) - - assert.Equal(t, http.StatusOK, resp.Code) - - var response controllers.Response[models.Spider] - err = json.Unmarshal(resp.Body.Bytes(), &response) - require.Nil(t, err) - assert.Equal(t, payload.Name, response.Data.Name) - - svc := service.NewModelService[models.Spider]() - resModel, err := svc.GetById(id) - require.Nil(t, err) - assert.Equal(t, payload.Name, resModel.Name) -} - func TestDeleteSpiderById(t *testing.T) { SetupTestDB() defer CleanupTestDB() diff --git a/core/controllers/stats.go b/core/controllers/stats.go index a3b449da..19f9499d 100644 --- a/core/controllers/stats.go +++ b/core/controllers/stats.go @@ -15,7 +15,7 @@ import ( ) var statsDefaultQuery = bson.M{ - "created_ts": bson.M{ + "created_at": bson.M{ "$gte": time.Now().Add(-30 * 24 * time.Hour), }, } diff --git a/core/controllers/task.go b/core/controllers/task.go index 9112bd62..d6646bba 100644 --- a/core/controllers/task.go +++ b/core/controllers/task.go @@ -412,9 +412,10 @@ func PostTaskCancel(c *gin.Context, params *PostTaskCancelParams) (response *Voi } type GetTaskLogsParams struct { - Id string `path:"id" description:"Task ID" format:"objectid" pattern:"^[0-9a-fA-F]{24}$"` - Page int `query:"page" description:"Page" default:"1" minimum:"1"` - Size int `query:"size" description:"Size" default:"10" minimum:"1"` + Id string `path:"id" description:"Task ID" format:"objectid" pattern:"^[0-9a-fA-F]{24}$"` + Page int `query:"page" description:"Page (default: 1)" default:"1" minimum:"1"` + Size int `query:"size" description:"Size (default: 10000)" default:"10000" minimum:"1"` + Latest bool `query:"latest" description:"Whether to get latest logs (default: true)" default:"true"` } func GetTaskLogs(_ *gin.Context, params *GetTaskLogsParams) (response *ListResponse[string], err error) { @@ -424,19 +425,31 @@ func GetTaskLogs(_ *gin.Context, params *GetTaskLogsParams) (response *ListRespo return GetErrorListResponse[string](err) } - // logs + // Get total count first for pagination logDriver := log.GetFileLogDriver() - logs, err := logDriver.Find(id.Hex(), "", (params.Page-1)*params.Size, params.Size) + total, err := logDriver.Count(id.Hex(), "") + if err != nil { + return GetErrorListResponse[string](err) + } + + // Skip calculation depends on whether we're getting latest logs or not + skip := (params.Page - 1) * params.Size + if params.Latest { + // For latest logs (tail mode), skip is from the end + // No adjustment needed as our implementation handles it correctly + } else { + // For oldest logs (normal mode), skip is from the beginning + // No adjustment needed as it's already the default behavior + } + + // Get logs + logs, err := logDriver.Find(id.Hex(), "", skip, params.Size, params.Latest) if err != nil { if strings.HasSuffix(err.Error(), "Status:404 Not Found") { return GetListResponse[string](nil, 0) } return GetErrorListResponse[string](err) } - total, err := logDriver.Count(id.Hex(), "") - if err != nil { - return GetErrorListResponse[string](err) - } return GetListResponse(logs, total) } diff --git a/core/controllers/utils.go b/core/controllers/utils.go index 4930fe09..aa2a600a 100644 --- a/core/controllers/utils.go +++ b/core/controllers/utils.go @@ -2,6 +2,7 @@ package controllers import ( "encoding/json" + "github.com/juju/errors" "net/http" "reflect" "strings" @@ -198,16 +199,22 @@ func GetSortsFromString(sortStr string) (sorts []entity.Sort, err error) { for _, part := range parts { trimmed := strings.TrimSpace(part) if trimmed == "" { - continue + return nil, errors.BadRequestf("invalid sort string: %s", sortStr) } if strings.HasPrefix(trimmed, "-") { key := strings.TrimLeft(trimmed, "-") + if key == "" { + return nil, errors.BadRequestf("invalid sort string: %s", sortStr) + } sorts = append(sorts, entity.Sort{ Key: key, Direction: constants.DESCENDING, }) } else { key := strings.TrimLeft(trimmed, "+") + if key == "" { + return nil, errors.BadRequestf("invalid sort string: %s", sortStr) + } sorts = append(sorts, entity.Sort{ Key: key, Direction: constants.ASCENDING, diff --git a/core/entity/export.go b/core/entity/export.go index 62605659..f9997d46 100644 --- a/core/entity/export.go +++ b/core/entity/export.go @@ -11,8 +11,8 @@ type Export struct { Target string `json:"target"` Query bson.M `json:"query"` Status string `json:"status"` - StartTs time.Time `json:"start_ts"` - EndTs time.Time `json:"end_ts"` + StartedAt time.Time `json:"started_at"` + EndedAt time.Time `json:"ended_at"` FileName string `json:"file_name"` DownloadPath string `json:"-"` Limit int `json:"-"` @@ -39,11 +39,11 @@ func (e *Export) GetStatus() string { } func (e *Export) GetStartTs() time.Time { - return e.StartTs + return e.StartedAt } func (e *Export) GetEndTs() time.Time { - return e.EndTs + return e.EndedAt } func (e *Export) GetDownloadPath() string { diff --git a/core/export/csv_service.go b/core/export/csv_service.go index 83bf962d..c770f0e6 100644 --- a/core/export/csv_service.go +++ b/core/export/csv_service.go @@ -50,7 +50,7 @@ func (svc *CsvService) Export(exportType, target string, query bson.M) (exportId Target: target, Query: query, Status: constants.TaskStatusRunning, - StartTs: time.Now(), + StartedAt: time.Now(), FileName: svc.getFileName(exportId), DownloadPath: svc.getDownloadPath(exportId), Limit: 100, @@ -81,7 +81,7 @@ func (svc *CsvService) export(export *entity.Export) { if export.Target == "" { err := errors.New("empty target") export.Status = constants.TaskStatusError - export.EndTs = time.Now() + export.EndedAt = time.Now() svc.Errorf("export error (id: %s): %v", export.Id, err) svc.cache.Set(export.Id, export) return @@ -101,7 +101,7 @@ func (svc *CsvService) export(export *entity.Export) { }() if err != nil { export.Status = constants.TaskStatusError - export.EndTs = time.Now() + export.EndedAt = time.Now() svc.Errorf("export error (id: %s): %v", export.Id, err) svc.cache.Set(export.Id, export) return @@ -120,7 +120,7 @@ func (svc *CsvService) export(export *entity.Export) { err = csvWriter.Write(columns) if err != nil { export.Status = constants.TaskStatusError - export.EndTs = time.Now() + export.EndedAt = time.Now() svc.Errorf("export error (id: %s): %v", export.Id, err) svc.cache.Set(export.Id, export) return @@ -139,12 +139,12 @@ func (svc *CsvService) export(export *entity.Export) { if !errors.Is(err, mongo2.ErrNoDocuments) { // error export.Status = constants.TaskStatusError - export.EndTs = time.Now() + export.EndedAt = time.Now() svc.Errorf("export error (id: %s): %v", export.Id, err) } else { // no more data export.Status = constants.TaskStatusFinished - export.EndTs = time.Now() + export.EndedAt = time.Now() svc.Infof("export finished (id: %s)", export.Id) } svc.cache.Set(export.Id, export) @@ -155,7 +155,7 @@ func (svc *CsvService) export(export *entity.Export) { if !cur.Next(context.Background()) { // no more data export.Status = constants.TaskStatusFinished - export.EndTs = time.Now() + export.EndedAt = time.Now() svc.Infof("export finished (id: %s)", export.Id) svc.cache.Set(export.Id, export) return @@ -167,7 +167,7 @@ func (svc *CsvService) export(export *entity.Export) { if err != nil { // error export.Status = constants.TaskStatusError - export.EndTs = time.Now() + export.EndedAt = time.Now() svc.Errorf("export error (id: %s): %v", export.Id, err) svc.cache.Set(export.Id, export) return @@ -179,7 +179,7 @@ func (svc *CsvService) export(export *entity.Export) { if err != nil { // error export.Status = constants.TaskStatusError - export.EndTs = time.Now() + export.EndedAt = time.Now() svc.Errorf("export error (id: %s): %v", export.Id, err) svc.cache.Set(export.Id, export) return diff --git a/core/export/json_service.go b/core/export/json_service.go index 66813b47..44a520fd 100644 --- a/core/export/json_service.go +++ b/core/export/json_service.go @@ -46,7 +46,7 @@ func (svc *JsonService) Export(exportType, target string, query bson.M) (exportI Target: target, Query: query, Status: constants.TaskStatusRunning, - StartTs: time.Now(), + StartedAt: time.Now(), FileName: svc.getFileName(exportId), DownloadPath: svc.getDownloadPath(exportId), Limit: 100, @@ -77,7 +77,7 @@ func (svc *JsonService) export(export *entity.Export) { if export.Target == "" { err := errors.New("empty target") export.Status = constants.TaskStatusError - export.EndTs = time.Now() + export.EndedAt = time.Now() svc.Errorf("export error (id: %s): %v", export.Id, err) svc.cache.Set(export.Id, export) return @@ -104,13 +104,13 @@ func (svc *JsonService) export(export *entity.Export) { if !errors.Is(err, mongo2.ErrNoDocuments) { // error export.Status = constants.TaskStatusError - export.EndTs = time.Now() + export.EndedAt = time.Now() svc.Errorf("export error (id: %s): %v", export.Id, err) } else { // no more data export.Status = constants.TaskStatusFinished - export.EndTs = time.Now() + export.EndedAt = time.Now() svc.Infof("export finished (id: %s)", export.Id) } svc.cache.Set(export.Id, export) @@ -121,7 +121,7 @@ func (svc *JsonService) export(export *entity.Export) { if !cur.Next(context.Background()) { // no more data export.Status = constants.TaskStatusFinished - export.EndTs = time.Now() + export.EndedAt = time.Now() svc.Infof("export finished (id: %s)", export.Id) svc.cache.Set(export.Id, export) break @@ -133,7 +133,7 @@ func (svc *JsonService) export(export *entity.Export) { if err != nil { // error export.Status = constants.TaskStatusError - export.EndTs = time.Now() + export.EndedAt = time.Now() svc.Errorf("export error (id: %s): %v", export.Id, err) svc.cache.Set(export.Id, export) return @@ -146,7 +146,7 @@ func (svc *JsonService) export(export *entity.Export) { if err != nil { // error export.Status = constants.TaskStatusError - export.EndTs = time.Now() + export.EndedAt = time.Now() svc.Errorf("export error (id: %s): %v", export.Id, err) svc.cache.Set(export.Id, export) return @@ -157,7 +157,7 @@ func (svc *JsonService) export(export *entity.Export) { if err != nil { // error export.Status = constants.TaskStatusError - export.EndTs = time.Now() + export.EndedAt = time.Now() svc.Errorf("export error (id: %s): %v", export.Id, err) svc.cache.Set(export.Id, export) return diff --git a/core/models/common/init_index.go b/core/models/common/init_index.go index ac4ea63d..dbd56814 100644 --- a/core/models/common/init_index.go +++ b/core/models/common/init_index.go @@ -44,13 +44,13 @@ func InitIndexes() { {Keys: bson.D{{Key: "priority", Value: 1}}}, {Keys: bson.D{{Key: "parent_id", Value: 1}}}, {Keys: bson.D{{Key: "has_sub", Value: 1}}}, - {Keys: bson.D{{Key: "created_ts", Value: -1}}, Options: (&options.IndexOptions{}).SetExpireAfterSeconds(60 * 60 * 24 * 30)}, + {Keys: bson.D{{Key: "created_at", Value: -1}}, Options: (&options.IndexOptions{}).SetExpireAfterSeconds(60 * 60 * 24 * 30)}, {Keys: bson.D{{Key: "node_id", Value: 1}, {Key: "status", Value: 1}}}, }) // task stats CreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models.TaskStat{})), []mongo2.IndexModel{ - {Keys: bson.D{{Key: "created_ts", Value: -1}}, Options: (&options.IndexOptions{}).SetExpireAfterSeconds(60 * 60 * 24 * 30)}, + {Keys: bson.D{{Key: "created_at", Value: -1}}, Options: (&options.IndexOptions{}).SetExpireAfterSeconds(60 * 60 * 24 * 30)}, }) // schedules diff --git a/core/models/models/base.go b/core/models/models/base.go index ce13a62b..f54b6460 100644 --- a/core/models/models/base.go +++ b/core/models/models/base.go @@ -9,9 +9,9 @@ import ( type BaseModel struct { Id primitive.ObjectID `json:"_id" bson:"_id" description:"ID"` - CreatedAt time.Time `json:"created_ts,omitempty" bson:"created_ts,omitempty" description:"Created timestamp"` + CreatedAt time.Time `json:"created_at,omitempty" bson:"created_at,omitempty" description:"Created timestamp"` CreatedBy primitive.ObjectID `json:"created_by,omitempty" bson:"created_by,omitempty" description:"Created by"` - UpdatedAt time.Time `json:"updated_ts,omitempty" bson:"updated_ts,omitempty" description:"Updated timestamp"` + UpdatedAt time.Time `json:"updated_at,omitempty" bson:"updated_at,omitempty" description:"Updated timestamp"` UpdatedBy primitive.ObjectID `json:"updated_by,omitempty" bson:"updated_by,omitempty" description:"Updated by"` } diff --git a/core/models/models/chat_message_content.go b/core/models/models/chat_message_content.go index 77e72370..25718315 100644 --- a/core/models/models/chat_message_content.go +++ b/core/models/models/chat_message_content.go @@ -33,6 +33,7 @@ type ChatMessageContent struct { Content string `json:"content" bson:"content" description:"Message content"` Type string `json:"type" bson:"type" description:"Message type (text/action)"` Action string `json:"action,omitempty" bson:"action,omitempty" description:"Action name"` + ActionTarget string `json:"action_target,omitempty" bson:"action_target,omitempty" description:"Action target"` ActionStatus string `json:"action_status,omitempty" bson:"action_status,omitempty" description:"Action status"` Hidden bool `json:"hidden,omitempty" bson:"hidden,omitempty" description:"Hidden"` Usage *entity.LLMResponseUsage `json:"usage,omitempty" bson:"usage,omitempty" description:"Usage"` diff --git a/core/models/models/task.go b/core/models/models/task.go index c5c3c29e..1a9704e0 100644 --- a/core/models/models/task.go +++ b/core/models/models/task.go @@ -8,14 +8,13 @@ type Task struct { any `collection:"tasks"` BaseModel `bson:",inline"` SpiderId primitive.ObjectID `json:"spider_id" bson:"spider_id" description:"Spider ID"` - Status string `json:"status" bson:"status" description:"Status"` + Status string `json:"status" bson:"status" description:"Status: pending, assigned, running, finished, error, cancelled, abnormal."` NodeId primitive.ObjectID `json:"node_id" bson:"node_id" description:"Node ID"` Cmd string `json:"cmd" bson:"cmd" description:"Command"` Param string `json:"param" bson:"param" description:"Parameter"` Error string `json:"error" bson:"error" description:"Error"` Pid int `json:"pid" bson:"pid" description:"Process ID"` ScheduleId primitive.ObjectID `json:"schedule_id" bson:"schedule_id" description:"Schedule ID"` - Type string `json:"type" bson:"type" description:"Type"` Mode string `json:"mode" bson:"mode" description:"Mode"` Priority int `json:"priority" bson:"priority" description:"Priority"` NodeIds []primitive.ObjectID `json:"node_ids,omitempty" bson:"-"` diff --git a/core/models/models/task_stat.go b/core/models/models/task_stat.go index 5d34e2a0..4ce714b6 100644 --- a/core/models/models/task_stat.go +++ b/core/models/models/task_stat.go @@ -7,8 +7,8 @@ import ( type TaskStat struct { any `collection:"task_stats"` BaseModel `bson:",inline"` - StartTs time.Time `json:"start_ts" bson:"start_ts,omitempty" description:"Start time"` - EndTs time.Time `json:"end_ts" bson:"end_ts,omitempty" description:"End time"` + StartedAt time.Time `json:"started_at" bson:"started_at,omitempty" description:"Start time"` + EndedAt time.Time `json:"ended_at" bson:"ended_at,omitempty" description:"End time"` WaitDuration int64 `json:"wait_duration" bson:"wait_duration,omitempty" description:"Wait duration (in millisecond)"` RuntimeDuration int64 `json:"runtime_duration" bson:"runtime_duration,omitempty" description:"Runtime duration (in millisecond)"` TotalDuration int64 `json:"total_duration" bson:"total_duration,omitempty" description:"Total duration (in millisecond)"` diff --git a/core/notification/service.go b/core/notification/service.go index 31eb7c0f..c7f00e77 100644 --- a/core/notification/service.go +++ b/core/notification/service.go @@ -175,17 +175,15 @@ func (svc *Service) geContentWithVariables(template string, variables []entity.N content = strings.ReplaceAll(content, v.GetKey(), vd.Task.Error) case "pid": content = strings.ReplaceAll(content, v.GetKey(), fmt.Sprintf("%d", vd.Task.Pid)) - case "type": - content = strings.ReplaceAll(content, v.GetKey(), vd.Task.Type) case "mode": content = strings.ReplaceAll(content, v.GetKey(), vd.Task.Mode) case "priority": content = strings.ReplaceAll(content, v.GetKey(), fmt.Sprintf("%d", vd.Task.Priority)) - case "created_ts": + case "created_at": content = strings.ReplaceAll(content, v.GetKey(), svc.getFormattedTime(vd.Task.CreatedAt)) case "created_by": content = strings.ReplaceAll(content, v.GetKey(), svc.getUsernameById(vd.Task.CreatedBy)) - case "updated_ts": + case "updated_at": content = strings.ReplaceAll(content, v.GetKey(), svc.getFormattedTime(vd.Task.UpdatedAt)) case "updated_by": content = strings.ReplaceAll(content, v.GetKey(), svc.getUsernameById(vd.Task.UpdatedBy)) @@ -197,10 +195,10 @@ func (svc *Service) geContentWithVariables(template string, variables []entity.N continue } switch v.Name { - case "start_ts": - content = strings.ReplaceAll(content, v.GetKey(), svc.getFormattedTime(vd.TaskStat.StartTs)) - case "end_ts": - content = strings.ReplaceAll(content, v.GetKey(), svc.getFormattedTime(vd.TaskStat.EndTs)) + case "started_at": + content = strings.ReplaceAll(content, v.GetKey(), svc.getFormattedTime(vd.TaskStat.StartedAt)) + case "ended_at": + content = strings.ReplaceAll(content, v.GetKey(), svc.getFormattedTime(vd.TaskStat.EndedAt)) case "wait_duration": content = strings.ReplaceAll(content, v.GetKey(), fmt.Sprintf("%ds", vd.TaskStat.WaitDuration/1000)) case "runtime_duration": @@ -231,11 +229,11 @@ func (svc *Service) geContentWithVariables(template string, variables []entity.N content = strings.ReplaceAll(content, v.GetKey(), vd.Spider.Param) case "priority": content = strings.ReplaceAll(content, v.GetKey(), fmt.Sprintf("%d", vd.Spider.Priority)) - case "created_ts": + case "created_at": content = strings.ReplaceAll(content, v.GetKey(), svc.getFormattedTime(vd.Spider.CreatedAt)) case "created_by": content = strings.ReplaceAll(content, v.GetKey(), svc.getUsernameById(vd.Spider.CreatedBy)) - case "updated_ts": + case "updated_at": content = strings.ReplaceAll(content, v.GetKey(), svc.getFormattedTime(vd.Spider.UpdatedAt)) case "updated_by": content = strings.ReplaceAll(content, v.GetKey(), svc.getUsernameById(vd.Spider.UpdatedBy)) @@ -275,11 +273,11 @@ func (svc *Service) geContentWithVariables(template string, variables []entity.N content = strings.ReplaceAll(content, v.GetKey(), fmt.Sprintf("%d", vd.Node.CurrentRunners)) case "max_runners": content = strings.ReplaceAll(content, v.GetKey(), fmt.Sprintf("%d", vd.Node.MaxRunners)) - case "created_ts": + case "created_at": content = strings.ReplaceAll(content, v.GetKey(), svc.getFormattedTime(vd.Node.CreatedAt)) case "created_by": content = strings.ReplaceAll(content, v.GetKey(), svc.getUsernameById(vd.Node.CreatedBy)) - case "updated_ts": + case "updated_at": content = strings.ReplaceAll(content, v.GetKey(), svc.getFormattedTime(vd.Node.UpdatedAt)) case "updated_by": content = strings.ReplaceAll(content, v.GetKey(), svc.getUsernameById(vd.Node.UpdatedBy)) @@ -309,11 +307,11 @@ func (svc *Service) geContentWithVariables(template string, variables []entity.N content = strings.ReplaceAll(content, v.GetKey(), fmt.Sprintf("%d", vd.Schedule.Priority)) case "enabled": content = strings.ReplaceAll(content, v.GetKey(), fmt.Sprintf("%t", vd.Schedule.Enabled)) - case "created_ts": + case "created_at": content = strings.ReplaceAll(content, v.GetKey(), svc.getFormattedTime(vd.Schedule.CreatedAt)) case "created_by": content = strings.ReplaceAll(content, v.GetKey(), svc.getUsernameById(vd.Schedule.CreatedBy)) - case "updated_ts": + case "updated_at": content = strings.ReplaceAll(content, v.GetKey(), svc.getFormattedTime(vd.Schedule.UpdatedAt)) case "updated_by": content = strings.ReplaceAll(content, v.GetKey(), svc.getUsernameById(vd.Schedule.UpdatedBy)) diff --git a/core/task/handler/runner.go b/core/task/handler/runner.go index bfe3d0f6..6ea0dfb9 100644 --- a/core/task/handler/runner.go +++ b/core/task/handler/runner.go @@ -786,16 +786,16 @@ func (r *Runner) _updateTaskStat(status string) { case constants.TaskStatusPending: // do nothing case constants.TaskStatusRunning: - ts.StartTs = time.Now() - ts.WaitDuration = ts.StartTs.Sub(ts.CreatedAt).Milliseconds() + ts.StartedAt = time.Now() + ts.WaitDuration = ts.StartedAt.Sub(ts.CreatedAt).Milliseconds() case constants.TaskStatusFinished, constants.TaskStatusError, constants.TaskStatusCancelled: - if ts.StartTs.IsZero() { - ts.StartTs = time.Now() - ts.WaitDuration = ts.StartTs.Sub(ts.CreatedAt).Milliseconds() + if ts.StartedAt.IsZero() { + ts.StartedAt = time.Now() + ts.WaitDuration = ts.StartedAt.Sub(ts.CreatedAt).Milliseconds() } - ts.EndTs = time.Now() - ts.RuntimeDuration = ts.EndTs.Sub(ts.StartTs).Milliseconds() - ts.TotalDuration = ts.EndTs.Sub(ts.CreatedAt).Milliseconds() + ts.EndedAt = time.Now() + ts.RuntimeDuration = ts.EndedAt.Sub(ts.StartedAt).Milliseconds() + ts.TotalDuration = ts.EndedAt.Sub(ts.CreatedAt).Milliseconds() } if utils.IsMaster() { err = service.NewModelService[models.TaskStat]().ReplaceById(ts.Id, *ts) diff --git a/core/task/log/file_driver.go b/core/task/log/file_driver.go index ac5367e3..1abe6a38 100644 --- a/core/task/log/file_driver.go +++ b/core/task/log/file_driver.go @@ -4,9 +4,6 @@ import ( "bufio" "bytes" "fmt" - "github.com/crawlab-team/crawlab/core/interfaces" - "github.com/crawlab-team/crawlab/core/utils" - "github.com/spf13/viper" "io" "os" "path/filepath" @@ -14,6 +11,10 @@ import ( "strings" "sync" "time" + + "github.com/crawlab-team/crawlab/core/interfaces" + "github.com/crawlab-team/crawlab/core/utils" + "github.com/spf13/viper" ) type FileLogDriver struct { @@ -72,7 +73,7 @@ func (d *FileLogDriver) WriteLines(id string, lines []string) (err error) { return nil } -func (d *FileLogDriver) Find(id string, pattern string, skip int, limit int) (lines []string, err error) { +func (d *FileLogDriver) Find(id string, pattern string, skip int, limit int, tail bool) (lines []string, err error) { if pattern != "" { err = fmt.Errorf("find with pattern not implemented") d.Errorf("%v", err) @@ -89,27 +90,89 @@ func (d *FileLogDriver) Find(id string, pattern string, skip int, limit int) (li } defer f.Close() + // Get total line count + totalLines, err := d.Count(id, "") + if err != nil { + d.Errorf("failed to count lines: %v", err) + return nil, err + } + + // Calculate effective skip and limit + effectiveSkip := skip + effectiveLimit := limit + + if tail { + // In tail mode, we count from the end + if skip >= totalLines { + // Skip beyond file size + return []string{}, nil + } + + if limit <= 0 { + // When limit is 0 in tail mode, return all lines except the last 'skip' lines + // This means we start from the beginning and read until (totalLines - skip) + effectiveSkip = 0 + effectiveLimit = totalLines - skip + } else { + // Normal tail behavior: skip from end and read N lines + startPosition := totalLines - skip - limit + if startPosition < 0 { + // If we would start before the beginning of the file, adjust + effectiveSkip = 0 + effectiveLimit = totalLines - skip + if effectiveLimit < 0 { + effectiveLimit = 0 + } + } else { + effectiveSkip = startPosition + effectiveLimit = limit + } + } + } else { + // In normal mode, just ensure we don't read past EOF + if effectiveSkip >= totalLines { + return []string{}, nil + } + if limit <= 0 { + effectiveLimit = totalLines - effectiveSkip + } else if effectiveSkip+effectiveLimit > totalLines { + effectiveLimit = totalLines - effectiveSkip + } + } + + if effectiveLimit <= 0 { + return []string{}, nil + } + sc := bufio.NewReaderSize(f, 1024*1024*10) - i := -1 - for { + // Skip lines + for i := 0; i < effectiveSkip; i++ { + _, err := sc.ReadString(byte('\n')) + if err != nil { + if err == io.EOF { + return lines, nil + } + d.Errorf("failed to read line: %v", err) + return nil, err + } + } + + // Read required lines + for i := 0; i < effectiveLimit; i++ { line, err := sc.ReadString(byte('\n')) if err != nil { + if err != io.EOF { + d.Errorf("failed to read line: %v", err) + return nil, err + } + // Handle the last line if it doesn't end with newline + if len(line) > 0 { + lines = append(lines, strings.TrimSuffix(line, "\n")) + } break } - line = strings.TrimSuffix(line, "\n") - - i++ - - if i < skip { - continue - } - - if i >= skip+limit { - break - } - - lines = append(lines, line) + lines = append(lines, strings.TrimSuffix(line, "\n")) } return lines, nil diff --git a/core/task/log/file_driver_test.go b/core/task/log/file_driver_test.go index c8e9c96d..88d6b3e3 100644 --- a/core/task/log/file_driver_test.go +++ b/core/task/log/file_driver_test.go @@ -7,6 +7,8 @@ import ( "strings" "testing" + "github.com/stretchr/testify/assert" + "github.com/spf13/viper" "github.com/stretchr/testify/require" "go.mongodb.org/mongo-driver/bson/primitive" @@ -95,47 +97,68 @@ func TestFileDriver_Find(t *testing.T) { driver := d - lines, err := driver.Find(id.Hex(), "", 0, 10) - require.Nil(t, err) - require.Equal(t, 10, len(lines)) - require.Equal(t, "line: 1", lines[0]) - require.Equal(t, "line: 10", lines[len(lines)-1]) + t.Run("Normal Mode (tail=false)", func(t *testing.T) { + // Test reading first 10 lines + lines, err := driver.Find(id.Hex(), "", 0, 10, false) + require.Nil(t, err) + assert.Equal(t, 10, len(lines)) + assert.Equal(t, "line: 1", lines[0]) + assert.Equal(t, "line: 10", lines[9]) - lines, err = driver.Find(id.Hex(), "", 0, 1) - require.Nil(t, err) - require.Equal(t, 1, len(lines)) - require.Equal(t, "line: 1", lines[0]) - require.Equal(t, "line: 1", lines[len(lines)-1]) + // Test reading with skip + lines, err = driver.Find(id.Hex(), "", 5, 5, false) + require.Nil(t, err) + assert.Equal(t, 5, len(lines)) + assert.Equal(t, "line: 6", lines[0]) + assert.Equal(t, "line: 10", lines[4]) - lines, err = driver.Find(id.Hex(), "", 0, 1000) - require.Nil(t, err) - require.Equal(t, 1000, len(lines)) - require.Equal(t, "line: 1", lines[0]) - require.Equal(t, "line: 1000", lines[len(lines)-1]) + // Test reading with no limit (should read to end) + lines, err = driver.Find(id.Hex(), "", 9995, 0, false) + require.Nil(t, err) + assert.Equal(t, 5, len(lines)) + assert.Equal(t, "line: 9996", lines[0]) + assert.Equal(t, "line: 10000", lines[4]) - lines, err = driver.Find(id.Hex(), "", 1000, 1000) - require.Nil(t, err) - require.Equal(t, 1000, len(lines)) - require.Equal(t, "line: 1001", lines[0]) - require.Equal(t, "line: 2000", lines[len(lines)-1]) + // Test reading past end + lines, err = driver.Find(id.Hex(), "", 10000, 10, false) + require.Nil(t, err) + assert.Equal(t, 0, len(lines)) + }) - lines, err = driver.Find(id.Hex(), "", 1001, 1000) - require.Nil(t, err) - require.Equal(t, 1000, len(lines)) - require.Equal(t, "line: 1002", lines[0]) - require.Equal(t, "line: 2001", lines[len(lines)-1]) + t.Run("Tail Mode (tail=true)", func(t *testing.T) { + // Test reading last 10 lines + lines, err := driver.Find(id.Hex(), "", 0, 10, true) + require.Nil(t, err) + assert.Equal(t, 10, len(lines)) + assert.Equal(t, "line: 9991", lines[0]) + assert.Equal(t, "line: 10000", lines[9]) - lines, err = driver.Find(id.Hex(), "", 1001, 999) - require.Nil(t, err) - require.Equal(t, 999, len(lines)) - require.Equal(t, "line: 1002", lines[0]) - require.Equal(t, "line: 2000", lines[len(lines)-1]) + // Test reading with skip from end + lines, err = driver.Find(id.Hex(), "", 5, 10, true) + require.Nil(t, err) + assert.Equal(t, 10, len(lines)) + assert.Equal(t, "line: 9986", lines[0]) + assert.Equal(t, "line: 9995", lines[9]) - lines, err = driver.Find(id.Hex(), "", 999, 2001) - require.Nil(t, err) - require.Equal(t, 2001, len(lines)) - require.Equal(t, "line: 1000", lines[0]) - require.Equal(t, "line: 3000", lines[len(lines)-1]) + // Test reading with no limit (should read all lines after skip) + lines, err = driver.Find(id.Hex(), "", 5, 0, true) + require.Nil(t, err) + assert.Equal(t, 9995, len(lines)) + assert.Equal(t, "line: 1", lines[0]) + assert.Equal(t, "line: 9995", lines[9994]) + + // Test reading with large skip + lines, err = driver.Find(id.Hex(), "", 9995, 10, true) + require.Nil(t, err) + assert.Equal(t, 5, len(lines)) + assert.Equal(t, "line: 1", lines[0]) + assert.Equal(t, "line: 5", lines[4]) + + // Test reading with skip larger than file + lines, err = driver.Find(id.Hex(), "", 10001, 10, true) + require.Nil(t, err) + assert.Equal(t, 0, len(lines)) + }) cleanupFileDriverTest() } diff --git a/core/task/log/interface.go b/core/task/log/interface.go index aaf0f664..fb96b8a0 100644 --- a/core/task/log/interface.go +++ b/core/task/log/interface.go @@ -5,6 +5,6 @@ type Driver interface { Close() (err error) WriteLine(id string, line string) (err error) WriteLines(id string, lines []string) (err error) - Find(id string, pattern string, skip int, limit int) (lines []string, err error) + Find(id string, pattern string, skip int, limit int, tail bool) (lines []string, err error) Count(id string, pattern string) (n int, err error) } diff --git a/core/task/scheduler/service.go b/core/task/scheduler/service.go index 85b7d90b..05e880e2 100644 --- a/core/task/scheduler/service.go +++ b/core/task/scheduler/service.go @@ -205,7 +205,7 @@ func (svc *Service) cleanupTasks() { for { // task stats over 30 days ago taskStats, err := service.NewModelService[models.TaskStat]().GetMany(bson.M{ - "created_ts": bson.M{ + "created_at": bson.M{ "$lt": time.Now().Add(-30 * 24 * time.Hour), }, }, nil) diff --git a/frontend/crawlab-ui/src/components/core/ai/useAssistantConsole.ts b/frontend/crawlab-ui/src/components/core/ai/useAssistantConsole.ts index e62d5790..7e55fe9f 100644 --- a/frontend/crawlab-ui/src/components/core/ai/useAssistantConsole.ts +++ b/frontend/crawlab-ui/src/components/core/ai/useAssistantConsole.ts @@ -75,7 +75,7 @@ const useAssistantConsole = () => { const messages = (res.data || []).map((msg: any) => { const message: ChatMessage = { ...msg, - timestamp: new Date(msg.created_ts || Date.now()), + timestamp: new Date(msg.created_at || Date.now()), }; return message; }); diff --git a/frontend/crawlab-ui/src/components/core/task/useTask.ts b/frontend/crawlab-ui/src/components/core/task/useTask.ts index 31fe9200..c8472622 100644 --- a/frontend/crawlab-ui/src/components/core/task/useTask.ts +++ b/frontend/crawlab-ui/src/components/core/task/useTask.ts @@ -34,7 +34,7 @@ const useTask = (store: Store) => { const allListSelectOptions = computed(() => state.allList.map(task => { const spider = allSpiderDict.value.get(task.spider_id!); - const timeAgo = formatTimeAgo(task.created_ts!); + const timeAgo = formatTimeAgo(task.created_at!); return { label: `${spider?.name} (${timeAgo})`, value: task._id, diff --git a/frontend/crawlab-ui/src/components/ui/chat/ChatHistory.vue b/frontend/crawlab-ui/src/components/ui/chat/ChatHistory.vue index 94108282..5c709b30 100644 --- a/frontend/crawlab-ui/src/components/ui/chat/ChatHistory.vue +++ b/frontend/crawlab-ui/src/components/ui/chat/ChatHistory.vue @@ -86,7 +86,7 @@ defineOptions({ name: 'ClChatHistory' });
@@ -174,4 +174,4 @@ defineOptions({ name: 'ClChatHistory' }); text-overflow: ellipsis; white-space: nowrap; } - \ No newline at end of file + diff --git a/frontend/crawlab-ui/src/components/ui/chat/ChatMessage.vue b/frontend/crawlab-ui/src/components/ui/chat/ChatMessage.vue index c70ae2cc..d477a8b3 100644 --- a/frontend/crawlab-ui/src/components/ui/chat/ChatMessage.vue +++ b/frontend/crawlab-ui/src/components/ui/chat/ChatMessage.vue @@ -98,6 +98,7 @@ defineOptions({ name: 'ClChatMessage' }); ; content?: string; @@ -27,7 +28,9 @@ const actionStatusIcon = computed(() => { } }); -const parsedContent = computed | Record[] | null>(() => { +const parsedContent = computed< + Record | Record[] | null +>(() => { if (!props.content) return null; try { return JSON.parse(props.content); @@ -48,6 +51,14 @@ const hasContent = computed(() => { return props.content || hasParameters.value; }); +const actionLabel = computed(() => { + const { action, actionTarget } = props; + if (actionTarget) { + return `${action}: ${actionTarget}`; + } + return action; +}); + defineOptions({ name: 'ClChatMessageAction' }); @@ -65,7 +76,7 @@ defineOptions({ name: 'ClChatMessageAction' }); :class="{ 'flash-text': actionStatus === 'pending' }" @click="isExpanded = !isExpanded" > - {{ action }} + {{ actionLabel }}
-
+
Parameters
-
+
Response
@@ -151,7 +171,10 @@ defineOptions({ name: 'ClChatMessageAction' });
-
+
Parameters
-
+
Response ; tags?: string[]; messages?: ChatMessage[]; - created_ts?: string; - updated_ts?: string; + created_at?: string; + updated_at?: string; } interface ChatRequest { @@ -120,6 +121,7 @@ export declare global { type: 'text' | 'action'; // Message type action_id?: string; action?: string; + action_target?: string; action_status?: ChatMessageActionStatus; is_done?: boolean; is_initial?: boolean; diff --git a/frontend/crawlab-ui/src/interfaces/views/task.d.ts b/frontend/crawlab-ui/src/interfaces/views/task.d.ts index 84b41150..5433ddf3 100644 --- a/frontend/crawlab-ui/src/interfaces/views/task.d.ts +++ b/frontend/crawlab-ui/src/interfaces/views/task.d.ts @@ -31,8 +31,8 @@ export declare global { interface TaskStat { create_ts?: string; - start_ts?: string; - end_ts?: string; + started_at?: string; + ended_at?: string; result_count?: number; error_log_count?: number; wait_duration?: number; diff --git a/frontend/crawlab-ui/src/utils/notification.ts b/frontend/crawlab-ui/src/utils/notification.ts index 35a5ae72..7f05e3ae 100644 --- a/frontend/crawlab-ui/src/utils/notification.ts +++ b/frontend/crawlab-ui/src/utils/notification.ts @@ -53,7 +53,7 @@ export const allVariables: NotificationVariable[] = [ }, { category: 'task', - name: 'created_ts', + name: 'created_at', label: t('components.notification.variables.common.createdAt'), icon: ['fa', 'clock'], }, @@ -65,7 +65,7 @@ export const allVariables: NotificationVariable[] = [ }, { category: 'task', - name: 'updated_ts', + name: 'updated_at', label: t('components.notification.variables.common.updatedAt'), icon: ['fa', 'clock'], }, @@ -77,13 +77,13 @@ export const allVariables: NotificationVariable[] = [ }, { category: 'task_stat', - name: 'start_ts', + name: 'started_at', label: t('components.notification.variables.taskStat.startTs'), icon: ['fa', 'clock'], }, { category: 'task_stat', - name: 'end_ts', + name: 'ended_at', label: t('components.notification.variables.taskStat.endTs'), icon: ['fa', 'clock'], }, @@ -155,7 +155,7 @@ export const allVariables: NotificationVariable[] = [ }, { category: 'spider', - name: 'created_ts', + name: 'created_at', label: t('components.notification.variables.common.createdAt'), icon: ['fa', 'clock'], }, @@ -167,7 +167,7 @@ export const allVariables: NotificationVariable[] = [ }, { category: 'spider', - name: 'updated_ts', + name: 'updated_at', label: t('components.notification.variables.common.updatedAt'), icon: ['fa', 'clock'], }, @@ -273,7 +273,7 @@ export const allVariables: NotificationVariable[] = [ }, { category: 'schedule', - name: 'created_ts', + name: 'created_at', label: t('components.notification.variables.common.createdAt'), icon: ['fa', 'clock'], }, @@ -285,7 +285,7 @@ export const allVariables: NotificationVariable[] = [ }, { category: 'schedule', - name: 'updated_ts', + name: 'updated_at', label: t('components.notification.variables.common.updatedAt'), icon: ['fa', 'clock'], }, @@ -327,7 +327,7 @@ export const allVariables: NotificationVariable[] = [ }, { category: 'node', - name: 'created_ts', + name: 'created_at', label: t('components.notification.variables.common.createdAt'), icon: ['fa', 'clock'], }, @@ -339,7 +339,7 @@ export const allVariables: NotificationVariable[] = [ }, { category: 'node', - name: 'updated_ts', + name: 'updated_at', label: t('components.notification.variables.common.updatedAt'), icon: ['fa', 'clock'], }, @@ -399,7 +399,7 @@ export const allVariables: NotificationVariable[] = [ }, { category: 'alert', - name: 'created_ts', + name: 'created_at', label: t('components.notification.variables.common.createdAt'), icon: ['fa', 'clock'], }, @@ -411,7 +411,7 @@ export const allVariables: NotificationVariable[] = [ }, { category: 'alert', - name: 'updated_ts', + name: 'updated_at', label: t('components.notification.variables.common.updatedAt'), icon: ['fa', 'clock'], }, diff --git a/frontend/crawlab-ui/src/views/notification/request/list/NotificationRequestList.vue b/frontend/crawlab-ui/src/views/notification/request/list/NotificationRequestList.vue index b534662b..98259730 100644 --- a/frontend/crawlab-ui/src/views/notification/request/list/NotificationRequestList.vue +++ b/frontend/crawlab-ui/src/views/notification/request/list/NotificationRequestList.vue @@ -93,7 +93,7 @@ defineOptions({ name: 'ClNotificationRequestList' }); {{ t('views.notification.requests.form.createdAt') }} - +