refactor: replace apex/log with structured logger across multiple services

- Replaced all instances of apex/log with a structured logger interface in various services, including Api, Server, Config, and others, to enhance logging consistency and context.
- Updated logging calls to utilize the new logger methods, improving error tracking and service monitoring.
- Added logger initialization in services and controllers to ensure proper logging setup.
- Improved error handling and logging messages for better clarity during service operations.
- Removed unused apex/log imports and cleaned up related code for better maintainability.
This commit is contained in:
Marvin Zhang
2024-12-24 19:11:19 +08:00
parent 67165f5e3f
commit 3276083994
46 changed files with 617 additions and 490 deletions

View File

@@ -3,8 +3,8 @@ package apps
import (
"context"
"errors"
"github.com/apex/log"
"github.com/crawlab-team/crawlab/core/controllers"
"github.com/crawlab-team/crawlab/core/interfaces"
"github.com/crawlab-team/crawlab/core/middlewares"
"github.com/crawlab-team/crawlab/core/utils"
"github.com/gin-gonic/gin"
@@ -25,6 +25,7 @@ type Api struct {
ln net.Listener
srv *http.Server
initialized bool
interfaces.Logger
}
func (app *Api) Init() {
@@ -59,14 +60,14 @@ func (app *Api) Start() {
if err != nil {
panic(err)
}
log.Infof("api server listening on %s", address)
app.Infof("api server listening on %s", address)
// serve
if err := http.Serve(app.ln, app.app); err != nil {
if !errors.Is(err, http.ErrServerClosed) {
log.Errorf("run api server error: %v", err)
app.Errorf("run api server error: %v", err)
} else {
log.Info("api server graceful down")
app.Info("api server graceful down")
}
}
}
@@ -80,7 +81,7 @@ func (app *Api) Stop() {
defer cancel()
if err := app.srv.Shutdown(ctx); err != nil {
log.Errorf("shutdown api server error: %v", err)
app.Errorf("shutdown api server error: %v", err)
}
}
@@ -100,7 +101,8 @@ func (app *Api) initModuleWithApp(name string, fn func(app *gin.Engine) error) (
func newApi() *Api {
api := &Api{
app: gin.New(),
app: gin.New(),
Logger: utils.NewLogger("Api"),
}
api.Init()
return api

View File

@@ -2,7 +2,6 @@ package apps
import (
"fmt"
"github.com/apex/log"
"github.com/crawlab-team/crawlab/core/interfaces"
"github.com/crawlab-team/crawlab/core/node/service"
"github.com/crawlab-team/crawlab/core/utils"
@@ -16,6 +15,9 @@ type Server struct {
// modules
nodeSvc interfaces.NodeService
api *Api
// internals
interfaces.Logger
}
func (app *Server) Init() {
@@ -53,7 +55,7 @@ func (app *Server) GetNodeService() interfaces.NodeService {
}
func (app *Server) logNodeInfo() {
log.Infof("current node type: %s", utils.GetNodeType())
app.Infof("current node type: %s", utils.GetNodeType())
}
func (app *Server) initPprof() {
@@ -66,7 +68,9 @@ func (app *Server) initPprof() {
func newServer() App {
// server
svr := &Server{}
svr := &Server{
Logger: utils.NewLogger("Server"),
}
// master modules
if utils.IsMaster() {

View File

@@ -1,11 +1,11 @@
package apps
import (
"fmt"
"github.com/apex/log"
"github.com/crawlab-team/crawlab/trace"
"github.com/crawlab-team/crawlab/core/utils"
)
var utilsLogger = utils.NewLogger("AppsUtils")
func Start(app App) {
start(app)
}
@@ -19,10 +19,9 @@ func start(app App) {
func initModule(name string, fn func() error) (err error) {
if err := fn(); err != nil {
log.Error(fmt.Sprintf("init %s error: %s", name, err.Error()))
_ = trace.TraceError(err)
utilsLogger.Errorf("init %s error: %v", name, err)
panic(err)
}
log.Info(fmt.Sprintf("initialized %s successfully", name))
utilsLogger.Infof("initialized %s successfully", name)
return nil
}

View File

@@ -2,6 +2,8 @@ package config
import (
"errors"
"github.com/crawlab-team/crawlab/core/interfaces"
"github.com/crawlab-team/crawlab/core/utils"
"strings"
"sync"
@@ -12,6 +14,7 @@ import (
type Config struct {
Name string
interfaces.Logger
}
func (c *Config) Init() {
@@ -43,7 +46,7 @@ func (c *Config) Init() {
if err := viper.ReadInConfig(); err != nil {
var configFileNotFoundError viper.ConfigFileNotFoundError
if errors.As(err, &configFileNotFoundError) {
log.Warn("No config file found. Using default values.")
c.Warn("No config file found. Using default values.")
}
}
@@ -54,7 +57,7 @@ func (c *Config) Init() {
func (c *Config) WatchConfig() {
viper.WatchConfig()
viper.OnConfigChange(func(e fsnotify.Event) {
log.Infof("Config file changed: %s", e.Name)
c.Infof("Config file changed: %s", e.Name)
})
}
@@ -78,7 +81,9 @@ func (c *Config) initLogLevel() {
}
func newConfig() *Config {
return &Config{}
return &Config{
Logger: utils.NewLogger("Config"),
}
}
var _config *Config

View File

@@ -3,7 +3,6 @@ package controllers
import (
"errors"
"fmt"
log2 "github.com/apex/log"
"github.com/crawlab-team/crawlab/core/fs"
"github.com/crawlab-team/crawlab/core/interfaces"
"github.com/crawlab-team/crawlab/core/utils"
@@ -138,28 +137,28 @@ func PostBaseFileSaveFiles(rootPath string, c *gin.Context) {
go func(path string) {
file, err := c.FormFile(path)
if err != nil {
log2.Warnf("invalid file header: %s", path)
log2.Error(err.Error())
logger.Warnf("invalid file header: %s", path)
logger.Error(err.Error())
wg.Done()
return
}
f, err := file.Open()
if err != nil {
log2.Warnf("unable to open file: %s", path)
log2.Error(err.Error())
logger.Warnf("unable to open file: %s", path)
logger.Error(err.Error())
wg.Done()
return
}
fileData, err := io.ReadAll(f)
if err != nil {
log2.Warnf("unable to read file: %s", path)
log2.Error(err.Error())
logger.Warnf("unable to read file: %s", path)
logger.Error(err.Error())
wg.Done()
return
}
if err := fsSvc.Save(path, fileData); err != nil {
log2.Warnf("unable to save file: %s", path)
log2.Error(err.Error())
logger.Warnf("unable to save file: %s", path)
logger.Error(err.Error())
wg.Done()
return
}

View File

@@ -9,7 +9,6 @@ import (
"path/filepath"
"sync"
"github.com/apex/log"
"github.com/crawlab-team/crawlab/core/fs"
"github.com/crawlab-team/crawlab/core/interfaces"
"github.com/crawlab-team/crawlab/core/models/service"
@@ -17,7 +16,6 @@ import (
"github.com/crawlab-team/crawlab/core/utils"
"github.com/crawlab-team/crawlab/db/generic"
"github.com/crawlab-team/crawlab/db/mongo"
"github.com/crawlab-team/crawlab/trace"
"github.com/gin-gonic/gin"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
@@ -398,7 +396,7 @@ func DeleteSpiderById(c *gin.Context) {
// delete task logs
logPath := filepath.Join(utils.GetTaskLogPath(), id)
if err := os.RemoveAll(logPath); err != nil {
log.Warnf("failed to remove task log directory: %s", logPath)
logger.Warnf("failed to remove task log directory: %s", logPath)
}
wg.Done()
}(id.Hex())
@@ -416,12 +414,12 @@ func DeleteSpiderById(c *gin.Context) {
// delete spider directory
fsSvc, err := getSpiderFsSvcById(id)
if err != nil {
log.Errorf("failed to get spider fs service: %s", err.Error())
logger.Errorf("failed to get spider fs service: %v", err)
return
}
err = fsSvc.Delete(".")
if err != nil {
log.Errorf("failed to delete spider directory: %s", err.Error())
logger.Errorf("failed to delete spider directory: %v", err)
return
}
}()
@@ -503,7 +501,7 @@ func DeleteSpiderList(c *gin.Context) {
// delete task logs
logPath := filepath.Join(utils.GetTaskLogPath(), id)
if err := os.RemoveAll(logPath); err != nil {
log.Warnf("failed to remove task log directory: %s", logPath)
logger.Warnf("failed to remove task log directory: %s", logPath)
}
wg.Done()
}(id.Hex())
@@ -532,14 +530,12 @@ func DeleteSpiderList(c *gin.Context) {
// Delete spider directory
fsSvc, err := getSpiderFsSvcById(s.Id)
if err != nil {
log.Errorf("failed to get spider fs service: %s", err.Error())
trace.PrintError(err)
logger.Errorf("failed to get spider fs service: %v", err)
return
}
err = fsSvc.Delete(".")
if err != nil {
log.Errorf("failed to delete spider directory: %s", err.Error())
trace.PrintError(err)
logger.Errorf("failed to delete spider directory: %v", err)
return
}
}(&spiders[i])
@@ -719,8 +715,7 @@ func getSpiderFsSvc(s *models.Spider) (svc interfaces.FsService, err error) {
func getSpiderFsSvcById(id primitive.ObjectID) (svc interfaces.FsService, err error) {
s, err := service.NewModelService[models.Spider]().GetById(id)
if err != nil {
log.Errorf("failed to get spider: %s", err.Error())
trace.PrintError(err)
logger.Errorf("failed to get spider: %v", err)
return nil, err
}
return getSpiderFsSvc(s)

View File

@@ -2,7 +2,6 @@ package controllers
import (
"errors"
log2 "github.com/apex/log"
"github.com/crawlab-team/crawlab/core/constants"
"github.com/crawlab-team/crawlab/core/interfaces"
"github.com/crawlab-team/crawlab/core/models/models"
@@ -191,12 +190,12 @@ func DeleteTaskById(c *gin.Context) {
// delete task stat
_, err = service.NewModelService[models.TaskStat]().GetById(id)
if err != nil {
log2.Warnf("delete task stat error: %s", err.Error())
logger.Warnf("delete task stat error: %s", err.Error())
return nil
}
err = service.NewModelService[models.TaskStat]().DeleteById(id)
if err != nil {
log2.Warnf("delete task stat error: %s", err.Error())
logger.Warnf("delete task stat error: %s", err.Error())
return nil
}
@@ -209,7 +208,7 @@ func DeleteTaskById(c *gin.Context) {
// delete task logs
logPath := filepath.Join(utils.GetTaskLogPath(), id.Hex())
if err := os.RemoveAll(logPath); err != nil {
log2.Warnf("failed to remove task log directory: %s", logPath)
logger.Warnf("failed to remove task log directory: %s", logPath)
}
HandleSuccess(c)
@@ -240,7 +239,7 @@ func DeleteList(c *gin.Context) {
"$in": payload.Ids,
},
}); err != nil {
log2.Warnf("delete task stat error: %s", err.Error())
logger.Warnf("delete task stat error: %s", err.Error())
return nil
}
@@ -258,7 +257,7 @@ func DeleteList(c *gin.Context) {
// delete task logs
logPath := filepath.Join(utils.GetTaskLogPath(), id)
if err := os.RemoveAll(logPath); err != nil {
log2.Warnf("failed to remove task log directory: %s", logPath)
logger.Warnf("failed to remove task log directory: %s", logPath)
}
wg.Done()
}(id.Hex())

View File

@@ -0,0 +1,5 @@
package controllers
import "github.com/crawlab-team/crawlab/core/utils"
var logger = utils.NewLogger("Controllers")

View File

@@ -1,8 +1,6 @@
package controllers
import (
"github.com/apex/log"
"github.com/crawlab-team/crawlab/trace"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"io"
@@ -16,7 +14,7 @@ type WsWriter struct {
}
func (w *WsWriter) Write(data []byte) (n int, err error) {
log.Infof("websocket write: %s", string(data))
logger.Infof("websocket write: %s", string(data))
err = w.conn.WriteMessage(websocket.TextMessage, data)
if err != nil {
return 0, err
@@ -47,8 +45,7 @@ func NewWsWriter(c *gin.Context) (writer *WsWriter, err error) {
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
log.Errorf("websocket open connection error: %v", err)
trace.PrintError(err)
logger.Errorf("websocket open connection error: %v", err)
}
return &WsWriter{

View File

@@ -6,13 +6,11 @@ import (
"errors"
"fmt"
"github.com/ReneKroon/ttlcache"
"github.com/apex/log"
"github.com/crawlab-team/crawlab/core/constants"
"github.com/crawlab-team/crawlab/core/entity"
"github.com/crawlab-team/crawlab/core/interfaces"
"github.com/crawlab-team/crawlab/core/utils"
"github.com/crawlab-team/crawlab/db/mongo"
"github.com/crawlab-team/crawlab/trace"
"github.com/hashicorp/go-uuid"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
@@ -26,12 +24,14 @@ import (
type CsvService struct {
cache *ttlcache.Cache
interfaces.Logger
}
func (svc *CsvService) GenerateId() (exportId string, err error) {
exportId, err = uuid.GenerateUUID()
if err != nil {
return "", trace.TraceError(err)
svc.Errorf("failed to generate export id: %v", err)
return "", err
}
return exportId, nil
}
@@ -69,7 +69,8 @@ func (svc *CsvService) GetExport(exportId string) (export interfaces.Export, err
// get export from cache
res, ok := svc.cache.Get(exportId)
if !ok {
return nil, trace.TraceError(errors.New("export not found"))
svc.Errorf("export not found: %s", exportId)
return nil, err
}
export = res.(interfaces.Export)
return export, nil
@@ -81,8 +82,7 @@ func (svc *CsvService) export(export *entity.Export) {
err := errors.New("empty target")
export.Status = constants.TaskStatusError
export.EndTs = time.Now()
log.Errorf("export error (id: %s): %v", export.Id, err)
trace.PrintError(err)
svc.Errorf("export error (id: %s): %v", export.Id, err)
svc.cache.Set(export.Id, export)
return
}
@@ -95,8 +95,7 @@ func (svc *CsvService) export(export *entity.Export) {
if err != nil {
export.Status = constants.TaskStatusError
export.EndTs = time.Now()
log.Errorf("export error (id: %s): %v", export.Id, err)
trace.PrintError(err)
svc.Errorf("export error (id: %s): %v", export.Id, err)
svc.cache.Set(export.Id, export)
return
}
@@ -113,8 +112,7 @@ func (svc *CsvService) export(export *entity.Export) {
if err != nil {
export.Status = constants.TaskStatusError
export.EndTs = time.Now()
log.Errorf("export error (id: %s): %v", export.Id, err)
trace.PrintError(err)
svc.Errorf("export error (id: %s): %v", export.Id, err)
svc.cache.Set(export.Id, export)
return
}
@@ -123,7 +121,7 @@ func (svc *CsvService) export(export *entity.Export) {
bom := []byte{0xEF, 0xBB, 0xBF}
_, err = csvFile.Write(bom)
if err != nil {
trace.PrintError(err)
svc.Errorf("export error (id: %s): %v", export.Id, err)
return
}
@@ -133,8 +131,7 @@ func (svc *CsvService) export(export *entity.Export) {
if err != nil {
export.Status = constants.TaskStatusError
export.EndTs = time.Now()
log.Errorf("export error (id: %s): %v", export.Id, err)
trace.PrintError(err)
svc.Errorf("export error (id: %s): %v", export.Id, err)
svc.cache.Set(export.Id, export)
return
}
@@ -149,17 +146,16 @@ func (svc *CsvService) export(export *entity.Export) {
// check error
err := cur.Err()
if err != nil {
if err != mongo2.ErrNoDocuments {
if !errors.Is(err, mongo2.ErrNoDocuments) {
// error
export.Status = constants.TaskStatusError
export.EndTs = time.Now()
log.Errorf("export error (id: %s): %v", export.Id, err)
trace.PrintError(err)
svc.Errorf("export error (id: %s): %v", export.Id, err)
} else {
// no more data
export.Status = constants.TaskStatusFinished
export.EndTs = time.Now()
log.Infof("export finished (id: %s)", export.Id)
svc.Infof("export finished (id: %s)", export.Id)
}
svc.cache.Set(export.Id, export)
return
@@ -170,7 +166,7 @@ func (svc *CsvService) export(export *entity.Export) {
// no more data
export.Status = constants.TaskStatusFinished
export.EndTs = time.Now()
log.Infof("export finished (id: %s)", export.Id)
svc.Infof("export finished (id: %s)", export.Id)
svc.cache.Set(export.Id, export)
return
}
@@ -182,8 +178,7 @@ func (svc *CsvService) export(export *entity.Export) {
// error
export.Status = constants.TaskStatusError
export.EndTs = time.Now()
log.Errorf("export error (id: %s): %v", export.Id, err)
trace.PrintError(err)
svc.Errorf("export error (id: %s): %v", export.Id, err)
svc.cache.Set(export.Id, export)
return
}
@@ -195,8 +190,7 @@ func (svc *CsvService) export(export *entity.Export) {
// error
export.Status = constants.TaskStatusError
export.EndTs = time.Now()
log.Errorf("export error (id: %s): %v", export.Id, err)
trace.PrintError(err)
svc.Errorf("export error (id: %s): %v", export.Id, err)
svc.cache.Set(export.Id, export)
return
}
@@ -240,7 +234,8 @@ func (svc *CsvService) getCsvWriter(export *entity.Export) (csvWriter *csv.Write
// open file
csvFile, err = os.Create(export.DownloadPath)
if err != nil {
return nil, nil, trace.TraceError(err)
svc.Errorf("failed to create csv file: %v", err)
return nil, nil, err
}
// create csv writer
@@ -256,7 +251,8 @@ func (svc *CsvService) getColumns(query bson.M, export interfaces.Export) (colum
// get 10 records
var data []bson.M
if err := col.Find(query, &mongo.FindOptions{Limit: 10}).All(&data); err != nil {
return nil, trace.TraceError(err)
svc.Errorf("failed to get columns: %v", err)
return nil, err
}
// columns set
@@ -329,7 +325,8 @@ func NewCsvService() (svc2 interfaces.ExportService) {
cache := ttlcache.NewCache()
cache.SetTTL(time.Minute * 5)
svc := &CsvService{
cache: cache,
cache: cache,
Logger: utils.NewLogger("CsvService"),
}
return svc
}

View File

@@ -5,13 +5,11 @@ import (
"encoding/json"
"errors"
"github.com/ReneKroon/ttlcache"
"github.com/apex/log"
"github.com/crawlab-team/crawlab/core/constants"
"github.com/crawlab-team/crawlab/core/entity"
"github.com/crawlab-team/crawlab/core/interfaces"
"github.com/crawlab-team/crawlab/core/utils"
"github.com/crawlab-team/crawlab/db/mongo"
"github.com/crawlab-team/crawlab/trace"
"github.com/hashicorp/go-uuid"
mongo2 "go.mongodb.org/mongo-driver/mongo"
"os"
@@ -21,12 +19,14 @@ import (
type JsonService struct {
cache *ttlcache.Cache
interfaces.Logger
}
func (svc *JsonService) GenerateId() (exportId string, err error) {
exportId, err = uuid.GenerateUUID()
if err != nil {
return "", trace.TraceError(err)
svc.Errorf("failed to generate export id: %v", err)
return "", err
}
return exportId, nil
}
@@ -64,7 +64,8 @@ func (svc *JsonService) GetExport(exportId string) (export interfaces.Export, er
// get export from cache
res, ok := svc.cache.Get(exportId)
if !ok {
return nil, trace.TraceError(errors.New("export not found"))
svc.Errorf("export not found (id: %s)", exportId)
return nil, err
}
export = res.(interfaces.Export)
return export, nil
@@ -76,8 +77,7 @@ func (svc *JsonService) export(export *entity.Export) {
err := errors.New("empty target")
export.Status = constants.TaskStatusError
export.EndTs = time.Now()
log.Errorf("export error (id: %s): %v", export.Id, err)
trace.PrintError(err)
svc.Errorf("export error (id: %s): %v", export.Id, err)
svc.cache.Set(export.Id, export)
return
}
@@ -90,8 +90,7 @@ func (svc *JsonService) export(export *entity.Export) {
if err != nil {
export.Status = constants.TaskStatusError
export.EndTs = time.Now()
log.Errorf("export error (id: %s): %v", export.Id, err)
trace.PrintError(err)
svc.Errorf("export error (id: %s): %v", export.Id, err)
svc.cache.Set(export.Id, export)
return
}
@@ -111,17 +110,17 @@ func (svc *JsonService) export(export *entity.Export) {
// check error
err := cur.Err()
if err != nil {
if err != mongo2.ErrNoDocuments {
if !errors.Is(err, mongo2.ErrNoDocuments) {
// error
export.Status = constants.TaskStatusError
export.EndTs = time.Now()
log.Errorf("export error (id: %s): %v", export.Id, err)
trace.PrintError(err)
svc.Errorf("export error (id: %s): %v", export.Id, err)
} else {
// no more data
export.Status = constants.TaskStatusFinished
export.EndTs = time.Now()
log.Infof("export finished (id: %s)", export.Id)
svc.Infof("export finished (id: %s)", export.Id)
}
svc.cache.Set(export.Id, export)
return
@@ -132,7 +131,7 @@ func (svc *JsonService) export(export *entity.Export) {
// no more data
export.Status = constants.TaskStatusFinished
export.EndTs = time.Now()
log.Infof("export finished (id: %s)", export.Id)
svc.Infof("export finished (id: %s)", export.Id)
svc.cache.Set(export.Id, export)
break
}
@@ -144,8 +143,7 @@ func (svc *JsonService) export(export *entity.Export) {
// error
export.Status = constants.TaskStatusError
export.EndTs = time.Now()
log.Errorf("export error (id: %s): %v", export.Id, err)
trace.PrintError(err)
svc.Errorf("export error (id: %s): %v", export.Id, err)
svc.cache.Set(export.Id, export)
return
}
@@ -158,8 +156,7 @@ func (svc *JsonService) export(export *entity.Export) {
// error
export.Status = constants.TaskStatusError
export.EndTs = time.Now()
log.Errorf("export error (id: %s): %v", export.Id, err)
trace.PrintError(err)
svc.Errorf("export error (id: %s): %v", export.Id, err)
svc.cache.Set(export.Id, export)
return
}
@@ -170,8 +167,7 @@ func (svc *JsonService) export(export *entity.Export) {
// error
export.Status = constants.TaskStatusError
export.EndTs = time.Now()
log.Errorf("export error (id: %s): %v", export.Id, err)
trace.PrintError(err)
svc.Errorf("export error (id: %s): %v", export.Id, err)
svc.cache.Set(export.Id, export)
return
}
@@ -208,7 +204,8 @@ func NewJsonService() (svc2 interfaces.ExportService) {
cache := ttlcache.NewCache()
cache.SetTTL(time.Minute * 5)
svc := &JsonService{
cache: cache,
cache: cache,
Logger: utils.NewLogger("JsonService"),
}
return svc
}

View File

@@ -1,25 +0,0 @@
package fs
import (
"github.com/apex/log"
"github.com/crawlab-team/crawlab/core/utils"
"github.com/mitchellh/go-homedir"
"github.com/spf13/viper"
"path/filepath"
)
func init() {
rootDir, err := homedir.Dir()
if err != nil {
log.Warnf("cannot find home directory: %v", err)
return
}
DefaultWorkspacePath = filepath.Join(rootDir, "crawlab_workspace")
workspacePath := utils.GetWorkspace()
if workspacePath == "" {
viper.Set("workspace", DefaultWorkspacePath)
}
}
var DefaultWorkspacePath string

View File

@@ -4,7 +4,6 @@ import (
"github.com/crawlab-team/crawlab/core/entity"
"github.com/crawlab-team/crawlab/core/interfaces"
"github.com/crawlab-team/crawlab/core/utils"
"github.com/crawlab-team/crawlab/trace"
"github.com/google/uuid"
"io"
"os"
@@ -15,6 +14,9 @@ type Service struct {
// settings
rootPath string
skipNames []string
// internals
interfaces.Logger
}
func (svc *Service) List(path string) (files []interfaces.FsFileInfo, err error) {
@@ -31,11 +33,13 @@ func (svc *Service) List(path string) (files []interfaces.FsFileInfo, err error)
// Use filepath.Walk to recursively traverse directories
err = filepath.Walk(fullPath, func(p string, info os.FileInfo, err error) error {
if err != nil {
svc.Errorf("failed to walk path: %v", err)
return err
}
relPath, err := filepath.Rel(svc.rootPath, p)
if err != nil {
svc.Errorf("failed to get relative path: %v", err)
return err
}
@@ -85,6 +89,7 @@ func (svc *Service) GetFile(path string) (data []byte, err error) {
func (svc *Service) GetFileInfo(path string) (file interfaces.FsFileInfo, err error) {
f, err := os.Stat(filepath.Join(svc.rootPath, path))
if err != nil {
svc.Errorf("failed to get file info: %v", err)
return nil, err
}
return &entity.FsFileInfo{
@@ -105,6 +110,7 @@ func (svc *Service) Save(path string, data []byte) (err error) {
dir := filepath.Dir(filepath.Join(svc.rootPath, path))
if _, err := os.Stat(dir); os.IsNotExist(err) {
if err := os.MkdirAll(dir, 0755); err != nil {
svc.Errorf("failed to create directory: %v", err)
return err
}
}
@@ -135,6 +141,7 @@ func (svc *Service) Copy(path, newPath string) (err error) {
// Get source info
srcInfo, err := os.Stat(srcPath)
if err != nil {
svc.Errorf("failed to get source info: %v", err)
return err
}
@@ -142,19 +149,25 @@ func (svc *Service) Copy(path, newPath string) (err error) {
if !srcInfo.IsDir() {
srcFile, err := os.Open(srcPath)
if err != nil {
svc.Errorf("failed to open source file: %v", err)
return err
}
defer srcFile.Close()
destFile, err := os.Create(destPath)
if err != nil {
svc.Errorf("failed to create destination file: %v", err)
return err
}
defer destFile.Close()
_, err = io.Copy(destFile, srcFile)
if err != nil {
svc.Errorf("failed to copy file: %v", err)
return err
}
return err
return nil
} else {
// If source is directory, copy it recursively
return utils.CopyDir(srcPath, destPath)
@@ -164,7 +177,8 @@ func (svc *Service) Copy(path, newPath string) (err error) {
func (svc *Service) Export() (resultPath string, err error) {
zipFilePath := filepath.Join(os.TempDir(), uuid.New().String()+".zip")
if err := utils.ZipDirectory(svc.rootPath, zipFilePath); err != nil {
return "", trace.TraceError(err)
svc.Errorf("failed to zip directory: %v", err)
return "", err
}
return zipFilePath, nil
@@ -174,5 +188,6 @@ func NewFsService(path string) (svc interfaces.FsService) {
return &Service{
rootPath: path,
skipNames: []string{".git"},
Logger: utils.NewLogger("FsService"),
}
}

View File

@@ -30,7 +30,7 @@ type GrpcClient struct {
once sync.Once
stopped bool
stop chan struct{}
logger interfaces.Logger
interfaces.Logger
// clients
NodeClient grpc2.NodeServiceClient
@@ -56,7 +56,7 @@ func (c *GrpcClient) Start() {
// connect
err := c.connect()
if err != nil {
c.logger.Fatalf("failed to connect: %v", err)
c.Fatalf("failed to connect: %v", err)
return
}
@@ -69,7 +69,7 @@ func (c *GrpcClient) Stop() (err error) {
// set stopped flag
c.stopped = true
c.stop <- struct{}{}
c.logger.Infof("stopped")
c.Infof("stopped")
// skip if connection is nil
if c.conn == nil {
@@ -78,10 +78,10 @@ func (c *GrpcClient) Stop() (err error) {
// close connection
if err := c.conn.Close(); err != nil {
c.logger.Errorf("failed to close connection: %v", err)
c.Errorf("failed to close connection: %v", err)
return err
}
c.logger.Infof("disconnected from %s", c.address)
c.Infof("disconnected from %s", c.address)
return nil
}
@@ -93,11 +93,11 @@ func (c *GrpcClient) WaitForReady() {
select {
case <-ticker.C:
if c.IsReady() {
c.logger.Debugf("client is now ready")
c.Debugf("client is now ready")
return
}
case <-c.stop:
c.logger.Errorf("client has stopped")
c.Errorf("client has stopped")
}
}
}
@@ -142,7 +142,7 @@ func (c *GrpcClient) monitorState() {
if previous != current {
c.setState(current)
c.logger.Infof("state changed from %s to %s", previous, current)
c.Infof("state changed from %s to %s", previous, current)
// Trigger reconnect if connection is lost or becomes idle from ready state
if current == connectivity.TransientFailure ||
@@ -150,7 +150,7 @@ func (c *GrpcClient) monitorState() {
(previous == connectivity.Ready && current == connectivity.Idle) {
select {
case c.reconnect <- struct{}{}:
c.logger.Infof("triggering reconnection due to state change to %s", current)
c.Infof("triggering reconnection due to state change to %s", current)
default:
}
}
@@ -182,9 +182,9 @@ func (c *GrpcClient) connect() (err error) {
return
case <-c.reconnect:
if !c.stopped {
c.logger.Infof("attempting to reconnect to %s", c.address)
c.Infof("attempting to reconnect to %s", c.address)
if err := c.doConnect(); err != nil {
c.logger.Errorf("reconnection failed: %v", err)
c.Errorf("reconnection failed: %v", err)
}
}
}
@@ -206,12 +206,12 @@ func (c *GrpcClient) doConnect() (err error) {
// create new client connection
c.conn, err = grpc.NewClient(c.address, opts...)
if err != nil {
c.logger.Errorf("failed to connect to %s: %v", c.address, err)
c.Errorf("failed to connect to %s: %v", c.address, err)
return err
}
// connect
c.logger.Infof("connecting to %s", c.address)
c.Infof("connecting to %s", c.address)
c.conn.Connect()
// wait for connection to be ready
@@ -223,7 +223,7 @@ func (c *GrpcClient) doConnect() (err error) {
}
// success
c.logger.Infof("connected to %s", c.address)
c.Infof("connected to %s", c.address)
return nil
}
@@ -231,7 +231,7 @@ func (c *GrpcClient) doConnect() (err error) {
b.InitialInterval = 5 * time.Second
b.MaxElapsedTime = 10 * time.Minute
n := func(err error, duration time.Duration) {
c.logger.Errorf("failed to connect to %s: %v, retrying in %s", c.address, err, duration)
c.Errorf("failed to connect to %s: %v, retrying in %s", c.address, err, duration)
}
return backoff.RetryNotify(op, b, n)
}
@@ -241,7 +241,7 @@ func newGrpcClient() (c *GrpcClient) {
address: utils.GetGrpcAddress(),
timeout: 10 * time.Second,
stop: make(chan struct{}),
logger: utils.NewServiceLogger("GrpcClient"),
Logger: utils.NewLogger("GrpcClient"),
state: connectivity.Idle,
}
}

View File

@@ -5,11 +5,12 @@ import (
"errors"
"fmt"
"github.com/cenkalti/backoff/v4"
"github.com/crawlab-team/crawlab/core/interfaces"
"github.com/crawlab-team/crawlab/core/utils"
"io"
"sync"
"time"
"github.com/apex/log"
"github.com/crawlab-team/crawlab/core/constants"
"github.com/crawlab-team/crawlab/core/models/models"
"github.com/crawlab-team/crawlab/core/models/service"
@@ -24,17 +25,18 @@ type DependencyServiceServer struct {
grpc.UnimplementedDependencyServiceServer
mu *sync.Mutex
streams map[string]*grpc.DependencyService_ConnectServer
interfaces.Logger
}
func (svr DependencyServiceServer) Connect(req *grpc.DependencyServiceConnectRequest, stream grpc.DependencyService_ConnectServer) (err error) {
svr.mu.Lock()
svr.streams[req.NodeKey] = &stream
svr.mu.Unlock()
log.Info("[DependencyServiceServer] connected: " + req.NodeKey)
svr.Info("[DependencyServiceServer] connected: " + req.NodeKey)
// Keep this scope alive because once this scope exits - the stream is closed
<-stream.Context().Done()
log.Info("[DependencyServiceServer] disconnected: " + req.NodeKey)
svr.Info("[DependencyServiceServer] disconnected: " + req.NodeKey)
return nil
}
@@ -54,7 +56,7 @@ func (svr DependencyServiceServer) Sync(_ context.Context, request *grpc.Depende
}, nil)
if err != nil {
if !errors.Is(err, mongo.ErrNoDocuments) {
log.Errorf("[DependencyService] get dependencies from db error: %v", err)
svr.Errorf("[DependencyService] get dependencies from db error: %v", err)
return nil, err
}
}
@@ -117,7 +119,7 @@ func (svr DependencyServiceServer) Sync(_ context.Context, request *grpc.Depende
"_id": bson.M{"$in": depIdsToDelete},
})
if err != nil {
log.Errorf("[DependencyServiceServer] delete dependencies in db error: %v", err)
svr.Errorf("[DependencyServiceServer] delete dependencies in db error: %v", err)
return err
}
}
@@ -126,7 +128,7 @@ func (svr DependencyServiceServer) Sync(_ context.Context, request *grpc.Depende
if len(depsToInsert) > 0 {
_, err = service.NewModelService[models.Dependency]().InsertMany(depsToInsert)
if err != nil {
log.Errorf("[DependencyServiceServer] insert dependencies in db error: %v", err)
svr.Errorf("[DependencyServiceServer] insert dependencies in db error: %v", err)
return err
}
}
@@ -135,7 +137,7 @@ func (svr DependencyServiceServer) Sync(_ context.Context, request *grpc.Depende
for _, d := range depsToUpdate {
err = service.NewModelService[models.Dependency]().ReplaceById(d.Id, d)
if err != nil {
log.Errorf("[DependencyServiceServer] update dependency in db error: %v", err)
svr.Errorf("[DependencyServiceServer] update dependency in db error: %v", err)
return err
}
}
@@ -161,7 +163,7 @@ func (svr DependencyServiceServer) UpdateLogs(stream grpc.DependencyService_Upda
// get id
id, err := primitive.ObjectIDFromHex(req.TargetId)
if err != nil {
log.Errorf("[DependencyServiceServer] convert dependency id error: %v", err)
svr.Errorf("[DependencyServiceServer] convert dependency id error: %v", err)
return err
}
@@ -176,7 +178,7 @@ func (svr DependencyServiceServer) UpdateLogs(stream grpc.DependencyService_Upda
}
_, err = service.NewModelService[models.DependencyLog]().InsertMany(depLogs)
if err != nil {
log.Errorf("[DependencyServiceServer] insert dependency logs error: %v", err)
svr.Errorf("[DependencyServiceServer] insert dependency logs error: %v", err)
return err
}
}
@@ -202,7 +204,7 @@ func (svr DependencyServiceServer) SyncConfigSetup(_ context.Context, request *g
}, nil)
if err != nil {
if !errors.Is(err, mongo.ErrNoDocuments) {
log.Errorf("[DependencyService] get dependency config setup from db error: %v", err)
svr.Errorf("[DependencyService] get dependency config setup from db error: %v", err)
return nil, err
}
}
@@ -230,7 +232,7 @@ func (svr DependencyServiceServer) SyncConfigSetup(_ context.Context, request *g
}
_, err = service.NewModelService[models.DependencyConfigSetup]().InsertOne(*cs)
if err != nil {
log.Errorf("[DependencyService] insert dependency config setup error: %v", err)
svr.Errorf("[DependencyService] insert dependency config setup error: %v", err)
return nil, err
}
} else {
@@ -243,7 +245,7 @@ func (svr DependencyServiceServer) SyncConfigSetup(_ context.Context, request *g
cs.Drivers = drivers
err = service.NewModelService[models.DependencyConfigSetup]().ReplaceById(cs.Id, *cs)
if err != nil {
log.Errorf("[DependencyService] update dependency config setup error: %v", err)
svr.Errorf("[DependencyService] update dependency config setup error: %v", err)
return nil, err
}
}
@@ -257,7 +259,7 @@ func (svr DependencyServiceServer) GetStream(nodeKey string) (stream *grpc.Depen
return err
}, b)
if err != nil {
log.Errorf("get stream error: %v", err)
svr.Errorf("get stream error: %v", err)
return nil, err
}
return stream, nil
@@ -277,6 +279,7 @@ func newDependencyServer() *DependencyServiceServer {
return &DependencyServiceServer{
mu: new(sync.Mutex),
streams: make(map[string]*grpc.DependencyService_ConnectServer),
Logger: utils.NewLogger("DependencyServiceServer"),
}
}

View File

@@ -2,9 +2,10 @@ package server
import (
"context"
"github.com/apex/log"
"github.com/crawlab-team/crawlab/core/interfaces"
"github.com/crawlab-team/crawlab/core/models/models"
"github.com/crawlab-team/crawlab/core/models/service"
"github.com/crawlab-team/crawlab/core/utils"
"github.com/crawlab-team/crawlab/grpc"
"go.mongodb.org/mongo-driver/bson"
"sync"
@@ -13,13 +14,14 @@ import (
type MetricServiceServer struct {
grpc.UnimplementedMetricServiceServer
interfaces.Logger
}
func (svr MetricServiceServer) Send(_ context.Context, req *grpc.MetricServiceSendRequest) (res *grpc.Response, err error) {
log.Debugf("[MetricServiceServer] received metric from node: " + req.NodeKey)
svr.Debugf("received metric from node: " + req.NodeKey)
n, err := service.NewModelService[models.Node]().GetOne(bson.M{"key": req.NodeKey}, nil)
if err != nil {
log.Errorf("[MetricServiceServer] error getting node: %v", err)
svr.Errorf("failed to get node: %v", err)
return HandleError(err)
}
metric := models.Metric{
@@ -42,14 +44,16 @@ func (svr MetricServiceServer) Send(_ context.Context, req *grpc.MetricServiceSe
metric.CreatedAt = time.Unix(req.Timestamp, 0)
_, err = service.NewModelService[models.Metric]().InsertOne(metric)
if err != nil {
log.Errorf("[MetricServiceServer] error inserting metric: %v", err)
svr.Errorf("failed to insert metric: %v", err)
return HandleError(err)
}
return HandleSuccess()
}
func newMetricsServer() *MetricServiceServer {
return &MetricServiceServer{}
return &MetricServiceServer{
Logger: utils.NewLogger("MetricServiceServer"),
}
}
var metricsServer *MetricServiceServer

View File

@@ -2,7 +2,6 @@ package server
import (
"context"
"github.com/apex/log"
"github.com/crawlab-team/crawlab/core/constants"
"github.com/crawlab-team/crawlab/core/errors"
"github.com/crawlab-team/crawlab/core/interfaces"
@@ -30,6 +29,7 @@ type NodeServiceServer struct {
// internals
subs map[primitive.ObjectID]grpc.NodeService_SubscribeServer
interfaces.Logger
}
// Register from handler/worker to master
@@ -51,7 +51,7 @@ func (svr NodeServiceServer) Register(_ context.Context, req *grpc.NodeServiceRe
if err != nil {
return HandleError(err)
}
log.Infof("[NodeServiceServer] updated worker[%s] in db. id: %s", req.NodeKey, node.Id.Hex())
svr.Infof("[NodeServiceServer] updated worker[%s] in db. id: %s", req.NodeKey, node.Id.Hex())
} else if errors2.Is(err, mongo.ErrNoDocuments) {
// register new
node = &models.Node{
@@ -69,13 +69,13 @@ func (svr NodeServiceServer) Register(_ context.Context, req *grpc.NodeServiceRe
if err != nil {
return HandleError(err)
}
log.Infof("[NodeServiceServer] added worker[%s] in db. id: %s", req.NodeKey, node.Id.Hex())
svr.Infof("[NodeServiceServer] added worker[%s] in db. id: %s", req.NodeKey, node.Id.Hex())
} else {
// error
return HandleError(err)
}
log.Infof("[NodeServiceServer] master registered worker[%s]", req.NodeKey)
svr.Infof("[NodeServiceServer] master registered worker[%s]", req.NodeKey)
return HandleSuccessWithData(node)
}
@@ -113,12 +113,12 @@ func (svr NodeServiceServer) SendHeartbeat(_ context.Context, req *grpc.NodeServ
}
func (svr NodeServiceServer) Subscribe(request *grpc.NodeServiceSubscribeRequest, stream grpc.NodeService_SubscribeServer) (err error) {
log.Infof("[NodeServiceServer] master received subscribe request from node[%s]", request.NodeKey)
svr.Infof("[NodeServiceServer] master received subscribe request from node[%s]", request.NodeKey)
// find in db
node, err := service.NewModelService[models.Node]().GetOne(bson.M{"key": request.NodeKey}, nil)
if err != nil {
log.Errorf("[NodeServiceServer] error getting node: %v", err)
svr.Errorf("[NodeServiceServer] error getting node: %v", err)
return err
}
@@ -136,7 +136,7 @@ func (svr NodeServiceServer) Subscribe(request *grpc.NodeServiceSubscribeRequest
nodeServiceMutex.Lock()
delete(svr.subs, node.Id)
nodeServiceMutex.Unlock()
log.Infof("[NodeServiceServer] master unsubscribed from node[%s]", request.NodeKey)
svr.Infof("[NodeServiceServer] master unsubscribed from node[%s]", request.NodeKey)
return nil
}
@@ -152,6 +152,7 @@ func newNodeServiceServer() *NodeServiceServer {
return &NodeServiceServer{
cfgSvc: nodeconfig.GetNodeConfigService(),
subs: make(map[primitive.ObjectID]grpc.NodeService_SubscribeServer),
Logger: utils.NewLogger("GrpcNodeServiceServer"),
}
}

View File

@@ -2,8 +2,8 @@ package server
import (
"fmt"
"github.com/apex/log"
"github.com/crawlab-team/crawlab/core/grpc/middlewares"
"github.com/crawlab-team/crawlab/core/interfaces"
"github.com/crawlab-team/crawlab/core/utils"
grpc2 "github.com/crawlab-team/crawlab/grpc"
grpcmiddleware "github.com/grpc-ecosystem/go-grpc-middleware"
@@ -23,6 +23,7 @@ type GrpcServer struct {
svr *grpc.Server
l net.Listener
stopped bool
interfaces.Logger
// servers
NodeSvr *NodeServiceServer
@@ -40,10 +41,10 @@ func (svr *GrpcServer) Start() (err error) {
// listener
svr.l, err = net.Listen("tcp", svr.address)
if err != nil {
log.Errorf("[GrpcServer] failed to listen: %v", err)
svr.Errorf("failed to listen: %v", err)
return err
}
log.Infof("[GrpcServer] grpc server listens to %s", svr.address)
svr.Infof("server listens to %s", svr.address)
// start grpc server
go func() {
@@ -51,7 +52,7 @@ func (svr *GrpcServer) Start() (err error) {
if errors2.Is(err, grpc.ErrServerStopped) {
return
}
log.Errorf("[GrpcServer] failed to serve: %v", err)
svr.Errorf("failed to serve: %v", err)
}
}()
@@ -65,18 +66,18 @@ func (svr *GrpcServer) Stop() (err error) {
}
// graceful stop
log.Infof("[GrpcServer] grpc server stopping...")
svr.Infof("server stopping...")
svr.svr.Stop()
// close listener
log.Infof("[GrpcServer] grpc server closing listener...")
svr.Infof("server closing listener...")
_ = svr.l.Close()
// mark as stopped
svr.stopped = true
// log
log.Infof("[GrpcServer] grpc server stopped")
svr.Infof("server stopped")
return nil
}
@@ -90,7 +91,7 @@ func (svr *GrpcServer) register() {
}
func (svr *GrpcServer) recoveryHandlerFunc(p interface{}) (err error) {
log.Errorf("[GrpcServer] recovered from panic: %v", p)
svr.Errorf("recovered from panic: %v", p)
return fmt.Errorf("recovered from panic: %v", p)
}
@@ -98,6 +99,7 @@ func newGrpcServer() *GrpcServer {
// server
svr := &GrpcServer{
address: utils.GetGrpcServerAddress(),
Logger: utils.NewLogger("GrpcServer"),
}
// services servers

View File

@@ -4,11 +4,11 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"strings"
"sync"
"github.com/apex/log"
"github.com/crawlab-team/crawlab/core/constants"
"github.com/crawlab-team/crawlab/core/interfaces"
"github.com/crawlab-team/crawlab/core/models/models"
@@ -19,7 +19,6 @@ import (
"github.com/crawlab-team/crawlab/core/utils"
"github.com/crawlab-team/crawlab/db/mongo"
"github.com/crawlab-team/crawlab/grpc"
"github.com/crawlab-team/crawlab/trace"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
mongo2 "go.mongodb.org/mongo-driver/mongo"
@@ -36,6 +35,7 @@ type TaskServiceServer struct {
// internals
subs map[primitive.ObjectID]grpc.TaskService_SubscribeServer
interfaces.Logger
}
func (svr TaskServiceServer) Subscribe(req *grpc.TaskServiceSubscribeRequest, stream grpc.TaskService_SubscribeServer) (err error) {
@@ -62,7 +62,7 @@ func (svr TaskServiceServer) Subscribe(req *grpc.TaskServiceSubscribeRequest, st
taskServiceMutex.Lock()
delete(svr.subs, taskId)
taskServiceMutex.Unlock()
log.Infof("[TaskServiceServer] task stream closed: %s", taskId.Hex())
svr.Infof("[TaskServiceServer] task stream closed: %s", taskId.Hex())
return nil
}
@@ -89,7 +89,7 @@ func (svr TaskServiceServer) Connect(stream grpc.TaskService_ConnectServer) (err
return nil
}
// log other stream receive errors and continue
log.Errorf("error receiving stream message: %v", err)
svr.Errorf("error receiving stream message: %v", err)
continue
}
@@ -97,7 +97,7 @@ func (svr TaskServiceServer) Connect(stream grpc.TaskService_ConnectServer) (err
if taskId.IsZero() {
taskId, err = primitive.ObjectIDFromHex(msg.TaskId)
if err != nil {
log.Errorf("invalid task id: %s", msg.TaskId)
svr.Errorf("invalid task id: %s", msg.TaskId)
continue
}
}
@@ -107,7 +107,7 @@ func (svr TaskServiceServer) Connect(stream grpc.TaskService_ConnectServer) (err
if spiderId.IsZero() {
t, err := service.NewModelService[models.Task]().GetById(taskId)
if err != nil {
log.Errorf("error getting spider[%s]: %v", taskId.Hex(), err)
svr.Errorf("error getting spider[%s]: %v", taskId.Hex(), err)
continue
}
spiderId = t.SpiderId
@@ -123,12 +123,12 @@ func (svr TaskServiceServer) Connect(stream grpc.TaskService_ConnectServer) (err
err = svr.handleInsertLogs(taskId, msg)
default:
// invalid message code received
log.Errorf("invalid stream message code: %d", msg.Code)
svr.Errorf("invalid stream message code: %d", msg.Code)
continue
}
if err != nil {
// log any errors from handlers
log.Errorf("grpc error[%d]: %v", msg.Code, err)
svr.Errorf("grpc error[%d]: %v", msg.Code, err)
}
}
}
@@ -137,11 +137,14 @@ func (svr TaskServiceServer) Connect(stream grpc.TaskService_ConnectServer) (err
func (svr TaskServiceServer) FetchTask(ctx context.Context, request *grpc.TaskServiceFetchTaskRequest) (response *grpc.TaskServiceFetchTaskResponse, err error) {
nodeKey := request.GetNodeKey()
if nodeKey == "" {
return nil, errors.New("invalid node key")
err = fmt.Errorf("invalid node key")
svr.Errorf("error fetching task: %v", err)
return nil, err
}
n, err := service.NewModelService[models.Node]().GetOne(bson.M{"key": nodeKey}, nil)
if err != nil {
return nil, trace.TraceError(err)
svr.Errorf("error getting node[%s]: %v", nodeKey, err)
return nil, err
}
var tid primitive.ObjectID
opts := &mongo.FindOptions{
@@ -162,7 +165,7 @@ func (svr TaskServiceServer) FetchTask(ctx context.Context, request *grpc.TaskSe
t.Status = constants.TaskStatusAssigned
return svr.saveTask(t)
} else if !errors.Is(err, mongo2.ErrNoDocuments) {
log.Errorf("error fetching task for node[%s]: %v", nodeKey, err)
svr.Errorf("error fetching task for node[%s]: %v", nodeKey, err)
return err
}
@@ -177,7 +180,7 @@ func (svr TaskServiceServer) FetchTask(ctx context.Context, request *grpc.TaskSe
t.Status = constants.TaskStatusAssigned
return svr.saveTask(t)
} else if !errors.Is(err, mongo2.ErrNoDocuments) {
log.Errorf("error fetching task for any node: %v", err)
svr.Errorf("error fetching task for any node: %v", err)
return err
}
@@ -198,7 +201,7 @@ func (svr TaskServiceServer) SendNotification(_ context.Context, request *grpc.T
// task id
taskId, err := primitive.ObjectIDFromHex(request.TaskId)
if err != nil {
log.Errorf("invalid task id: %s", request.TaskId)
svr.Errorf("invalid task id: %s", request.TaskId)
return nil, err
}
@@ -208,7 +211,7 @@ func (svr TaskServiceServer) SendNotification(_ context.Context, request *grpc.T
// task
task, err := service.NewModelService[models.Task]().GetById(taskId)
if err != nil {
log.Errorf("error getting task[%s]: %v", request.TaskId, err)
svr.Errorf("error getting task[%s]: %v", request.TaskId, err)
return nil, err
}
args = append(args, task)
@@ -216,7 +219,7 @@ func (svr TaskServiceServer) SendNotification(_ context.Context, request *grpc.T
// task stat
taskStat, err := service.NewModelService[models.TaskStat]().GetById(task.Id)
if err != nil {
log.Errorf("error getting task stat for task[%s]: %v", request.TaskId, err)
svr.Errorf("error getting task stat for task[%s]: %v", request.TaskId, err)
return nil, err
}
args = append(args, taskStat)
@@ -224,7 +227,7 @@ func (svr TaskServiceServer) SendNotification(_ context.Context, request *grpc.T
// spider
spider, err := service.NewModelService[models.Spider]().GetById(task.SpiderId)
if err != nil {
log.Errorf("error getting spider[%s]: %v", task.SpiderId.Hex(), err)
svr.Errorf("error getting spider[%s]: %v", task.SpiderId.Hex(), err)
return nil, err
}
args = append(args, spider)
@@ -232,7 +235,7 @@ func (svr TaskServiceServer) SendNotification(_ context.Context, request *grpc.T
// node
node, err := service.NewModelService[models.Node]().GetById(task.NodeId)
if err != nil {
log.Errorf("error getting node[%s]: %v", task.NodeId.Hex(), err)
svr.Errorf("error getting node[%s]: %v", task.NodeId.Hex(), err)
return nil, err
}
args = append(args, node)
@@ -242,7 +245,7 @@ func (svr TaskServiceServer) SendNotification(_ context.Context, request *grpc.T
if !task.ScheduleId.IsZero() {
schedule, err = service.NewModelService[models.Schedule]().GetById(task.ScheduleId)
if err != nil {
log.Errorf("error getting schedule[%s]: %v", task.ScheduleId.Hex(), err)
svr.Errorf("error getting schedule[%s]: %v", task.ScheduleId.Hex(), err)
return nil, err
}
args = append(args, schedule)
@@ -256,7 +259,7 @@ func (svr TaskServiceServer) SendNotification(_ context.Context, request *grpc.T
},
}, nil)
if err != nil {
log.Errorf("error getting notification settings: %v", err)
svr.Errorf("error getting notification settings: %v", err)
return nil, err
}
@@ -303,7 +306,7 @@ func (svr TaskServiceServer) handleInsertData(taskId, spiderId primitive.ObjectI
var records []map[string]interface{}
err = json.Unmarshal(msg.Data, &records)
if err != nil {
log.Errorf("error unmarshalling data: %v", err)
svr.Errorf("error unmarshalling data: %v", err)
return err
}
for i := range records {
@@ -317,7 +320,7 @@ func (svr TaskServiceServer) handleInsertLogs(taskId primitive.ObjectID, msg *gr
var logs []string
err = json.Unmarshal(msg.Data, &logs)
if err != nil {
log.Errorf("error unmarshalling logs: %v", err)
svr.Errorf("error unmarshalling logs: %v", err)
return err
}
return svr.statsSvc.InsertLogs(taskId, logs...)
@@ -333,6 +336,7 @@ func newTaskServiceServer() *TaskServiceServer {
cfgSvc: nodeconfig.GetNodeConfigService(),
subs: make(map[primitive.ObjectID]grpc.TaskService_SubscribeServer),
statsSvc: stats.GetTaskStatsService(),
Logger: utils.NewLogger("GrpcTaskServiceServer"),
}
}

View File

@@ -3,12 +3,14 @@ package common
import (
"encoding/json"
"fmt"
"github.com/apex/log"
"github.com/crawlab-team/crawlab/core/utils"
"github.com/crawlab-team/crawlab/db/mongo"
"go.mongodb.org/mongo-driver/bson"
mongo2 "go.mongodb.org/mongo-driver/mongo"
)
var indexUtilsLogger = utils.NewLogger("IndexUtils")
// getIndexKeyString converts index keys to a consistent string representation
func getIndexKeyString(key interface{}) string {
switch v := key.(type) {
@@ -64,10 +66,10 @@ func normalizeIndexKey(key interface{}) string {
return pairsStr
}
func RecreateIndexes(col *mongo.Col, desiredIndexes []mongo2.IndexModel) {
func CreateIndexes(col *mongo.Col, desiredIndexes []mongo2.IndexModel) {
existingIndexes, err := col.ListIndexes()
if err != nil {
log.Errorf("error listing indexes: %v", err)
indexUtilsLogger.Errorf("error listing indexes: %v", err)
return
}
@@ -99,15 +101,15 @@ func RecreateIndexes(col *mongo.Col, desiredIndexes []mongo2.IndexModel) {
// Drop all existing indexes (except _id)
err := col.DeleteAllIndexes()
if err != nil {
log.Errorf("error dropping indexes: %v", err)
indexUtilsLogger.Errorf("error dropping indexes: %v", err)
}
// Create new indexes
err = col.CreateIndexes(desiredIndexes)
if err != nil {
log.Errorf("error creating indexes: %v", err)
indexUtilsLogger.Errorf("error creating indexes: %v", err)
return
}
log.Infof("recreated indexes for collection: %s", col.GetCollection().Name())
indexUtilsLogger.Infof("created indexes for collection: %s", col.GetCollection().Name())
}
}

View File

@@ -104,8 +104,8 @@ func TestRecreateIndexes(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Execute RecreateIndexes
RecreateIndexes(testCol, tt.desiredIndexes)
// Execute CreateIndexes
CreateIndexes(testCol, tt.desiredIndexes)
// Verify indexes
indexes, err := testCol.ListIndexes()

View File

@@ -11,7 +11,7 @@ import (
func InitIndexes() {
// nodes
RecreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models.Node{})), []mongo2.IndexModel{
CreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models.Node{})), []mongo2.IndexModel{
{Keys: bson.D{{Key: "key", Value: 1}}},
{Keys: bson.D{{Key: "name", Value: 1}}},
{Keys: bson.D{{Key: "is_master", Value: 1}}},
@@ -21,12 +21,12 @@ func InitIndexes() {
})
// projects
RecreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models.Project{})), []mongo2.IndexModel{
CreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models.Project{})), []mongo2.IndexModel{
{Keys: bson.D{{Key: "name", Value: 1}}},
})
// spiders
RecreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models.Spider{})), []mongo2.IndexModel{
CreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models.Spider{})), []mongo2.IndexModel{
{Keys: bson.D{{Key: "name", Value: 1}}},
{Keys: bson.D{{Key: "type", Value: 1}}},
{Keys: bson.D{{Key: "col_id", Value: 1}}},
@@ -34,7 +34,7 @@ func InitIndexes() {
})
// tasks
RecreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models.Task{})), []mongo2.IndexModel{
CreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models.Task{})), []mongo2.IndexModel{
{Keys: bson.D{{Key: "spider_id", Value: 1}}},
{Keys: bson.D{{Key: "status", Value: 1}}},
{Keys: bson.D{{Key: "node_id", Value: 1}}},
@@ -49,19 +49,19 @@ func InitIndexes() {
})
// task stats
RecreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models.TaskStat{})), []mongo2.IndexModel{
CreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models.TaskStat{})), []mongo2.IndexModel{
{Keys: bson.D{{Key: "created_ts", Value: -1}}, Options: (&options.IndexOptions{}).SetExpireAfterSeconds(60 * 60 * 24 * 30)},
})
// schedules
RecreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models.Schedule{})), []mongo2.IndexModel{
CreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models.Schedule{})), []mongo2.IndexModel{
{Keys: bson.D{{Key: "name", Value: 1}}},
{Keys: bson.D{{Key: "spider_id", Value: 1}}},
{Keys: bson.D{{Key: "enabled", Value: 1}}},
})
// users
RecreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models.User{})), []mongo2.IndexModel{
CreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models.User{})), []mongo2.IndexModel{
{Keys: bson.D{{Key: "username", Value: 1}}, Options: (&options.IndexOptions{}).SetUnique(true)},
{Keys: bson.D{{Key: "role", Value: 1}}},
{Keys: bson.D{{Key: "role_id", Value: 1}}},
@@ -69,12 +69,12 @@ func InitIndexes() {
})
// settings
RecreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models.Setting{})), []mongo2.IndexModel{
CreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models.Setting{})), []mongo2.IndexModel{
{Keys: bson.D{{Key: "key", Value: 1}}, Options: options.Index().SetUnique(true)},
})
// tokens
RecreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models.Token{})), []mongo2.IndexModel{
CreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models.Token{})), []mongo2.IndexModel{
{Keys: bson.D{{Key: "name", Value: 1}}},
})
}

View File

@@ -2,11 +2,9 @@ package config
import (
"encoding/json"
"github.com/apex/log"
"github.com/crawlab-team/crawlab/core/entity"
"github.com/crawlab-team/crawlab/core/interfaces"
"github.com/crawlab-team/crawlab/core/utils"
"github.com/crawlab-team/crawlab/trace"
"os"
"path/filepath"
"sync"
@@ -14,6 +12,7 @@ import (
type Service struct {
cfg *entity.NodeInfo
interfaces.Logger
}
func (svc *Service) Init() (err error) {
@@ -23,7 +22,8 @@ func (svc *Service) Init() (err error) {
configDirPath := filepath.Dir(metadataConfigPath)
if !utils.Exists(configDirPath) {
if err := os.MkdirAll(configDirPath, os.FileMode(0766)); err != nil {
return trace.TraceError(err)
svc.Errorf("create config directory error: %v", err)
return err
}
}
@@ -32,22 +32,22 @@ func (svc *Service) Init() (err error) {
svc.cfg = newConfig()
data, err := json.Marshal(svc.cfg)
if err != nil {
log.Errorf("marshal config error: %v", err)
svc.Errorf("marshal config error: %v", err)
return err
}
if err := os.WriteFile(metadataConfigPath, data, os.FileMode(0766)); err != nil {
log.Errorf("write config file error: %v", err)
svc.Errorf("write config file error: %v", err)
return err
}
} else {
// exists, read and set to config
data, err := os.ReadFile(metadataConfigPath)
if err != nil {
log.Errorf("read config file error: %v", err)
svc.Errorf("read config file error: %v", err)
return err
}
if err := json.Unmarshal(data, svc.cfg); err != nil {
log.Errorf("unmarshal config error: %v", err)
svc.Errorf("unmarshal config error: %v", err)
return err
}
}
@@ -93,7 +93,8 @@ func (svc *Service) GetMaxRunners() (res int) {
func newNodeConfigService() (svc2 interfaces.NodeConfigService, err error) {
// config service
svc := &Service{
cfg: newConfig(),
cfg: newConfig(),
Logger: utils.NewLogger("NodeConfigService"),
}
// init

View File

@@ -38,7 +38,7 @@ type MasterService struct {
monitorInterval time.Duration
// internals
logger interfaces.Logger
interfaces.Logger
}
func (svc *MasterService) Start() {
@@ -81,11 +81,11 @@ func (svc *MasterService) Wait() {
func (svc *MasterService) Stop() {
_ = svc.server.Stop()
svc.taskHandlerSvc.Stop()
svc.logger.Infof("master[%s] service has stopped", svc.cfgSvc.GetNodeKey())
svc.Infof("master[%s] service has stopped", svc.cfgSvc.GetNodeKey())
}
func (svc *MasterService) startMonitoring() {
svc.logger.Infof("master[%s] monitoring started", svc.cfgSvc.GetNodeKey())
svc.Infof("master[%s] monitoring started", svc.cfgSvc.GetNodeKey())
// ticker
ticker := time.NewTicker(svc.monitorInterval)
@@ -94,7 +94,7 @@ func (svc *MasterService) startMonitoring() {
// monitor
err := svc.monitor()
if err != nil {
svc.logger.Errorf("master[%s] monitor error: %v", svc.cfgSvc.GetNodeKey(), err)
svc.Errorf("master[%s] monitor error: %v", svc.cfgSvc.GetNodeKey(), err)
}
// wait
@@ -109,7 +109,7 @@ func (svc *MasterService) Register() (err error) {
node, err := service.NewModelService[models.Node]().GetOne(bson.M{"key": nodeKey}, nil)
if err != nil && err.Error() == mongo2.ErrNoDocuments.Error() {
// not exists
svc.logger.Infof("master[%s] does not exist in db", nodeKey)
svc.Infof("master[%s] does not exist in db", nodeKey)
node := models.Node{
Key: nodeKey,
Name: nodeName,
@@ -124,23 +124,23 @@ func (svc *MasterService) Register() (err error) {
node.SetUpdated(primitive.NilObjectID)
_, err := service.NewModelService[models.Node]().InsertOne(node)
if err != nil {
svc.logger.Errorf("save master[%s] to db error: %v", nodeKey, err)
svc.Errorf("save master[%s] to db error: %v", nodeKey, err)
return err
}
svc.logger.Infof("added master[%s] to db", nodeKey)
svc.Infof("added master[%s] to db", nodeKey)
return nil
} else if err == nil {
// exists
svc.logger.Infof("master[%s] exists in db", nodeKey)
svc.Infof("master[%s] exists in db", nodeKey)
node.Status = constants.NodeStatusOnline
node.Active = true
node.ActiveAt = time.Now()
err = service.NewModelService[models.Node]().ReplaceById(node.Id, *node)
if err != nil {
svc.logger.Errorf("update master[%s] in db error: %v", nodeKey, err)
svc.Errorf("update master[%s] in db error: %v", nodeKey, err)
return err
}
svc.logger.Infof("updated master[%s] in db", nodeKey)
svc.Infof("updated master[%s] in db", nodeKey)
return nil
} else {
// error
@@ -204,7 +204,7 @@ func (svc *MasterService) getAllWorkerNodes() (nodes []models.Node, err error) {
if errors.Is(err, mongo2.ErrNoDocuments) {
return nil, nil
}
svc.logger.Errorf("get all worker nodes error: %v", err)
svc.Errorf("get all worker nodes error: %v", err)
return nil, err
}
return nodes, nil
@@ -257,14 +257,14 @@ func (svc *MasterService) subscribeNode(n *models.Node) (ok bool) {
func (svc *MasterService) pingNodeClient(n *models.Node) (ok bool) {
stream, ok := svc.server.NodeSvr.GetSubscribeStream(n.Id)
if !ok {
svc.logger.Errorf("cannot get worker node client[%s]", n.Key)
svc.Errorf("cannot get worker node client[%s]", n.Key)
return false
}
err := stream.Send(&grpc.NodeServiceSubscribeResponse{
Code: grpc.NodeServiceSubscribeCode_PING,
})
if err != nil {
svc.logger.Errorf("failed to ping worker node client[%s]: %v", n.Key, err)
svc.Errorf("failed to ping worker node client[%s]: %v", n.Key, err)
return false
}
return true
@@ -277,13 +277,13 @@ func (svc *MasterService) updateNodeRunners(node *models.Node) (err error) {
}
runningTasksCount, err := service.NewModelService[models.Task]().Count(query)
if err != nil {
svc.logger.Errorf("failed to count running tasks for node[%s]: %v", node.Key, err)
svc.Errorf("failed to count running tasks for node[%s]: %v", node.Key, err)
return err
}
node.CurrentRunners = runningTasksCount
err = service.NewModelService[models.Node]().ReplaceById(node.Id, *node)
if err != nil {
svc.logger.Errorf("failed to update node runners for node[%s]: %v", node.Key, err)
svc.Errorf("failed to update node runners for node[%s]: %v", node.Key, err)
return err
}
return nil
@@ -305,7 +305,7 @@ func newMasterService() *MasterService {
taskHandlerSvc: handler.GetTaskHandlerService(),
scheduleSvc: schedule.GetScheduleService(),
systemSvc: system.GetSystemService(),
logger: utils.NewServiceLogger("MasterService"),
Logger: utils.NewLogger("MasterService"),
}
}

View File

@@ -38,7 +38,7 @@ type WorkerService struct {
n *models.Node
s grpc.NodeService_SubscribeClient
isReady bool
logger interfaces.Logger
interfaces.Logger
}
func (svc *WorkerService) Start() {
@@ -78,7 +78,7 @@ func (svc *WorkerService) Stop() {
svc.stopped = true
_ = client.GetGrpcClient().Stop()
svc.handlerSvc.Stop()
svc.logger.Infof("worker[%s] service has stopped", svc.cfgSvc.GetNodeKey())
svc.Infof("worker[%s] service has stopped", svc.cfgSvc.GetNodeKey())
}
func (svc *WorkerService) register() {
@@ -99,17 +99,17 @@ func (svc *WorkerService) register() {
err = fmt.Errorf("failed to get node: %v", err)
return err
}
svc.logger.Infof("worker[%s] registered to master. id: %s", svc.GetConfigService().GetNodeKey(), svc.n.Id.Hex())
svc.Infof("worker[%s] registered to master. id: %s", svc.GetConfigService().GetNodeKey(), svc.n.Id.Hex())
return nil
}
b := backoff.NewExponentialBackOff()
n := func(err error, duration time.Duration) {
svc.logger.Errorf("register worker[%s] error: %v", svc.cfgSvc.GetNodeKey(), err)
svc.logger.Infof("retry in %.1f seconds", duration.Seconds())
svc.Errorf("register worker[%s] error: %v", svc.cfgSvc.GetNodeKey(), err)
svc.Infof("retry in %.1f seconds", duration.Seconds())
}
err := backoff.RetryNotify(op, b, n)
if err != nil {
svc.logger.Fatalf("failed to register worker[%s]: %v", svc.cfgSvc.GetNodeKey(), err)
svc.Fatalf("failed to register worker[%s]: %v", svc.cfgSvc.GetNodeKey(), err)
panic(err)
}
}
@@ -154,7 +154,7 @@ func (svc *WorkerService) subscribe() {
NodeKey: svc.cfgSvc.GetNodeKey(),
})
if err != nil {
svc.logger.Errorf("failed to subscribe to master: %v", err)
svc.Errorf("failed to subscribe to master: %v", err)
return err
}
@@ -167,10 +167,10 @@ func (svc *WorkerService) subscribe() {
msg, err := stream.Recv()
if err != nil {
if client.GetGrpcClient().IsClosed() {
svc.logger.Errorf("connection to master is closed: %v", err)
svc.Errorf("connection to master is closed: %v", err)
return err
}
svc.logger.Errorf("failed to receive message from master: %v", err)
svc.Errorf("failed to receive message from master: %v", err)
return err
}
@@ -184,7 +184,7 @@ func (svc *WorkerService) subscribe() {
// Execute with backoff
err := backoff.Retry(operation, b)
if err != nil {
svc.logger.Errorf("subscription failed after max retries: %v", err)
svc.Errorf("subscription failed after max retries: %v", err)
return
}
@@ -200,7 +200,7 @@ func (svc *WorkerService) sendHeartbeat() {
NodeKey: svc.cfgSvc.GetNodeKey(),
})
if err != nil {
svc.logger.Errorf("failed to send heartbeat to master: %v", err)
svc.Errorf("failed to send heartbeat to master: %v", err)
}
}
@@ -220,9 +220,9 @@ func (svc *WorkerService) startHealthServer() {
// serve
if err := http.Serve(ln, app); err != nil {
if !errors.Is(err, http.ErrServerClosed) {
svc.logger.Errorf("run server error: %v", err)
svc.Errorf("run server error: %v", err)
} else {
svc.logger.Info("server graceful down")
svc.Info("server graceful down")
}
}
}
@@ -233,7 +233,7 @@ func newWorkerService() *WorkerService {
cfgSvc: nodeconfig.GetNodeConfigService(),
handlerSvc: handler.GetTaskHandlerService(),
isReady: false,
logger: utils.NewServiceLogger("WorkerService"),
Logger: utils.NewLogger("WorkerService"),
}
}

View File

@@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/apex/log"
"github.com/crawlab-team/crawlab/core/entity"
"github.com/crawlab-team/crawlab/core/utils"
"io"
@@ -13,9 +14,7 @@ import (
"strings"
"time"
"github.com/apex/log"
"github.com/crawlab-team/crawlab/core/models/models"
"github.com/crawlab-team/crawlab/trace"
)
type ResBody struct {
@@ -102,7 +101,8 @@ func SendIMNotification(ch *models.NotificationChannel, title, content string) e
// perform request
resp, body, err := performRequest("POST", ch.WebhookUrl, data)
if err != nil {
return trace.TraceError(err)
log.Errorf("IM request error: %v", err)
return err
}
if resp.StatusCode != http.StatusOK {
@@ -112,11 +112,13 @@ func SendIMNotification(ch *models.NotificationChannel, title, content string) e
// parse response
var resBody ResBody
if err := json.Unmarshal(body, &resBody); err != nil {
return trace.TraceError(err)
log.Errorf("Parsing IM response error: %v", err)
return err
}
// validate response code
if resBody.ErrCode != 0 {
log.Errorf("IM response error: %s", resBody.ErrMsg)
return errors.New(resBody.ErrMsg)
}
@@ -126,12 +128,12 @@ func SendIMNotification(ch *models.NotificationChannel, title, content string) e
func performIMRequest(webhookUrl string, data RequestParam) ([]byte, error) {
resp, body, err := performRequest("POST", webhookUrl, data)
if err != nil {
log.Errorf("IM request error: %v", err)
logger.Errorf("IM request error: %v", err)
return nil, err
}
if resp.StatusCode >= 400 {
log.Errorf("IM response status code: %d", resp.StatusCode)
logger.Errorf("IM response status code: %d", resp.StatusCode)
return nil, fmt.Errorf("IM error response %d: %s", resp.StatusCode, string(body))
}
@@ -146,8 +148,8 @@ func performIMRequestWithJson[T any](webhookUrl string, data RequestParam) (resB
// parse response
if err := json.Unmarshal(body, &resBody); err != nil {
log.Warnf("Parsing IM response error: %v", err)
log.Infof("IM response: %s", string(body))
logger.Warnf("Parsing IM response error: %v", err)
logger.Infof("IM response: %s", string(body))
return resBody, nil
}

View File

@@ -0,0 +1,5 @@
package notification
import "github.com/crawlab-team/crawlab/core/utils"
var logger = utils.NewLogger("NotificationUtils")

View File

@@ -3,9 +3,7 @@ package notification
import (
"errors"
"github.com/PuerkitoBio/goquery"
"github.com/apex/log"
"github.com/crawlab-team/crawlab/core/models/models"
"github.com/crawlab-team/crawlab/trace"
"gopkg.in/gomail.v2"
"net/mail"
"regexp"
@@ -69,8 +67,7 @@ func isHtml(content string) bool {
func convertHtmlToText(content string) string {
doc, err := goquery.NewDocumentFromReader(strings.NewReader(content))
if err != nil {
log.Errorf("failed to convert html to text: %v", err)
trace.PrintError(err)
logger.Errorf("failed to convert html to text: %v", err)
return ""
}
return doc.Text()

View File

@@ -3,7 +3,6 @@ package notification
import (
"context"
"encoding/base64"
"github.com/apex/log"
"github.com/crawlab-team/crawlab/core/models/models"
"github.com/crawlab-team/crawlab/trace"
"golang.org/x/oauth2/google"
@@ -18,7 +17,7 @@ func sendMailGmail(ch *models.NotificationChannel, smtpConfig smtpAuthentication
// 使用服务账户 JSON 密钥文件创建 JWT 配置
config, err := google.JWTConfigFromJSON(b, gmail.GmailSendScope)
if err != nil {
log.Errorf("Unable to parse service account key file to config: %v", err)
logger.Errorf("Unable to parse service account key file to config: %v", err)
return trace.TraceError(err)
}
@@ -29,8 +28,8 @@ func sendMailGmail(ch *models.NotificationChannel, smtpConfig smtpAuthentication
client := config.Client(context.Background())
srv, err := gmail.New(client)
if err != nil {
log.Errorf("Unable to create Gmail client: %v", err)
return trace.TraceError(err)
logger.Errorf("Unable to create Gmail client: %v", err)
return err
}
// 创建 MIME 邮件
@@ -41,8 +40,8 @@ func sendMailGmail(ch *models.NotificationChannel, smtpConfig smtpAuthentication
var buf strings.Builder
if _, err := m.WriteTo(&buf); err != nil {
log.Errorf("Unable to write message: %v", err)
return trace.TraceError(err)
logger.Errorf("Unable to write message: %v", err)
return err
}
// 将邮件内容进行 base64 编码
@@ -53,8 +52,8 @@ func sendMailGmail(ch *models.NotificationChannel, smtpConfig smtpAuthentication
// 发送邮件
_, err = srv.Users.Messages.Send("me", gmsg).Do()
if err != nil {
log.Errorf("Unable to send email: %v", err)
return trace.TraceError(err)
logger.Errorf("Unable to send email: %v", err)
return err
}
return nil

View File

@@ -2,7 +2,6 @@ package notification
import (
"context"
"github.com/apex/log"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
"net/smtp"
@@ -20,14 +19,14 @@ func getGmailOAuth2Token(oauth2Json string) (token *oauth2.Token, err error) {
// 使用服务账户 JSON 密钥文件创建 JWT 配置
config, err := google.JWTConfigFromJSON(b, "https://mail.google.com/")
if err != nil {
log.Errorf("Unable to parse service account key file to config: %v", err)
logger.Errorf("Unable to parse service account key file to config: %v", err)
return nil, err
}
// 使用服务账户的电子邮件和访问令牌
token, err = config.TokenSource(ctx).Token()
if err != nil {
log.Errorf("Unable to generate token: %v", err)
logger.Errorf("Unable to generate token: %v", err)
return nil, err
}
return token, nil

View File

@@ -2,23 +2,24 @@ package notification
import (
"fmt"
"github.com/crawlab-team/crawlab/core/interfaces"
"github.com/crawlab-team/crawlab/core/utils"
"regexp"
"strings"
"sync"
"time"
"github.com/apex/log"
"github.com/crawlab-team/crawlab/core/constants"
"github.com/crawlab-team/crawlab/core/entity"
"github.com/crawlab-team/crawlab/core/models/models"
"github.com/crawlab-team/crawlab/core/models/service"
"github.com/crawlab-team/crawlab/trace"
"github.com/gomarkdown/markdown"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
)
type Service struct {
interfaces.Logger
}
func (svc *Service) Send(s *models.NotificationSetting, args ...any) {
@@ -31,7 +32,7 @@ func (svc *Service) Send(s *models.NotificationSetting, args ...any) {
defer wg.Done()
ch, err := service.NewModelService[models.NotificationChannel]().GetById(chId)
if err != nil {
log.Errorf("[NotificationService] get channel error: %v", err)
svc.Errorf("[NotificationService] get channel error: %v", err)
return
}
content := svc.getContent(s, ch, args...)
@@ -57,7 +58,7 @@ func (svc *Service) SendMail(s *models.NotificationSetting, ch *models.Notificat
// send mail
err := SendMail(s, ch, mailTo, mailCc, mailBcc, title, content)
if err != nil {
log.Errorf("[NotificationService] send mail error: %v", err)
svc.Errorf("[NotificationService] send mail error: %v", err)
}
// save request
@@ -71,7 +72,7 @@ func (svc *Service) SendIM(ch *models.NotificationChannel, title, content string
// send mobile notification
err := SendIMNotification(ch, title, content)
if err != nil {
log.Errorf("[NotificationService] send mobile notification error: %v", err)
svc.Errorf("[NotificationService] send mobile notification error: %v", err)
}
// save request
@@ -107,7 +108,7 @@ func (svc *Service) SendTestMessage(locale string, ch *models.NotificationChanne
// For email
err = SendMail(nil, ch, toMail, nil, nil, title, content)
if err != nil {
log.Errorf("failed to send test email: %v", err)
svc.Errorf("failed to send test email: %v", err)
}
case TypeIM:
@@ -117,7 +118,7 @@ func (svc *Service) SendTestMessage(locale string, ch *models.NotificationChanne
// For instant messaging
err = SendIMNotification(ch, title, content)
if err != nil {
log.Errorf("failed to send test IM notification: %v", err)
svc.Errorf("failed to send test IM notification: %v", err)
}
default:
@@ -417,7 +418,7 @@ func (svc *Service) getUsernameById(id primitive.ObjectID) (username string) {
}
u, err := service.NewModelService[models.User]().GetById(id)
if err != nil {
log.Errorf("[NotificationService] get user error: %v", err)
svc.Errorf("[NotificationService] get user error: %v", err)
return ""
}
return u.Username
@@ -494,8 +495,7 @@ func (svc *Service) SendNodeNotification(node *models.Node) {
},
}, nil)
if err != nil {
log.Errorf("get notification settings error: %v", err)
trace.PrintError(err)
svc.Errorf("get notification settings error: %v", err)
return
}
@@ -537,7 +537,7 @@ func (svc *Service) createRequestMail(s *models.NotificationSetting, ch *models.
r.SetUpdatedAt(time.Now())
r.Id, err = service.NewModelService[models.NotificationRequest]().InsertOne(r)
if err != nil {
log.Errorf("[NotificationService] save request error: %v", err)
svc.Errorf("[NotificationService] save request error: %v", err)
return nil, err
}
return &r, nil
@@ -563,7 +563,7 @@ func (svc *Service) createRequestMailTest(ch *models.NotificationChannel, title,
r.SetUpdatedAt(time.Now())
r.Id, err = service.NewModelService[models.NotificationRequest]().InsertOne(r)
if err != nil {
log.Errorf("[NotificationService] save request error: %v", err)
svc.Errorf("[NotificationService] save request error: %v", err)
return nil, err
}
return &r, nil
@@ -581,7 +581,7 @@ func (svc *Service) createRequestIM(ch *models.NotificationChannel, title, conte
r.SetUpdatedAt(time.Now())
r.Id, err = service.NewModelService[models.NotificationRequest]().InsertOne(r)
if err != nil {
log.Errorf("[NotificationService] save request error: %v", err)
svc.Errorf("[NotificationService] save request error: %v", err)
return nil, err
}
return &r, nil
@@ -601,12 +601,14 @@ func (svc *Service) saveRequest(r *models.NotificationRequest, err error) {
r.SetUpdatedAt(time.Now())
err = service.NewModelService[models.NotificationRequest]().ReplaceById(r.Id, *r)
if err != nil {
log.Errorf("[NotificationService] save request error: %v", err)
svc.Errorf("[NotificationService] save request error: %v", err)
}
}
func newNotificationService() *Service {
return &Service{}
return &Service{
Logger: utils.NewLogger("NotificationService"),
}
}
var _service *Service

View File

@@ -1,28 +1,27 @@
package schedule
import (
"fmt"
"github.com/apex/log"
"github.com/crawlab-team/crawlab/trace"
"github.com/crawlab-team/crawlab/core/interfaces"
"github.com/crawlab-team/crawlab/core/utils"
"github.com/robfig/cron/v3"
"strings"
)
type Logger struct {
type CronLogger struct {
interfaces.Logger
}
func (l *Logger) Info(msg string, keysAndValues ...interface{}) {
func (l *CronLogger) Info(msg string, keysAndValues ...interface{}) {
p := l.getPlaceholder(len(keysAndValues))
log.Infof(fmt.Sprintf("cron: %s %s", msg, p), keysAndValues...)
l.Infof("cron: %s %s", msg, p)
}
func (l *Logger) Error(err error, msg string, keysAndValues ...interface{}) {
func (l *CronLogger) Error(err error, msg string, keysAndValues ...interface{}) {
p := l.getPlaceholder(len(keysAndValues))
log.Errorf(fmt.Sprintf("cron: %s %s", msg, p), keysAndValues...)
trace.PrintError(err)
l.Errorf("cron: %s %v %s", msg, err, p)
}
func (l *Logger) getPlaceholder(n int) (s string) {
func (l *CronLogger) getPlaceholder(n int) (s string) {
var arr []string
for i := 0; i < n; i++ {
arr = append(arr, "%v")
@@ -30,6 +29,8 @@ func (l *Logger) getPlaceholder(n int) (s string) {
return strings.Join(arr, " ")
}
func NewLogger() cron.Logger {
return &Logger{}
func NewCronLogger() cron.Logger {
return &CronLogger{
Logger: utils.NewLogger("CronLogger"),
}
}

View File

@@ -1,13 +1,11 @@
package schedule
import (
"github.com/apex/log"
"github.com/crawlab-team/crawlab/core/interfaces"
"github.com/crawlab-team/crawlab/core/models/models"
"github.com/crawlab-team/crawlab/core/models/service"
"github.com/crawlab-team/crawlab/core/spider/admin"
"github.com/crawlab-team/crawlab/core/utils"
"github.com/crawlab-team/crawlab/trace"
"github.com/robfig/cron/v3"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
@@ -32,6 +30,7 @@ type Service struct {
schedules []models.Schedule
stopped bool
mu sync.Mutex
interfaces.Logger
}
func (svc *Service) GetLocation() (loc *time.Location) {
@@ -67,7 +66,12 @@ func (svc *Service) SetUpdateInterval(interval time.Duration) {
}
func (svc *Service) Init() (err error) {
return svc.fetch()
err = svc.fetch()
if err != nil {
svc.Fatalf("failed to initialize schedule service: %v", err)
return err
}
return nil
}
func (svc *Service) Start() {
@@ -91,7 +95,8 @@ func (svc *Service) Enable(s models.Schedule, by primitive.ObjectID) (err error)
id, err := svc.cron.AddFunc(s.Cron, svc.schedule(s.Id))
if err != nil {
return trace.TraceError(err)
svc.Errorf("failed to add cron job: %v", err)
return err
}
s.Enabled = true
s.EntryId = id
@@ -129,7 +134,7 @@ func (svc *Service) GetCron() (c *cron.Cron) {
func (svc *Service) update() {
// fetch enabled schedules
if err := svc.fetch(); err != nil {
trace.PrintError(err)
svc.Errorf("failed to fetch schedules: %v", err)
return
}
@@ -145,7 +150,7 @@ func (svc *Service) update() {
if !s.Enabled {
err := svc.Enable(s, s.GetCreatedBy())
if err != nil {
trace.PrintError(err)
svc.Errorf("failed to enable schedule: %v", err)
continue
}
}
@@ -184,14 +189,14 @@ func (svc *Service) schedule(id primitive.ObjectID) (fn func()) {
// schedule
s, err := svc.modelSvc.GetById(id)
if err != nil {
trace.PrintError(err)
svc.Errorf("failed to get schedule: %v", err)
return
}
// spider
spider, err := service.NewModelService[models.Spider]().GetById(s.SpiderId)
if err != nil {
trace.PrintError(err)
svc.Errorf("failed to get spider: %v", err)
return
}
@@ -229,7 +234,8 @@ func (svc *Service) schedule(id primitive.ObjectID) (fn func()) {
// schedule or assign a task in the task queue
if _, err := svc.adminSvc.Schedule(s.SpiderId, opts); err != nil {
trace.PrintError(err)
svc.Errorf("failed to schedule spider: %v", err)
return
}
}
}
@@ -244,10 +250,11 @@ func newScheduleService() *Service {
updateInterval: 1 * time.Minute,
adminSvc: admin.GetSpiderAdminService(),
modelSvc: service.NewModelService[models.Schedule](),
Logger: utils.NewLogger("ScheduleService"),
}
// logger
svc.logger = NewLogger()
svc.logger = NewCronLogger()
// cron
svc.cron = cron.New(
@@ -258,7 +265,6 @@ func newScheduleService() *Service {
// initialize
if err := svc.Init(); err != nil {
log.Fatalf("failed to initialize schedule service: %v", err)
panic(err)
}

View File

@@ -20,7 +20,6 @@ import (
"github.com/crawlab-team/crawlab/core/models/models"
"github.com/apex/log"
"github.com/crawlab-team/crawlab/core/constants"
"github.com/crawlab-team/crawlab/core/entity"
client2 "github.com/crawlab-team/crawlab/core/grpc/client"
@@ -29,7 +28,6 @@ import (
"github.com/crawlab-team/crawlab/core/models/service"
"github.com/crawlab-team/crawlab/core/utils"
"github.com/crawlab-team/crawlab/grpc"
"github.com/crawlab-team/crawlab/trace"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
)
@@ -54,6 +52,7 @@ type Runner struct {
err error // captures any process execution errors
cwd string // current working directory for task
conn grpc.TaskService_ConnectClient // gRPC stream connection for task service
interfaces.Logger
// log handling
scannerStdout *bufio.Reader // reader for process stdout
@@ -95,7 +94,7 @@ func (r *Runner) Init() (err error) {
// and status monitoring. Returns an error if the task execution fails.
func (r *Runner) Run() (err error) {
// log task started
log.Infof("task[%s] started", r.tid.Hex())
r.Infof("task[%s] started", r.tid.Hex())
// configure working directory
r.configureCwd()
@@ -164,10 +163,10 @@ func (r *Runner) Cancel(force bool) (err error) {
// Kill process
err = utils.KillProcess(r.cmd, force)
if err != nil {
log.Errorf("kill process error: %v", err)
r.Errorf("kill process error: %v", err)
return err
}
log.Debugf("attempt to kill process[%d]", r.pid)
r.Debugf("attempt to kill process[%d]", r.pid)
// Create a context with timeout
ctx, cancel := context.WithTimeout(context.Background(), r.svc.GetCancelTimeout())
@@ -219,7 +218,7 @@ func (r *Runner) configureCmd() (err error) {
// get cmd instance
r.cmd, err = utils.BuildCmd(cmdStr)
if err != nil {
log.Errorf("error building command: %v", err)
r.Errorf("error building command: %v", err)
return err
}
@@ -229,14 +228,14 @@ func (r *Runner) configureCmd() (err error) {
// Configure pipes for IPC
r.stdinPipe, err = r.cmd.StdinPipe()
if err != nil {
log.Errorf("error creating stdin pipe: %v", err)
r.Errorf("error creating stdin pipe: %v", err)
return err
}
// Add stdout pipe for IPC
r.stdoutPipe, err = r.cmd.StdoutPipe()
if err != nil {
log.Errorf("error creating stdout pipe: %v", err)
r.Errorf("error creating stdout pipe: %v", err)
return err
}
@@ -297,7 +296,7 @@ func (r *Runner) configureEnv() {
// Global environment variables
envs, err := client.NewModelService[models.Environment]().GetMany(nil, nil)
if err != nil {
trace.PrintError(err)
r.Errorf("error getting environment variables: %v", err)
return
}
for _, env := range envs {
@@ -350,20 +349,20 @@ func (r *Runner) syncFiles() (err error) {
// get file list from master
resp, err := r.createHttpRequest("GET", "/scan?path="+workingDir)
if err != nil {
log.Errorf("error getting file list from master: %v", err)
r.Errorf("error getting file list from master: %v", err)
return err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
log.Errorf("error reading response body: %v", err)
r.Errorf("error reading response body: %v", err)
return err
}
var masterFiles map[string]entity.FsFileInfo
err = json.Unmarshal(body, &masterFiles)
if err != nil {
log.Errorf("error unmarshaling JSON for URL: %s", resp.Request.URL.String())
log.Errorf("error details: %v", err)
r.Errorf("error unmarshaling JSON for URL: %s", resp.Request.URL.String())
r.Errorf("error details: %v", err)
return err
}
@@ -376,7 +375,7 @@ func (r *Runner) syncFiles() (err error) {
// create working directory if not exists
if _, err := os.Stat(r.cwd); os.IsNotExist(err) {
if err := os.MkdirAll(r.cwd, os.ModePerm); err != nil {
log.Errorf("error creating worker directory: %v", err)
r.Errorf("error creating worker directory: %v", err)
return err
}
}
@@ -384,17 +383,17 @@ func (r *Runner) syncFiles() (err error) {
// get file list from worker
workerFiles, err := utils.ScanDirectory(r.cwd)
if err != nil {
log.Errorf("error scanning worker directory: %v", err)
return trace.TraceError(err)
r.Errorf("error scanning worker directory: %v", err)
return err
}
// delete files that are deleted on master node
for path, workerFile := range workerFiles {
if _, exists := masterFilesMap[path]; !exists {
log.Infof("deleting file: %s", path)
r.Infof("deleting file: %s", path)
err := os.Remove(workerFile.FullPath)
if err != nil {
log.Errorf("error deleting file: %v", err)
r.Errorf("error deleting file: %v", err)
}
}
}
@@ -417,17 +416,17 @@ func (r *Runner) syncFiles() (err error) {
defer wg.Done()
if masterFile.IsDir {
log.Infof("directory needs to be synchronized: %s", path)
r.Infof("directory needs to be synchronized: %s", path)
_err := os.MkdirAll(filepath.Join(r.cwd, path), masterFile.Mode)
if _err != nil {
log.Errorf("error creating directory: %v", _err)
r.Errorf("error creating directory: %v", _err)
err = errors.Join(err, _err)
}
} else {
log.Infof("file needs to be synchronized: %s", path)
r.Infof("file needs to be synchronized: %s", path)
_err := r.downloadFile(path, filepath.Join(r.cwd, path), masterFile)
if _err != nil {
log.Errorf("error downloading file: %v", _err)
r.Errorf("error downloading file: %v", _err)
err = errors.Join(err, _err)
}
}
@@ -449,11 +448,11 @@ func (r *Runner) syncFiles() (err error) {
func (r *Runner) downloadFile(path string, filePath string, fileInfo *entity.FsFileInfo) error {
resp, err := r.createHttpRequest("GET", "/download?path="+path)
if err != nil {
log.Errorf("error getting file response: %v", err)
r.Errorf("error getting file response: %v", err)
return err
}
if resp.StatusCode != http.StatusOK {
log.Errorf("error downloading file: %s", resp.Status)
r.Errorf("error downloading file: %s", resp.Status)
return errors.New(resp.Status)
}
defer resp.Body.Close()
@@ -463,14 +462,14 @@ func (r *Runner) downloadFile(path string, filePath string, fileInfo *entity.FsF
utils.Exists(dirPath)
err = os.MkdirAll(dirPath, os.ModePerm)
if err != nil {
log.Errorf("error creating directory: %v", err)
r.Errorf("error creating directory: %v", err)
return err
}
// create local file
out, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, fileInfo.Mode)
if err != nil {
log.Errorf("error creating file: %v", err)
r.Errorf("error creating file: %v", err)
return err
}
defer out.Close()
@@ -478,7 +477,7 @@ func (r *Runner) downloadFile(path string, filePath string, fileInfo *entity.FsF
// copy file content to local file
_, err = io.Copy(out, resp.Body)
if err != nil {
log.Errorf("error copying file: %v", err)
r.Errorf("error copying file: %v", err)
return err
}
return nil
@@ -498,33 +497,33 @@ func (r *Runner) getHttpRequestHeaders() (headers map[string]string) {
func (r *Runner) wait() (err error) {
// start a goroutine to wait for process to finish
go func() {
log.Debugf("waiting for process[%d] to finish", r.pid)
r.Debugf("waiting for process[%d] to finish", r.pid)
err = r.cmd.Wait()
if err != nil {
var exitError *exec.ExitError
if !errors.As(err, &exitError) {
r.ch <- constants.TaskSignalError
log.Debugf("process[%d] exited with error: %v", r.pid, err)
r.Debugf("process[%d] exited with error: %v", r.pid, err)
return
}
exitCode := exitError.ExitCode()
if exitCode == -1 {
// cancel error
r.ch <- constants.TaskSignalCancel
log.Debugf("process[%d] cancelled", r.pid)
r.Debugf("process[%d] cancelled", r.pid)
return
}
// standard error
r.err = err
r.ch <- constants.TaskSignalError
log.Debugf("process[%d] exited with error: %v", r.pid, err)
r.Debugf("process[%d] exited with error: %v", r.pid, err)
return
}
// success
r.ch <- constants.TaskSignalFinish
log.Debugf("process[%d] exited successfully", r.pid)
r.Debugf("process[%d] exited successfully", r.pid)
}()
// declare task status
@@ -552,7 +551,7 @@ func (r *Runner) wait() (err error) {
// update task status
if err := r.updateTask(status, err); err != nil {
log.Errorf("error updating task status: %v", err)
r.Errorf("error updating task status: %v", err)
return err
}
@@ -601,7 +600,7 @@ func (r *Runner) updateTask(status string, e error) (err error) {
func (r *Runner) initConnection() (err error) {
r.conn, err = client2.GetGrpcClient().TaskClient.Connect(context.Background())
if err != nil {
log.Errorf("error connecting to task service: %v", err)
r.Errorf("error connecting to task service: %v", err)
return err
}
return nil
@@ -611,7 +610,7 @@ func (r *Runner) initConnection() (err error) {
func (r *Runner) writeLogLines(lines []string) {
linesBytes, err := json.Marshal(lines)
if err != nil {
log.Errorf("error marshaling log lines: %v", err)
r.Errorf("error marshaling log lines: %v", err)
return
}
msg := &grpc.TaskServiceConnectRequest{
@@ -620,7 +619,7 @@ func (r *Runner) writeLogLines(lines []string) {
Data: linesBytes,
}
if err := r.conn.Send(msg); err != nil {
log.Errorf("error sending log lines: %v", err)
r.Errorf("error sending log lines: %v", err)
return
}
}
@@ -631,7 +630,7 @@ func (r *Runner) writeLogLines(lines []string) {
func (r *Runner) _updateTaskStat(status string) {
ts, err := client.NewModelService[models.TaskStat]().GetById(r.tid)
if err != nil {
trace.PrintError(err)
r.Errorf("error getting task stat: %v", err)
return
}
switch status {
@@ -652,13 +651,13 @@ func (r *Runner) _updateTaskStat(status string) {
if r.svc.GetNodeConfigService().IsMaster() {
err = service.NewModelService[models.TaskStat]().ReplaceById(ts.Id, *ts)
if err != nil {
trace.PrintError(err)
r.Errorf("error updating task stat: %v", err)
return
}
} else {
err = client.NewModelService[models.TaskStat]().ReplaceById(ts.Id, *ts)
if err != nil {
trace.PrintError(err)
r.Errorf("error updating task stat: %v", err)
return
}
}
@@ -672,8 +671,7 @@ func (r *Runner) sendNotification() {
}
_, err := client2.GetGrpcClient().TaskClient.SendNotification(context.Background(), req)
if err != nil {
log.Errorf("error sending notification: %v", err)
trace.PrintError(err)
r.Errorf("error sending notification: %v", err)
return
}
}
@@ -686,7 +684,7 @@ func (r *Runner) _updateSpiderStat(status string) {
// task stat
ts, err := client.NewModelService[models.TaskStat]().GetById(r.tid)
if err != nil {
trace.PrintError(err)
r.Errorf("error getting task stat: %v", err)
return
}
@@ -715,8 +713,7 @@ func (r *Runner) _updateSpiderStat(status string) {
},
}
default:
log.Errorf("Invalid task status: %s", status)
trace.PrintError(errors.New("invalid task status"))
r.Errorf("Invalid task status: %s", status)
return
}
@@ -724,13 +721,13 @@ func (r *Runner) _updateSpiderStat(status string) {
if r.svc.GetNodeConfigService().IsMaster() {
err = service.NewModelService[models.SpiderStat]().UpdateById(r.s.Id, update)
if err != nil {
trace.PrintError(err)
r.Errorf("error updating spider stat: %v", err)
return
}
} else {
err = client.NewModelService[models.SpiderStat]().UpdateById(r.s.Id, update)
if err != nil {
trace.PrintError(err)
r.Errorf("error updating spider stat: %v", err)
return
}
}
@@ -755,14 +752,14 @@ func (r *Runner) handleIPC() {
// Convert message to JSON
jsonData, err := json.Marshal(msg)
if err != nil {
log.Errorf("error marshaling IPC message: %v", err)
r.Errorf("error marshaling IPC message: %v", err)
continue
}
// Write to child process's stdin
_, err = fmt.Fprintln(r.stdinPipe, string(jsonData))
if err != nil {
log.Errorf("error writing to child process: %v", err)
r.Errorf("error writing to child process: %v", err)
}
}
}
@@ -800,7 +797,7 @@ func (r *Runner) startIPCReader() {
if ipcMsg.Type == "" || ipcMsg.Type == constants.IPCMessageData {
r.handleIPCInsertDataMessage(ipcMsg)
} else {
log.Warnf("no IPC handler set for message: %v", ipcMsg)
r.Warnf("no IPC handler set for message: %v", ipcMsg)
}
}
} else {
@@ -815,7 +812,7 @@ func (r *Runner) startIPCReader() {
func (r *Runner) handleIPCInsertDataMessage(ipcMsg entity.IPCMessage) {
// Validate message
if ipcMsg.Payload == nil {
log.Errorf("empty payload in IPC message")
r.Errorf("empty payload in IPC message")
return
}
@@ -829,7 +826,7 @@ func (r *Runner) handleIPCInsertDataMessage(ipcMsg entity.IPCMessage) {
if itemMap, ok := item.(map[string]interface{}); ok {
records = append(records, itemMap)
} else {
log.Errorf("invalid record at index %d: %v", i, item)
r.Errorf("invalid record at index %d: %v", i, item)
continue
}
}
@@ -841,30 +838,30 @@ func (r *Runner) handleIPCInsertDataMessage(ipcMsg entity.IPCMessage) {
if itemMap, ok := payload.(map[string]interface{}); ok {
records = []map[string]interface{}{itemMap}
} else {
log.Errorf("invalid payload type: %T", payload)
r.Errorf("invalid payload type: %T", payload)
return
}
default:
log.Errorf("unsupported payload type: %T, value: %v", payload, ipcMsg.Payload)
r.Errorf("unsupported payload type: %T, value: %v", payload, ipcMsg.Payload)
return
}
// Validate records
if len(records) == 0 {
log.Warnf("no valid records to insert")
r.Warnf("no valid records to insert")
return
}
// Marshal data with error handling
data, err := json.Marshal(records)
if err != nil {
log.Errorf("error marshaling records: %v", err)
r.Errorf("error marshaling records: %v", err)
return
}
// Validate connection
if r.conn == nil {
log.Errorf("gRPC connection not initialized")
r.Errorf("gRPC connection not initialized")
return
}
@@ -882,11 +879,11 @@ func (r *Runner) handleIPCInsertDataMessage(ipcMsg entity.IPCMessage) {
// Use context for sending
select {
case <-ctx.Done():
log.Errorf("timeout sending IPC message")
r.Errorf("timeout sending IPC message")
return
default:
if err := r.conn.Send(grpcMsg); err != nil {
log.Errorf("error sending IPC message: %v", err)
r.Errorf("error sending IPC message: %v", err)
return
}
}
@@ -898,7 +895,6 @@ func newTaskRunner(id primitive.ObjectID, svc *Service) (r *Runner, err error) {
// validate options
if id.IsZero() {
err = fmt.Errorf("invalid task id: %s", id.Hex())
log.Errorf("error creating task runner: %v", err)
return nil, err
}
@@ -910,6 +906,7 @@ func newTaskRunner(id primitive.ObjectID, svc *Service) (r *Runner, err error) {
tid: id,
ch: make(chan constants.TaskSignal),
logBatchSize: 20,
Logger: utils.NewLogger("TaskRunner"),
}
// multi error
@@ -936,7 +933,7 @@ func newTaskRunner(id primitive.ObjectID, svc *Service) (r *Runner, err error) {
// initialize task runner
if err := r.Init(); err != nil {
log.Errorf("error initializing task runner: %v", err)
r.Errorf("error initializing task runner: %v", err)
errs.Errors = append(errs.Errors, err)
}

View File

@@ -4,17 +4,15 @@ import (
"context"
"errors"
"fmt"
"github.com/apex/log"
"github.com/crawlab-team/crawlab/core/constants"
errors2 "github.com/crawlab-team/crawlab/core/errors"
grpcclient "github.com/crawlab-team/crawlab/core/grpc/client"
"github.com/crawlab-team/crawlab/core/interfaces"
"github.com/crawlab-team/crawlab/core/models/client"
"github.com/crawlab-team/crawlab/core/models/models"
"github.com/crawlab-team/crawlab/core/models/service"
nodeconfig "github.com/crawlab-team/crawlab/core/node/config"
"github.com/crawlab-team/crawlab/core/utils"
"github.com/crawlab-team/crawlab/grpc"
"github.com/crawlab-team/crawlab/trace"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"io"
@@ -38,6 +36,7 @@ type Service struct {
mu sync.Mutex
runners sync.Map // pool of task runners started
syncLocks sync.Map // files sync locks map of task runners
interfaces.Logger
}
func (svc *Service) Start() {
@@ -123,7 +122,7 @@ func (svc *Service) reportStatus() {
case <-ticker.C:
// update node status
if err := svc.updateNodeStatus(); err != nil {
log.Errorf("failed to report status: %v", err)
svc.Errorf("failed to report status: %v", err)
}
}
}
@@ -161,7 +160,7 @@ func (svc *Service) GetTaskById(id primitive.ObjectID) (t *models.Task, err erro
t, err = client.NewModelService[models.Task]().GetById(id)
}
if err != nil {
log.Errorf("failed to get task by id: %v", err)
svc.Errorf("failed to get task by id: %v", err)
return nil, err
}
@@ -188,7 +187,7 @@ func (svc *Service) GetSpiderById(id primitive.ObjectID) (s *models.Spider, err
s, err = client.NewModelService[models.Spider]().GetById(id)
}
if err != nil {
log.Errorf("failed to get spider by id: %v", err)
svc.Errorf("failed to get spider by id: %v", err)
return nil, err
}
@@ -198,7 +197,7 @@ func (svc *Service) GetSpiderById(id primitive.ObjectID) (s *models.Spider, err
func (svc *Service) getRunnerCount() (count int) {
n, err := svc.GetCurrentNode()
if err != nil {
log.Errorf("failed to get current node: %v", err)
svc.Errorf("failed to get current node: %v", err)
return
}
query := bson.M{
@@ -210,13 +209,13 @@ func (svc *Service) getRunnerCount() (count int) {
if svc.cfgSvc.IsMaster() {
count, err = service.NewModelService[models.Task]().Count(query)
if err != nil {
log.Errorf("failed to count tasks: %v", err)
svc.Errorf("failed to count tasks: %v", err)
return
}
} else {
count, err = client.NewModelService[models.Task]().Count(query)
if err != nil {
log.Errorf("failed to count tasks: %v", err)
svc.Errorf("failed to count tasks: %v", err)
return
}
}
@@ -224,27 +223,31 @@ func (svc *Service) getRunnerCount() (count int) {
}
func (svc *Service) getRunner(taskId primitive.ObjectID) (r interfaces.TaskRunner, err error) {
log.Debugf("[TaskHandlerService] getRunner: taskId[%v]", taskId)
svc.Debugf("[TaskHandlerService] getRunner: taskId[%v]", taskId)
v, ok := svc.runners.Load(taskId)
if !ok {
return nil, trace.TraceError(errors2.ErrorTaskNotExists)
err = fmt.Errorf("task[%s] not exists", taskId.Hex())
svc.Errorf("get runner error: %v", err)
return nil, err
}
switch v.(type) {
case interfaces.TaskRunner:
r = v.(interfaces.TaskRunner)
default:
return nil, trace.TraceError(errors2.ErrorModelInvalidType)
err = fmt.Errorf("invalid type: %T", v)
svc.Errorf("get runner error: %v", err)
return nil, err
}
return r, nil
}
func (svc *Service) addRunner(taskId primitive.ObjectID, r interfaces.TaskRunner) {
log.Debugf("[TaskHandlerService] addRunner: taskId[%s]", taskId.Hex())
svc.Debugf("[TaskHandlerService] addRunner: taskId[%s]", taskId.Hex())
svc.runners.Store(taskId, r)
}
func (svc *Service) deleteRunner(taskId primitive.ObjectID) {
log.Debugf("[TaskHandlerService] deleteRunner: taskId[%v]", taskId)
svc.Debugf("[TaskHandlerService] deleteRunner: taskId[%v]", taskId)
svc.runners.Delete(taskId)
}
@@ -294,7 +297,7 @@ func (svc *Service) runTask(taskId primitive.ObjectID) (err error) {
_, ok := svc.runners.Load(taskId)
if ok {
err = fmt.Errorf("task[%s] already exists", taskId.Hex())
log.Errorf("run task error: %v", err)
svc.Errorf("run task error: %v", err)
return err
}
@@ -302,7 +305,7 @@ func (svc *Service) runTask(taskId primitive.ObjectID) (err error) {
r, err := newTaskRunner(taskId, svc)
if err != nil {
err = fmt.Errorf("failed to create task runner: %v", err)
log.Errorf("run task error: %v", err)
svc.Errorf("run task error: %v", err)
return err
}
@@ -318,22 +321,22 @@ func (svc *Service) runTask(taskId primitive.ObjectID) (err error) {
// create a goroutine to handle stream messages
go svc.handleStreamMessages(r.GetTaskId(), stream, stopCh)
} else {
log.Errorf("failed to subscribe task[%s]: %v", r.GetTaskId().Hex(), err)
log.Warnf("task[%s] will not be able to receive stream messages", r.GetTaskId().Hex())
svc.Errorf("failed to subscribe task[%s]: %v", r.GetTaskId().Hex(), err)
svc.Warnf("task[%s] will not be able to receive stream messages", r.GetTaskId().Hex())
}
// run task process (blocking) error or finish after task runner ends
if err := r.Run(); err != nil {
switch {
case errors.Is(err, constants.ErrTaskError):
log.Errorf("task[%s] finished with error: %v", r.GetTaskId().Hex(), err)
svc.Errorf("task[%s] finished with error: %v", r.GetTaskId().Hex(), err)
case errors.Is(err, constants.ErrTaskCancelled):
log.Errorf("task[%s] cancelled", r.GetTaskId().Hex())
svc.Errorf("task[%s] cancelled", r.GetTaskId().Hex())
default:
log.Errorf("task[%s] finished with unknown error: %v", r.GetTaskId().Hex(), err)
svc.Errorf("task[%s] finished with unknown error: %v", r.GetTaskId().Hex(), err)
}
}
log.Infof("task[%s] finished", r.GetTaskId().Hex())
svc.Infof("task[%s] finished", r.GetTaskId().Hex())
// send stopCh signal to stream message handler
stopCh <- struct{}{}
@@ -353,7 +356,7 @@ func (svc *Service) subscribeTask(taskId primitive.ObjectID) (stream grpc.TaskSe
}
stream, err = svc.c.TaskClient.Subscribe(ctx, req)
if err != nil {
log.Errorf("failed to subscribe task[%s]: %v", taskId.Hex(), err)
svc.Errorf("failed to subscribe task[%s]: %v", taskId.Hex(), err)
return nil, err
}
return stream, nil
@@ -365,7 +368,7 @@ func (svc *Service) handleStreamMessages(taskId primitive.ObjectID, stream grpc.
case <-stopCh:
err := stream.CloseSend()
if err != nil {
log.Errorf("task[%s] failed to close stream: %v", taskId.Hex(), err)
svc.Errorf("task[%s] failed to close stream: %v", taskId.Hex(), err)
return
}
return
@@ -373,15 +376,15 @@ func (svc *Service) handleStreamMessages(taskId primitive.ObjectID, stream grpc.
msg, err := stream.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
log.Infof("task[%s] received EOF, stream closed", taskId.Hex())
svc.Infof("task[%s] received EOF, stream closed", taskId.Hex())
return
}
log.Errorf("task[%s] stream error: %v", taskId.Hex(), err)
svc.Errorf("task[%s] stream error: %v", taskId.Hex(), err)
continue
}
switch msg.Code {
case grpc.TaskServiceSubscribeCode_CANCEL:
log.Infof("task[%s] received cancel signal", taskId.Hex())
svc.Infof("task[%s] received cancel signal", taskId.Hex())
go svc.handleCancel(msg, taskId)
}
}
@@ -391,28 +394,28 @@ func (svc *Service) handleStreamMessages(taskId primitive.ObjectID, stream grpc.
func (svc *Service) handleCancel(msg *grpc.TaskServiceSubscribeResponse, taskId primitive.ObjectID) {
// validate task id
if msg.TaskId != taskId.Hex() {
log.Errorf("task[%s] received cancel signal for another task[%s]", taskId.Hex(), msg.TaskId)
svc.Errorf("task[%s] received cancel signal for another task[%s]", taskId.Hex(), msg.TaskId)
return
}
// cancel task
err := svc.cancelTask(taskId, msg.Force)
if err != nil {
log.Errorf("task[%s] failed to cancel: %v", taskId.Hex(), err)
svc.Errorf("task[%s] failed to cancel: %v", taskId.Hex(), err)
return
}
log.Infof("task[%s] cancelled", taskId.Hex())
svc.Infof("task[%s] cancelled", taskId.Hex())
// set task status as "cancelled"
t, err := svc.GetTaskById(taskId)
if err != nil {
log.Errorf("task[%s] failed to get task: %v", taskId.Hex(), err)
svc.Errorf("task[%s] failed to get task: %v", taskId.Hex(), err)
return
}
t.Status = constants.TaskStatusCancelled
err = svc.UpdateTask(t)
if err != nil {
log.Errorf("task[%s] failed to update task: %v", taskId.Hex(), err)
svc.Errorf("task[%s] failed to update task: %v", taskId.Hex(), err)
}
}
@@ -436,6 +439,7 @@ func newTaskHandlerService() *Service {
cancelTimeout: 60 * time.Second,
mu: sync.Mutex{},
runners: sync.Map{},
Logger: utils.NewLogger("TaskHandlerService"),
}
// dependency injection

View File

@@ -3,10 +3,9 @@ package log
import (
"bufio"
"bytes"
"errors"
"github.com/apex/log"
"fmt"
"github.com/crawlab-team/crawlab/core/interfaces"
"github.com/crawlab-team/crawlab/core/utils"
"github.com/crawlab-team/crawlab/trace"
"github.com/spf13/viper"
"io"
"os"
@@ -24,6 +23,7 @@ type FileLogDriver struct {
// internals
mu sync.Mutex
interfaces.Logger
}
func (d *FileLogDriver) Init() {
@@ -43,18 +43,21 @@ func (d *FileLogDriver) WriteLine(id string, line string) (err error) {
f, err := os.OpenFile(filePath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, os.FileMode(0760))
if err != nil {
return trace.TraceError(err)
d.Errorf("open file error: %v", err)
return err
}
defer func(f *os.File) {
err := f.Close()
if err != nil {
log.Errorf("close file error: %s", err.Error())
d.Errorf("close file error: %v", err)
return
}
}(f)
_, err = f.WriteString(line + "\n")
if err != nil {
return trace.TraceError(err)
d.Errorf("write file error: %v", err)
return err
}
return nil
@@ -63,6 +66,7 @@ func (d *FileLogDriver) WriteLine(id string, line string) (err error) {
func (d *FileLogDriver) WriteLines(id string, lines []string) (err error) {
linesString := strings.Join(lines, "\n")
if err := d.WriteLine(id, linesString); err != nil {
d.Errorf("write line error: %v", err)
return err
}
return nil
@@ -70,7 +74,9 @@ func (d *FileLogDriver) WriteLines(id string, lines []string) (err error) {
func (d *FileLogDriver) Find(id string, pattern string, skip int, limit int) (lines []string, err error) {
if pattern != "" {
return lines, errors.New("not implemented")
err = fmt.Errorf("find with pattern not implemented")
d.Errorf("%v", err)
return lines, err
}
if !utils.Exists(d.getLogFilePath(id, d.logFileName)) {
return nil, nil
@@ -78,7 +84,8 @@ func (d *FileLogDriver) Find(id string, pattern string, skip int, limit int) (li
f, err := os.Open(d.getLogFilePath(id, d.logFileName))
if err != nil {
return nil, trace.TraceError(err)
d.Errorf("failed to open file: %v", err)
return nil, err
}
defer f.Close()
@@ -110,7 +117,9 @@ func (d *FileLogDriver) Find(id string, pattern string, skip int, limit int) (li
func (d *FileLogDriver) Count(id string, pattern string) (n int, err error) {
if pattern != "" {
return n, errors.New("not implemented")
err = fmt.Errorf("count with pattern not implemented")
d.Errorf("%v", err)
return n, err
}
if !utils.Exists(d.getLogFilePath(id, d.logFileName)) {
return 0, nil
@@ -118,7 +127,8 @@ func (d *FileLogDriver) Count(id string, pattern string) (n int, err error) {
f, err := os.Open(d.getLogFilePath(id, d.logFileName))
if err != nil {
return n, trace.TraceError(err)
d.Errorf("failed to open file: %v", err)
return n, err
}
return d.lineCounter(f)
}
@@ -142,7 +152,7 @@ func (d *FileLogDriver) getLogFilePath(id, fileName string) (filePath string) {
func (d *FileLogDriver) getLogFiles(id string) (files []os.FileInfo) {
files, err := utils.ListDir(d.getBasePath(id))
if err != nil {
log.Errorf("failed to list log files: %s", err.Error())
d.Errorf("failed to list log files: %v", err)
return nil
}
return
@@ -151,7 +161,8 @@ func (d *FileLogDriver) getLogFiles(id string) (files []os.FileInfo) {
func (d *FileLogDriver) initDir(id string) {
if !utils.Exists(d.getBasePath(id)) {
if err := os.MkdirAll(d.getBasePath(id), os.FileMode(0770)); err != nil {
trace.PrintError(err)
d.Errorf("failed to create log directory: %s", d.getBasePath(id))
return
}
}
}
@@ -218,7 +229,7 @@ func (d *FileLogDriver) getTtl() time.Duration {
func (d *FileLogDriver) cleanup() {
// check if log path is set
if utils.GetTaskLogPath() == "" {
log.Errorf("log path is not set")
d.Errorf("log path is not set")
return
}
@@ -226,7 +237,7 @@ func (d *FileLogDriver) cleanup() {
if !utils.Exists(utils.GetTaskLogPath()) {
// create log directory if not exists
if err := os.MkdirAll(utils.GetTaskLogPath(), os.FileMode(0770)); err != nil {
log.Errorf("failed to create log directory: %s", utils.GetTaskLogPath())
d.Errorf("failed to create log directory: %s", utils.GetTaskLogPath())
return
}
}
@@ -238,16 +249,16 @@ func (d *FileLogDriver) cleanup() {
case <-ticker.C:
dirs, err := utils.ListDir(utils.GetTaskLogPath())
if err != nil {
log.Errorf("failed to list log directory: %s", utils.GetTaskLogPath())
d.Errorf("failed to list log directory: %s", utils.GetTaskLogPath())
continue
}
for _, dir := range dirs {
if time.Now().After(dir.ModTime().Add(d.getTtl())) {
if err := os.RemoveAll(d.getBasePath(dir.Name())); err != nil {
log.Errorf("failed to remove outdated log directory: %s", d.getBasePath(dir.Name()))
d.Errorf("failed to remove outdated log directory: %s", d.getBasePath(dir.Name()))
continue
}
log.Infof("removed outdated log directory: %s", d.getBasePath(dir.Name()))
d.Infof("removed outdated log directory: %s", d.getBasePath(dir.Name()))
}
}
}
@@ -259,6 +270,7 @@ func newFileLogDriver() Driver {
driver := &FileLogDriver{
logFileName: "log.txt",
mu: sync.Mutex{},
Logger: utils.NewLogger("FileLogDriver"),
}
// init

View File

@@ -3,16 +3,14 @@ package scheduler
import (
errors2 "errors"
"fmt"
"github.com/apex/log"
"github.com/crawlab-team/crawlab/core/constants"
"github.com/crawlab-team/crawlab/core/errors"
"github.com/crawlab-team/crawlab/core/grpc/server"
"github.com/crawlab-team/crawlab/core/interfaces"
"github.com/crawlab-team/crawlab/core/models/models"
"github.com/crawlab-team/crawlab/core/models/service"
"github.com/crawlab-team/crawlab/core/task/handler"
"github.com/crawlab-team/crawlab/core/utils"
"github.com/crawlab-team/crawlab/grpc"
"github.com/crawlab-team/crawlab/trace"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
mongo2 "go.mongodb.org/mongo-driver/mongo"
@@ -27,6 +25,9 @@ type Service struct {
// settings
interval time.Duration
// internals
interfaces.Logger
}
func (svc *Service) Start() {
@@ -57,7 +58,8 @@ func (svc *Service) Enqueue(t *models.Task, by primitive.ObjectID) (t2 *models.T
// add task stat
_, err = service.NewModelService[models.TaskStat]().InsertOne(ts)
if err != nil {
return nil, trace.TraceError(err)
svc.Errorf("failed to add task stat: %s", t.Id.Hex())
return nil, err
}
// success
@@ -68,7 +70,7 @@ func (svc *Service) Cancel(id, by primitive.ObjectID, force bool) (err error) {
// task
t, err := service.NewModelService[models.Task]().GetById(id)
if err != nil {
log.Errorf("task not found: %s", id.Hex())
svc.Errorf("task not found: %s", id.Hex())
return err
}
@@ -101,7 +103,7 @@ func (svc *Service) Cancel(id, by primitive.ObjectID, force bool) (err error) {
func (svc *Service) cancelOnMaster(t *models.Task, by primitive.ObjectID, force bool) (err error) {
if err := svc.handlerSvc.Cancel(t.Id, force); err != nil {
log.Errorf("failed to cancel task on master: %s", t.Id.Hex())
svc.Errorf("failed to cancel task on master: %s", t.Id.Hex())
return err
}
@@ -115,7 +117,7 @@ func (svc *Service) cancelOnWorker(t *models.Task, by primitive.ObjectID, force
stream, ok := svc.svr.TaskSvr.GetSubscribeStream(t.Id)
if !ok {
err := fmt.Errorf("stream not found for task: %s", t.Id.Hex())
log.Errorf(err.Error())
svc.Errorf(err.Error())
t.Status = constants.TaskStatusAbnormal
t.Error = err.Error()
return svc.SaveTask(t, by)
@@ -128,7 +130,7 @@ func (svc *Service) cancelOnWorker(t *models.Task, by primitive.ObjectID, force
Force: force,
})
if err != nil {
log.Errorf("failed to send cancel request to worker: %s", t.Id.Hex())
svc.Errorf("failed to send cancel request to worker: %s", t.Id.Hex())
return err
}
@@ -167,14 +169,15 @@ func (svc *Service) initTaskStatus() {
if errors2.Is(err, mongo2.ErrNoDocuments) {
return
}
log.Errorf("failed to get running tasks: %v", err)
svc.Errorf("failed to get running tasks: %v", err)
return
}
for _, t := range runningTasks {
go func(t *models.Task) {
t.Status = constants.TaskStatusAbnormal
if err := svc.SaveTask(t, primitive.NilObjectID); err != nil {
trace.PrintError(err)
svc.Errorf("failed to set task status as TaskStatusAbnormal: %s", t.Id.Hex())
return
}
}(&t)
}
@@ -182,14 +185,18 @@ func (svc *Service) initTaskStatus() {
func (svc *Service) isMasterNode(t *models.Task) (ok bool, err error) {
if t.NodeId.IsZero() {
return false, trace.TraceError(errors.ErrorTaskNoNodeId)
err = fmt.Errorf("task %s has no node id", t.Id.Hex())
svc.Errorf("%v", err)
return false, err
}
n, err := service.NewModelService[models.Node]().GetById(t.NodeId)
if err != nil {
if errors2.Is(err, mongo2.ErrNoDocuments) {
return false, trace.TraceError(errors.ErrorTaskNodeNotFound)
svc.Errorf("node not found: %s", t.NodeId.Hex())
return false, err
}
return false, trace.TraceError(err)
svc.Errorf("failed to get node: %s", t.NodeId.Hex())
return false, err
}
return n.IsMaster, nil
}
@@ -218,14 +225,14 @@ func (svc *Service) cleanupTasks() {
if err := service.NewModelService[models.Task]().DeleteMany(bson.M{
"_id": bson.M{"$in": ids},
}); err != nil {
trace.PrintError(err)
svc.Warnf("failed to remove tasks: %v", err)
}
// remove task stats
if err := service.NewModelService[models.TaskStat]().DeleteMany(bson.M{
"_id": bson.M{"$in": ids},
}); err != nil {
trace.PrintError(err)
svc.Warnf("failed to remove task stats: %v", err)
}
}

View File

@@ -1,7 +1,6 @@
package stats
import (
log2 "github.com/apex/log"
"github.com/crawlab-team/crawlab/core/constants"
"github.com/crawlab-team/crawlab/core/database"
interfaces2 "github.com/crawlab-team/crawlab/core/database/interfaces"
@@ -12,7 +11,6 @@ import (
"github.com/crawlab-team/crawlab/core/task/log"
"github.com/crawlab-team/crawlab/core/utils"
"github.com/crawlab-team/crawlab/db/mongo"
"github.com/crawlab-team/crawlab/trace"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"sync"
@@ -37,6 +35,7 @@ type Service struct {
databaseServiceItems map[string]*databaseServiceItem
databaseServiceTll time.Duration
logDriver log.Driver
interfaces.Logger
}
func (svc *Service) Init() (err error) {
@@ -57,7 +56,7 @@ func (svc *Service) InsertData(taskId primitive.ObjectID, records ...map[string]
if utils.IsPro() && dbSvc != nil {
for _, record := range records {
if err := dbSvc.CreateRow(dbId, "", tableName, svc.normalizeRecord(item, record)); err != nil {
log2.Errorf("failed to insert data: %v", err)
svc.Errorf("failed to insert data: %v", err)
continue
}
count++
@@ -69,7 +68,7 @@ func (svc *Service) InsertData(taskId primitive.ObjectID, records ...map[string]
}
_, err = mongo.GetMongoCol(tableName).InsertMany(recordsToInsert)
if err != nil {
log2.Errorf("failed to insert data: %v", err)
svc.Errorf("failed to insert data: %v", err)
return err
}
count = len(records)
@@ -143,7 +142,8 @@ func (svc *Service) updateTaskStats(id primitive.ObjectID, resultCount int) {
},
})
if err != nil {
trace.PrintError(err)
svc.Errorf("failed to update task stats: %v", err)
return
}
}
@@ -182,6 +182,7 @@ func NewTaskStatsService() *Service {
mu: sync.Mutex{},
databaseServiceItems: map[string]*databaseServiceItem{},
databaseServiceTll: 10 * time.Minute,
Logger: utils.NewLogger("TaskStatsService"),
}
svc.nodeCfgSvc = nodeconfig.GetNodeConfigService()

View File

@@ -2,13 +2,12 @@ package user
import (
errors2 "errors"
"github.com/apex/log"
"github.com/crawlab-team/crawlab/core/constants"
"github.com/crawlab-team/crawlab/core/errors"
"github.com/crawlab-team/crawlab/core/interfaces"
"github.com/crawlab-team/crawlab/core/models/models"
"github.com/crawlab-team/crawlab/core/models/service"
"github.com/crawlab-team/crawlab/core/utils"
"github.com/crawlab-team/crawlab/trace"
"github.com/golang-jwt/jwt/v5"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
@@ -20,6 +19,7 @@ import (
type Service struct {
jwtSecret string
jwtSigningMethod jwt.SigningMethod
interfaces.Logger
}
func (svc *Service) Init() (err error) {
@@ -70,7 +70,7 @@ func (svc *Service) initPro() (err error) {
u.SetUpdatedAt(time.Now())
err = service.NewModelService[models.User]().ReplaceById(u.Id, *u)
if err != nil {
log.Errorf("failed to update user: %v", err)
svc.Errorf("failed to update user: %v", err)
}
return
}
@@ -227,12 +227,13 @@ func newUserService() (svc *Service, err error) {
svc = &Service{
jwtSecret: "crawlab",
jwtSigningMethod: jwt.SigningMethodHS256,
Logger: utils.NewLogger("UserService"),
}
// initialize
if err := svc.Init(); err != nil {
log.Errorf("failed to initialize user service: %v", err)
return nil, trace.TraceError(err)
svc.Errorf("failed to initialize user service: %v", err)
return nil, err
}
return svc, nil

View File

@@ -1,15 +0,0 @@
package utils
import (
"github.com/apex/log"
"github.com/cenkalti/backoff/v4"
"github.com/crawlab-team/crawlab/trace"
"time"
)
func BackoffErrorNotify(prefix string) backoff.Notify {
return func(err error, duration time.Duration) {
log.Errorf("%s error: %v. reattempt in %.1f seconds...", prefix, err, duration.Seconds())
trace.PrintError(err)
}
}

View File

@@ -2,7 +2,6 @@ package utils
import (
"fmt"
"github.com/apex/log"
"github.com/gin-gonic/gin"
"github.com/mitchellh/go-homedir"
"github.com/spf13/viper"
@@ -88,7 +87,7 @@ func IsPro() bool {
func GetWorkspace() string {
homedirPath, err := homedir.Dir()
if err != nil {
log.Warnf("cannot find home directory: %v", err)
logger.Warnf("cannot find home directory: %v", err)
return DefaultWorkspace
}
if res := viper.GetString("workspace"); res != "" {
@@ -232,8 +231,8 @@ func GetNodeMaxRunners() int {
func GetMetadataConfigPath() string {
var homeDirPath, err = homedir.Dir()
if err != nil {
log.Errorf("failed to get home directory: %v", err)
log.Errorf("please set metadata directory path using either CRAWLAB_METADATA environment variable or the metadata path in the configuration file")
logger.Errorf("failed to get home directory: %v", err)
logger.Errorf("please set metadata directory path using either CRAWLAB_METADATA environment variable or the metadata path in the configuration file")
panic(err)
}

View File

@@ -5,21 +5,18 @@ import (
"crypto/md5"
"encoding/hex"
"fmt"
"github.com/apex/log"
"github.com/crawlab-team/crawlab/core/entity"
"io"
"io/fs"
"os"
"path"
"path/filepath"
"runtime/debug"
)
func OpenFile(fileName string) *os.File {
file, err := os.OpenFile(fileName, os.O_CREATE|os.O_RDWR, os.ModePerm)
if err != nil {
log.Errorf("create file error: %s, file_name: %s", err.Error(), fileName)
debug.PrintStack()
logger.Errorf("create file error: %s, file_name: %s", err.Error(), fileName)
return nil
}
return file
@@ -45,8 +42,7 @@ func IsDir(path string) bool {
func ListDir(path string) ([]fs.FileInfo, error) {
list, err := os.ReadDir(path)
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
logger.Errorf("read dir error: %v, path: %s", err, path)
return nil, err
}
@@ -54,8 +50,7 @@ func ListDir(path string) ([]fs.FileInfo, error) {
for _, item := range list {
info, err := item.Info()
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
logger.Errorf("get file info error: %v, path: %s", err, item.Name())
return nil, err
}
res = append(res, info)

View File

@@ -2,7 +2,6 @@ package utils
import (
"fmt"
"github.com/apex/log"
"net/http"
)
@@ -10,8 +9,8 @@ func HandleHealthFn(healthFn func() bool, healthPort int) {
addr := fmt.Sprintf(":%d", healthPort)
go func() {
if err := http.ListenAndServe(addr, nil); err != nil {
log.Errorf("health check server failed: %v", err)
logger.Errorf("health check server failed: %v", err)
}
}()
log.Infof("health check server started on port %d", healthPort)
logger.Infof("health check server started on port %d", healthPort)
}

View File

@@ -2,21 +2,48 @@ package utils
import (
"fmt"
"io"
"os"
"sync"
"time"
"github.com/apex/log"
"github.com/apex/log/handlers/json"
"github.com/apex/log/handlers/text"
)
var logger = NewLogger("Utils")
// ServiceLogger represents a logger with a specific service prefix.
type ServiceLogger struct {
prefix string
}
// NewServiceLogger creates a new logger with the given service name as a prefix.
func NewServiceLogger(serviceName string) *ServiceLogger {
return &ServiceLogger{
prefix: serviceName,
type ServiceLoggerOption func(logger *ServiceLogger)
func WithHandler(handlerType string, output *os.File) ServiceLoggerOption {
return func(logger *ServiceLogger) {
SetHandler(handlerType, output)
}
}
// NewLogger creates a new logger with the given service name as a prefix.
func NewLogger(prefix string, opts ...ServiceLoggerOption) *ServiceLogger {
logger := &ServiceLogger{
prefix: prefix,
}
if len(opts) == 0 {
SetConsoleHandler()
} else {
for _, opt := range opts {
opt(logger)
}
}
return logger
}
// Debug logs a debug message.
func (l *ServiceLogger) Debug(message string) {
log.Debug(l.getFormat(message))
@@ -68,5 +95,90 @@ func (l *ServiceLogger) Fatalf(format string, args ...interface{}) {
}
func (l *ServiceLogger) getFormat(format string) string {
return fmt.Sprintf("[%s] %s", l.prefix, format)
timestamp := time.Now().Local().Format("2006-01-02 15:04:05")
return fmt.Sprintf("[%s] [%s] %s", timestamp, l.prefix, format)
}
type LogHandler struct {
mu sync.Mutex
}
func handleLog(w io.Writer, e *log.Entry) error {
color := text.Colors[e.Level]
level := text.Strings[e.Level]
names := e.Fields.Names()
fmt.Fprintf(w, "\033[%dm%6s\033[0m %-25s", color, level, e.Message)
for _, name := range names {
fmt.Fprintf(w, " \033[%dm%s\033[0m=%v", color, name, e.Fields.Get(name))
}
fmt.Fprintln(w)
return nil
}
// MultiHandler is a handler that routes logs to stdout/stderr based on level
type MultiHandler struct {
mu sync.Mutex
}
// NewMultiHandler creates a handler that routes logs to stdout/stderr based on level
func NewMultiHandler() *MultiHandler {
return &MultiHandler{
mu: sync.Mutex{},
}
}
// HandleLog implements log.Handler interface
func (h *MultiHandler) HandleLog(e *log.Entry) error {
h.mu.Lock()
defer h.mu.Unlock()
// Route to stderr for warn, error, and fatal
if e.Level <= log.WarnLevel {
return handleLog(os.Stdout, e)
}
// Route to stdout for debug and info
return handleLog(os.Stderr, e)
}
type ConsoleHandler struct {
mu sync.Mutex
}
func NewConsoleHandler() *ConsoleHandler {
return &ConsoleHandler{
mu: sync.Mutex{},
}
}
func (h *ConsoleHandler) HandleLog(e *log.Entry) error {
h.mu.Lock()
defer h.mu.Unlock()
return handleLog(os.Stdout, e)
}
// SetHandler to include the new option
func SetHandler(handlerType string, output *os.File) {
switch handlerType {
case "json":
log.SetHandler(json.New(output))
case "text":
log.SetHandler(text.New(output))
case "split":
SetMultiHandler()
default:
SetConsoleHandler()
}
}
func SetMultiHandler() {
log.SetHandler(NewMultiHandler())
}
func SetConsoleHandler() {
log.SetHandler(NewConsoleHandler())
}

View File

@@ -2,7 +2,6 @@ package utils
import (
"errors"
"github.com/apex/log"
"github.com/shirou/gopsutil/process"
"os/exec"
"runtime"
@@ -28,7 +27,7 @@ func ProcessIdExists(pid int) (exists bool) {
func processIdExistsWindows(pid int) (exists bool) {
exists, err := process.PidExists(int32(pid))
if err != nil {
log.Errorf("error checking if process exists: %v", err)
logger.Errorf("error checking if process exists: %v", err)
}
return exists
}
@@ -36,7 +35,7 @@ func processIdExistsWindows(pid int) (exists bool) {
func processIdExistsLinuxMac(pid int) (exists bool) {
exists, err := process.PidExists(int32(pid))
if err != nil {
log.Errorf("error checking if process exists: %v", err)
logger.Errorf("error checking if process exists: %v", err)
}
return exists
}
@@ -44,7 +43,7 @@ func processIdExistsLinuxMac(pid int) (exists bool) {
func GetProcesses() (processes []*process.Process, err error) {
processes, err = process.Processes()
if err != nil {
log.Errorf("error getting processes: %v", err)
logger.Errorf("error getting processes: %v", err)
return nil, err
}
return processes, nil
@@ -58,7 +57,7 @@ func KillProcess(cmd *exec.Cmd, force bool) error {
// process
p, err := process.NewProcess(int32(cmd.Process.Pid))
if err != nil {
log.Errorf("failed to get process: %v", err)
logger.Errorf("failed to get process: %v", err)
return err
}
@@ -71,7 +70,7 @@ func killProcessRecursive(p *process.Process, force bool) (err error) {
cps, err := p.Children()
if err != nil {
if !errors.Is(err, process.ErrorNoChildren) {
log.Errorf("failed to get children processes: %v", err)
logger.Errorf("failed to get children processes: %v", err)
} else if errors.Is(err, process.ErrorProcessNotRunning) {
return nil
}
@@ -95,7 +94,7 @@ func killProcess(p *process.Process, force bool) (err error) {
err = p.Terminate()
}
if err != nil {
log.Errorf("failed to kill process (force: %v): %v", force, err)
logger.Errorf("failed to kill process (force: %v): %v", force, err)
return err
}
return nil

View File

@@ -2,7 +2,6 @@ package utils
import (
"errors"
"github.com/apex/log"
"github.com/crawlab-team/crawlab/trace"
"regexp"
"strconv"
@@ -27,13 +26,13 @@ func GetTimeUnitParts(timeUnit string) (num int, unit string, err error) {
groups := re.FindStringSubmatch(timeUnit)
if len(groups) < 3 {
err = errors.New("failed to parse duration text")
log.Errorf("failed to parse duration text: %v", err)
logger.Errorf("failed to parse duration text: %v", err)
trace.PrintError(err)
return 0, "", err
}
num, err = strconv.Atoi(groups[1])
if err != nil {
log.Errorf("failed to convert string to int: %v", err)
logger.Errorf("failed to convert string to int: %v", err)
trace.PrintError(err)
return 0, "", err
}
@@ -44,7 +43,7 @@ func GetTimeUnitParts(timeUnit string) (num int, unit string, err error) {
func GetTimeDuration(num string, unit string) (d time.Duration, err error) {
numInt, err := strconv.Atoi(num)
if err != nil {
log.Errorf("failed to convert string to int: %v", err)
logger.Errorf("failed to convert string to int: %v", err)
trace.PrintError(err)
return d, err
}
@@ -65,7 +64,7 @@ func GetTimeDuration(num string, unit string) (d time.Duration, err error) {
d = time.Duration(numInt) * 365 * 24 * time.Hour
default:
err = errors.New("invalid time unit")
log.Errorf("invalid time unit: %v", unit)
logger.Errorf("invalid time unit: %v", unit)
trace.PrintError(err)
return d, err
}