From ed29a7656692944fdc03bd2644ac7d9ee9cac42d Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Thu, 3 Apr 2025 22:35:22 +0800 Subject: [PATCH] refactor: restructure stats handling and remove legacy service interface - Replaced the StatsService interface with direct MongoDB queries in the stats controller, enhancing performance and reducing complexity. - Introduced structured response types for GetStatsOverview and GetStatsTasks, improving clarity and maintainability of the API responses. - Removed outdated service implementation files, streamlining the codebase and eliminating redundancy. - Enhanced error handling for MongoDB operations to ensure robustness in data retrieval processes. --- core/controllers/stats.go | 288 +++++++++++++++++++++++++-- core/interfaces/stats_service.go | 9 - core/stats/service.go | 324 ------------------------------- 3 files changed, 274 insertions(+), 347 deletions(-) delete mode 100644 core/interfaces/stats_service.go delete mode 100644 core/stats/service.go diff --git a/core/controllers/stats.go b/core/controllers/stats.go index 019b6667..a3b449da 100644 --- a/core/controllers/stats.go +++ b/core/controllers/stats.go @@ -1,9 +1,15 @@ package controllers import ( + "go.mongodb.org/mongo-driver/bson/primitive" "time" - "github.com/crawlab-team/crawlab/core/stats" + "github.com/crawlab-team/crawlab/core/constants" + "github.com/crawlab-team/crawlab/core/entity" + "github.com/crawlab-team/crawlab/core/models/models" + "github.com/crawlab-team/crawlab/core/mongo" + mongo2 "go.mongodb.org/mongo-driver/mongo" + "github.com/gin-gonic/gin" "go.mongodb.org/mongo-driver/bson" ) @@ -18,49 +24,303 @@ type GetStatsOverviewParams struct { Query bson.M `json:"query" description:"Query"` } -func GetStatsOverview(_ *gin.Context, params *GetStatsOverviewParams) (response *Response[bson.M], err error) { +type GetStatsOverviewResponse struct { + Nodes int `json:"nodes" description:"Number of nodes"` + Projects int `json:"projects" description:"Number of projects"` + Spiders int `json:"spiders" description:"Number of spiders"` + Schedules int `json:"schedules" description:"Number of schedules"` + Tasks int `json:"tasks" description:"Number of tasks"` + ErrorTasks int `json:"error_tasks" description:"Number of error tasks"` + Results int `json:"results" description:"Number of results"` + Users int `json:"users" description:"Number of users"` +} + +func GetStatsOverview(_ *gin.Context, params *GetStatsOverviewParams) (response *Response[GetStatsOverviewResponse], err error) { query := statsDefaultQuery if params.Query != nil { query = params.Query } - data, err := stats.GetStatsService().GetOverviewStats(query) + var data GetStatsOverviewResponse + + // nodes + data.Nodes, err = mongo.GetMongoCol(models.GetCollectionNameByInstance(models.Node{})).Count(bson.M{"active": true}) if err != nil { - return GetErrorResponse[bson.M](err) + if err.Error() != mongo2.ErrNoDocuments.Error() { + return nil, err + } + data.Nodes = 0 } - return GetDataResponse(data.(bson.M)) + + // projects + data.Projects, err = mongo.GetMongoCol(models.GetCollectionNameByInstance(models.Project{})).Count(nil) + if err != nil { + if err.Error() != mongo2.ErrNoDocuments.Error() { + return nil, err + } + data.Projects = 0 + } + + // spiders + data.Spiders, err = mongo.GetMongoCol(models.GetCollectionNameByInstance(models.Spider{})).Count(nil) + if err != nil { + if err.Error() != mongo2.ErrNoDocuments.Error() { + return nil, err + } + data.Spiders = 0 + } + + // schedules + data.Schedules, err = mongo.GetMongoCol(models.GetCollectionNameByInstance(models.Schedule{})).Count(nil) + if err != nil { + if err.Error() != mongo2.ErrNoDocuments.Error() { + return nil, err + } + data.Schedules = 0 + } + + // tasks + data.Tasks, err = mongo.GetMongoCol(models.GetCollectionNameByInstance(models.Task{})).Count(nil) + if err != nil { + if err.Error() != mongo2.ErrNoDocuments.Error() { + return nil, err + } + data.Tasks = 0 + } + + // error tasks + data.ErrorTasks, err = mongo.GetMongoCol(models.GetCollectionNameByInstance(models.Task{})).Count(bson.M{"status": constants.TaskStatusError}) + if err != nil { + if err.Error() != mongo2.ErrNoDocuments.Error() { + return nil, err + } + data.ErrorTasks = 0 + } + + // results + pipeline := mongo2.Pipeline{ + {{"$match", query}}, + {{ + "$group", + bson.M{ + "_id": nil, + "results": bson.M{"$sum": "$result_count"}, + }, + }}, + } + var res struct { + Results int `bson:"results"` + } + if err := mongo.GetMongoCol(models.GetCollectionNameByInstance(models.TaskStat{})).Aggregate(pipeline, nil).One(&res); err != nil { + return nil, err + } + data.Results = res.Results + + // users + data.Users, err = mongo.GetMongoCol(models.GetCollectionNameByInstance(models.User{})).Count(nil) + if err != nil { + if err.Error() != mongo2.ErrNoDocuments.Error() { + return nil, err + } + data.Users = 0 + } + + return GetDataResponse(data) } type GetStatsDailyParams struct { Query bson.M `json:"query" description:"Query"` } -func GetStatsDaily(_ *gin.Context, params *GetStatsDailyParams) (response *Response[bson.M], err error) { +func GetStatsDaily(_ *gin.Context, params *GetStatsDailyParams) (response *Response[[]entity.StatsDailyItem], err error) { query := statsDefaultQuery if params.Query != nil { query = params.Query } - data, err := stats.GetStatsService().GetDailyStats(query) - if err != nil { - return GetErrorResponse[bson.M](err) + pipeline := mongo2.Pipeline{ + {{ + "$match", query, + }}, + {{ + "$addFields", + bson.M{ + "date": bson.M{ + "$dateToString": bson.M{ + "date": bson.M{"$toDate": "$_id"}, + "format": "%Y-%m-%d", + "timezone": "Asia/Shanghai", // TODO: parameterization + }, + }, + }, + }}, + {{ + "$group", + bson.M{ + "_id": "$date", + "tasks": bson.M{"$sum": 1}, + "results": bson.M{"$sum": "$result_count"}, + }, + }}, + {{ + "$sort", + bson.D{{"_id", 1}}, + }}, } - return GetDataResponse(data.(bson.M)) + var results []entity.StatsDailyItem + if err := mongo.GetMongoCol(models.GetCollectionNameByInstance(models.TaskStat{})).Aggregate(pipeline, nil).All(&results); err != nil { + return nil, err + } + return GetDataResponse(results) } type GetStatsTasksParams struct { Query bson.M `json:"query" description:"Query"` } -func GetStatsTasks(_ *gin.Context, params *GetStatsTasksParams) (response *Response[bson.M], err error) { +type GetStatsTaskResponse struct { + ByStatus []GetStatsTaskResponseByStatusItem `json:"by_status"` + ByNode []GetStatsTaskResponseByNodeItem `json:"by_node"` + BySpider []GetStatsTaskResponseBySpiderItem `json:"by_spider"` +} + +type GetStatsTaskResponseByStatusItem struct { + Status string `json:"status"` + Tasks int `json:"tasks"` +} +type GetStatsTaskResponseByNodeItem struct { + NodeId primitive.ObjectID `json:"node_id"` + Node models.Node `json:"node"` + NodeName string `json:"node_name"` + Tasks int `json:"tasks"` +} +type GetStatsTaskResponseBySpiderItem struct { + SpiderId primitive.ObjectID `json:"spider_id"` + Spider models.Spider `json:"spider"` + SpiderName string `json:"spider_name"` + Tasks int `json:"tasks"` +} + +func GetStatsTasks(_ *gin.Context, params *GetStatsTasksParams) (response *Response[GetStatsTaskResponse], err error) { query := statsDefaultQuery if params.Query != nil { query = params.Query } - data, err := stats.GetStatsService().GetTaskStats(query) + var data GetStatsTaskResponse + + // by status + data.ByStatus, err = getTaskStatsByStatus(query) if err != nil { - return GetErrorResponse[bson.M](err) + return nil, err } - return GetDataResponse(data.(bson.M)) + + // by node + data.ByNode, err = getTaskStatsByNode(query) + if err != nil { + return nil, err + } + + // by spider + data.BySpider, err = getTaskStatsBySpider(query) + if err != nil { + return nil, err + } + + return GetDataResponse(data) +} + +func getTaskStatsByStatus(query bson.M) (data []GetStatsTaskResponseByStatusItem, err error) { + pipeline := mongo2.Pipeline{ + {{"$match", query}}, + {{ + "$group", + bson.M{ + "_id": "$status", + "tasks": bson.M{"$sum": 1}, + }, + }}, + {{ + "$project", + bson.M{ + "status": "$_id", + "tasks": "$tasks", + }, + }}, + } + if err := mongo.GetMongoCol(models.GetCollectionNameByInstance(models.Task{})).Aggregate(pipeline, nil).All(&data); err != nil { + return nil, err + } + return data, nil +} + +func getTaskStatsByNode(query bson.M) (data []GetStatsTaskResponseByNodeItem, err error) { + pipeline := mongo2.Pipeline{ + {{"$match", query}}, + {{ + "$group", + bson.M{ + "_id": "$node_id", + "tasks": bson.M{"$sum": 1}, + }, + }}, + {{ + "$lookup", + bson.M{ + "from": models.GetCollectionNameByInstance(models.Node{}), + "localField": "_id", + "foreignField": "_id", + "as": "_n", + }, + }}, + {{ + "$project", + bson.M{ + "node_id": "$node_id", + "node": bson.M{"$arrayElemAt": bson.A{"$_n", 0}}, + "node_name": bson.M{"$arrayElemAt": bson.A{"$_n.name", 0}}, + "tasks": "$tasks", + }, + }}, + } + if err := mongo.GetMongoCol(models.GetCollectionNameByInstance(models.Task{})).Aggregate(pipeline, nil).All(&data); err != nil { + return nil, err + } + return data, nil +} + +func getTaskStatsBySpider(query bson.M) (data []GetStatsTaskResponseBySpiderItem, err error) { + pipeline := mongo2.Pipeline{ + {{"$match", query}}, + {{ + "$group", + bson.M{ + "_id": "$spider_id", + "tasks": bson.M{"$sum": 1}, + }, + }}, + {{ + "$lookup", + bson.M{ + "from": models.GetCollectionNameByInstance(models.Spider{}), + "localField": "_id", + "foreignField": "_id", + "as": "_s", + }, + }}, + {{ + "$project", + bson.M{ + "spider_id": "$spider_id", + "spider": bson.M{"$arrayElemAt": bson.A{"$_s", 0}}, + "spider_name": bson.M{"$arrayElemAt": bson.A{"$_s.name", 0}}, + "tasks": "$tasks", + }, + }}, + {{"$limit", 10}}, + } + if err := mongo.GetMongoCol(models.GetCollectionNameByInstance(models.Task{})).Aggregate(pipeline, nil).All(&data); err != nil { + return nil, err + } + return data, nil } diff --git a/core/interfaces/stats_service.go b/core/interfaces/stats_service.go deleted file mode 100644 index aef3f5f5..00000000 --- a/core/interfaces/stats_service.go +++ /dev/null @@ -1,9 +0,0 @@ -package interfaces - -import "go.mongodb.org/mongo-driver/bson" - -type StatsService interface { - GetOverviewStats(query bson.M) (data interface{}, err error) - GetDailyStats(query bson.M) (data interface{}, err error) - GetTaskStats(query bson.M) (data interface{}, err error) -} diff --git a/core/stats/service.go b/core/stats/service.go deleted file mode 100644 index 6f1d26ea..00000000 --- a/core/stats/service.go +++ /dev/null @@ -1,324 +0,0 @@ -package stats - -import ( - "github.com/crawlab-team/crawlab/core/constants" - "github.com/crawlab-team/crawlab/core/entity" - "github.com/crawlab-team/crawlab/core/interfaces" - "github.com/crawlab-team/crawlab/core/models/models" - "github.com/crawlab-team/crawlab/core/mongo" - "go.mongodb.org/mongo-driver/bson" - mongo2 "go.mongodb.org/mongo-driver/mongo" -) - -type Service struct { -} - -func (svc *Service) GetOverviewStats(query bson.M) (data interface{}, err error) { - stats := bson.M{} - - // nodes - stats["nodes"], err = mongo.GetMongoCol(models.GetCollectionNameByInstance(models.Node{})).Count(bson.M{"active": true}) - if err != nil { - if err.Error() != mongo2.ErrNoDocuments.Error() { - return nil, err - } - stats["nodes"] = 0 - } - - // projects - stats["projects"], err = mongo.GetMongoCol(models.GetCollectionNameByInstance(models.Project{})).Count(nil) - if err != nil { - if err.Error() != mongo2.ErrNoDocuments.Error() { - return nil, err - } - stats["projects"] = 0 - } - - // spiders - stats["spiders"], err = mongo.GetMongoCol(models.GetCollectionNameByInstance(models.Spider{})).Count(nil) - if err != nil { - if err.Error() != mongo2.ErrNoDocuments.Error() { - return nil, err - } - stats["spiders"] = 0 - } - - // schedules - stats["schedules"], err = mongo.GetMongoCol(models.GetCollectionNameByInstance(models.Schedule{})).Count(nil) - if err != nil { - if err.Error() != mongo2.ErrNoDocuments.Error() { - return nil, err - } - stats["schedules"] = 0 - } - - // tasks - stats["tasks"], err = mongo.GetMongoCol(models.GetCollectionNameByInstance(models.Task{})).Count(nil) - if err != nil { - if err.Error() != mongo2.ErrNoDocuments.Error() { - return nil, err - } - stats["tasks"] = 0 - } - - // error tasks - stats["error_tasks"], err = mongo.GetMongoCol(models.GetCollectionNameByInstance(models.Task{})).Count(bson.M{"status": constants.TaskStatusError}) - if err != nil { - if err.Error() != mongo2.ErrNoDocuments.Error() { - return nil, err - } - stats["error_tasks"] = 0 - } - - // results - stats["results"], err = svc.getOverviewResults(query) - if err != nil { - if err.Error() != mongo2.ErrNoDocuments.Error() { - return nil, err - } - stats["results"] = 0 - } - - // users - stats["users"], err = mongo.GetMongoCol(models.GetCollectionNameByInstance(models.User{})).Count(nil) - if err != nil { - if err.Error() != mongo2.ErrNoDocuments.Error() { - return nil, err - } - stats["users"] = 0 - } - - return stats, nil -} - -func (svc *Service) GetDailyStats(query bson.M) (data interface{}, err error) { - tasksStats, err := svc.getDailyTasksStats(query) - if err != nil { - return nil, err - } - return tasksStats, nil -} - -func (svc *Service) GetTaskStats(query bson.M) (data interface{}, err error) { - stats := bson.M{} - - // by status - stats["by_status"], err = svc.getTaskStatsByStatus(query) - if err != nil { - return nil, err - } - - // by node - stats["by_node"], err = svc.getTaskStatsByNode(query) - if err != nil { - return nil, err - } - - // by spider - stats["by_spider"], err = svc.getTaskStatsBySpider(query) - if err != nil { - return nil, err - } - - return stats, nil -} - -func (svc *Service) getDailyTasksStats(query bson.M) (data interface{}, err error) { - pipeline := mongo2.Pipeline{ - {{ - "$match", query, - }}, - {{ - "$addFields", - bson.M{ - "date": bson.M{ - "$dateToString": bson.M{ - "date": bson.M{"$toDate": "$_id"}, - "format": "%Y-%m-%d", - "timezone": "Asia/Shanghai", // TODO: parameterization - }, - }, - }, - }}, - {{ - "$group", - bson.M{ - "_id": "$date", - "tasks": bson.M{"$sum": 1}, - "results": bson.M{"$sum": "$result_count"}, - }, - }}, - {{ - "$sort", - bson.D{{"_id", 1}}, - }}, - } - var results []entity.StatsDailyItem - if err := mongo.GetMongoCol(models.GetCollectionNameByInstance(models.TaskStat{})).Aggregate(pipeline, nil).All(&results); err != nil { - return nil, err - } - return results, nil -} - -func (svc *Service) getOverviewResults(query bson.M) (data interface{}, err error) { - pipeline := mongo2.Pipeline{ - {{"$match", query}}, - {{ - "$group", - bson.M{ - "_id": nil, - "results": bson.M{"$sum": "$result_count"}, - }, - }}, - } - var res bson.M - if err := mongo.GetMongoCol(models.GetCollectionNameByInstance(models.TaskStat{})).Aggregate(pipeline, nil).One(&res); err != nil { - return nil, err - } - return res["results"], nil -} - -func (svc *Service) getTaskStatsByStatus(query bson.M) (data interface{}, err error) { - pipeline := mongo2.Pipeline{ - {{"$match", query}}, - {{ - "$group", - bson.M{ - "_id": "$status", - "tasks": bson.M{"$sum": 1}, - }, - }}, - {{ - "$project", - bson.M{ - "status": "$_id", - "tasks": "$tasks", - }, - }}, - } - var results []bson.M - if err := mongo.GetMongoCol(models.GetCollectionNameByInstance(models.Task{})).Aggregate(pipeline, nil).All(&results); err != nil { - return nil, err - } - return results, nil -} - -func (svc *Service) getTaskStatsByNode(query bson.M) (data interface{}, err error) { - pipeline := mongo2.Pipeline{ - {{"$match", query}}, - {{ - "$group", - bson.M{ - "_id": "$node_id", - "tasks": bson.M{"$sum": 1}, - }, - }}, - {{ - "$lookup", - bson.M{ - "from": models.GetCollectionNameByInstance(models.Node{}), - "localField": "_id", - "foreignField": "_id", - "as": "_n", - }, - }}, - {{ - "$project", - bson.M{ - "node_id": "$node_id", - "node": bson.M{"$arrayElemAt": bson.A{"$_n", 0}}, - "node_name": bson.M{"$arrayElemAt": bson.A{"$_n.name", 0}}, - "tasks": "$tasks", - }, - }}, - } - var results []bson.M - if err := mongo.GetMongoCol(models.GetCollectionNameByInstance(models.Task{})).Aggregate(pipeline, nil).All(&results); err != nil { - return nil, err - } - return results, nil -} - -func (svc *Service) getTaskStatsBySpider(query bson.M) (data interface{}, err error) { - pipeline := mongo2.Pipeline{ - {{"$match", query}}, - {{ - "$group", - bson.M{ - "_id": "$spider_id", - "tasks": bson.M{"$sum": 1}, - }, - }}, - {{ - "$lookup", - bson.M{ - "from": models.GetCollectionNameByInstance(models.Spider{}), - "localField": "_id", - "foreignField": "_id", - "as": "_s", - }, - }}, - {{ - "$project", - bson.M{ - "spider_id": "$spider_id", - "spider": bson.M{"$arrayElemAt": bson.A{"$_s", 0}}, - "spider_name": bson.M{"$arrayElemAt": bson.A{"$_s.name", 0}}, - "tasks": "$tasks", - }, - }}, - {{"$limit", 10}}, - } - var results []bson.M - if err := mongo.GetMongoCol(models.GetCollectionNameByInstance(models.Task{})).Aggregate(pipeline, nil).All(&results); err != nil { - return nil, err - } - return results, nil -} - -func (svc *Service) getTaskStatsHistogram(query bson.M) (data interface{}, err error) { - pipeline := mongo2.Pipeline{ - {{"$match", query}}, - {{ - "$lookup", - bson.M{ - "from": models.GetCollectionNameByInstance(models.TaskStat{}), - "localField": "_id", - "foreignField": "_id", - "as": "_ts", - }, - }}, - {{ - "$facet", - bson.M{ - "total_duration": bson.A{ - bson.M{ - "$bucketAuto": bson.M{ - "groupBy": "$_ts.td", - "buckets": 10, - "granularity": "1-2-5", - }, - }, - }, - }, - }}, - } - var res bson.M - if err := mongo.GetMongoCol(models.GetCollectionNameByInstance(models.Task{})).Aggregate(pipeline, nil).One(&res); err != nil { - return nil, err - } - return res, nil -} - -var svc interfaces.StatsService - -func GetStatsService() interfaces.StatsService { - if svc != nil { - return svc - } - - // service - svc = &Service{} - - return svc -}