From 32760839946959b6df19d889822dba7285543856 Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Tue, 24 Dec 2024 19:11:19 +0800 Subject: [PATCH] refactor: replace apex/log with structured logger across multiple services - Replaced all instances of apex/log with a structured logger interface in various services, including Api, Server, Config, and others, to enhance logging consistency and context. - Updated logging calls to utilize the new logger methods, improving error tracking and service monitoring. - Added logger initialization in services and controllers to ensure proper logging setup. - Improved error handling and logging messages for better clarity during service operations. - Removed unused apex/log imports and cleaned up related code for better maintainability. --- core/apps/api.go | 14 +- core/apps/server.go | 10 +- core/apps/utils.go | 11 +- core/config/config.go | 11 +- core/controllers/base_file.go | 17 ++- core/controllers/spider.go | 19 +-- core/controllers/task.go | 11 +- core/controllers/utils_logger.go | 5 + core/controllers/ws_writer.go | 7 +- core/export/csv_service.go | 47 ++++--- core/export/json_service.go | 37 +++--- core/fs/default.go | 25 ---- core/fs/service.go | 21 ++- core/grpc/client/client.go | 32 ++--- core/grpc/server/dependency_service_server.go | 29 +++-- core/grpc/server/metric_service_server.go | 14 +- core/grpc/server/node_service_server.go | 15 ++- core/grpc/server/server.go | 18 +-- core/grpc/server/task_service_server.go | 46 ++++--- core/models/common/index_utils.go | 14 +- core/models/common/index_utils_test.go | 4 +- core/models/common/init_index.go | 18 +-- core/node/config/config_service.go | 17 +-- core/node/service/master_service.go | 32 ++--- core/node/service/worker_service.go | 28 ++-- core/notification/im.go | 18 +-- core/notification/logger.go | 5 + core/notification/mail.go | 5 +- core/notification/mail_gmail.go | 15 +-- core/notification/oauth2_gmail.go | 5 +- core/notification/service.go | 32 ++--- core/schedule/logger.go | 25 ++-- core/schedule/service.go | 28 ++-- core/task/handler/runner.go | 117 ++++++++--------- core/task/handler/service.go | 68 +++++----- core/task/log/file_driver.go | 46 ++++--- core/task/scheduler/service.go | 37 +++--- core/task/stats/service.go | 11 +- core/user/service.go | 11 +- core/utils/backoff.go | 15 --- core/utils/config.go | 7 +- core/utils/file.go | 11 +- core/utils/health.go | 5 +- core/utils/log.go | 122 +++++++++++++++++- core/utils/process.go | 13 +- core/utils/time.go | 9 +- 46 files changed, 617 insertions(+), 490 deletions(-) create mode 100644 core/controllers/utils_logger.go delete mode 100644 core/fs/default.go create mode 100644 core/notification/logger.go delete mode 100644 core/utils/backoff.go diff --git a/core/apps/api.go b/core/apps/api.go index 39c13c54..4f25e6b6 100644 --- a/core/apps/api.go +++ b/core/apps/api.go @@ -3,8 +3,8 @@ package apps import ( "context" "errors" - "github.com/apex/log" "github.com/crawlab-team/crawlab/core/controllers" + "github.com/crawlab-team/crawlab/core/interfaces" "github.com/crawlab-team/crawlab/core/middlewares" "github.com/crawlab-team/crawlab/core/utils" "github.com/gin-gonic/gin" @@ -25,6 +25,7 @@ type Api struct { ln net.Listener srv *http.Server initialized bool + interfaces.Logger } func (app *Api) Init() { @@ -59,14 +60,14 @@ func (app *Api) Start() { if err != nil { panic(err) } - log.Infof("api server listening on %s", address) + app.Infof("api server listening on %s", address) // serve if err := http.Serve(app.ln, app.app); err != nil { if !errors.Is(err, http.ErrServerClosed) { - log.Errorf("run api server error: %v", err) + app.Errorf("run api server error: %v", err) } else { - log.Info("api server graceful down") + app.Info("api server graceful down") } } } @@ -80,7 +81,7 @@ func (app *Api) Stop() { defer cancel() if err := app.srv.Shutdown(ctx); err != nil { - log.Errorf("shutdown api server error: %v", err) + app.Errorf("shutdown api server error: %v", err) } } @@ -100,7 +101,8 @@ func (app *Api) initModuleWithApp(name string, fn func(app *gin.Engine) error) ( func newApi() *Api { api := &Api{ - app: gin.New(), + app: gin.New(), + Logger: utils.NewLogger("Api"), } api.Init() return api diff --git a/core/apps/server.go b/core/apps/server.go index 7cc2bdf8..c23441bb 100644 --- a/core/apps/server.go +++ b/core/apps/server.go @@ -2,7 +2,6 @@ package apps import ( "fmt" - "github.com/apex/log" "github.com/crawlab-team/crawlab/core/interfaces" "github.com/crawlab-team/crawlab/core/node/service" "github.com/crawlab-team/crawlab/core/utils" @@ -16,6 +15,9 @@ type Server struct { // modules nodeSvc interfaces.NodeService api *Api + + // internals + interfaces.Logger } func (app *Server) Init() { @@ -53,7 +55,7 @@ func (app *Server) GetNodeService() interfaces.NodeService { } func (app *Server) logNodeInfo() { - log.Infof("current node type: %s", utils.GetNodeType()) + app.Infof("current node type: %s", utils.GetNodeType()) } func (app *Server) initPprof() { @@ -66,7 +68,9 @@ func (app *Server) initPprof() { func newServer() App { // server - svr := &Server{} + svr := &Server{ + Logger: utils.NewLogger("Server"), + } // master modules if utils.IsMaster() { diff --git a/core/apps/utils.go b/core/apps/utils.go index 57bc1da7..faa83700 100644 --- a/core/apps/utils.go +++ b/core/apps/utils.go @@ -1,11 +1,11 @@ package apps import ( - "fmt" - "github.com/apex/log" - "github.com/crawlab-team/crawlab/trace" + "github.com/crawlab-team/crawlab/core/utils" ) +var utilsLogger = utils.NewLogger("AppsUtils") + func Start(app App) { start(app) } @@ -19,10 +19,9 @@ func start(app App) { func initModule(name string, fn func() error) (err error) { if err := fn(); err != nil { - log.Error(fmt.Sprintf("init %s error: %s", name, err.Error())) - _ = trace.TraceError(err) + utilsLogger.Errorf("init %s error: %v", name, err) panic(err) } - log.Info(fmt.Sprintf("initialized %s successfully", name)) + utilsLogger.Infof("initialized %s successfully", name) return nil } diff --git a/core/config/config.go b/core/config/config.go index 09bfde7a..ba79e064 100644 --- a/core/config/config.go +++ b/core/config/config.go @@ -2,6 +2,8 @@ package config import ( "errors" + "github.com/crawlab-team/crawlab/core/interfaces" + "github.com/crawlab-team/crawlab/core/utils" "strings" "sync" @@ -12,6 +14,7 @@ import ( type Config struct { Name string + interfaces.Logger } func (c *Config) Init() { @@ -43,7 +46,7 @@ func (c *Config) Init() { if err := viper.ReadInConfig(); err != nil { var configFileNotFoundError viper.ConfigFileNotFoundError if errors.As(err, &configFileNotFoundError) { - log.Warn("No config file found. Using default values.") + c.Warn("No config file found. Using default values.") } } @@ -54,7 +57,7 @@ func (c *Config) Init() { func (c *Config) WatchConfig() { viper.WatchConfig() viper.OnConfigChange(func(e fsnotify.Event) { - log.Infof("Config file changed: %s", e.Name) + c.Infof("Config file changed: %s", e.Name) }) } @@ -78,7 +81,9 @@ func (c *Config) initLogLevel() { } func newConfig() *Config { - return &Config{} + return &Config{ + Logger: utils.NewLogger("Config"), + } } var _config *Config diff --git a/core/controllers/base_file.go b/core/controllers/base_file.go index d35c4d86..5018caea 100644 --- a/core/controllers/base_file.go +++ b/core/controllers/base_file.go @@ -3,7 +3,6 @@ package controllers import ( "errors" "fmt" - log2 "github.com/apex/log" "github.com/crawlab-team/crawlab/core/fs" "github.com/crawlab-team/crawlab/core/interfaces" "github.com/crawlab-team/crawlab/core/utils" @@ -138,28 +137,28 @@ func PostBaseFileSaveFiles(rootPath string, c *gin.Context) { go func(path string) { file, err := c.FormFile(path) if err != nil { - log2.Warnf("invalid file header: %s", path) - log2.Error(err.Error()) + logger.Warnf("invalid file header: %s", path) + logger.Error(err.Error()) wg.Done() return } f, err := file.Open() if err != nil { - log2.Warnf("unable to open file: %s", path) - log2.Error(err.Error()) + logger.Warnf("unable to open file: %s", path) + logger.Error(err.Error()) wg.Done() return } fileData, err := io.ReadAll(f) if err != nil { - log2.Warnf("unable to read file: %s", path) - log2.Error(err.Error()) + logger.Warnf("unable to read file: %s", path) + logger.Error(err.Error()) wg.Done() return } if err := fsSvc.Save(path, fileData); err != nil { - log2.Warnf("unable to save file: %s", path) - log2.Error(err.Error()) + logger.Warnf("unable to save file: %s", path) + logger.Error(err.Error()) wg.Done() return } diff --git a/core/controllers/spider.go b/core/controllers/spider.go index 36746166..edc05507 100644 --- a/core/controllers/spider.go +++ b/core/controllers/spider.go @@ -9,7 +9,6 @@ import ( "path/filepath" "sync" - "github.com/apex/log" "github.com/crawlab-team/crawlab/core/fs" "github.com/crawlab-team/crawlab/core/interfaces" "github.com/crawlab-team/crawlab/core/models/service" @@ -17,7 +16,6 @@ import ( "github.com/crawlab-team/crawlab/core/utils" "github.com/crawlab-team/crawlab/db/generic" "github.com/crawlab-team/crawlab/db/mongo" - "github.com/crawlab-team/crawlab/trace" "github.com/gin-gonic/gin" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" @@ -398,7 +396,7 @@ func DeleteSpiderById(c *gin.Context) { // delete task logs logPath := filepath.Join(utils.GetTaskLogPath(), id) if err := os.RemoveAll(logPath); err != nil { - log.Warnf("failed to remove task log directory: %s", logPath) + logger.Warnf("failed to remove task log directory: %s", logPath) } wg.Done() }(id.Hex()) @@ -416,12 +414,12 @@ func DeleteSpiderById(c *gin.Context) { // delete spider directory fsSvc, err := getSpiderFsSvcById(id) if err != nil { - log.Errorf("failed to get spider fs service: %s", err.Error()) + logger.Errorf("failed to get spider fs service: %v", err) return } err = fsSvc.Delete(".") if err != nil { - log.Errorf("failed to delete spider directory: %s", err.Error()) + logger.Errorf("failed to delete spider directory: %v", err) return } }() @@ -503,7 +501,7 @@ func DeleteSpiderList(c *gin.Context) { // delete task logs logPath := filepath.Join(utils.GetTaskLogPath(), id) if err := os.RemoveAll(logPath); err != nil { - log.Warnf("failed to remove task log directory: %s", logPath) + logger.Warnf("failed to remove task log directory: %s", logPath) } wg.Done() }(id.Hex()) @@ -532,14 +530,12 @@ func DeleteSpiderList(c *gin.Context) { // Delete spider directory fsSvc, err := getSpiderFsSvcById(s.Id) if err != nil { - log.Errorf("failed to get spider fs service: %s", err.Error()) - trace.PrintError(err) + logger.Errorf("failed to get spider fs service: %v", err) return } err = fsSvc.Delete(".") if err != nil { - log.Errorf("failed to delete spider directory: %s", err.Error()) - trace.PrintError(err) + logger.Errorf("failed to delete spider directory: %v", err) return } }(&spiders[i]) @@ -719,8 +715,7 @@ func getSpiderFsSvc(s *models.Spider) (svc interfaces.FsService, err error) { func getSpiderFsSvcById(id primitive.ObjectID) (svc interfaces.FsService, err error) { s, err := service.NewModelService[models.Spider]().GetById(id) if err != nil { - log.Errorf("failed to get spider: %s", err.Error()) - trace.PrintError(err) + logger.Errorf("failed to get spider: %v", err) return nil, err } return getSpiderFsSvc(s) diff --git a/core/controllers/task.go b/core/controllers/task.go index 7032a476..868ef107 100644 --- a/core/controllers/task.go +++ b/core/controllers/task.go @@ -2,7 +2,6 @@ package controllers import ( "errors" - log2 "github.com/apex/log" "github.com/crawlab-team/crawlab/core/constants" "github.com/crawlab-team/crawlab/core/interfaces" "github.com/crawlab-team/crawlab/core/models/models" @@ -191,12 +190,12 @@ func DeleteTaskById(c *gin.Context) { // delete task stat _, err = service.NewModelService[models.TaskStat]().GetById(id) if err != nil { - log2.Warnf("delete task stat error: %s", err.Error()) + logger.Warnf("delete task stat error: %s", err.Error()) return nil } err = service.NewModelService[models.TaskStat]().DeleteById(id) if err != nil { - log2.Warnf("delete task stat error: %s", err.Error()) + logger.Warnf("delete task stat error: %s", err.Error()) return nil } @@ -209,7 +208,7 @@ func DeleteTaskById(c *gin.Context) { // delete task logs logPath := filepath.Join(utils.GetTaskLogPath(), id.Hex()) if err := os.RemoveAll(logPath); err != nil { - log2.Warnf("failed to remove task log directory: %s", logPath) + logger.Warnf("failed to remove task log directory: %s", logPath) } HandleSuccess(c) @@ -240,7 +239,7 @@ func DeleteList(c *gin.Context) { "$in": payload.Ids, }, }); err != nil { - log2.Warnf("delete task stat error: %s", err.Error()) + logger.Warnf("delete task stat error: %s", err.Error()) return nil } @@ -258,7 +257,7 @@ func DeleteList(c *gin.Context) { // delete task logs logPath := filepath.Join(utils.GetTaskLogPath(), id) if err := os.RemoveAll(logPath); err != nil { - log2.Warnf("failed to remove task log directory: %s", logPath) + logger.Warnf("failed to remove task log directory: %s", logPath) } wg.Done() }(id.Hex()) diff --git a/core/controllers/utils_logger.go b/core/controllers/utils_logger.go new file mode 100644 index 00000000..002ff743 --- /dev/null +++ b/core/controllers/utils_logger.go @@ -0,0 +1,5 @@ +package controllers + +import "github.com/crawlab-team/crawlab/core/utils" + +var logger = utils.NewLogger("Controllers") diff --git a/core/controllers/ws_writer.go b/core/controllers/ws_writer.go index 798b162c..dcc096c3 100644 --- a/core/controllers/ws_writer.go +++ b/core/controllers/ws_writer.go @@ -1,8 +1,6 @@ package controllers import ( - "github.com/apex/log" - "github.com/crawlab-team/crawlab/trace" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" "io" @@ -16,7 +14,7 @@ type WsWriter struct { } func (w *WsWriter) Write(data []byte) (n int, err error) { - log.Infof("websocket write: %s", string(data)) + logger.Infof("websocket write: %s", string(data)) err = w.conn.WriteMessage(websocket.TextMessage, data) if err != nil { return 0, err @@ -47,8 +45,7 @@ func NewWsWriter(c *gin.Context) (writer *WsWriter, err error) { conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) if err != nil { - log.Errorf("websocket open connection error: %v", err) - trace.PrintError(err) + logger.Errorf("websocket open connection error: %v", err) } return &WsWriter{ diff --git a/core/export/csv_service.go b/core/export/csv_service.go index 666cc331..46175774 100644 --- a/core/export/csv_service.go +++ b/core/export/csv_service.go @@ -6,13 +6,11 @@ import ( "errors" "fmt" "github.com/ReneKroon/ttlcache" - "github.com/apex/log" "github.com/crawlab-team/crawlab/core/constants" "github.com/crawlab-team/crawlab/core/entity" "github.com/crawlab-team/crawlab/core/interfaces" "github.com/crawlab-team/crawlab/core/utils" "github.com/crawlab-team/crawlab/db/mongo" - "github.com/crawlab-team/crawlab/trace" "github.com/hashicorp/go-uuid" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" @@ -26,12 +24,14 @@ import ( type CsvService struct { cache *ttlcache.Cache + interfaces.Logger } func (svc *CsvService) GenerateId() (exportId string, err error) { exportId, err = uuid.GenerateUUID() if err != nil { - return "", trace.TraceError(err) + svc.Errorf("failed to generate export id: %v", err) + return "", err } return exportId, nil } @@ -69,7 +69,8 @@ func (svc *CsvService) GetExport(exportId string) (export interfaces.Export, err // get export from cache res, ok := svc.cache.Get(exportId) if !ok { - return nil, trace.TraceError(errors.New("export not found")) + svc.Errorf("export not found: %s", exportId) + return nil, err } export = res.(interfaces.Export) return export, nil @@ -81,8 +82,7 @@ func (svc *CsvService) export(export *entity.Export) { err := errors.New("empty target") export.Status = constants.TaskStatusError export.EndTs = time.Now() - log.Errorf("export error (id: %s): %v", export.Id, err) - trace.PrintError(err) + svc.Errorf("export error (id: %s): %v", export.Id, err) svc.cache.Set(export.Id, export) return } @@ -95,8 +95,7 @@ func (svc *CsvService) export(export *entity.Export) { if err != nil { export.Status = constants.TaskStatusError export.EndTs = time.Now() - log.Errorf("export error (id: %s): %v", export.Id, err) - trace.PrintError(err) + svc.Errorf("export error (id: %s): %v", export.Id, err) svc.cache.Set(export.Id, export) return } @@ -113,8 +112,7 @@ func (svc *CsvService) export(export *entity.Export) { if err != nil { export.Status = constants.TaskStatusError export.EndTs = time.Now() - log.Errorf("export error (id: %s): %v", export.Id, err) - trace.PrintError(err) + svc.Errorf("export error (id: %s): %v", export.Id, err) svc.cache.Set(export.Id, export) return } @@ -123,7 +121,7 @@ func (svc *CsvService) export(export *entity.Export) { bom := []byte{0xEF, 0xBB, 0xBF} _, err = csvFile.Write(bom) if err != nil { - trace.PrintError(err) + svc.Errorf("export error (id: %s): %v", export.Id, err) return } @@ -133,8 +131,7 @@ func (svc *CsvService) export(export *entity.Export) { if err != nil { export.Status = constants.TaskStatusError export.EndTs = time.Now() - log.Errorf("export error (id: %s): %v", export.Id, err) - trace.PrintError(err) + svc.Errorf("export error (id: %s): %v", export.Id, err) svc.cache.Set(export.Id, export) return } @@ -149,17 +146,16 @@ func (svc *CsvService) export(export *entity.Export) { // check error err := cur.Err() if err != nil { - if err != mongo2.ErrNoDocuments { + if !errors.Is(err, mongo2.ErrNoDocuments) { // error export.Status = constants.TaskStatusError export.EndTs = time.Now() - log.Errorf("export error (id: %s): %v", export.Id, err) - trace.PrintError(err) + svc.Errorf("export error (id: %s): %v", export.Id, err) } else { // no more data export.Status = constants.TaskStatusFinished export.EndTs = time.Now() - log.Infof("export finished (id: %s)", export.Id) + svc.Infof("export finished (id: %s)", export.Id) } svc.cache.Set(export.Id, export) return @@ -170,7 +166,7 @@ func (svc *CsvService) export(export *entity.Export) { // no more data export.Status = constants.TaskStatusFinished export.EndTs = time.Now() - log.Infof("export finished (id: %s)", export.Id) + svc.Infof("export finished (id: %s)", export.Id) svc.cache.Set(export.Id, export) return } @@ -182,8 +178,7 @@ func (svc *CsvService) export(export *entity.Export) { // error export.Status = constants.TaskStatusError export.EndTs = time.Now() - log.Errorf("export error (id: %s): %v", export.Id, err) - trace.PrintError(err) + svc.Errorf("export error (id: %s): %v", export.Id, err) svc.cache.Set(export.Id, export) return } @@ -195,8 +190,7 @@ func (svc *CsvService) export(export *entity.Export) { // error export.Status = constants.TaskStatusError export.EndTs = time.Now() - log.Errorf("export error (id: %s): %v", export.Id, err) - trace.PrintError(err) + svc.Errorf("export error (id: %s): %v", export.Id, err) svc.cache.Set(export.Id, export) return } @@ -240,7 +234,8 @@ func (svc *CsvService) getCsvWriter(export *entity.Export) (csvWriter *csv.Write // open file csvFile, err = os.Create(export.DownloadPath) if err != nil { - return nil, nil, trace.TraceError(err) + svc.Errorf("failed to create csv file: %v", err) + return nil, nil, err } // create csv writer @@ -256,7 +251,8 @@ func (svc *CsvService) getColumns(query bson.M, export interfaces.Export) (colum // get 10 records var data []bson.M if err := col.Find(query, &mongo.FindOptions{Limit: 10}).All(&data); err != nil { - return nil, trace.TraceError(err) + svc.Errorf("failed to get columns: %v", err) + return nil, err } // columns set @@ -329,7 +325,8 @@ func NewCsvService() (svc2 interfaces.ExportService) { cache := ttlcache.NewCache() cache.SetTTL(time.Minute * 5) svc := &CsvService{ - cache: cache, + cache: cache, + Logger: utils.NewLogger("CsvService"), } return svc } diff --git a/core/export/json_service.go b/core/export/json_service.go index 5d56268d..1b0dbe37 100644 --- a/core/export/json_service.go +++ b/core/export/json_service.go @@ -5,13 +5,11 @@ import ( "encoding/json" "errors" "github.com/ReneKroon/ttlcache" - "github.com/apex/log" "github.com/crawlab-team/crawlab/core/constants" "github.com/crawlab-team/crawlab/core/entity" "github.com/crawlab-team/crawlab/core/interfaces" "github.com/crawlab-team/crawlab/core/utils" "github.com/crawlab-team/crawlab/db/mongo" - "github.com/crawlab-team/crawlab/trace" "github.com/hashicorp/go-uuid" mongo2 "go.mongodb.org/mongo-driver/mongo" "os" @@ -21,12 +19,14 @@ import ( type JsonService struct { cache *ttlcache.Cache + interfaces.Logger } func (svc *JsonService) GenerateId() (exportId string, err error) { exportId, err = uuid.GenerateUUID() if err != nil { - return "", trace.TraceError(err) + svc.Errorf("failed to generate export id: %v", err) + return "", err } return exportId, nil } @@ -64,7 +64,8 @@ func (svc *JsonService) GetExport(exportId string) (export interfaces.Export, er // get export from cache res, ok := svc.cache.Get(exportId) if !ok { - return nil, trace.TraceError(errors.New("export not found")) + svc.Errorf("export not found (id: %s)", exportId) + return nil, err } export = res.(interfaces.Export) return export, nil @@ -76,8 +77,7 @@ func (svc *JsonService) export(export *entity.Export) { err := errors.New("empty target") export.Status = constants.TaskStatusError export.EndTs = time.Now() - log.Errorf("export error (id: %s): %v", export.Id, err) - trace.PrintError(err) + svc.Errorf("export error (id: %s): %v", export.Id, err) svc.cache.Set(export.Id, export) return } @@ -90,8 +90,7 @@ func (svc *JsonService) export(export *entity.Export) { if err != nil { export.Status = constants.TaskStatusError export.EndTs = time.Now() - log.Errorf("export error (id: %s): %v", export.Id, err) - trace.PrintError(err) + svc.Errorf("export error (id: %s): %v", export.Id, err) svc.cache.Set(export.Id, export) return } @@ -111,17 +110,17 @@ func (svc *JsonService) export(export *entity.Export) { // check error err := cur.Err() if err != nil { - if err != mongo2.ErrNoDocuments { + if !errors.Is(err, mongo2.ErrNoDocuments) { // error export.Status = constants.TaskStatusError export.EndTs = time.Now() - log.Errorf("export error (id: %s): %v", export.Id, err) - trace.PrintError(err) + svc.Errorf("export error (id: %s): %v", export.Id, err) + } else { // no more data export.Status = constants.TaskStatusFinished export.EndTs = time.Now() - log.Infof("export finished (id: %s)", export.Id) + svc.Infof("export finished (id: %s)", export.Id) } svc.cache.Set(export.Id, export) return @@ -132,7 +131,7 @@ func (svc *JsonService) export(export *entity.Export) { // no more data export.Status = constants.TaskStatusFinished export.EndTs = time.Now() - log.Infof("export finished (id: %s)", export.Id) + svc.Infof("export finished (id: %s)", export.Id) svc.cache.Set(export.Id, export) break } @@ -144,8 +143,7 @@ func (svc *JsonService) export(export *entity.Export) { // error export.Status = constants.TaskStatusError export.EndTs = time.Now() - log.Errorf("export error (id: %s): %v", export.Id, err) - trace.PrintError(err) + svc.Errorf("export error (id: %s): %v", export.Id, err) svc.cache.Set(export.Id, export) return } @@ -158,8 +156,7 @@ func (svc *JsonService) export(export *entity.Export) { // error export.Status = constants.TaskStatusError export.EndTs = time.Now() - log.Errorf("export error (id: %s): %v", export.Id, err) - trace.PrintError(err) + svc.Errorf("export error (id: %s): %v", export.Id, err) svc.cache.Set(export.Id, export) return } @@ -170,8 +167,7 @@ func (svc *JsonService) export(export *entity.Export) { // error export.Status = constants.TaskStatusError export.EndTs = time.Now() - log.Errorf("export error (id: %s): %v", export.Id, err) - trace.PrintError(err) + svc.Errorf("export error (id: %s): %v", export.Id, err) svc.cache.Set(export.Id, export) return } @@ -208,7 +204,8 @@ func NewJsonService() (svc2 interfaces.ExportService) { cache := ttlcache.NewCache() cache.SetTTL(time.Minute * 5) svc := &JsonService{ - cache: cache, + cache: cache, + Logger: utils.NewLogger("JsonService"), } return svc } diff --git a/core/fs/default.go b/core/fs/default.go deleted file mode 100644 index 7041cec5..00000000 --- a/core/fs/default.go +++ /dev/null @@ -1,25 +0,0 @@ -package fs - -import ( - "github.com/apex/log" - "github.com/crawlab-team/crawlab/core/utils" - "github.com/mitchellh/go-homedir" - "github.com/spf13/viper" - "path/filepath" -) - -func init() { - rootDir, err := homedir.Dir() - if err != nil { - log.Warnf("cannot find home directory: %v", err) - return - } - DefaultWorkspacePath = filepath.Join(rootDir, "crawlab_workspace") - - workspacePath := utils.GetWorkspace() - if workspacePath == "" { - viper.Set("workspace", DefaultWorkspacePath) - } -} - -var DefaultWorkspacePath string diff --git a/core/fs/service.go b/core/fs/service.go index 2103fa4f..835a1e00 100644 --- a/core/fs/service.go +++ b/core/fs/service.go @@ -4,7 +4,6 @@ import ( "github.com/crawlab-team/crawlab/core/entity" "github.com/crawlab-team/crawlab/core/interfaces" "github.com/crawlab-team/crawlab/core/utils" - "github.com/crawlab-team/crawlab/trace" "github.com/google/uuid" "io" "os" @@ -15,6 +14,9 @@ type Service struct { // settings rootPath string skipNames []string + + // internals + interfaces.Logger } func (svc *Service) List(path string) (files []interfaces.FsFileInfo, err error) { @@ -31,11 +33,13 @@ func (svc *Service) List(path string) (files []interfaces.FsFileInfo, err error) // Use filepath.Walk to recursively traverse directories err = filepath.Walk(fullPath, func(p string, info os.FileInfo, err error) error { if err != nil { + svc.Errorf("failed to walk path: %v", err) return err } relPath, err := filepath.Rel(svc.rootPath, p) if err != nil { + svc.Errorf("failed to get relative path: %v", err) return err } @@ -85,6 +89,7 @@ func (svc *Service) GetFile(path string) (data []byte, err error) { func (svc *Service) GetFileInfo(path string) (file interfaces.FsFileInfo, err error) { f, err := os.Stat(filepath.Join(svc.rootPath, path)) if err != nil { + svc.Errorf("failed to get file info: %v", err) return nil, err } return &entity.FsFileInfo{ @@ -105,6 +110,7 @@ func (svc *Service) Save(path string, data []byte) (err error) { dir := filepath.Dir(filepath.Join(svc.rootPath, path)) if _, err := os.Stat(dir); os.IsNotExist(err) { if err := os.MkdirAll(dir, 0755); err != nil { + svc.Errorf("failed to create directory: %v", err) return err } } @@ -135,6 +141,7 @@ func (svc *Service) Copy(path, newPath string) (err error) { // Get source info srcInfo, err := os.Stat(srcPath) if err != nil { + svc.Errorf("failed to get source info: %v", err) return err } @@ -142,19 +149,25 @@ func (svc *Service) Copy(path, newPath string) (err error) { if !srcInfo.IsDir() { srcFile, err := os.Open(srcPath) if err != nil { + svc.Errorf("failed to open source file: %v", err) return err } defer srcFile.Close() destFile, err := os.Create(destPath) if err != nil { + svc.Errorf("failed to create destination file: %v", err) return err } defer destFile.Close() _, err = io.Copy(destFile, srcFile) + if err != nil { + svc.Errorf("failed to copy file: %v", err) + return err + } - return err + return nil } else { // If source is directory, copy it recursively return utils.CopyDir(srcPath, destPath) @@ -164,7 +177,8 @@ func (svc *Service) Copy(path, newPath string) (err error) { func (svc *Service) Export() (resultPath string, err error) { zipFilePath := filepath.Join(os.TempDir(), uuid.New().String()+".zip") if err := utils.ZipDirectory(svc.rootPath, zipFilePath); err != nil { - return "", trace.TraceError(err) + svc.Errorf("failed to zip directory: %v", err) + return "", err } return zipFilePath, nil @@ -174,5 +188,6 @@ func NewFsService(path string) (svc interfaces.FsService) { return &Service{ rootPath: path, skipNames: []string{".git"}, + Logger: utils.NewLogger("FsService"), } } diff --git a/core/grpc/client/client.go b/core/grpc/client/client.go index ea774322..8ae6eb1f 100644 --- a/core/grpc/client/client.go +++ b/core/grpc/client/client.go @@ -30,7 +30,7 @@ type GrpcClient struct { once sync.Once stopped bool stop chan struct{} - logger interfaces.Logger + interfaces.Logger // clients NodeClient grpc2.NodeServiceClient @@ -56,7 +56,7 @@ func (c *GrpcClient) Start() { // connect err := c.connect() if err != nil { - c.logger.Fatalf("failed to connect: %v", err) + c.Fatalf("failed to connect: %v", err) return } @@ -69,7 +69,7 @@ func (c *GrpcClient) Stop() (err error) { // set stopped flag c.stopped = true c.stop <- struct{}{} - c.logger.Infof("stopped") + c.Infof("stopped") // skip if connection is nil if c.conn == nil { @@ -78,10 +78,10 @@ func (c *GrpcClient) Stop() (err error) { // close connection if err := c.conn.Close(); err != nil { - c.logger.Errorf("failed to close connection: %v", err) + c.Errorf("failed to close connection: %v", err) return err } - c.logger.Infof("disconnected from %s", c.address) + c.Infof("disconnected from %s", c.address) return nil } @@ -93,11 +93,11 @@ func (c *GrpcClient) WaitForReady() { select { case <-ticker.C: if c.IsReady() { - c.logger.Debugf("client is now ready") + c.Debugf("client is now ready") return } case <-c.stop: - c.logger.Errorf("client has stopped") + c.Errorf("client has stopped") } } } @@ -142,7 +142,7 @@ func (c *GrpcClient) monitorState() { if previous != current { c.setState(current) - c.logger.Infof("state changed from %s to %s", previous, current) + c.Infof("state changed from %s to %s", previous, current) // Trigger reconnect if connection is lost or becomes idle from ready state if current == connectivity.TransientFailure || @@ -150,7 +150,7 @@ func (c *GrpcClient) monitorState() { (previous == connectivity.Ready && current == connectivity.Idle) { select { case c.reconnect <- struct{}{}: - c.logger.Infof("triggering reconnection due to state change to %s", current) + c.Infof("triggering reconnection due to state change to %s", current) default: } } @@ -182,9 +182,9 @@ func (c *GrpcClient) connect() (err error) { return case <-c.reconnect: if !c.stopped { - c.logger.Infof("attempting to reconnect to %s", c.address) + c.Infof("attempting to reconnect to %s", c.address) if err := c.doConnect(); err != nil { - c.logger.Errorf("reconnection failed: %v", err) + c.Errorf("reconnection failed: %v", err) } } } @@ -206,12 +206,12 @@ func (c *GrpcClient) doConnect() (err error) { // create new client connection c.conn, err = grpc.NewClient(c.address, opts...) if err != nil { - c.logger.Errorf("failed to connect to %s: %v", c.address, err) + c.Errorf("failed to connect to %s: %v", c.address, err) return err } // connect - c.logger.Infof("connecting to %s", c.address) + c.Infof("connecting to %s", c.address) c.conn.Connect() // wait for connection to be ready @@ -223,7 +223,7 @@ func (c *GrpcClient) doConnect() (err error) { } // success - c.logger.Infof("connected to %s", c.address) + c.Infof("connected to %s", c.address) return nil } @@ -231,7 +231,7 @@ func (c *GrpcClient) doConnect() (err error) { b.InitialInterval = 5 * time.Second b.MaxElapsedTime = 10 * time.Minute n := func(err error, duration time.Duration) { - c.logger.Errorf("failed to connect to %s: %v, retrying in %s", c.address, err, duration) + c.Errorf("failed to connect to %s: %v, retrying in %s", c.address, err, duration) } return backoff.RetryNotify(op, b, n) } @@ -241,7 +241,7 @@ func newGrpcClient() (c *GrpcClient) { address: utils.GetGrpcAddress(), timeout: 10 * time.Second, stop: make(chan struct{}), - logger: utils.NewServiceLogger("GrpcClient"), + Logger: utils.NewLogger("GrpcClient"), state: connectivity.Idle, } } diff --git a/core/grpc/server/dependency_service_server.go b/core/grpc/server/dependency_service_server.go index 4a0fdec8..c164911b 100644 --- a/core/grpc/server/dependency_service_server.go +++ b/core/grpc/server/dependency_service_server.go @@ -5,11 +5,12 @@ import ( "errors" "fmt" "github.com/cenkalti/backoff/v4" + "github.com/crawlab-team/crawlab/core/interfaces" + "github.com/crawlab-team/crawlab/core/utils" "io" "sync" "time" - "github.com/apex/log" "github.com/crawlab-team/crawlab/core/constants" "github.com/crawlab-team/crawlab/core/models/models" "github.com/crawlab-team/crawlab/core/models/service" @@ -24,17 +25,18 @@ type DependencyServiceServer struct { grpc.UnimplementedDependencyServiceServer mu *sync.Mutex streams map[string]*grpc.DependencyService_ConnectServer + interfaces.Logger } func (svr DependencyServiceServer) Connect(req *grpc.DependencyServiceConnectRequest, stream grpc.DependencyService_ConnectServer) (err error) { svr.mu.Lock() svr.streams[req.NodeKey] = &stream svr.mu.Unlock() - log.Info("[DependencyServiceServer] connected: " + req.NodeKey) + svr.Info("[DependencyServiceServer] connected: " + req.NodeKey) // Keep this scope alive because once this scope exits - the stream is closed <-stream.Context().Done() - log.Info("[DependencyServiceServer] disconnected: " + req.NodeKey) + svr.Info("[DependencyServiceServer] disconnected: " + req.NodeKey) return nil } @@ -54,7 +56,7 @@ func (svr DependencyServiceServer) Sync(_ context.Context, request *grpc.Depende }, nil) if err != nil { if !errors.Is(err, mongo.ErrNoDocuments) { - log.Errorf("[DependencyService] get dependencies from db error: %v", err) + svr.Errorf("[DependencyService] get dependencies from db error: %v", err) return nil, err } } @@ -117,7 +119,7 @@ func (svr DependencyServiceServer) Sync(_ context.Context, request *grpc.Depende "_id": bson.M{"$in": depIdsToDelete}, }) if err != nil { - log.Errorf("[DependencyServiceServer] delete dependencies in db error: %v", err) + svr.Errorf("[DependencyServiceServer] delete dependencies in db error: %v", err) return err } } @@ -126,7 +128,7 @@ func (svr DependencyServiceServer) Sync(_ context.Context, request *grpc.Depende if len(depsToInsert) > 0 { _, err = service.NewModelService[models.Dependency]().InsertMany(depsToInsert) if err != nil { - log.Errorf("[DependencyServiceServer] insert dependencies in db error: %v", err) + svr.Errorf("[DependencyServiceServer] insert dependencies in db error: %v", err) return err } } @@ -135,7 +137,7 @@ func (svr DependencyServiceServer) Sync(_ context.Context, request *grpc.Depende for _, d := range depsToUpdate { err = service.NewModelService[models.Dependency]().ReplaceById(d.Id, d) if err != nil { - log.Errorf("[DependencyServiceServer] update dependency in db error: %v", err) + svr.Errorf("[DependencyServiceServer] update dependency in db error: %v", err) return err } } @@ -161,7 +163,7 @@ func (svr DependencyServiceServer) UpdateLogs(stream grpc.DependencyService_Upda // get id id, err := primitive.ObjectIDFromHex(req.TargetId) if err != nil { - log.Errorf("[DependencyServiceServer] convert dependency id error: %v", err) + svr.Errorf("[DependencyServiceServer] convert dependency id error: %v", err) return err } @@ -176,7 +178,7 @@ func (svr DependencyServiceServer) UpdateLogs(stream grpc.DependencyService_Upda } _, err = service.NewModelService[models.DependencyLog]().InsertMany(depLogs) if err != nil { - log.Errorf("[DependencyServiceServer] insert dependency logs error: %v", err) + svr.Errorf("[DependencyServiceServer] insert dependency logs error: %v", err) return err } } @@ -202,7 +204,7 @@ func (svr DependencyServiceServer) SyncConfigSetup(_ context.Context, request *g }, nil) if err != nil { if !errors.Is(err, mongo.ErrNoDocuments) { - log.Errorf("[DependencyService] get dependency config setup from db error: %v", err) + svr.Errorf("[DependencyService] get dependency config setup from db error: %v", err) return nil, err } } @@ -230,7 +232,7 @@ func (svr DependencyServiceServer) SyncConfigSetup(_ context.Context, request *g } _, err = service.NewModelService[models.DependencyConfigSetup]().InsertOne(*cs) if err != nil { - log.Errorf("[DependencyService] insert dependency config setup error: %v", err) + svr.Errorf("[DependencyService] insert dependency config setup error: %v", err) return nil, err } } else { @@ -243,7 +245,7 @@ func (svr DependencyServiceServer) SyncConfigSetup(_ context.Context, request *g cs.Drivers = drivers err = service.NewModelService[models.DependencyConfigSetup]().ReplaceById(cs.Id, *cs) if err != nil { - log.Errorf("[DependencyService] update dependency config setup error: %v", err) + svr.Errorf("[DependencyService] update dependency config setup error: %v", err) return nil, err } } @@ -257,7 +259,7 @@ func (svr DependencyServiceServer) GetStream(nodeKey string) (stream *grpc.Depen return err }, b) if err != nil { - log.Errorf("get stream error: %v", err) + svr.Errorf("get stream error: %v", err) return nil, err } return stream, nil @@ -277,6 +279,7 @@ func newDependencyServer() *DependencyServiceServer { return &DependencyServiceServer{ mu: new(sync.Mutex), streams: make(map[string]*grpc.DependencyService_ConnectServer), + Logger: utils.NewLogger("DependencyServiceServer"), } } diff --git a/core/grpc/server/metric_service_server.go b/core/grpc/server/metric_service_server.go index cadde45a..2288f614 100644 --- a/core/grpc/server/metric_service_server.go +++ b/core/grpc/server/metric_service_server.go @@ -2,9 +2,10 @@ package server import ( "context" - "github.com/apex/log" + "github.com/crawlab-team/crawlab/core/interfaces" "github.com/crawlab-team/crawlab/core/models/models" "github.com/crawlab-team/crawlab/core/models/service" + "github.com/crawlab-team/crawlab/core/utils" "github.com/crawlab-team/crawlab/grpc" "go.mongodb.org/mongo-driver/bson" "sync" @@ -13,13 +14,14 @@ import ( type MetricServiceServer struct { grpc.UnimplementedMetricServiceServer + interfaces.Logger } func (svr MetricServiceServer) Send(_ context.Context, req *grpc.MetricServiceSendRequest) (res *grpc.Response, err error) { - log.Debugf("[MetricServiceServer] received metric from node: " + req.NodeKey) + svr.Debugf("received metric from node: " + req.NodeKey) n, err := service.NewModelService[models.Node]().GetOne(bson.M{"key": req.NodeKey}, nil) if err != nil { - log.Errorf("[MetricServiceServer] error getting node: %v", err) + svr.Errorf("failed to get node: %v", err) return HandleError(err) } metric := models.Metric{ @@ -42,14 +44,16 @@ func (svr MetricServiceServer) Send(_ context.Context, req *grpc.MetricServiceSe metric.CreatedAt = time.Unix(req.Timestamp, 0) _, err = service.NewModelService[models.Metric]().InsertOne(metric) if err != nil { - log.Errorf("[MetricServiceServer] error inserting metric: %v", err) + svr.Errorf("failed to insert metric: %v", err) return HandleError(err) } return HandleSuccess() } func newMetricsServer() *MetricServiceServer { - return &MetricServiceServer{} + return &MetricServiceServer{ + Logger: utils.NewLogger("MetricServiceServer"), + } } var metricsServer *MetricServiceServer diff --git a/core/grpc/server/node_service_server.go b/core/grpc/server/node_service_server.go index 281893eb..d1a0162b 100644 --- a/core/grpc/server/node_service_server.go +++ b/core/grpc/server/node_service_server.go @@ -2,7 +2,6 @@ package server import ( "context" - "github.com/apex/log" "github.com/crawlab-team/crawlab/core/constants" "github.com/crawlab-team/crawlab/core/errors" "github.com/crawlab-team/crawlab/core/interfaces" @@ -30,6 +29,7 @@ type NodeServiceServer struct { // internals subs map[primitive.ObjectID]grpc.NodeService_SubscribeServer + interfaces.Logger } // Register from handler/worker to master @@ -51,7 +51,7 @@ func (svr NodeServiceServer) Register(_ context.Context, req *grpc.NodeServiceRe if err != nil { return HandleError(err) } - log.Infof("[NodeServiceServer] updated worker[%s] in db. id: %s", req.NodeKey, node.Id.Hex()) + svr.Infof("[NodeServiceServer] updated worker[%s] in db. id: %s", req.NodeKey, node.Id.Hex()) } else if errors2.Is(err, mongo.ErrNoDocuments) { // register new node = &models.Node{ @@ -69,13 +69,13 @@ func (svr NodeServiceServer) Register(_ context.Context, req *grpc.NodeServiceRe if err != nil { return HandleError(err) } - log.Infof("[NodeServiceServer] added worker[%s] in db. id: %s", req.NodeKey, node.Id.Hex()) + svr.Infof("[NodeServiceServer] added worker[%s] in db. id: %s", req.NodeKey, node.Id.Hex()) } else { // error return HandleError(err) } - log.Infof("[NodeServiceServer] master registered worker[%s]", req.NodeKey) + svr.Infof("[NodeServiceServer] master registered worker[%s]", req.NodeKey) return HandleSuccessWithData(node) } @@ -113,12 +113,12 @@ func (svr NodeServiceServer) SendHeartbeat(_ context.Context, req *grpc.NodeServ } func (svr NodeServiceServer) Subscribe(request *grpc.NodeServiceSubscribeRequest, stream grpc.NodeService_SubscribeServer) (err error) { - log.Infof("[NodeServiceServer] master received subscribe request from node[%s]", request.NodeKey) + svr.Infof("[NodeServiceServer] master received subscribe request from node[%s]", request.NodeKey) // find in db node, err := service.NewModelService[models.Node]().GetOne(bson.M{"key": request.NodeKey}, nil) if err != nil { - log.Errorf("[NodeServiceServer] error getting node: %v", err) + svr.Errorf("[NodeServiceServer] error getting node: %v", err) return err } @@ -136,7 +136,7 @@ func (svr NodeServiceServer) Subscribe(request *grpc.NodeServiceSubscribeRequest nodeServiceMutex.Lock() delete(svr.subs, node.Id) nodeServiceMutex.Unlock() - log.Infof("[NodeServiceServer] master unsubscribed from node[%s]", request.NodeKey) + svr.Infof("[NodeServiceServer] master unsubscribed from node[%s]", request.NodeKey) return nil } @@ -152,6 +152,7 @@ func newNodeServiceServer() *NodeServiceServer { return &NodeServiceServer{ cfgSvc: nodeconfig.GetNodeConfigService(), subs: make(map[primitive.ObjectID]grpc.NodeService_SubscribeServer), + Logger: utils.NewLogger("GrpcNodeServiceServer"), } } diff --git a/core/grpc/server/server.go b/core/grpc/server/server.go index 06bb2e39..dbde2885 100644 --- a/core/grpc/server/server.go +++ b/core/grpc/server/server.go @@ -2,8 +2,8 @@ package server import ( "fmt" - "github.com/apex/log" "github.com/crawlab-team/crawlab/core/grpc/middlewares" + "github.com/crawlab-team/crawlab/core/interfaces" "github.com/crawlab-team/crawlab/core/utils" grpc2 "github.com/crawlab-team/crawlab/grpc" grpcmiddleware "github.com/grpc-ecosystem/go-grpc-middleware" @@ -23,6 +23,7 @@ type GrpcServer struct { svr *grpc.Server l net.Listener stopped bool + interfaces.Logger // servers NodeSvr *NodeServiceServer @@ -40,10 +41,10 @@ func (svr *GrpcServer) Start() (err error) { // listener svr.l, err = net.Listen("tcp", svr.address) if err != nil { - log.Errorf("[GrpcServer] failed to listen: %v", err) + svr.Errorf("failed to listen: %v", err) return err } - log.Infof("[GrpcServer] grpc server listens to %s", svr.address) + svr.Infof("server listens to %s", svr.address) // start grpc server go func() { @@ -51,7 +52,7 @@ func (svr *GrpcServer) Start() (err error) { if errors2.Is(err, grpc.ErrServerStopped) { return } - log.Errorf("[GrpcServer] failed to serve: %v", err) + svr.Errorf("failed to serve: %v", err) } }() @@ -65,18 +66,18 @@ func (svr *GrpcServer) Stop() (err error) { } // graceful stop - log.Infof("[GrpcServer] grpc server stopping...") + svr.Infof("server stopping...") svr.svr.Stop() // close listener - log.Infof("[GrpcServer] grpc server closing listener...") + svr.Infof("server closing listener...") _ = svr.l.Close() // mark as stopped svr.stopped = true // log - log.Infof("[GrpcServer] grpc server stopped") + svr.Infof("server stopped") return nil } @@ -90,7 +91,7 @@ func (svr *GrpcServer) register() { } func (svr *GrpcServer) recoveryHandlerFunc(p interface{}) (err error) { - log.Errorf("[GrpcServer] recovered from panic: %v", p) + svr.Errorf("recovered from panic: %v", p) return fmt.Errorf("recovered from panic: %v", p) } @@ -98,6 +99,7 @@ func newGrpcServer() *GrpcServer { // server svr := &GrpcServer{ address: utils.GetGrpcServerAddress(), + Logger: utils.NewLogger("GrpcServer"), } // services servers diff --git a/core/grpc/server/task_service_server.go b/core/grpc/server/task_service_server.go index 2dec2bf5..29ffb43f 100644 --- a/core/grpc/server/task_service_server.go +++ b/core/grpc/server/task_service_server.go @@ -4,11 +4,11 @@ import ( "context" "encoding/json" "errors" + "fmt" "io" "strings" "sync" - "github.com/apex/log" "github.com/crawlab-team/crawlab/core/constants" "github.com/crawlab-team/crawlab/core/interfaces" "github.com/crawlab-team/crawlab/core/models/models" @@ -19,7 +19,6 @@ import ( "github.com/crawlab-team/crawlab/core/utils" "github.com/crawlab-team/crawlab/db/mongo" "github.com/crawlab-team/crawlab/grpc" - "github.com/crawlab-team/crawlab/trace" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" mongo2 "go.mongodb.org/mongo-driver/mongo" @@ -36,6 +35,7 @@ type TaskServiceServer struct { // internals subs map[primitive.ObjectID]grpc.TaskService_SubscribeServer + interfaces.Logger } func (svr TaskServiceServer) Subscribe(req *grpc.TaskServiceSubscribeRequest, stream grpc.TaskService_SubscribeServer) (err error) { @@ -62,7 +62,7 @@ func (svr TaskServiceServer) Subscribe(req *grpc.TaskServiceSubscribeRequest, st taskServiceMutex.Lock() delete(svr.subs, taskId) taskServiceMutex.Unlock() - log.Infof("[TaskServiceServer] task stream closed: %s", taskId.Hex()) + svr.Infof("[TaskServiceServer] task stream closed: %s", taskId.Hex()) return nil } @@ -89,7 +89,7 @@ func (svr TaskServiceServer) Connect(stream grpc.TaskService_ConnectServer) (err return nil } // log other stream receive errors and continue - log.Errorf("error receiving stream message: %v", err) + svr.Errorf("error receiving stream message: %v", err) continue } @@ -97,7 +97,7 @@ func (svr TaskServiceServer) Connect(stream grpc.TaskService_ConnectServer) (err if taskId.IsZero() { taskId, err = primitive.ObjectIDFromHex(msg.TaskId) if err != nil { - log.Errorf("invalid task id: %s", msg.TaskId) + svr.Errorf("invalid task id: %s", msg.TaskId) continue } } @@ -107,7 +107,7 @@ func (svr TaskServiceServer) Connect(stream grpc.TaskService_ConnectServer) (err if spiderId.IsZero() { t, err := service.NewModelService[models.Task]().GetById(taskId) if err != nil { - log.Errorf("error getting spider[%s]: %v", taskId.Hex(), err) + svr.Errorf("error getting spider[%s]: %v", taskId.Hex(), err) continue } spiderId = t.SpiderId @@ -123,12 +123,12 @@ func (svr TaskServiceServer) Connect(stream grpc.TaskService_ConnectServer) (err err = svr.handleInsertLogs(taskId, msg) default: // invalid message code received - log.Errorf("invalid stream message code: %d", msg.Code) + svr.Errorf("invalid stream message code: %d", msg.Code) continue } if err != nil { // log any errors from handlers - log.Errorf("grpc error[%d]: %v", msg.Code, err) + svr.Errorf("grpc error[%d]: %v", msg.Code, err) } } } @@ -137,11 +137,14 @@ func (svr TaskServiceServer) Connect(stream grpc.TaskService_ConnectServer) (err func (svr TaskServiceServer) FetchTask(ctx context.Context, request *grpc.TaskServiceFetchTaskRequest) (response *grpc.TaskServiceFetchTaskResponse, err error) { nodeKey := request.GetNodeKey() if nodeKey == "" { - return nil, errors.New("invalid node key") + err = fmt.Errorf("invalid node key") + svr.Errorf("error fetching task: %v", err) + return nil, err } n, err := service.NewModelService[models.Node]().GetOne(bson.M{"key": nodeKey}, nil) if err != nil { - return nil, trace.TraceError(err) + svr.Errorf("error getting node[%s]: %v", nodeKey, err) + return nil, err } var tid primitive.ObjectID opts := &mongo.FindOptions{ @@ -162,7 +165,7 @@ func (svr TaskServiceServer) FetchTask(ctx context.Context, request *grpc.TaskSe t.Status = constants.TaskStatusAssigned return svr.saveTask(t) } else if !errors.Is(err, mongo2.ErrNoDocuments) { - log.Errorf("error fetching task for node[%s]: %v", nodeKey, err) + svr.Errorf("error fetching task for node[%s]: %v", nodeKey, err) return err } @@ -177,7 +180,7 @@ func (svr TaskServiceServer) FetchTask(ctx context.Context, request *grpc.TaskSe t.Status = constants.TaskStatusAssigned return svr.saveTask(t) } else if !errors.Is(err, mongo2.ErrNoDocuments) { - log.Errorf("error fetching task for any node: %v", err) + svr.Errorf("error fetching task for any node: %v", err) return err } @@ -198,7 +201,7 @@ func (svr TaskServiceServer) SendNotification(_ context.Context, request *grpc.T // task id taskId, err := primitive.ObjectIDFromHex(request.TaskId) if err != nil { - log.Errorf("invalid task id: %s", request.TaskId) + svr.Errorf("invalid task id: %s", request.TaskId) return nil, err } @@ -208,7 +211,7 @@ func (svr TaskServiceServer) SendNotification(_ context.Context, request *grpc.T // task task, err := service.NewModelService[models.Task]().GetById(taskId) if err != nil { - log.Errorf("error getting task[%s]: %v", request.TaskId, err) + svr.Errorf("error getting task[%s]: %v", request.TaskId, err) return nil, err } args = append(args, task) @@ -216,7 +219,7 @@ func (svr TaskServiceServer) SendNotification(_ context.Context, request *grpc.T // task stat taskStat, err := service.NewModelService[models.TaskStat]().GetById(task.Id) if err != nil { - log.Errorf("error getting task stat for task[%s]: %v", request.TaskId, err) + svr.Errorf("error getting task stat for task[%s]: %v", request.TaskId, err) return nil, err } args = append(args, taskStat) @@ -224,7 +227,7 @@ func (svr TaskServiceServer) SendNotification(_ context.Context, request *grpc.T // spider spider, err := service.NewModelService[models.Spider]().GetById(task.SpiderId) if err != nil { - log.Errorf("error getting spider[%s]: %v", task.SpiderId.Hex(), err) + svr.Errorf("error getting spider[%s]: %v", task.SpiderId.Hex(), err) return nil, err } args = append(args, spider) @@ -232,7 +235,7 @@ func (svr TaskServiceServer) SendNotification(_ context.Context, request *grpc.T // node node, err := service.NewModelService[models.Node]().GetById(task.NodeId) if err != nil { - log.Errorf("error getting node[%s]: %v", task.NodeId.Hex(), err) + svr.Errorf("error getting node[%s]: %v", task.NodeId.Hex(), err) return nil, err } args = append(args, node) @@ -242,7 +245,7 @@ func (svr TaskServiceServer) SendNotification(_ context.Context, request *grpc.T if !task.ScheduleId.IsZero() { schedule, err = service.NewModelService[models.Schedule]().GetById(task.ScheduleId) if err != nil { - log.Errorf("error getting schedule[%s]: %v", task.ScheduleId.Hex(), err) + svr.Errorf("error getting schedule[%s]: %v", task.ScheduleId.Hex(), err) return nil, err } args = append(args, schedule) @@ -256,7 +259,7 @@ func (svr TaskServiceServer) SendNotification(_ context.Context, request *grpc.T }, }, nil) if err != nil { - log.Errorf("error getting notification settings: %v", err) + svr.Errorf("error getting notification settings: %v", err) return nil, err } @@ -303,7 +306,7 @@ func (svr TaskServiceServer) handleInsertData(taskId, spiderId primitive.ObjectI var records []map[string]interface{} err = json.Unmarshal(msg.Data, &records) if err != nil { - log.Errorf("error unmarshalling data: %v", err) + svr.Errorf("error unmarshalling data: %v", err) return err } for i := range records { @@ -317,7 +320,7 @@ func (svr TaskServiceServer) handleInsertLogs(taskId primitive.ObjectID, msg *gr var logs []string err = json.Unmarshal(msg.Data, &logs) if err != nil { - log.Errorf("error unmarshalling logs: %v", err) + svr.Errorf("error unmarshalling logs: %v", err) return err } return svr.statsSvc.InsertLogs(taskId, logs...) @@ -333,6 +336,7 @@ func newTaskServiceServer() *TaskServiceServer { cfgSvc: nodeconfig.GetNodeConfigService(), subs: make(map[primitive.ObjectID]grpc.TaskService_SubscribeServer), statsSvc: stats.GetTaskStatsService(), + Logger: utils.NewLogger("GrpcTaskServiceServer"), } } diff --git a/core/models/common/index_utils.go b/core/models/common/index_utils.go index ff0fd061..f1a96e65 100644 --- a/core/models/common/index_utils.go +++ b/core/models/common/index_utils.go @@ -3,12 +3,14 @@ package common import ( "encoding/json" "fmt" - "github.com/apex/log" + "github.com/crawlab-team/crawlab/core/utils" "github.com/crawlab-team/crawlab/db/mongo" "go.mongodb.org/mongo-driver/bson" mongo2 "go.mongodb.org/mongo-driver/mongo" ) +var indexUtilsLogger = utils.NewLogger("IndexUtils") + // getIndexKeyString converts index keys to a consistent string representation func getIndexKeyString(key interface{}) string { switch v := key.(type) { @@ -64,10 +66,10 @@ func normalizeIndexKey(key interface{}) string { return pairsStr } -func RecreateIndexes(col *mongo.Col, desiredIndexes []mongo2.IndexModel) { +func CreateIndexes(col *mongo.Col, desiredIndexes []mongo2.IndexModel) { existingIndexes, err := col.ListIndexes() if err != nil { - log.Errorf("error listing indexes: %v", err) + indexUtilsLogger.Errorf("error listing indexes: %v", err) return } @@ -99,15 +101,15 @@ func RecreateIndexes(col *mongo.Col, desiredIndexes []mongo2.IndexModel) { // Drop all existing indexes (except _id) err := col.DeleteAllIndexes() if err != nil { - log.Errorf("error dropping indexes: %v", err) + indexUtilsLogger.Errorf("error dropping indexes: %v", err) } // Create new indexes err = col.CreateIndexes(desiredIndexes) if err != nil { - log.Errorf("error creating indexes: %v", err) + indexUtilsLogger.Errorf("error creating indexes: %v", err) return } - log.Infof("recreated indexes for collection: %s", col.GetCollection().Name()) + indexUtilsLogger.Infof("created indexes for collection: %s", col.GetCollection().Name()) } } diff --git a/core/models/common/index_utils_test.go b/core/models/common/index_utils_test.go index 99680a68..db0f9109 100644 --- a/core/models/common/index_utils_test.go +++ b/core/models/common/index_utils_test.go @@ -104,8 +104,8 @@ func TestRecreateIndexes(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - // Execute RecreateIndexes - RecreateIndexes(testCol, tt.desiredIndexes) + // Execute CreateIndexes + CreateIndexes(testCol, tt.desiredIndexes) // Verify indexes indexes, err := testCol.ListIndexes() diff --git a/core/models/common/init_index.go b/core/models/common/init_index.go index b2c60e87..4eee5e41 100644 --- a/core/models/common/init_index.go +++ b/core/models/common/init_index.go @@ -11,7 +11,7 @@ import ( func InitIndexes() { // nodes - RecreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models.Node{})), []mongo2.IndexModel{ + CreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models.Node{})), []mongo2.IndexModel{ {Keys: bson.D{{Key: "key", Value: 1}}}, {Keys: bson.D{{Key: "name", Value: 1}}}, {Keys: bson.D{{Key: "is_master", Value: 1}}}, @@ -21,12 +21,12 @@ func InitIndexes() { }) // projects - RecreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models.Project{})), []mongo2.IndexModel{ + CreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models.Project{})), []mongo2.IndexModel{ {Keys: bson.D{{Key: "name", Value: 1}}}, }) // spiders - RecreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models.Spider{})), []mongo2.IndexModel{ + CreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models.Spider{})), []mongo2.IndexModel{ {Keys: bson.D{{Key: "name", Value: 1}}}, {Keys: bson.D{{Key: "type", Value: 1}}}, {Keys: bson.D{{Key: "col_id", Value: 1}}}, @@ -34,7 +34,7 @@ func InitIndexes() { }) // tasks - RecreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models.Task{})), []mongo2.IndexModel{ + CreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models.Task{})), []mongo2.IndexModel{ {Keys: bson.D{{Key: "spider_id", Value: 1}}}, {Keys: bson.D{{Key: "status", Value: 1}}}, {Keys: bson.D{{Key: "node_id", Value: 1}}}, @@ -49,19 +49,19 @@ func InitIndexes() { }) // task stats - RecreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models.TaskStat{})), []mongo2.IndexModel{ + CreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models.TaskStat{})), []mongo2.IndexModel{ {Keys: bson.D{{Key: "created_ts", Value: -1}}, Options: (&options.IndexOptions{}).SetExpireAfterSeconds(60 * 60 * 24 * 30)}, }) // schedules - RecreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models.Schedule{})), []mongo2.IndexModel{ + CreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models.Schedule{})), []mongo2.IndexModel{ {Keys: bson.D{{Key: "name", Value: 1}}}, {Keys: bson.D{{Key: "spider_id", Value: 1}}}, {Keys: bson.D{{Key: "enabled", Value: 1}}}, }) // users - RecreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models.User{})), []mongo2.IndexModel{ + CreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models.User{})), []mongo2.IndexModel{ {Keys: bson.D{{Key: "username", Value: 1}}, Options: (&options.IndexOptions{}).SetUnique(true)}, {Keys: bson.D{{Key: "role", Value: 1}}}, {Keys: bson.D{{Key: "role_id", Value: 1}}}, @@ -69,12 +69,12 @@ func InitIndexes() { }) // settings - RecreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models.Setting{})), []mongo2.IndexModel{ + CreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models.Setting{})), []mongo2.IndexModel{ {Keys: bson.D{{Key: "key", Value: 1}}, Options: options.Index().SetUnique(true)}, }) // tokens - RecreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models.Token{})), []mongo2.IndexModel{ + CreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models.Token{})), []mongo2.IndexModel{ {Keys: bson.D{{Key: "name", Value: 1}}}, }) } diff --git a/core/node/config/config_service.go b/core/node/config/config_service.go index b7503f0c..93e03724 100644 --- a/core/node/config/config_service.go +++ b/core/node/config/config_service.go @@ -2,11 +2,9 @@ package config import ( "encoding/json" - "github.com/apex/log" "github.com/crawlab-team/crawlab/core/entity" "github.com/crawlab-team/crawlab/core/interfaces" "github.com/crawlab-team/crawlab/core/utils" - "github.com/crawlab-team/crawlab/trace" "os" "path/filepath" "sync" @@ -14,6 +12,7 @@ import ( type Service struct { cfg *entity.NodeInfo + interfaces.Logger } func (svc *Service) Init() (err error) { @@ -23,7 +22,8 @@ func (svc *Service) Init() (err error) { configDirPath := filepath.Dir(metadataConfigPath) if !utils.Exists(configDirPath) { if err := os.MkdirAll(configDirPath, os.FileMode(0766)); err != nil { - return trace.TraceError(err) + svc.Errorf("create config directory error: %v", err) + return err } } @@ -32,22 +32,22 @@ func (svc *Service) Init() (err error) { svc.cfg = newConfig() data, err := json.Marshal(svc.cfg) if err != nil { - log.Errorf("marshal config error: %v", err) + svc.Errorf("marshal config error: %v", err) return err } if err := os.WriteFile(metadataConfigPath, data, os.FileMode(0766)); err != nil { - log.Errorf("write config file error: %v", err) + svc.Errorf("write config file error: %v", err) return err } } else { // exists, read and set to config data, err := os.ReadFile(metadataConfigPath) if err != nil { - log.Errorf("read config file error: %v", err) + svc.Errorf("read config file error: %v", err) return err } if err := json.Unmarshal(data, svc.cfg); err != nil { - log.Errorf("unmarshal config error: %v", err) + svc.Errorf("unmarshal config error: %v", err) return err } } @@ -93,7 +93,8 @@ func (svc *Service) GetMaxRunners() (res int) { func newNodeConfigService() (svc2 interfaces.NodeConfigService, err error) { // config service svc := &Service{ - cfg: newConfig(), + cfg: newConfig(), + Logger: utils.NewLogger("NodeConfigService"), } // init diff --git a/core/node/service/master_service.go b/core/node/service/master_service.go index 45449002..6844d934 100644 --- a/core/node/service/master_service.go +++ b/core/node/service/master_service.go @@ -38,7 +38,7 @@ type MasterService struct { monitorInterval time.Duration // internals - logger interfaces.Logger + interfaces.Logger } func (svc *MasterService) Start() { @@ -81,11 +81,11 @@ func (svc *MasterService) Wait() { func (svc *MasterService) Stop() { _ = svc.server.Stop() svc.taskHandlerSvc.Stop() - svc.logger.Infof("master[%s] service has stopped", svc.cfgSvc.GetNodeKey()) + svc.Infof("master[%s] service has stopped", svc.cfgSvc.GetNodeKey()) } func (svc *MasterService) startMonitoring() { - svc.logger.Infof("master[%s] monitoring started", svc.cfgSvc.GetNodeKey()) + svc.Infof("master[%s] monitoring started", svc.cfgSvc.GetNodeKey()) // ticker ticker := time.NewTicker(svc.monitorInterval) @@ -94,7 +94,7 @@ func (svc *MasterService) startMonitoring() { // monitor err := svc.monitor() if err != nil { - svc.logger.Errorf("master[%s] monitor error: %v", svc.cfgSvc.GetNodeKey(), err) + svc.Errorf("master[%s] monitor error: %v", svc.cfgSvc.GetNodeKey(), err) } // wait @@ -109,7 +109,7 @@ func (svc *MasterService) Register() (err error) { node, err := service.NewModelService[models.Node]().GetOne(bson.M{"key": nodeKey}, nil) if err != nil && err.Error() == mongo2.ErrNoDocuments.Error() { // not exists - svc.logger.Infof("master[%s] does not exist in db", nodeKey) + svc.Infof("master[%s] does not exist in db", nodeKey) node := models.Node{ Key: nodeKey, Name: nodeName, @@ -124,23 +124,23 @@ func (svc *MasterService) Register() (err error) { node.SetUpdated(primitive.NilObjectID) _, err := service.NewModelService[models.Node]().InsertOne(node) if err != nil { - svc.logger.Errorf("save master[%s] to db error: %v", nodeKey, err) + svc.Errorf("save master[%s] to db error: %v", nodeKey, err) return err } - svc.logger.Infof("added master[%s] to db", nodeKey) + svc.Infof("added master[%s] to db", nodeKey) return nil } else if err == nil { // exists - svc.logger.Infof("master[%s] exists in db", nodeKey) + svc.Infof("master[%s] exists in db", nodeKey) node.Status = constants.NodeStatusOnline node.Active = true node.ActiveAt = time.Now() err = service.NewModelService[models.Node]().ReplaceById(node.Id, *node) if err != nil { - svc.logger.Errorf("update master[%s] in db error: %v", nodeKey, err) + svc.Errorf("update master[%s] in db error: %v", nodeKey, err) return err } - svc.logger.Infof("updated master[%s] in db", nodeKey) + svc.Infof("updated master[%s] in db", nodeKey) return nil } else { // error @@ -204,7 +204,7 @@ func (svc *MasterService) getAllWorkerNodes() (nodes []models.Node, err error) { if errors.Is(err, mongo2.ErrNoDocuments) { return nil, nil } - svc.logger.Errorf("get all worker nodes error: %v", err) + svc.Errorf("get all worker nodes error: %v", err) return nil, err } return nodes, nil @@ -257,14 +257,14 @@ func (svc *MasterService) subscribeNode(n *models.Node) (ok bool) { func (svc *MasterService) pingNodeClient(n *models.Node) (ok bool) { stream, ok := svc.server.NodeSvr.GetSubscribeStream(n.Id) if !ok { - svc.logger.Errorf("cannot get worker node client[%s]", n.Key) + svc.Errorf("cannot get worker node client[%s]", n.Key) return false } err := stream.Send(&grpc.NodeServiceSubscribeResponse{ Code: grpc.NodeServiceSubscribeCode_PING, }) if err != nil { - svc.logger.Errorf("failed to ping worker node client[%s]: %v", n.Key, err) + svc.Errorf("failed to ping worker node client[%s]: %v", n.Key, err) return false } return true @@ -277,13 +277,13 @@ func (svc *MasterService) updateNodeRunners(node *models.Node) (err error) { } runningTasksCount, err := service.NewModelService[models.Task]().Count(query) if err != nil { - svc.logger.Errorf("failed to count running tasks for node[%s]: %v", node.Key, err) + svc.Errorf("failed to count running tasks for node[%s]: %v", node.Key, err) return err } node.CurrentRunners = runningTasksCount err = service.NewModelService[models.Node]().ReplaceById(node.Id, *node) if err != nil { - svc.logger.Errorf("failed to update node runners for node[%s]: %v", node.Key, err) + svc.Errorf("failed to update node runners for node[%s]: %v", node.Key, err) return err } return nil @@ -305,7 +305,7 @@ func newMasterService() *MasterService { taskHandlerSvc: handler.GetTaskHandlerService(), scheduleSvc: schedule.GetScheduleService(), systemSvc: system.GetSystemService(), - logger: utils.NewServiceLogger("MasterService"), + Logger: utils.NewLogger("MasterService"), } } diff --git a/core/node/service/worker_service.go b/core/node/service/worker_service.go index 31332226..ff174204 100644 --- a/core/node/service/worker_service.go +++ b/core/node/service/worker_service.go @@ -38,7 +38,7 @@ type WorkerService struct { n *models.Node s grpc.NodeService_SubscribeClient isReady bool - logger interfaces.Logger + interfaces.Logger } func (svc *WorkerService) Start() { @@ -78,7 +78,7 @@ func (svc *WorkerService) Stop() { svc.stopped = true _ = client.GetGrpcClient().Stop() svc.handlerSvc.Stop() - svc.logger.Infof("worker[%s] service has stopped", svc.cfgSvc.GetNodeKey()) + svc.Infof("worker[%s] service has stopped", svc.cfgSvc.GetNodeKey()) } func (svc *WorkerService) register() { @@ -99,17 +99,17 @@ func (svc *WorkerService) register() { err = fmt.Errorf("failed to get node: %v", err) return err } - svc.logger.Infof("worker[%s] registered to master. id: %s", svc.GetConfigService().GetNodeKey(), svc.n.Id.Hex()) + svc.Infof("worker[%s] registered to master. id: %s", svc.GetConfigService().GetNodeKey(), svc.n.Id.Hex()) return nil } b := backoff.NewExponentialBackOff() n := func(err error, duration time.Duration) { - svc.logger.Errorf("register worker[%s] error: %v", svc.cfgSvc.GetNodeKey(), err) - svc.logger.Infof("retry in %.1f seconds", duration.Seconds()) + svc.Errorf("register worker[%s] error: %v", svc.cfgSvc.GetNodeKey(), err) + svc.Infof("retry in %.1f seconds", duration.Seconds()) } err := backoff.RetryNotify(op, b, n) if err != nil { - svc.logger.Fatalf("failed to register worker[%s]: %v", svc.cfgSvc.GetNodeKey(), err) + svc.Fatalf("failed to register worker[%s]: %v", svc.cfgSvc.GetNodeKey(), err) panic(err) } } @@ -154,7 +154,7 @@ func (svc *WorkerService) subscribe() { NodeKey: svc.cfgSvc.GetNodeKey(), }) if err != nil { - svc.logger.Errorf("failed to subscribe to master: %v", err) + svc.Errorf("failed to subscribe to master: %v", err) return err } @@ -167,10 +167,10 @@ func (svc *WorkerService) subscribe() { msg, err := stream.Recv() if err != nil { if client.GetGrpcClient().IsClosed() { - svc.logger.Errorf("connection to master is closed: %v", err) + svc.Errorf("connection to master is closed: %v", err) return err } - svc.logger.Errorf("failed to receive message from master: %v", err) + svc.Errorf("failed to receive message from master: %v", err) return err } @@ -184,7 +184,7 @@ func (svc *WorkerService) subscribe() { // Execute with backoff err := backoff.Retry(operation, b) if err != nil { - svc.logger.Errorf("subscription failed after max retries: %v", err) + svc.Errorf("subscription failed after max retries: %v", err) return } @@ -200,7 +200,7 @@ func (svc *WorkerService) sendHeartbeat() { NodeKey: svc.cfgSvc.GetNodeKey(), }) if err != nil { - svc.logger.Errorf("failed to send heartbeat to master: %v", err) + svc.Errorf("failed to send heartbeat to master: %v", err) } } @@ -220,9 +220,9 @@ func (svc *WorkerService) startHealthServer() { // serve if err := http.Serve(ln, app); err != nil { if !errors.Is(err, http.ErrServerClosed) { - svc.logger.Errorf("run server error: %v", err) + svc.Errorf("run server error: %v", err) } else { - svc.logger.Info("server graceful down") + svc.Info("server graceful down") } } } @@ -233,7 +233,7 @@ func newWorkerService() *WorkerService { cfgSvc: nodeconfig.GetNodeConfigService(), handlerSvc: handler.GetTaskHandlerService(), isReady: false, - logger: utils.NewServiceLogger("WorkerService"), + Logger: utils.NewLogger("WorkerService"), } } diff --git a/core/notification/im.go b/core/notification/im.go index e481e509..48020a9a 100644 --- a/core/notification/im.go +++ b/core/notification/im.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/apex/log" "github.com/crawlab-team/crawlab/core/entity" "github.com/crawlab-team/crawlab/core/utils" "io" @@ -13,9 +14,7 @@ import ( "strings" "time" - "github.com/apex/log" "github.com/crawlab-team/crawlab/core/models/models" - "github.com/crawlab-team/crawlab/trace" ) type ResBody struct { @@ -102,7 +101,8 @@ func SendIMNotification(ch *models.NotificationChannel, title, content string) e // perform request resp, body, err := performRequest("POST", ch.WebhookUrl, data) if err != nil { - return trace.TraceError(err) + log.Errorf("IM request error: %v", err) + return err } if resp.StatusCode != http.StatusOK { @@ -112,11 +112,13 @@ func SendIMNotification(ch *models.NotificationChannel, title, content string) e // parse response var resBody ResBody if err := json.Unmarshal(body, &resBody); err != nil { - return trace.TraceError(err) + log.Errorf("Parsing IM response error: %v", err) + return err } // validate response code if resBody.ErrCode != 0 { + log.Errorf("IM response error: %s", resBody.ErrMsg) return errors.New(resBody.ErrMsg) } @@ -126,12 +128,12 @@ func SendIMNotification(ch *models.NotificationChannel, title, content string) e func performIMRequest(webhookUrl string, data RequestParam) ([]byte, error) { resp, body, err := performRequest("POST", webhookUrl, data) if err != nil { - log.Errorf("IM request error: %v", err) + logger.Errorf("IM request error: %v", err) return nil, err } if resp.StatusCode >= 400 { - log.Errorf("IM response status code: %d", resp.StatusCode) + logger.Errorf("IM response status code: %d", resp.StatusCode) return nil, fmt.Errorf("IM error response %d: %s", resp.StatusCode, string(body)) } @@ -146,8 +148,8 @@ func performIMRequestWithJson[T any](webhookUrl string, data RequestParam) (resB // parse response if err := json.Unmarshal(body, &resBody); err != nil { - log.Warnf("Parsing IM response error: %v", err) - log.Infof("IM response: %s", string(body)) + logger.Warnf("Parsing IM response error: %v", err) + logger.Infof("IM response: %s", string(body)) return resBody, nil } diff --git a/core/notification/logger.go b/core/notification/logger.go new file mode 100644 index 00000000..476da1b5 --- /dev/null +++ b/core/notification/logger.go @@ -0,0 +1,5 @@ +package notification + +import "github.com/crawlab-team/crawlab/core/utils" + +var logger = utils.NewLogger("NotificationUtils") diff --git a/core/notification/mail.go b/core/notification/mail.go index 533482de..38b73f97 100644 --- a/core/notification/mail.go +++ b/core/notification/mail.go @@ -3,9 +3,7 @@ package notification import ( "errors" "github.com/PuerkitoBio/goquery" - "github.com/apex/log" "github.com/crawlab-team/crawlab/core/models/models" - "github.com/crawlab-team/crawlab/trace" "gopkg.in/gomail.v2" "net/mail" "regexp" @@ -69,8 +67,7 @@ func isHtml(content string) bool { func convertHtmlToText(content string) string { doc, err := goquery.NewDocumentFromReader(strings.NewReader(content)) if err != nil { - log.Errorf("failed to convert html to text: %v", err) - trace.PrintError(err) + logger.Errorf("failed to convert html to text: %v", err) return "" } return doc.Text() diff --git a/core/notification/mail_gmail.go b/core/notification/mail_gmail.go index 1d952032..8592744e 100644 --- a/core/notification/mail_gmail.go +++ b/core/notification/mail_gmail.go @@ -3,7 +3,6 @@ package notification import ( "context" "encoding/base64" - "github.com/apex/log" "github.com/crawlab-team/crawlab/core/models/models" "github.com/crawlab-team/crawlab/trace" "golang.org/x/oauth2/google" @@ -18,7 +17,7 @@ func sendMailGmail(ch *models.NotificationChannel, smtpConfig smtpAuthentication // 使用服务账户 JSON 密钥文件创建 JWT 配置 config, err := google.JWTConfigFromJSON(b, gmail.GmailSendScope) if err != nil { - log.Errorf("Unable to parse service account key file to config: %v", err) + logger.Errorf("Unable to parse service account key file to config: %v", err) return trace.TraceError(err) } @@ -29,8 +28,8 @@ func sendMailGmail(ch *models.NotificationChannel, smtpConfig smtpAuthentication client := config.Client(context.Background()) srv, err := gmail.New(client) if err != nil { - log.Errorf("Unable to create Gmail client: %v", err) - return trace.TraceError(err) + logger.Errorf("Unable to create Gmail client: %v", err) + return err } // 创建 MIME 邮件 @@ -41,8 +40,8 @@ func sendMailGmail(ch *models.NotificationChannel, smtpConfig smtpAuthentication var buf strings.Builder if _, err := m.WriteTo(&buf); err != nil { - log.Errorf("Unable to write message: %v", err) - return trace.TraceError(err) + logger.Errorf("Unable to write message: %v", err) + return err } // 将邮件内容进行 base64 编码 @@ -53,8 +52,8 @@ func sendMailGmail(ch *models.NotificationChannel, smtpConfig smtpAuthentication // 发送邮件 _, err = srv.Users.Messages.Send("me", gmsg).Do() if err != nil { - log.Errorf("Unable to send email: %v", err) - return trace.TraceError(err) + logger.Errorf("Unable to send email: %v", err) + return err } return nil diff --git a/core/notification/oauth2_gmail.go b/core/notification/oauth2_gmail.go index b6c1bbed..a8bf0781 100644 --- a/core/notification/oauth2_gmail.go +++ b/core/notification/oauth2_gmail.go @@ -2,7 +2,6 @@ package notification import ( "context" - "github.com/apex/log" "golang.org/x/oauth2" "golang.org/x/oauth2/google" "net/smtp" @@ -20,14 +19,14 @@ func getGmailOAuth2Token(oauth2Json string) (token *oauth2.Token, err error) { // 使用服务账户 JSON 密钥文件创建 JWT 配置 config, err := google.JWTConfigFromJSON(b, "https://mail.google.com/") if err != nil { - log.Errorf("Unable to parse service account key file to config: %v", err) + logger.Errorf("Unable to parse service account key file to config: %v", err) return nil, err } // 使用服务账户的电子邮件和访问令牌 token, err = config.TokenSource(ctx).Token() if err != nil { - log.Errorf("Unable to generate token: %v", err) + logger.Errorf("Unable to generate token: %v", err) return nil, err } return token, nil diff --git a/core/notification/service.go b/core/notification/service.go index 4723effb..31eb7c0f 100644 --- a/core/notification/service.go +++ b/core/notification/service.go @@ -2,23 +2,24 @@ package notification import ( "fmt" + "github.com/crawlab-team/crawlab/core/interfaces" + "github.com/crawlab-team/crawlab/core/utils" "regexp" "strings" "sync" "time" - "github.com/apex/log" "github.com/crawlab-team/crawlab/core/constants" "github.com/crawlab-team/crawlab/core/entity" "github.com/crawlab-team/crawlab/core/models/models" "github.com/crawlab-team/crawlab/core/models/service" - "github.com/crawlab-team/crawlab/trace" "github.com/gomarkdown/markdown" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" ) type Service struct { + interfaces.Logger } func (svc *Service) Send(s *models.NotificationSetting, args ...any) { @@ -31,7 +32,7 @@ func (svc *Service) Send(s *models.NotificationSetting, args ...any) { defer wg.Done() ch, err := service.NewModelService[models.NotificationChannel]().GetById(chId) if err != nil { - log.Errorf("[NotificationService] get channel error: %v", err) + svc.Errorf("[NotificationService] get channel error: %v", err) return } content := svc.getContent(s, ch, args...) @@ -57,7 +58,7 @@ func (svc *Service) SendMail(s *models.NotificationSetting, ch *models.Notificat // send mail err := SendMail(s, ch, mailTo, mailCc, mailBcc, title, content) if err != nil { - log.Errorf("[NotificationService] send mail error: %v", err) + svc.Errorf("[NotificationService] send mail error: %v", err) } // save request @@ -71,7 +72,7 @@ func (svc *Service) SendIM(ch *models.NotificationChannel, title, content string // send mobile notification err := SendIMNotification(ch, title, content) if err != nil { - log.Errorf("[NotificationService] send mobile notification error: %v", err) + svc.Errorf("[NotificationService] send mobile notification error: %v", err) } // save request @@ -107,7 +108,7 @@ func (svc *Service) SendTestMessage(locale string, ch *models.NotificationChanne // For email err = SendMail(nil, ch, toMail, nil, nil, title, content) if err != nil { - log.Errorf("failed to send test email: %v", err) + svc.Errorf("failed to send test email: %v", err) } case TypeIM: @@ -117,7 +118,7 @@ func (svc *Service) SendTestMessage(locale string, ch *models.NotificationChanne // For instant messaging err = SendIMNotification(ch, title, content) if err != nil { - log.Errorf("failed to send test IM notification: %v", err) + svc.Errorf("failed to send test IM notification: %v", err) } default: @@ -417,7 +418,7 @@ func (svc *Service) getUsernameById(id primitive.ObjectID) (username string) { } u, err := service.NewModelService[models.User]().GetById(id) if err != nil { - log.Errorf("[NotificationService] get user error: %v", err) + svc.Errorf("[NotificationService] get user error: %v", err) return "" } return u.Username @@ -494,8 +495,7 @@ func (svc *Service) SendNodeNotification(node *models.Node) { }, }, nil) if err != nil { - log.Errorf("get notification settings error: %v", err) - trace.PrintError(err) + svc.Errorf("get notification settings error: %v", err) return } @@ -537,7 +537,7 @@ func (svc *Service) createRequestMail(s *models.NotificationSetting, ch *models. r.SetUpdatedAt(time.Now()) r.Id, err = service.NewModelService[models.NotificationRequest]().InsertOne(r) if err != nil { - log.Errorf("[NotificationService] save request error: %v", err) + svc.Errorf("[NotificationService] save request error: %v", err) return nil, err } return &r, nil @@ -563,7 +563,7 @@ func (svc *Service) createRequestMailTest(ch *models.NotificationChannel, title, r.SetUpdatedAt(time.Now()) r.Id, err = service.NewModelService[models.NotificationRequest]().InsertOne(r) if err != nil { - log.Errorf("[NotificationService] save request error: %v", err) + svc.Errorf("[NotificationService] save request error: %v", err) return nil, err } return &r, nil @@ -581,7 +581,7 @@ func (svc *Service) createRequestIM(ch *models.NotificationChannel, title, conte r.SetUpdatedAt(time.Now()) r.Id, err = service.NewModelService[models.NotificationRequest]().InsertOne(r) if err != nil { - log.Errorf("[NotificationService] save request error: %v", err) + svc.Errorf("[NotificationService] save request error: %v", err) return nil, err } return &r, nil @@ -601,12 +601,14 @@ func (svc *Service) saveRequest(r *models.NotificationRequest, err error) { r.SetUpdatedAt(time.Now()) err = service.NewModelService[models.NotificationRequest]().ReplaceById(r.Id, *r) if err != nil { - log.Errorf("[NotificationService] save request error: %v", err) + svc.Errorf("[NotificationService] save request error: %v", err) } } func newNotificationService() *Service { - return &Service{} + return &Service{ + Logger: utils.NewLogger("NotificationService"), + } } var _service *Service diff --git a/core/schedule/logger.go b/core/schedule/logger.go index f1e18156..0556b3c8 100644 --- a/core/schedule/logger.go +++ b/core/schedule/logger.go @@ -1,28 +1,27 @@ package schedule import ( - "fmt" - "github.com/apex/log" - "github.com/crawlab-team/crawlab/trace" + "github.com/crawlab-team/crawlab/core/interfaces" + "github.com/crawlab-team/crawlab/core/utils" "github.com/robfig/cron/v3" "strings" ) -type Logger struct { +type CronLogger struct { + interfaces.Logger } -func (l *Logger) Info(msg string, keysAndValues ...interface{}) { +func (l *CronLogger) Info(msg string, keysAndValues ...interface{}) { p := l.getPlaceholder(len(keysAndValues)) - log.Infof(fmt.Sprintf("cron: %s %s", msg, p), keysAndValues...) + l.Infof("cron: %s %s", msg, p) } -func (l *Logger) Error(err error, msg string, keysAndValues ...interface{}) { +func (l *CronLogger) Error(err error, msg string, keysAndValues ...interface{}) { p := l.getPlaceholder(len(keysAndValues)) - log.Errorf(fmt.Sprintf("cron: %s %s", msg, p), keysAndValues...) - trace.PrintError(err) + l.Errorf("cron: %s %v %s", msg, err, p) } -func (l *Logger) getPlaceholder(n int) (s string) { +func (l *CronLogger) getPlaceholder(n int) (s string) { var arr []string for i := 0; i < n; i++ { arr = append(arr, "%v") @@ -30,6 +29,8 @@ func (l *Logger) getPlaceholder(n int) (s string) { return strings.Join(arr, " ") } -func NewLogger() cron.Logger { - return &Logger{} +func NewCronLogger() cron.Logger { + return &CronLogger{ + Logger: utils.NewLogger("CronLogger"), + } } diff --git a/core/schedule/service.go b/core/schedule/service.go index bea589e3..b93d832e 100644 --- a/core/schedule/service.go +++ b/core/schedule/service.go @@ -1,13 +1,11 @@ package schedule import ( - "github.com/apex/log" "github.com/crawlab-team/crawlab/core/interfaces" "github.com/crawlab-team/crawlab/core/models/models" "github.com/crawlab-team/crawlab/core/models/service" "github.com/crawlab-team/crawlab/core/spider/admin" "github.com/crawlab-team/crawlab/core/utils" - "github.com/crawlab-team/crawlab/trace" "github.com/robfig/cron/v3" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" @@ -32,6 +30,7 @@ type Service struct { schedules []models.Schedule stopped bool mu sync.Mutex + interfaces.Logger } func (svc *Service) GetLocation() (loc *time.Location) { @@ -67,7 +66,12 @@ func (svc *Service) SetUpdateInterval(interval time.Duration) { } func (svc *Service) Init() (err error) { - return svc.fetch() + err = svc.fetch() + if err != nil { + svc.Fatalf("failed to initialize schedule service: %v", err) + return err + } + return nil } func (svc *Service) Start() { @@ -91,7 +95,8 @@ func (svc *Service) Enable(s models.Schedule, by primitive.ObjectID) (err error) id, err := svc.cron.AddFunc(s.Cron, svc.schedule(s.Id)) if err != nil { - return trace.TraceError(err) + svc.Errorf("failed to add cron job: %v", err) + return err } s.Enabled = true s.EntryId = id @@ -129,7 +134,7 @@ func (svc *Service) GetCron() (c *cron.Cron) { func (svc *Service) update() { // fetch enabled schedules if err := svc.fetch(); err != nil { - trace.PrintError(err) + svc.Errorf("failed to fetch schedules: %v", err) return } @@ -145,7 +150,7 @@ func (svc *Service) update() { if !s.Enabled { err := svc.Enable(s, s.GetCreatedBy()) if err != nil { - trace.PrintError(err) + svc.Errorf("failed to enable schedule: %v", err) continue } } @@ -184,14 +189,14 @@ func (svc *Service) schedule(id primitive.ObjectID) (fn func()) { // schedule s, err := svc.modelSvc.GetById(id) if err != nil { - trace.PrintError(err) + svc.Errorf("failed to get schedule: %v", err) return } // spider spider, err := service.NewModelService[models.Spider]().GetById(s.SpiderId) if err != nil { - trace.PrintError(err) + svc.Errorf("failed to get spider: %v", err) return } @@ -229,7 +234,8 @@ func (svc *Service) schedule(id primitive.ObjectID) (fn func()) { // schedule or assign a task in the task queue if _, err := svc.adminSvc.Schedule(s.SpiderId, opts); err != nil { - trace.PrintError(err) + svc.Errorf("failed to schedule spider: %v", err) + return } } } @@ -244,10 +250,11 @@ func newScheduleService() *Service { updateInterval: 1 * time.Minute, adminSvc: admin.GetSpiderAdminService(), modelSvc: service.NewModelService[models.Schedule](), + Logger: utils.NewLogger("ScheduleService"), } // logger - svc.logger = NewLogger() + svc.logger = NewCronLogger() // cron svc.cron = cron.New( @@ -258,7 +265,6 @@ func newScheduleService() *Service { // initialize if err := svc.Init(); err != nil { - log.Fatalf("failed to initialize schedule service: %v", err) panic(err) } diff --git a/core/task/handler/runner.go b/core/task/handler/runner.go index e33499df..6ce4e756 100644 --- a/core/task/handler/runner.go +++ b/core/task/handler/runner.go @@ -20,7 +20,6 @@ import ( "github.com/crawlab-team/crawlab/core/models/models" - "github.com/apex/log" "github.com/crawlab-team/crawlab/core/constants" "github.com/crawlab-team/crawlab/core/entity" client2 "github.com/crawlab-team/crawlab/core/grpc/client" @@ -29,7 +28,6 @@ import ( "github.com/crawlab-team/crawlab/core/models/service" "github.com/crawlab-team/crawlab/core/utils" "github.com/crawlab-team/crawlab/grpc" - "github.com/crawlab-team/crawlab/trace" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" ) @@ -54,6 +52,7 @@ type Runner struct { err error // captures any process execution errors cwd string // current working directory for task conn grpc.TaskService_ConnectClient // gRPC stream connection for task service + interfaces.Logger // log handling scannerStdout *bufio.Reader // reader for process stdout @@ -95,7 +94,7 @@ func (r *Runner) Init() (err error) { // and status monitoring. Returns an error if the task execution fails. func (r *Runner) Run() (err error) { // log task started - log.Infof("task[%s] started", r.tid.Hex()) + r.Infof("task[%s] started", r.tid.Hex()) // configure working directory r.configureCwd() @@ -164,10 +163,10 @@ func (r *Runner) Cancel(force bool) (err error) { // Kill process err = utils.KillProcess(r.cmd, force) if err != nil { - log.Errorf("kill process error: %v", err) + r.Errorf("kill process error: %v", err) return err } - log.Debugf("attempt to kill process[%d]", r.pid) + r.Debugf("attempt to kill process[%d]", r.pid) // Create a context with timeout ctx, cancel := context.WithTimeout(context.Background(), r.svc.GetCancelTimeout()) @@ -219,7 +218,7 @@ func (r *Runner) configureCmd() (err error) { // get cmd instance r.cmd, err = utils.BuildCmd(cmdStr) if err != nil { - log.Errorf("error building command: %v", err) + r.Errorf("error building command: %v", err) return err } @@ -229,14 +228,14 @@ func (r *Runner) configureCmd() (err error) { // Configure pipes for IPC r.stdinPipe, err = r.cmd.StdinPipe() if err != nil { - log.Errorf("error creating stdin pipe: %v", err) + r.Errorf("error creating stdin pipe: %v", err) return err } // Add stdout pipe for IPC r.stdoutPipe, err = r.cmd.StdoutPipe() if err != nil { - log.Errorf("error creating stdout pipe: %v", err) + r.Errorf("error creating stdout pipe: %v", err) return err } @@ -297,7 +296,7 @@ func (r *Runner) configureEnv() { // Global environment variables envs, err := client.NewModelService[models.Environment]().GetMany(nil, nil) if err != nil { - trace.PrintError(err) + r.Errorf("error getting environment variables: %v", err) return } for _, env := range envs { @@ -350,20 +349,20 @@ func (r *Runner) syncFiles() (err error) { // get file list from master resp, err := r.createHttpRequest("GET", "/scan?path="+workingDir) if err != nil { - log.Errorf("error getting file list from master: %v", err) + r.Errorf("error getting file list from master: %v", err) return err } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { - log.Errorf("error reading response body: %v", err) + r.Errorf("error reading response body: %v", err) return err } var masterFiles map[string]entity.FsFileInfo err = json.Unmarshal(body, &masterFiles) if err != nil { - log.Errorf("error unmarshaling JSON for URL: %s", resp.Request.URL.String()) - log.Errorf("error details: %v", err) + r.Errorf("error unmarshaling JSON for URL: %s", resp.Request.URL.String()) + r.Errorf("error details: %v", err) return err } @@ -376,7 +375,7 @@ func (r *Runner) syncFiles() (err error) { // create working directory if not exists if _, err := os.Stat(r.cwd); os.IsNotExist(err) { if err := os.MkdirAll(r.cwd, os.ModePerm); err != nil { - log.Errorf("error creating worker directory: %v", err) + r.Errorf("error creating worker directory: %v", err) return err } } @@ -384,17 +383,17 @@ func (r *Runner) syncFiles() (err error) { // get file list from worker workerFiles, err := utils.ScanDirectory(r.cwd) if err != nil { - log.Errorf("error scanning worker directory: %v", err) - return trace.TraceError(err) + r.Errorf("error scanning worker directory: %v", err) + return err } // delete files that are deleted on master node for path, workerFile := range workerFiles { if _, exists := masterFilesMap[path]; !exists { - log.Infof("deleting file: %s", path) + r.Infof("deleting file: %s", path) err := os.Remove(workerFile.FullPath) if err != nil { - log.Errorf("error deleting file: %v", err) + r.Errorf("error deleting file: %v", err) } } } @@ -417,17 +416,17 @@ func (r *Runner) syncFiles() (err error) { defer wg.Done() if masterFile.IsDir { - log.Infof("directory needs to be synchronized: %s", path) + r.Infof("directory needs to be synchronized: %s", path) _err := os.MkdirAll(filepath.Join(r.cwd, path), masterFile.Mode) if _err != nil { - log.Errorf("error creating directory: %v", _err) + r.Errorf("error creating directory: %v", _err) err = errors.Join(err, _err) } } else { - log.Infof("file needs to be synchronized: %s", path) + r.Infof("file needs to be synchronized: %s", path) _err := r.downloadFile(path, filepath.Join(r.cwd, path), masterFile) if _err != nil { - log.Errorf("error downloading file: %v", _err) + r.Errorf("error downloading file: %v", _err) err = errors.Join(err, _err) } } @@ -449,11 +448,11 @@ func (r *Runner) syncFiles() (err error) { func (r *Runner) downloadFile(path string, filePath string, fileInfo *entity.FsFileInfo) error { resp, err := r.createHttpRequest("GET", "/download?path="+path) if err != nil { - log.Errorf("error getting file response: %v", err) + r.Errorf("error getting file response: %v", err) return err } if resp.StatusCode != http.StatusOK { - log.Errorf("error downloading file: %s", resp.Status) + r.Errorf("error downloading file: %s", resp.Status) return errors.New(resp.Status) } defer resp.Body.Close() @@ -463,14 +462,14 @@ func (r *Runner) downloadFile(path string, filePath string, fileInfo *entity.FsF utils.Exists(dirPath) err = os.MkdirAll(dirPath, os.ModePerm) if err != nil { - log.Errorf("error creating directory: %v", err) + r.Errorf("error creating directory: %v", err) return err } // create local file out, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, fileInfo.Mode) if err != nil { - log.Errorf("error creating file: %v", err) + r.Errorf("error creating file: %v", err) return err } defer out.Close() @@ -478,7 +477,7 @@ func (r *Runner) downloadFile(path string, filePath string, fileInfo *entity.FsF // copy file content to local file _, err = io.Copy(out, resp.Body) if err != nil { - log.Errorf("error copying file: %v", err) + r.Errorf("error copying file: %v", err) return err } return nil @@ -498,33 +497,33 @@ func (r *Runner) getHttpRequestHeaders() (headers map[string]string) { func (r *Runner) wait() (err error) { // start a goroutine to wait for process to finish go func() { - log.Debugf("waiting for process[%d] to finish", r.pid) + r.Debugf("waiting for process[%d] to finish", r.pid) err = r.cmd.Wait() if err != nil { var exitError *exec.ExitError if !errors.As(err, &exitError) { r.ch <- constants.TaskSignalError - log.Debugf("process[%d] exited with error: %v", r.pid, err) + r.Debugf("process[%d] exited with error: %v", r.pid, err) return } exitCode := exitError.ExitCode() if exitCode == -1 { // cancel error r.ch <- constants.TaskSignalCancel - log.Debugf("process[%d] cancelled", r.pid) + r.Debugf("process[%d] cancelled", r.pid) return } // standard error r.err = err r.ch <- constants.TaskSignalError - log.Debugf("process[%d] exited with error: %v", r.pid, err) + r.Debugf("process[%d] exited with error: %v", r.pid, err) return } // success r.ch <- constants.TaskSignalFinish - log.Debugf("process[%d] exited successfully", r.pid) + r.Debugf("process[%d] exited successfully", r.pid) }() // declare task status @@ -552,7 +551,7 @@ func (r *Runner) wait() (err error) { // update task status if err := r.updateTask(status, err); err != nil { - log.Errorf("error updating task status: %v", err) + r.Errorf("error updating task status: %v", err) return err } @@ -601,7 +600,7 @@ func (r *Runner) updateTask(status string, e error) (err error) { func (r *Runner) initConnection() (err error) { r.conn, err = client2.GetGrpcClient().TaskClient.Connect(context.Background()) if err != nil { - log.Errorf("error connecting to task service: %v", err) + r.Errorf("error connecting to task service: %v", err) return err } return nil @@ -611,7 +610,7 @@ func (r *Runner) initConnection() (err error) { func (r *Runner) writeLogLines(lines []string) { linesBytes, err := json.Marshal(lines) if err != nil { - log.Errorf("error marshaling log lines: %v", err) + r.Errorf("error marshaling log lines: %v", err) return } msg := &grpc.TaskServiceConnectRequest{ @@ -620,7 +619,7 @@ func (r *Runner) writeLogLines(lines []string) { Data: linesBytes, } if err := r.conn.Send(msg); err != nil { - log.Errorf("error sending log lines: %v", err) + r.Errorf("error sending log lines: %v", err) return } } @@ -631,7 +630,7 @@ func (r *Runner) writeLogLines(lines []string) { func (r *Runner) _updateTaskStat(status string) { ts, err := client.NewModelService[models.TaskStat]().GetById(r.tid) if err != nil { - trace.PrintError(err) + r.Errorf("error getting task stat: %v", err) return } switch status { @@ -652,13 +651,13 @@ func (r *Runner) _updateTaskStat(status string) { if r.svc.GetNodeConfigService().IsMaster() { err = service.NewModelService[models.TaskStat]().ReplaceById(ts.Id, *ts) if err != nil { - trace.PrintError(err) + r.Errorf("error updating task stat: %v", err) return } } else { err = client.NewModelService[models.TaskStat]().ReplaceById(ts.Id, *ts) if err != nil { - trace.PrintError(err) + r.Errorf("error updating task stat: %v", err) return } } @@ -672,8 +671,7 @@ func (r *Runner) sendNotification() { } _, err := client2.GetGrpcClient().TaskClient.SendNotification(context.Background(), req) if err != nil { - log.Errorf("error sending notification: %v", err) - trace.PrintError(err) + r.Errorf("error sending notification: %v", err) return } } @@ -686,7 +684,7 @@ func (r *Runner) _updateSpiderStat(status string) { // task stat ts, err := client.NewModelService[models.TaskStat]().GetById(r.tid) if err != nil { - trace.PrintError(err) + r.Errorf("error getting task stat: %v", err) return } @@ -715,8 +713,7 @@ func (r *Runner) _updateSpiderStat(status string) { }, } default: - log.Errorf("Invalid task status: %s", status) - trace.PrintError(errors.New("invalid task status")) + r.Errorf("Invalid task status: %s", status) return } @@ -724,13 +721,13 @@ func (r *Runner) _updateSpiderStat(status string) { if r.svc.GetNodeConfigService().IsMaster() { err = service.NewModelService[models.SpiderStat]().UpdateById(r.s.Id, update) if err != nil { - trace.PrintError(err) + r.Errorf("error updating spider stat: %v", err) return } } else { err = client.NewModelService[models.SpiderStat]().UpdateById(r.s.Id, update) if err != nil { - trace.PrintError(err) + r.Errorf("error updating spider stat: %v", err) return } } @@ -755,14 +752,14 @@ func (r *Runner) handleIPC() { // Convert message to JSON jsonData, err := json.Marshal(msg) if err != nil { - log.Errorf("error marshaling IPC message: %v", err) + r.Errorf("error marshaling IPC message: %v", err) continue } // Write to child process's stdin _, err = fmt.Fprintln(r.stdinPipe, string(jsonData)) if err != nil { - log.Errorf("error writing to child process: %v", err) + r.Errorf("error writing to child process: %v", err) } } } @@ -800,7 +797,7 @@ func (r *Runner) startIPCReader() { if ipcMsg.Type == "" || ipcMsg.Type == constants.IPCMessageData { r.handleIPCInsertDataMessage(ipcMsg) } else { - log.Warnf("no IPC handler set for message: %v", ipcMsg) + r.Warnf("no IPC handler set for message: %v", ipcMsg) } } } else { @@ -815,7 +812,7 @@ func (r *Runner) startIPCReader() { func (r *Runner) handleIPCInsertDataMessage(ipcMsg entity.IPCMessage) { // Validate message if ipcMsg.Payload == nil { - log.Errorf("empty payload in IPC message") + r.Errorf("empty payload in IPC message") return } @@ -829,7 +826,7 @@ func (r *Runner) handleIPCInsertDataMessage(ipcMsg entity.IPCMessage) { if itemMap, ok := item.(map[string]interface{}); ok { records = append(records, itemMap) } else { - log.Errorf("invalid record at index %d: %v", i, item) + r.Errorf("invalid record at index %d: %v", i, item) continue } } @@ -841,30 +838,30 @@ func (r *Runner) handleIPCInsertDataMessage(ipcMsg entity.IPCMessage) { if itemMap, ok := payload.(map[string]interface{}); ok { records = []map[string]interface{}{itemMap} } else { - log.Errorf("invalid payload type: %T", payload) + r.Errorf("invalid payload type: %T", payload) return } default: - log.Errorf("unsupported payload type: %T, value: %v", payload, ipcMsg.Payload) + r.Errorf("unsupported payload type: %T, value: %v", payload, ipcMsg.Payload) return } // Validate records if len(records) == 0 { - log.Warnf("no valid records to insert") + r.Warnf("no valid records to insert") return } // Marshal data with error handling data, err := json.Marshal(records) if err != nil { - log.Errorf("error marshaling records: %v", err) + r.Errorf("error marshaling records: %v", err) return } // Validate connection if r.conn == nil { - log.Errorf("gRPC connection not initialized") + r.Errorf("gRPC connection not initialized") return } @@ -882,11 +879,11 @@ func (r *Runner) handleIPCInsertDataMessage(ipcMsg entity.IPCMessage) { // Use context for sending select { case <-ctx.Done(): - log.Errorf("timeout sending IPC message") + r.Errorf("timeout sending IPC message") return default: if err := r.conn.Send(grpcMsg); err != nil { - log.Errorf("error sending IPC message: %v", err) + r.Errorf("error sending IPC message: %v", err) return } } @@ -898,7 +895,6 @@ func newTaskRunner(id primitive.ObjectID, svc *Service) (r *Runner, err error) { // validate options if id.IsZero() { err = fmt.Errorf("invalid task id: %s", id.Hex()) - log.Errorf("error creating task runner: %v", err) return nil, err } @@ -910,6 +906,7 @@ func newTaskRunner(id primitive.ObjectID, svc *Service) (r *Runner, err error) { tid: id, ch: make(chan constants.TaskSignal), logBatchSize: 20, + Logger: utils.NewLogger("TaskRunner"), } // multi error @@ -936,7 +933,7 @@ func newTaskRunner(id primitive.ObjectID, svc *Service) (r *Runner, err error) { // initialize task runner if err := r.Init(); err != nil { - log.Errorf("error initializing task runner: %v", err) + r.Errorf("error initializing task runner: %v", err) errs.Errors = append(errs.Errors, err) } diff --git a/core/task/handler/service.go b/core/task/handler/service.go index d3288957..3ee92af9 100644 --- a/core/task/handler/service.go +++ b/core/task/handler/service.go @@ -4,17 +4,15 @@ import ( "context" "errors" "fmt" - "github.com/apex/log" "github.com/crawlab-team/crawlab/core/constants" - errors2 "github.com/crawlab-team/crawlab/core/errors" grpcclient "github.com/crawlab-team/crawlab/core/grpc/client" "github.com/crawlab-team/crawlab/core/interfaces" "github.com/crawlab-team/crawlab/core/models/client" "github.com/crawlab-team/crawlab/core/models/models" "github.com/crawlab-team/crawlab/core/models/service" nodeconfig "github.com/crawlab-team/crawlab/core/node/config" + "github.com/crawlab-team/crawlab/core/utils" "github.com/crawlab-team/crawlab/grpc" - "github.com/crawlab-team/crawlab/trace" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "io" @@ -38,6 +36,7 @@ type Service struct { mu sync.Mutex runners sync.Map // pool of task runners started syncLocks sync.Map // files sync locks map of task runners + interfaces.Logger } func (svc *Service) Start() { @@ -123,7 +122,7 @@ func (svc *Service) reportStatus() { case <-ticker.C: // update node status if err := svc.updateNodeStatus(); err != nil { - log.Errorf("failed to report status: %v", err) + svc.Errorf("failed to report status: %v", err) } } } @@ -161,7 +160,7 @@ func (svc *Service) GetTaskById(id primitive.ObjectID) (t *models.Task, err erro t, err = client.NewModelService[models.Task]().GetById(id) } if err != nil { - log.Errorf("failed to get task by id: %v", err) + svc.Errorf("failed to get task by id: %v", err) return nil, err } @@ -188,7 +187,7 @@ func (svc *Service) GetSpiderById(id primitive.ObjectID) (s *models.Spider, err s, err = client.NewModelService[models.Spider]().GetById(id) } if err != nil { - log.Errorf("failed to get spider by id: %v", err) + svc.Errorf("failed to get spider by id: %v", err) return nil, err } @@ -198,7 +197,7 @@ func (svc *Service) GetSpiderById(id primitive.ObjectID) (s *models.Spider, err func (svc *Service) getRunnerCount() (count int) { n, err := svc.GetCurrentNode() if err != nil { - log.Errorf("failed to get current node: %v", err) + svc.Errorf("failed to get current node: %v", err) return } query := bson.M{ @@ -210,13 +209,13 @@ func (svc *Service) getRunnerCount() (count int) { if svc.cfgSvc.IsMaster() { count, err = service.NewModelService[models.Task]().Count(query) if err != nil { - log.Errorf("failed to count tasks: %v", err) + svc.Errorf("failed to count tasks: %v", err) return } } else { count, err = client.NewModelService[models.Task]().Count(query) if err != nil { - log.Errorf("failed to count tasks: %v", err) + svc.Errorf("failed to count tasks: %v", err) return } } @@ -224,27 +223,31 @@ func (svc *Service) getRunnerCount() (count int) { } func (svc *Service) getRunner(taskId primitive.ObjectID) (r interfaces.TaskRunner, err error) { - log.Debugf("[TaskHandlerService] getRunner: taskId[%v]", taskId) + svc.Debugf("[TaskHandlerService] getRunner: taskId[%v]", taskId) v, ok := svc.runners.Load(taskId) if !ok { - return nil, trace.TraceError(errors2.ErrorTaskNotExists) + err = fmt.Errorf("task[%s] not exists", taskId.Hex()) + svc.Errorf("get runner error: %v", err) + return nil, err } switch v.(type) { case interfaces.TaskRunner: r = v.(interfaces.TaskRunner) default: - return nil, trace.TraceError(errors2.ErrorModelInvalidType) + err = fmt.Errorf("invalid type: %T", v) + svc.Errorf("get runner error: %v", err) + return nil, err } return r, nil } func (svc *Service) addRunner(taskId primitive.ObjectID, r interfaces.TaskRunner) { - log.Debugf("[TaskHandlerService] addRunner: taskId[%s]", taskId.Hex()) + svc.Debugf("[TaskHandlerService] addRunner: taskId[%s]", taskId.Hex()) svc.runners.Store(taskId, r) } func (svc *Service) deleteRunner(taskId primitive.ObjectID) { - log.Debugf("[TaskHandlerService] deleteRunner: taskId[%v]", taskId) + svc.Debugf("[TaskHandlerService] deleteRunner: taskId[%v]", taskId) svc.runners.Delete(taskId) } @@ -294,7 +297,7 @@ func (svc *Service) runTask(taskId primitive.ObjectID) (err error) { _, ok := svc.runners.Load(taskId) if ok { err = fmt.Errorf("task[%s] already exists", taskId.Hex()) - log.Errorf("run task error: %v", err) + svc.Errorf("run task error: %v", err) return err } @@ -302,7 +305,7 @@ func (svc *Service) runTask(taskId primitive.ObjectID) (err error) { r, err := newTaskRunner(taskId, svc) if err != nil { err = fmt.Errorf("failed to create task runner: %v", err) - log.Errorf("run task error: %v", err) + svc.Errorf("run task error: %v", err) return err } @@ -318,22 +321,22 @@ func (svc *Service) runTask(taskId primitive.ObjectID) (err error) { // create a goroutine to handle stream messages go svc.handleStreamMessages(r.GetTaskId(), stream, stopCh) } else { - log.Errorf("failed to subscribe task[%s]: %v", r.GetTaskId().Hex(), err) - log.Warnf("task[%s] will not be able to receive stream messages", r.GetTaskId().Hex()) + svc.Errorf("failed to subscribe task[%s]: %v", r.GetTaskId().Hex(), err) + svc.Warnf("task[%s] will not be able to receive stream messages", r.GetTaskId().Hex()) } // run task process (blocking) error or finish after task runner ends if err := r.Run(); err != nil { switch { case errors.Is(err, constants.ErrTaskError): - log.Errorf("task[%s] finished with error: %v", r.GetTaskId().Hex(), err) + svc.Errorf("task[%s] finished with error: %v", r.GetTaskId().Hex(), err) case errors.Is(err, constants.ErrTaskCancelled): - log.Errorf("task[%s] cancelled", r.GetTaskId().Hex()) + svc.Errorf("task[%s] cancelled", r.GetTaskId().Hex()) default: - log.Errorf("task[%s] finished with unknown error: %v", r.GetTaskId().Hex(), err) + svc.Errorf("task[%s] finished with unknown error: %v", r.GetTaskId().Hex(), err) } } - log.Infof("task[%s] finished", r.GetTaskId().Hex()) + svc.Infof("task[%s] finished", r.GetTaskId().Hex()) // send stopCh signal to stream message handler stopCh <- struct{}{} @@ -353,7 +356,7 @@ func (svc *Service) subscribeTask(taskId primitive.ObjectID) (stream grpc.TaskSe } stream, err = svc.c.TaskClient.Subscribe(ctx, req) if err != nil { - log.Errorf("failed to subscribe task[%s]: %v", taskId.Hex(), err) + svc.Errorf("failed to subscribe task[%s]: %v", taskId.Hex(), err) return nil, err } return stream, nil @@ -365,7 +368,7 @@ func (svc *Service) handleStreamMessages(taskId primitive.ObjectID, stream grpc. case <-stopCh: err := stream.CloseSend() if err != nil { - log.Errorf("task[%s] failed to close stream: %v", taskId.Hex(), err) + svc.Errorf("task[%s] failed to close stream: %v", taskId.Hex(), err) return } return @@ -373,15 +376,15 @@ func (svc *Service) handleStreamMessages(taskId primitive.ObjectID, stream grpc. msg, err := stream.Recv() if err != nil { if errors.Is(err, io.EOF) { - log.Infof("task[%s] received EOF, stream closed", taskId.Hex()) + svc.Infof("task[%s] received EOF, stream closed", taskId.Hex()) return } - log.Errorf("task[%s] stream error: %v", taskId.Hex(), err) + svc.Errorf("task[%s] stream error: %v", taskId.Hex(), err) continue } switch msg.Code { case grpc.TaskServiceSubscribeCode_CANCEL: - log.Infof("task[%s] received cancel signal", taskId.Hex()) + svc.Infof("task[%s] received cancel signal", taskId.Hex()) go svc.handleCancel(msg, taskId) } } @@ -391,28 +394,28 @@ func (svc *Service) handleStreamMessages(taskId primitive.ObjectID, stream grpc. func (svc *Service) handleCancel(msg *grpc.TaskServiceSubscribeResponse, taskId primitive.ObjectID) { // validate task id if msg.TaskId != taskId.Hex() { - log.Errorf("task[%s] received cancel signal for another task[%s]", taskId.Hex(), msg.TaskId) + svc.Errorf("task[%s] received cancel signal for another task[%s]", taskId.Hex(), msg.TaskId) return } // cancel task err := svc.cancelTask(taskId, msg.Force) if err != nil { - log.Errorf("task[%s] failed to cancel: %v", taskId.Hex(), err) + svc.Errorf("task[%s] failed to cancel: %v", taskId.Hex(), err) return } - log.Infof("task[%s] cancelled", taskId.Hex()) + svc.Infof("task[%s] cancelled", taskId.Hex()) // set task status as "cancelled" t, err := svc.GetTaskById(taskId) if err != nil { - log.Errorf("task[%s] failed to get task: %v", taskId.Hex(), err) + svc.Errorf("task[%s] failed to get task: %v", taskId.Hex(), err) return } t.Status = constants.TaskStatusCancelled err = svc.UpdateTask(t) if err != nil { - log.Errorf("task[%s] failed to update task: %v", taskId.Hex(), err) + svc.Errorf("task[%s] failed to update task: %v", taskId.Hex(), err) } } @@ -436,6 +439,7 @@ func newTaskHandlerService() *Service { cancelTimeout: 60 * time.Second, mu: sync.Mutex{}, runners: sync.Map{}, + Logger: utils.NewLogger("TaskHandlerService"), } // dependency injection diff --git a/core/task/log/file_driver.go b/core/task/log/file_driver.go index 151a7bb3..57d33d91 100644 --- a/core/task/log/file_driver.go +++ b/core/task/log/file_driver.go @@ -3,10 +3,9 @@ package log import ( "bufio" "bytes" - "errors" - "github.com/apex/log" + "fmt" + "github.com/crawlab-team/crawlab/core/interfaces" "github.com/crawlab-team/crawlab/core/utils" - "github.com/crawlab-team/crawlab/trace" "github.com/spf13/viper" "io" "os" @@ -24,6 +23,7 @@ type FileLogDriver struct { // internals mu sync.Mutex + interfaces.Logger } func (d *FileLogDriver) Init() { @@ -43,18 +43,21 @@ func (d *FileLogDriver) WriteLine(id string, line string) (err error) { f, err := os.OpenFile(filePath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, os.FileMode(0760)) if err != nil { - return trace.TraceError(err) + d.Errorf("open file error: %v", err) + return err } defer func(f *os.File) { err := f.Close() if err != nil { - log.Errorf("close file error: %s", err.Error()) + d.Errorf("close file error: %v", err) + return } }(f) _, err = f.WriteString(line + "\n") if err != nil { - return trace.TraceError(err) + d.Errorf("write file error: %v", err) + return err } return nil @@ -63,6 +66,7 @@ func (d *FileLogDriver) WriteLine(id string, line string) (err error) { func (d *FileLogDriver) WriteLines(id string, lines []string) (err error) { linesString := strings.Join(lines, "\n") if err := d.WriteLine(id, linesString); err != nil { + d.Errorf("write line error: %v", err) return err } return nil @@ -70,7 +74,9 @@ func (d *FileLogDriver) WriteLines(id string, lines []string) (err error) { func (d *FileLogDriver) Find(id string, pattern string, skip int, limit int) (lines []string, err error) { if pattern != "" { - return lines, errors.New("not implemented") + err = fmt.Errorf("find with pattern not implemented") + d.Errorf("%v", err) + return lines, err } if !utils.Exists(d.getLogFilePath(id, d.logFileName)) { return nil, nil @@ -78,7 +84,8 @@ func (d *FileLogDriver) Find(id string, pattern string, skip int, limit int) (li f, err := os.Open(d.getLogFilePath(id, d.logFileName)) if err != nil { - return nil, trace.TraceError(err) + d.Errorf("failed to open file: %v", err) + return nil, err } defer f.Close() @@ -110,7 +117,9 @@ func (d *FileLogDriver) Find(id string, pattern string, skip int, limit int) (li func (d *FileLogDriver) Count(id string, pattern string) (n int, err error) { if pattern != "" { - return n, errors.New("not implemented") + err = fmt.Errorf("count with pattern not implemented") + d.Errorf("%v", err) + return n, err } if !utils.Exists(d.getLogFilePath(id, d.logFileName)) { return 0, nil @@ -118,7 +127,8 @@ func (d *FileLogDriver) Count(id string, pattern string) (n int, err error) { f, err := os.Open(d.getLogFilePath(id, d.logFileName)) if err != nil { - return n, trace.TraceError(err) + d.Errorf("failed to open file: %v", err) + return n, err } return d.lineCounter(f) } @@ -142,7 +152,7 @@ func (d *FileLogDriver) getLogFilePath(id, fileName string) (filePath string) { func (d *FileLogDriver) getLogFiles(id string) (files []os.FileInfo) { files, err := utils.ListDir(d.getBasePath(id)) if err != nil { - log.Errorf("failed to list log files: %s", err.Error()) + d.Errorf("failed to list log files: %v", err) return nil } return @@ -151,7 +161,8 @@ func (d *FileLogDriver) getLogFiles(id string) (files []os.FileInfo) { func (d *FileLogDriver) initDir(id string) { if !utils.Exists(d.getBasePath(id)) { if err := os.MkdirAll(d.getBasePath(id), os.FileMode(0770)); err != nil { - trace.PrintError(err) + d.Errorf("failed to create log directory: %s", d.getBasePath(id)) + return } } } @@ -218,7 +229,7 @@ func (d *FileLogDriver) getTtl() time.Duration { func (d *FileLogDriver) cleanup() { // check if log path is set if utils.GetTaskLogPath() == "" { - log.Errorf("log path is not set") + d.Errorf("log path is not set") return } @@ -226,7 +237,7 @@ func (d *FileLogDriver) cleanup() { if !utils.Exists(utils.GetTaskLogPath()) { // create log directory if not exists if err := os.MkdirAll(utils.GetTaskLogPath(), os.FileMode(0770)); err != nil { - log.Errorf("failed to create log directory: %s", utils.GetTaskLogPath()) + d.Errorf("failed to create log directory: %s", utils.GetTaskLogPath()) return } } @@ -238,16 +249,16 @@ func (d *FileLogDriver) cleanup() { case <-ticker.C: dirs, err := utils.ListDir(utils.GetTaskLogPath()) if err != nil { - log.Errorf("failed to list log directory: %s", utils.GetTaskLogPath()) + d.Errorf("failed to list log directory: %s", utils.GetTaskLogPath()) continue } for _, dir := range dirs { if time.Now().After(dir.ModTime().Add(d.getTtl())) { if err := os.RemoveAll(d.getBasePath(dir.Name())); err != nil { - log.Errorf("failed to remove outdated log directory: %s", d.getBasePath(dir.Name())) + d.Errorf("failed to remove outdated log directory: %s", d.getBasePath(dir.Name())) continue } - log.Infof("removed outdated log directory: %s", d.getBasePath(dir.Name())) + d.Infof("removed outdated log directory: %s", d.getBasePath(dir.Name())) } } } @@ -259,6 +270,7 @@ func newFileLogDriver() Driver { driver := &FileLogDriver{ logFileName: "log.txt", mu: sync.Mutex{}, + Logger: utils.NewLogger("FileLogDriver"), } // init diff --git a/core/task/scheduler/service.go b/core/task/scheduler/service.go index f1c12c5e..53c552bd 100644 --- a/core/task/scheduler/service.go +++ b/core/task/scheduler/service.go @@ -3,16 +3,14 @@ package scheduler import ( errors2 "errors" "fmt" - "github.com/apex/log" "github.com/crawlab-team/crawlab/core/constants" - "github.com/crawlab-team/crawlab/core/errors" "github.com/crawlab-team/crawlab/core/grpc/server" + "github.com/crawlab-team/crawlab/core/interfaces" "github.com/crawlab-team/crawlab/core/models/models" "github.com/crawlab-team/crawlab/core/models/service" "github.com/crawlab-team/crawlab/core/task/handler" "github.com/crawlab-team/crawlab/core/utils" "github.com/crawlab-team/crawlab/grpc" - "github.com/crawlab-team/crawlab/trace" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" mongo2 "go.mongodb.org/mongo-driver/mongo" @@ -27,6 +25,9 @@ type Service struct { // settings interval time.Duration + + // internals + interfaces.Logger } func (svc *Service) Start() { @@ -57,7 +58,8 @@ func (svc *Service) Enqueue(t *models.Task, by primitive.ObjectID) (t2 *models.T // add task stat _, err = service.NewModelService[models.TaskStat]().InsertOne(ts) if err != nil { - return nil, trace.TraceError(err) + svc.Errorf("failed to add task stat: %s", t.Id.Hex()) + return nil, err } // success @@ -68,7 +70,7 @@ func (svc *Service) Cancel(id, by primitive.ObjectID, force bool) (err error) { // task t, err := service.NewModelService[models.Task]().GetById(id) if err != nil { - log.Errorf("task not found: %s", id.Hex()) + svc.Errorf("task not found: %s", id.Hex()) return err } @@ -101,7 +103,7 @@ func (svc *Service) Cancel(id, by primitive.ObjectID, force bool) (err error) { func (svc *Service) cancelOnMaster(t *models.Task, by primitive.ObjectID, force bool) (err error) { if err := svc.handlerSvc.Cancel(t.Id, force); err != nil { - log.Errorf("failed to cancel task on master: %s", t.Id.Hex()) + svc.Errorf("failed to cancel task on master: %s", t.Id.Hex()) return err } @@ -115,7 +117,7 @@ func (svc *Service) cancelOnWorker(t *models.Task, by primitive.ObjectID, force stream, ok := svc.svr.TaskSvr.GetSubscribeStream(t.Id) if !ok { err := fmt.Errorf("stream not found for task: %s", t.Id.Hex()) - log.Errorf(err.Error()) + svc.Errorf(err.Error()) t.Status = constants.TaskStatusAbnormal t.Error = err.Error() return svc.SaveTask(t, by) @@ -128,7 +130,7 @@ func (svc *Service) cancelOnWorker(t *models.Task, by primitive.ObjectID, force Force: force, }) if err != nil { - log.Errorf("failed to send cancel request to worker: %s", t.Id.Hex()) + svc.Errorf("failed to send cancel request to worker: %s", t.Id.Hex()) return err } @@ -167,14 +169,15 @@ func (svc *Service) initTaskStatus() { if errors2.Is(err, mongo2.ErrNoDocuments) { return } - log.Errorf("failed to get running tasks: %v", err) + svc.Errorf("failed to get running tasks: %v", err) return } for _, t := range runningTasks { go func(t *models.Task) { t.Status = constants.TaskStatusAbnormal if err := svc.SaveTask(t, primitive.NilObjectID); err != nil { - trace.PrintError(err) + svc.Errorf("failed to set task status as TaskStatusAbnormal: %s", t.Id.Hex()) + return } }(&t) } @@ -182,14 +185,18 @@ func (svc *Service) initTaskStatus() { func (svc *Service) isMasterNode(t *models.Task) (ok bool, err error) { if t.NodeId.IsZero() { - return false, trace.TraceError(errors.ErrorTaskNoNodeId) + err = fmt.Errorf("task %s has no node id", t.Id.Hex()) + svc.Errorf("%v", err) + return false, err } n, err := service.NewModelService[models.Node]().GetById(t.NodeId) if err != nil { if errors2.Is(err, mongo2.ErrNoDocuments) { - return false, trace.TraceError(errors.ErrorTaskNodeNotFound) + svc.Errorf("node not found: %s", t.NodeId.Hex()) + return false, err } - return false, trace.TraceError(err) + svc.Errorf("failed to get node: %s", t.NodeId.Hex()) + return false, err } return n.IsMaster, nil } @@ -218,14 +225,14 @@ func (svc *Service) cleanupTasks() { if err := service.NewModelService[models.Task]().DeleteMany(bson.M{ "_id": bson.M{"$in": ids}, }); err != nil { - trace.PrintError(err) + svc.Warnf("failed to remove tasks: %v", err) } // remove task stats if err := service.NewModelService[models.TaskStat]().DeleteMany(bson.M{ "_id": bson.M{"$in": ids}, }); err != nil { - trace.PrintError(err) + svc.Warnf("failed to remove task stats: %v", err) } } diff --git a/core/task/stats/service.go b/core/task/stats/service.go index 75d64741..5dfb87f1 100644 --- a/core/task/stats/service.go +++ b/core/task/stats/service.go @@ -1,7 +1,6 @@ package stats import ( - log2 "github.com/apex/log" "github.com/crawlab-team/crawlab/core/constants" "github.com/crawlab-team/crawlab/core/database" interfaces2 "github.com/crawlab-team/crawlab/core/database/interfaces" @@ -12,7 +11,6 @@ import ( "github.com/crawlab-team/crawlab/core/task/log" "github.com/crawlab-team/crawlab/core/utils" "github.com/crawlab-team/crawlab/db/mongo" - "github.com/crawlab-team/crawlab/trace" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "sync" @@ -37,6 +35,7 @@ type Service struct { databaseServiceItems map[string]*databaseServiceItem databaseServiceTll time.Duration logDriver log.Driver + interfaces.Logger } func (svc *Service) Init() (err error) { @@ -57,7 +56,7 @@ func (svc *Service) InsertData(taskId primitive.ObjectID, records ...map[string] if utils.IsPro() && dbSvc != nil { for _, record := range records { if err := dbSvc.CreateRow(dbId, "", tableName, svc.normalizeRecord(item, record)); err != nil { - log2.Errorf("failed to insert data: %v", err) + svc.Errorf("failed to insert data: %v", err) continue } count++ @@ -69,7 +68,7 @@ func (svc *Service) InsertData(taskId primitive.ObjectID, records ...map[string] } _, err = mongo.GetMongoCol(tableName).InsertMany(recordsToInsert) if err != nil { - log2.Errorf("failed to insert data: %v", err) + svc.Errorf("failed to insert data: %v", err) return err } count = len(records) @@ -143,7 +142,8 @@ func (svc *Service) updateTaskStats(id primitive.ObjectID, resultCount int) { }, }) if err != nil { - trace.PrintError(err) + svc.Errorf("failed to update task stats: %v", err) + return } } @@ -182,6 +182,7 @@ func NewTaskStatsService() *Service { mu: sync.Mutex{}, databaseServiceItems: map[string]*databaseServiceItem{}, databaseServiceTll: 10 * time.Minute, + Logger: utils.NewLogger("TaskStatsService"), } svc.nodeCfgSvc = nodeconfig.GetNodeConfigService() diff --git a/core/user/service.go b/core/user/service.go index 10002a83..6a19334c 100644 --- a/core/user/service.go +++ b/core/user/service.go @@ -2,13 +2,12 @@ package user import ( errors2 "errors" - "github.com/apex/log" "github.com/crawlab-team/crawlab/core/constants" "github.com/crawlab-team/crawlab/core/errors" + "github.com/crawlab-team/crawlab/core/interfaces" "github.com/crawlab-team/crawlab/core/models/models" "github.com/crawlab-team/crawlab/core/models/service" "github.com/crawlab-team/crawlab/core/utils" - "github.com/crawlab-team/crawlab/trace" "github.com/golang-jwt/jwt/v5" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" @@ -20,6 +19,7 @@ import ( type Service struct { jwtSecret string jwtSigningMethod jwt.SigningMethod + interfaces.Logger } func (svc *Service) Init() (err error) { @@ -70,7 +70,7 @@ func (svc *Service) initPro() (err error) { u.SetUpdatedAt(time.Now()) err = service.NewModelService[models.User]().ReplaceById(u.Id, *u) if err != nil { - log.Errorf("failed to update user: %v", err) + svc.Errorf("failed to update user: %v", err) } return } @@ -227,12 +227,13 @@ func newUserService() (svc *Service, err error) { svc = &Service{ jwtSecret: "crawlab", jwtSigningMethod: jwt.SigningMethodHS256, + Logger: utils.NewLogger("UserService"), } // initialize if err := svc.Init(); err != nil { - log.Errorf("failed to initialize user service: %v", err) - return nil, trace.TraceError(err) + svc.Errorf("failed to initialize user service: %v", err) + return nil, err } return svc, nil diff --git a/core/utils/backoff.go b/core/utils/backoff.go deleted file mode 100644 index 6010acfa..00000000 --- a/core/utils/backoff.go +++ /dev/null @@ -1,15 +0,0 @@ -package utils - -import ( - "github.com/apex/log" - "github.com/cenkalti/backoff/v4" - "github.com/crawlab-team/crawlab/trace" - "time" -) - -func BackoffErrorNotify(prefix string) backoff.Notify { - return func(err error, duration time.Duration) { - log.Errorf("%s error: %v. reattempt in %.1f seconds...", prefix, err, duration.Seconds()) - trace.PrintError(err) - } -} diff --git a/core/utils/config.go b/core/utils/config.go index cc51f9d3..d7556a44 100644 --- a/core/utils/config.go +++ b/core/utils/config.go @@ -2,7 +2,6 @@ package utils import ( "fmt" - "github.com/apex/log" "github.com/gin-gonic/gin" "github.com/mitchellh/go-homedir" "github.com/spf13/viper" @@ -88,7 +87,7 @@ func IsPro() bool { func GetWorkspace() string { homedirPath, err := homedir.Dir() if err != nil { - log.Warnf("cannot find home directory: %v", err) + logger.Warnf("cannot find home directory: %v", err) return DefaultWorkspace } if res := viper.GetString("workspace"); res != "" { @@ -232,8 +231,8 @@ func GetNodeMaxRunners() int { func GetMetadataConfigPath() string { var homeDirPath, err = homedir.Dir() if err != nil { - log.Errorf("failed to get home directory: %v", err) - log.Errorf("please set metadata directory path using either CRAWLAB_METADATA environment variable or the metadata path in the configuration file") + logger.Errorf("failed to get home directory: %v", err) + logger.Errorf("please set metadata directory path using either CRAWLAB_METADATA environment variable or the metadata path in the configuration file") panic(err) } diff --git a/core/utils/file.go b/core/utils/file.go index b75501d6..5a965bc0 100644 --- a/core/utils/file.go +++ b/core/utils/file.go @@ -5,21 +5,18 @@ import ( "crypto/md5" "encoding/hex" "fmt" - "github.com/apex/log" "github.com/crawlab-team/crawlab/core/entity" "io" "io/fs" "os" "path" "path/filepath" - "runtime/debug" ) func OpenFile(fileName string) *os.File { file, err := os.OpenFile(fileName, os.O_CREATE|os.O_RDWR, os.ModePerm) if err != nil { - log.Errorf("create file error: %s, file_name: %s", err.Error(), fileName) - debug.PrintStack() + logger.Errorf("create file error: %s, file_name: %s", err.Error(), fileName) return nil } return file @@ -45,8 +42,7 @@ func IsDir(path string) bool { func ListDir(path string) ([]fs.FileInfo, error) { list, err := os.ReadDir(path) if err != nil { - log.Errorf(err.Error()) - debug.PrintStack() + logger.Errorf("read dir error: %v, path: %s", err, path) return nil, err } @@ -54,8 +50,7 @@ func ListDir(path string) ([]fs.FileInfo, error) { for _, item := range list { info, err := item.Info() if err != nil { - log.Errorf(err.Error()) - debug.PrintStack() + logger.Errorf("get file info error: %v, path: %s", err, item.Name()) return nil, err } res = append(res, info) diff --git a/core/utils/health.go b/core/utils/health.go index ce264e41..600d9d40 100644 --- a/core/utils/health.go +++ b/core/utils/health.go @@ -2,7 +2,6 @@ package utils import ( "fmt" - "github.com/apex/log" "net/http" ) @@ -10,8 +9,8 @@ func HandleHealthFn(healthFn func() bool, healthPort int) { addr := fmt.Sprintf(":%d", healthPort) go func() { if err := http.ListenAndServe(addr, nil); err != nil { - log.Errorf("health check server failed: %v", err) + logger.Errorf("health check server failed: %v", err) } }() - log.Infof("health check server started on port %d", healthPort) + logger.Infof("health check server started on port %d", healthPort) } diff --git a/core/utils/log.go b/core/utils/log.go index 2c7914f9..138805b1 100644 --- a/core/utils/log.go +++ b/core/utils/log.go @@ -2,21 +2,48 @@ package utils import ( "fmt" + "io" + "os" + "sync" + "time" + "github.com/apex/log" + "github.com/apex/log/handlers/json" + "github.com/apex/log/handlers/text" ) +var logger = NewLogger("Utils") + // ServiceLogger represents a logger with a specific service prefix. type ServiceLogger struct { prefix string } -// NewServiceLogger creates a new logger with the given service name as a prefix. -func NewServiceLogger(serviceName string) *ServiceLogger { - return &ServiceLogger{ - prefix: serviceName, +type ServiceLoggerOption func(logger *ServiceLogger) + +func WithHandler(handlerType string, output *os.File) ServiceLoggerOption { + return func(logger *ServiceLogger) { + SetHandler(handlerType, output) } } +// NewLogger creates a new logger with the given service name as a prefix. +func NewLogger(prefix string, opts ...ServiceLoggerOption) *ServiceLogger { + logger := &ServiceLogger{ + prefix: prefix, + } + + if len(opts) == 0 { + SetConsoleHandler() + } else { + for _, opt := range opts { + opt(logger) + } + } + + return logger +} + // Debug logs a debug message. func (l *ServiceLogger) Debug(message string) { log.Debug(l.getFormat(message)) @@ -68,5 +95,90 @@ func (l *ServiceLogger) Fatalf(format string, args ...interface{}) { } func (l *ServiceLogger) getFormat(format string) string { - return fmt.Sprintf("[%s] %s", l.prefix, format) + timestamp := time.Now().Local().Format("2006-01-02 15:04:05") + return fmt.Sprintf("[%s] [%s] %s", timestamp, l.prefix, format) +} + +type LogHandler struct { + mu sync.Mutex +} + +func handleLog(w io.Writer, e *log.Entry) error { + color := text.Colors[e.Level] + level := text.Strings[e.Level] + names := e.Fields.Names() + + fmt.Fprintf(w, "\033[%dm%6s\033[0m %-25s", color, level, e.Message) + + for _, name := range names { + fmt.Fprintf(w, " \033[%dm%s\033[0m=%v", color, name, e.Fields.Get(name)) + } + + fmt.Fprintln(w) + + return nil +} + +// MultiHandler is a handler that routes logs to stdout/stderr based on level +type MultiHandler struct { + mu sync.Mutex +} + +// NewMultiHandler creates a handler that routes logs to stdout/stderr based on level +func NewMultiHandler() *MultiHandler { + return &MultiHandler{ + mu: sync.Mutex{}, + } +} + +// HandleLog implements log.Handler interface +func (h *MultiHandler) HandleLog(e *log.Entry) error { + h.mu.Lock() + defer h.mu.Unlock() + + // Route to stderr for warn, error, and fatal + if e.Level <= log.WarnLevel { + return handleLog(os.Stdout, e) + } + // Route to stdout for debug and info + return handleLog(os.Stderr, e) +} + +type ConsoleHandler struct { + mu sync.Mutex +} + +func NewConsoleHandler() *ConsoleHandler { + return &ConsoleHandler{ + mu: sync.Mutex{}, + } +} + +func (h *ConsoleHandler) HandleLog(e *log.Entry) error { + h.mu.Lock() + defer h.mu.Unlock() + + return handleLog(os.Stdout, e) +} + +// SetHandler to include the new option +func SetHandler(handlerType string, output *os.File) { + switch handlerType { + case "json": + log.SetHandler(json.New(output)) + case "text": + log.SetHandler(text.New(output)) + case "split": + SetMultiHandler() + default: + SetConsoleHandler() + } +} + +func SetMultiHandler() { + log.SetHandler(NewMultiHandler()) +} + +func SetConsoleHandler() { + log.SetHandler(NewConsoleHandler()) } diff --git a/core/utils/process.go b/core/utils/process.go index c2967ebe..85b19aef 100644 --- a/core/utils/process.go +++ b/core/utils/process.go @@ -2,7 +2,6 @@ package utils import ( "errors" - "github.com/apex/log" "github.com/shirou/gopsutil/process" "os/exec" "runtime" @@ -28,7 +27,7 @@ func ProcessIdExists(pid int) (exists bool) { func processIdExistsWindows(pid int) (exists bool) { exists, err := process.PidExists(int32(pid)) if err != nil { - log.Errorf("error checking if process exists: %v", err) + logger.Errorf("error checking if process exists: %v", err) } return exists } @@ -36,7 +35,7 @@ func processIdExistsWindows(pid int) (exists bool) { func processIdExistsLinuxMac(pid int) (exists bool) { exists, err := process.PidExists(int32(pid)) if err != nil { - log.Errorf("error checking if process exists: %v", err) + logger.Errorf("error checking if process exists: %v", err) } return exists } @@ -44,7 +43,7 @@ func processIdExistsLinuxMac(pid int) (exists bool) { func GetProcesses() (processes []*process.Process, err error) { processes, err = process.Processes() if err != nil { - log.Errorf("error getting processes: %v", err) + logger.Errorf("error getting processes: %v", err) return nil, err } return processes, nil @@ -58,7 +57,7 @@ func KillProcess(cmd *exec.Cmd, force bool) error { // process p, err := process.NewProcess(int32(cmd.Process.Pid)) if err != nil { - log.Errorf("failed to get process: %v", err) + logger.Errorf("failed to get process: %v", err) return err } @@ -71,7 +70,7 @@ func killProcessRecursive(p *process.Process, force bool) (err error) { cps, err := p.Children() if err != nil { if !errors.Is(err, process.ErrorNoChildren) { - log.Errorf("failed to get children processes: %v", err) + logger.Errorf("failed to get children processes: %v", err) } else if errors.Is(err, process.ErrorProcessNotRunning) { return nil } @@ -95,7 +94,7 @@ func killProcess(p *process.Process, force bool) (err error) { err = p.Terminate() } if err != nil { - log.Errorf("failed to kill process (force: %v): %v", force, err) + logger.Errorf("failed to kill process (force: %v): %v", force, err) return err } return nil diff --git a/core/utils/time.go b/core/utils/time.go index 1ddcbdcf..afdbe20b 100644 --- a/core/utils/time.go +++ b/core/utils/time.go @@ -2,7 +2,6 @@ package utils import ( "errors" - "github.com/apex/log" "github.com/crawlab-team/crawlab/trace" "regexp" "strconv" @@ -27,13 +26,13 @@ func GetTimeUnitParts(timeUnit string) (num int, unit string, err error) { groups := re.FindStringSubmatch(timeUnit) if len(groups) < 3 { err = errors.New("failed to parse duration text") - log.Errorf("failed to parse duration text: %v", err) + logger.Errorf("failed to parse duration text: %v", err) trace.PrintError(err) return 0, "", err } num, err = strconv.Atoi(groups[1]) if err != nil { - log.Errorf("failed to convert string to int: %v", err) + logger.Errorf("failed to convert string to int: %v", err) trace.PrintError(err) return 0, "", err } @@ -44,7 +43,7 @@ func GetTimeUnitParts(timeUnit string) (num int, unit string, err error) { func GetTimeDuration(num string, unit string) (d time.Duration, err error) { numInt, err := strconv.Atoi(num) if err != nil { - log.Errorf("failed to convert string to int: %v", err) + logger.Errorf("failed to convert string to int: %v", err) trace.PrintError(err) return d, err } @@ -65,7 +64,7 @@ func GetTimeDuration(num string, unit string) (d time.Duration, err error) { d = time.Duration(numInt) * 365 * 24 * time.Hour default: err = errors.New("invalid time unit") - log.Errorf("invalid time unit: %v", unit) + logger.Errorf("invalid time unit: %v", unit) trace.PrintError(err) return d, err }