From 9ffdd3f1cd2eb31efcab31cebf858ba4a64c544c Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Tue, 23 Jul 2024 12:22:59 +0800 Subject: [PATCH] refactor: Remove unused code and update models and functions for notification channels and settings --- .../server/model_base_service_v2_server.go | 1 + .../models/v2/notification_request_v2.go | 12 + core/models/models/v2/schedule_v2.go | 1 - core/models/models/v2/task_v2.go | 2 - core/notification/im.go | 51 ++++ core/notification/mail.go | 5 +- core/notification/mail_theme.go | 8 - core/notification/mail_theme_flat.go | 287 ------------------ core/notification/payload.go | 8 - core/notification/service_v2.go | 107 ++++++- core/spider/admin/service_v2.go | 3 - core/task/base.go | 97 ------ core/task/scheduler/service.go | 264 ---------------- core/task/stats/service.go | 155 ---------- 14 files changed, 169 insertions(+), 832 deletions(-) create mode 100644 core/models/models/v2/notification_request_v2.go delete mode 100644 core/notification/mail_theme.go delete mode 100644 core/notification/mail_theme_flat.go delete mode 100644 core/notification/payload.go delete mode 100644 core/task/base.go delete mode 100644 core/task/scheduler/service.go delete mode 100644 core/task/stats/service.go diff --git a/core/grpc/server/model_base_service_v2_server.go b/core/grpc/server/model_base_service_v2_server.go index 59ef7d11..90eec10d 100644 --- a/core/grpc/server/model_base_service_v2_server.go +++ b/core/grpc/server/model_base_service_v2_server.go @@ -28,6 +28,7 @@ var ( *new(models2.MetricV2), *new(models2.NodeV2), *new(models2.NotificationChannelV2), + *new(models2.NotificationRequestV2), *new(models2.NotificationSettingV2), *new(models2.PermissionV2), *new(models2.ProjectV2), diff --git a/core/models/models/v2/notification_request_v2.go b/core/models/models/v2/notification_request_v2.go new file mode 100644 index 00000000..1130cc92 --- /dev/null +++ b/core/models/models/v2/notification_request_v2.go @@ -0,0 +1,12 @@ +package models + +import "go.mongodb.org/mongo-driver/bson/primitive" + +type NotificationRequestV2 struct { + any `collection:"notification_requests"` + BaseModelV2[NotificationRequestV2] `bson:",inline"` + Status string `json:"status" bson:"status"` + Error string `json:"error,omitempty" bson:"error,omitempty"` + SettingId primitive.ObjectID `json:"setting_id" bson:"setting_id"` + ChannelId primitive.ObjectID `json:"channel_id" bson:"channel_id"` +} diff --git a/core/models/models/v2/schedule_v2.go b/core/models/models/v2/schedule_v2.go index a52f1b92..4d8cf42c 100644 --- a/core/models/models/v2/schedule_v2.go +++ b/core/models/models/v2/schedule_v2.go @@ -19,5 +19,4 @@ type ScheduleV2 struct { NodeIds []primitive.ObjectID `json:"node_ids" bson:"node_ids"` Priority int `json:"priority" bson:"priority"` Enabled bool `json:"enabled" bson:"enabled"` - UserId primitive.ObjectID `json:"user_id" bson:"user_id"` } diff --git a/core/models/models/v2/task_v2.go b/core/models/models/v2/task_v2.go index 3473b6db..d3f2b178 100644 --- a/core/models/models/v2/task_v2.go +++ b/core/models/models/v2/task_v2.go @@ -2,7 +2,6 @@ package models import ( "go.mongodb.org/mongo-driver/bson/primitive" - "time" ) type TaskV2 struct { @@ -26,5 +25,4 @@ type TaskV2 struct { SubTasks []TaskV2 `json:"sub_tasks,omitempty" bson:"-"` Spider *SpiderV2 `json:"spider,omitempty" bson:"-"` UserId primitive.ObjectID `json:"-" bson:"-"` - CreateTs time.Time `json:"create_ts" bson:"create_ts"` } diff --git a/core/notification/im.go b/core/notification/im.go index 85d9cac5..74856f81 100644 --- a/core/notification/im.go +++ b/core/notification/im.go @@ -2,6 +2,7 @@ package notification import ( "errors" + "github.com/apex/log" "github.com/crawlab-team/crawlab/core/models/models/v2" "github.com/crawlab-team/crawlab/trace" "github.com/imroc/req" @@ -15,6 +16,10 @@ type ResBody struct { func SendIMNotification(s *models.NotificationSettingV2, ch *models.NotificationChannelV2, content string) error { // TODO: compatibility with different IM providers + switch ch.Provider { + case ChannelIMProviderLark: + return sendImLark(ch, content) + } // request header header := req.Header{ @@ -63,3 +68,49 @@ func SendIMNotification(s *models.NotificationSettingV2, ch *models.Notification return nil } + +func getIMRequestHeader() req.Header { + return req.Header{ + "Content-Type": "application/json; charset=utf-8", + } +} + +func performIMRequest(webhookUrl string, data req.Param) error { + // perform request + res, err := req.Post(webhookUrl, getIMRequestHeader(), req.BodyJSON(&data)) + if err != nil { + log.Errorf("IM request error: %v", err) + return err + } + + // parse response + var resBody ResBody + if err := res.ToJSON(&resBody); err != nil { + log.Errorf("Parsing IM response error: %v", err) + return err + } + + // validate response code + if resBody.ErrCode != 0 { + log.Errorf("IM response error: %v", resBody.ErrMsg) + return errors.New(resBody.ErrMsg) + } + + return nil +} + +func sendImLark(ch *models.NotificationChannelV2, content string) error { + // request header + data := req.Param{ + "msg_type": "interactive", + "card": req.Param{ + "elements": []req.Param{ + { + "tag": "markdown", + "content": content, + }, + }, + }, + } + return performIMRequest(ch.WebhookUrl, data) +} diff --git a/core/notification/mail.go b/core/notification/mail.go index 5a6eeb19..549cc0e6 100644 --- a/core/notification/mail.go +++ b/core/notification/mail.go @@ -9,7 +9,6 @@ import ( "gopkg.in/gomail.v2" "net/mail" "regexp" - "runtime/debug" "strings" ) @@ -44,8 +43,8 @@ func SendMail(s *models.NotificationSettingV2, ch *models.NotificationChannelV2, // send the email if err := send(smtpConfig, options, content, text); err != nil { - log.Errorf(err.Error()) - debug.PrintStack() + log.Errorf("failed to send email: %v", err) + trace.PrintError(err) return err } diff --git a/core/notification/mail_theme.go b/core/notification/mail_theme.go deleted file mode 100644 index 9266ddd8..00000000 --- a/core/notification/mail_theme.go +++ /dev/null @@ -1,8 +0,0 @@ -package notification - -import "github.com/matcornic/hermes/v2" - -type MailTheme interface { - hermes.Theme - GetStyle() string -} diff --git a/core/notification/mail_theme_flat.go b/core/notification/mail_theme_flat.go deleted file mode 100644 index 6edb480f..00000000 --- a/core/notification/mail_theme_flat.go +++ /dev/null @@ -1,287 +0,0 @@ -package notification - -// MailThemeFlat is a theme -type MailThemeFlat struct{} - -// Name returns the name of the flat theme -func (dt *MailThemeFlat) Name() string { - return "flat" -} - -// HTMLTemplate returns a Golang template that will generate an HTML email. -func (dt *MailThemeFlat) HTMLTemplate() string { - return ` - - - - - - - - - - - - -
- - - - - - - - - - - - - - -
- - -` -} - -// PlainTextTemplate returns a Golang template that will generate an plain text email. -func (dt *MailThemeFlat) PlainTextTemplate() string { - return `{{ with .Email.Body.Intros }} - {{ range $line := . }} -

{{ $line }}

- {{ end }} -{{ end }} -{{ if (ne .Email.Body.FreeMarkdown "") }} - {{ .Email.Body.FreeMarkdown.ToHTML }} -{{ else }} - {{ with .Email.Body.Dictionary }} - - {{ end }} - {{ with .Email.Body.Table }} - {{ $data := .Data }} - {{ $columns := .Columns }} - {{ if gt (len $data) 0 }} - - - {{ $col := index $data 0 }} - {{ range $entry := $col }} - - {{ end }} - - {{ range $row := $data }} - - {{ range $cell := $row }} - - {{ end }} - - {{ end }} -
{{ $entry.Key }}
- {{ $cell.Value }} -
- {{ end }} - {{ end }} - {{ with .Email.Body.Actions }} - {{ range $action := . }} -

{{ $action.Instructions }} {{ $action.Button.Link }}

- {{ end }} - {{ end }} -{{ end }} -{{ with .Email.Body.Outros }} - {{ range $line := . }} -

{{ $line }}

- {{ end }} -{{ end }} -

{{.Email.Body.Signature}},
{{.Hermes.Product.Name}} - {{.Hermes.Product.Link}}

- -

{{.Hermes.Product.Copyright}}

-` -} - -func (dt *MailThemeFlat) GetStyle() string { - return ` - -` -} diff --git a/core/notification/payload.go b/core/notification/payload.go deleted file mode 100644 index 7337ebe0..00000000 --- a/core/notification/payload.go +++ /dev/null @@ -1,8 +0,0 @@ -package notification - -import "go.mongodb.org/mongo-driver/bson/primitive" - -type SendPayload struct { - TaskId primitive.ObjectID `json:"task_id"` - Data string `json:"data"` -} diff --git a/core/notification/service_v2.go b/core/notification/service_v2.go index ded20c93..2f2118ac 100644 --- a/core/notification/service_v2.go +++ b/core/notification/service_v2.go @@ -11,6 +11,7 @@ import ( "regexp" "strings" "sync" + "time" ) type ServiceV2 struct { @@ -91,14 +92,108 @@ func (svc *ServiceV2) getTaskContent(template string, variables []entity.Notific content = strings.ReplaceAll(content, v.GetKey(), vd.Task.Id.Hex()) case "status": content = strings.ReplaceAll(content, v.GetKey(), vd.Task.Status) - case "priority": - content = strings.ReplaceAll(content, v.GetKey(), fmt.Sprintf("%d", vd.Task.Priority)) - case "mode": - content = strings.ReplaceAll(content, v.GetKey(), vd.Task.Mode) case "cmd": content = strings.ReplaceAll(content, v.GetKey(), vd.Task.Cmd) case "param": content = strings.ReplaceAll(content, v.GetKey(), vd.Task.Param) + case "error": + content = strings.ReplaceAll(content, v.GetKey(), vd.Task.Error) + case "pid": + content = strings.ReplaceAll(content, v.GetKey(), fmt.Sprintf("%d", vd.Task.Pid)) + case "type": + content = strings.ReplaceAll(content, v.GetKey(), vd.Task.Type) + case "mode": + content = strings.ReplaceAll(content, v.GetKey(), vd.Task.Mode) + case "priority": + content = strings.ReplaceAll(content, v.GetKey(), fmt.Sprintf("%d", vd.Task.Priority)) + case "created_at": + content = strings.ReplaceAll(content, v.GetKey(), vd.Task.CreatedAt.Format(time.DateTime)) + case "updated_at": + content = strings.ReplaceAll(content, v.GetKey(), vd.Task.UpdatedAt.Format(time.DateTime)) + } + + case "spider": + switch v.Name { + case "id": + content = strings.ReplaceAll(content, v.GetKey(), vd.Spider.Id.Hex()) + case "name": + content = strings.ReplaceAll(content, v.GetKey(), vd.Spider.Name) + case "type": + content = strings.ReplaceAll(content, v.GetKey(), vd.Spider.Type) + case "description": + content = strings.ReplaceAll(content, v.GetKey(), vd.Spider.Description) + case "mode": + content = strings.ReplaceAll(content, v.GetKey(), vd.Spider.Mode) + case "cmd": + content = strings.ReplaceAll(content, v.GetKey(), vd.Spider.Cmd) + case "param": + content = strings.ReplaceAll(content, v.GetKey(), vd.Spider.Param) + case "priority": + content = strings.ReplaceAll(content, v.GetKey(), fmt.Sprintf("%d", vd.Spider.Priority)) + case "created_at": + content = strings.ReplaceAll(content, v.GetKey(), vd.Spider.CreatedAt.Format(time.DateTime)) + case "updated_at": + content = strings.ReplaceAll(content, v.GetKey(), vd.Spider.UpdatedAt.Format(time.DateTime)) + } + + case "node": + switch v.Name { + case "id": + content = strings.ReplaceAll(content, v.GetKey(), vd.Node.Id.Hex()) + case "key": + content = strings.ReplaceAll(content, v.GetKey(), vd.Node.Key) + case "name": + content = strings.ReplaceAll(content, v.GetKey(), vd.Node.Name) + case "ip": + content = strings.ReplaceAll(content, v.GetKey(), vd.Node.Ip) + case "port": + content = strings.ReplaceAll(content, v.GetKey(), vd.Node.Port) + case "hostname": + content = strings.ReplaceAll(content, v.GetKey(), vd.Node.Hostname) + case "description": + content = strings.ReplaceAll(content, v.GetKey(), vd.Node.Description) + case "status": + content = strings.ReplaceAll(content, v.GetKey(), vd.Node.Status) + case "enabled": + content = strings.ReplaceAll(content, v.GetKey(), fmt.Sprintf("%t", vd.Node.Enabled)) + case "active": + content = strings.ReplaceAll(content, v.GetKey(), fmt.Sprintf("%t", vd.Node.Active)) + case "active_at": + content = strings.ReplaceAll(content, v.GetKey(), vd.Node.ActiveAt.Format("2006-01-02 15:04:05")) + case "available_runners": + content = strings.ReplaceAll(content, v.GetKey(), fmt.Sprintf("%d", vd.Node.AvailableRunners)) + case "max_runners": + content = strings.ReplaceAll(content, v.GetKey(), fmt.Sprintf("%d", vd.Node.MaxRunners)) + case "created_at": + content = strings.ReplaceAll(content, v.GetKey(), vd.Node.CreatedAt.Format(time.DateTime)) + case "updated_at": + content = strings.ReplaceAll(content, v.GetKey(), vd.Node.UpdatedAt.Format(time.DateTime)) + } + + case "schedule": + switch v.Name { + case "id": + content = strings.ReplaceAll(content, v.GetKey(), vd.Schedule.Id.Hex()) + case "name": + content = strings.ReplaceAll(content, v.GetKey(), vd.Schedule.Name) + case "description": + content = strings.ReplaceAll(content, v.GetKey(), vd.Schedule.Description) + case "cron": + content = strings.ReplaceAll(content, v.GetKey(), vd.Schedule.Cron) + case "cmd": + content = strings.ReplaceAll(content, v.GetKey(), vd.Schedule.Cmd) + case "param": + content = strings.ReplaceAll(content, v.GetKey(), vd.Schedule.Param) + case "mode": + content = strings.ReplaceAll(content, v.GetKey(), vd.Schedule.Mode) + case "priority": + content = strings.ReplaceAll(content, v.GetKey(), fmt.Sprintf("%d", vd.Schedule.Priority)) + case "enabled": + content = strings.ReplaceAll(content, v.GetKey(), fmt.Sprintf("%t", vd.Schedule.Enabled)) + case "created_at": + content = strings.ReplaceAll(content, v.GetKey(), vd.Schedule.CreatedAt.Format(time.DateTime)) + case "updated_at": + content = strings.ReplaceAll(content, v.GetKey(), vd.Schedule.UpdatedAt.Format(time.DateTime)) } } } @@ -153,6 +248,10 @@ func (svc *ServiceV2) parseTemplateVariables(template string) (variables []entit return variables } +func (svc *ServiceV2) getUserById(id primitive.ObjectID) { + +} + func newNotificationServiceV2() *ServiceV2 { return &ServiceV2{} } diff --git a/core/spider/admin/service_v2.go b/core/spider/admin/service_v2.go index 543fe453..b3c7136a 100644 --- a/core/spider/admin/service_v2.go +++ b/core/spider/admin/service_v2.go @@ -15,7 +15,6 @@ import ( "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "sync" - "time" ) type ServiceV2 struct { @@ -51,7 +50,6 @@ func (svc *ServiceV2) scheduleTasks(s *models2.SpiderV2, opts *interfaces.Spider ScheduleId: opts.ScheduleId, Priority: opts.Priority, UserId: opts.UserId, - CreateTs: time.Now(), } mainTask.SetId(primitive.NewObjectID()) @@ -88,7 +86,6 @@ func (svc *ServiceV2) scheduleTasks(s *models2.SpiderV2, opts *interfaces.Spider ScheduleId: opts.ScheduleId, Priority: opts.Priority, UserId: opts.UserId, - CreateTs: time.Now(), } t.SetId(primitive.NewObjectID()) t2, err := svc.schedulerSvc.Enqueue(t, opts.UserId) diff --git a/core/task/base.go b/core/task/base.go deleted file mode 100644 index 90de0ac2..00000000 --- a/core/task/base.go +++ /dev/null @@ -1,97 +0,0 @@ -package task - -import ( - "fmt" - "github.com/crawlab-team/crawlab/core/constants" - "github.com/crawlab-team/crawlab/core/container" - "github.com/crawlab-team/crawlab/core/interfaces" - "github.com/crawlab-team/crawlab/core/models/delegate" - "github.com/crawlab-team/crawlab/core/models/service" - "github.com/crawlab-team/crawlab/core/utils" - "github.com/crawlab-team/crawlab/trace" - "go.mongodb.org/mongo-driver/bson/primitive" - "go.mongodb.org/mongo-driver/mongo" -) - -type BaseService struct { - // dependencies - interfaces.WithConfigPath - modelSvc service.ModelService - - // internals - stopped bool -} - -func (svc *BaseService) Init() error { - // implement me - return nil -} - -func (svc *BaseService) Start() { - // implement me -} - -func (svc *BaseService) Wait() { - utils.DefaultWait() -} - -func (svc *BaseService) Stop() { - svc.stopped = true -} - -// SaveTask deprecated -func (svc *BaseService) SaveTask(t interfaces.Task, status string) (err error) { - // normalize status - if status == "" { - status = constants.TaskStatusPending - } - - // set task status - t.SetStatus(status) - - // attempt to get task from database - _, err = svc.modelSvc.GetTaskById(t.GetId()) - if err != nil { - // if task does not exist, add to database - if err == mongo.ErrNoDocuments { - if err := delegate.NewModelDelegate(t).Add(); err != nil { - return err - } - return nil - } else { - return err - } - } else { - // otherwise, update - if err := delegate.NewModelDelegate(t).Save(); err != nil { - return err - } - return nil - } -} - -func (svc *BaseService) IsStopped() (res bool) { - return svc.stopped -} - -func (svc *BaseService) GetQueue(nodeId primitive.ObjectID) (queue string) { - if nodeId.IsZero() { - return fmt.Sprintf("%s", constants.TaskListQueuePrefixPublic) - } else { - return fmt.Sprintf("%s:%s", constants.TaskListQueuePrefixNodes, nodeId.Hex()) - } -} - -func NewBaseService() (svc2 interfaces.TaskBaseService, err error) { - svc := &BaseService{} - - // dependency injection - if err := container.GetContainer().Invoke(func(cfgPath interfaces.WithConfigPath, modelSvc service.ModelService) { - svc.WithConfigPath = cfgPath - svc.modelSvc = modelSvc - }); err != nil { - return nil, trace.TraceError(err) - } - - return svc, nil -} diff --git a/core/task/scheduler/service.go b/core/task/scheduler/service.go deleted file mode 100644 index 3644aa17..00000000 --- a/core/task/scheduler/service.go +++ /dev/null @@ -1,264 +0,0 @@ -package scheduler - -import ( - "github.com/crawlab-team/crawlab/core/constants" - "github.com/crawlab-team/crawlab/core/container" - "github.com/crawlab-team/crawlab/core/errors" - "github.com/crawlab-team/crawlab/core/interfaces" - "github.com/crawlab-team/crawlab/core/models/delegate" - "github.com/crawlab-team/crawlab/core/models/models" - "github.com/crawlab-team/crawlab/core/models/service" - "github.com/crawlab-team/crawlab/core/task" - "github.com/crawlab-team/crawlab/db/mongo" - grpc "github.com/crawlab-team/crawlab/grpc" - "github.com/crawlab-team/crawlab/trace" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/bson/primitive" - mongo2 "go.mongodb.org/mongo-driver/mongo" - "time" -) - -type Service struct { - // dependencies - interfaces.TaskBaseService - nodeCfgSvc interfaces.NodeConfigService - modelSvc service.ModelService - svr interfaces.GrpcServer - handlerSvc interfaces.TaskHandlerService - - // settings - interval time.Duration -} - -func (svc *Service) Start() { - go svc.initTaskStatus() - go svc.cleanupTasks() - svc.Wait() - svc.Stop() -} - -func (svc *Service) Enqueue(t interfaces.Task) (t2 interfaces.Task, err error) { - // set task status - t.SetStatus(constants.TaskStatusPending) - - // user - var u *models.User - if !t.GetUserId().IsZero() { - u, _ = svc.modelSvc.GetUserById(t.GetUserId()) - } - - // add task - if err = delegate.NewModelDelegate(t, u).Add(); err != nil { - return nil, err - } - - // task queue item - tq := &models.TaskQueueItem{ - Id: t.GetId(), - Priority: t.GetPriority(), - NodeId: t.GetNodeId(), - } - - // task stat - ts := &models.TaskStat{ - Id: t.GetId(), - CreateTs: time.Now(), - } - - // enqueue task - _, err = mongo.GetMongoCol(interfaces.ModelColNameTaskQueue).Insert(tq) - if err != nil { - return nil, trace.TraceError(err) - } - - // add task stat - _, err = mongo.GetMongoCol(interfaces.ModelColNameTaskStat).Insert(ts) - if err != nil { - return nil, trace.TraceError(err) - } - - // success - return t, nil -} - -func (svc *Service) Cancel(id primitive.ObjectID, args ...interface{}) (err error) { - // task - t, err := svc.modelSvc.GetTaskById(id) - if err != nil { - return trace.TraceError(err) - } - - // initial status - initialStatus := t.Status - - // set task status as "cancelled" - _ = svc.SaveTask(t, constants.TaskStatusCancelled) - - // set status of pending tasks as "cancelled" and remove from task item queue - if initialStatus == constants.TaskStatusPending { - // remove from task item queue - if err := mongo.GetMongoCol(interfaces.ModelColNameTaskQueue).DeleteId(t.GetId()); err != nil { - return trace.TraceError(err) - } - return nil - } - - // whether task is running on master node - isMasterTask, err := svc.isMasterNode(t) - if err != nil { - // when error, force status being set as "cancelled" - return svc.SaveTask(t, constants.TaskStatusCancelled) - } - - // node - n, err := svc.modelSvc.GetNodeById(t.GetNodeId()) - if err != nil { - return trace.TraceError(err) - } - - if isMasterTask { - // cancel task on master - if err := svc.handlerSvc.Cancel(id); err != nil { - return trace.TraceError(err) - } - // cancel success - return nil - } else { - // send to cancel task on worker nodes - if err := svc.svr.SendStreamMessageWithData("node:"+n.GetKey(), grpc.StreamMessageCode_CANCEL_TASK, t); err != nil { - return trace.TraceError(err) - } - // cancel success - return nil - } -} - -func (svc *Service) SetInterval(interval time.Duration) { - svc.interval = interval -} - -// initTaskStatus initialize task status of existing tasks -func (svc *Service) initTaskStatus() { - // set status of running tasks as TaskStatusAbnormal - runningTasks, err := svc.modelSvc.GetTaskList(bson.M{ - "status": bson.M{ - "$in": []string{ - constants.TaskStatusPending, - constants.TaskStatusRunning, - }, - }, - }, nil) - if err != nil { - if err == mongo2.ErrNoDocuments { - return - } - trace.PrintError(err) - } - for _, t := range runningTasks { - go func(t *models.Task) { - if err := svc.SaveTask(t, constants.TaskStatusAbnormal); err != nil { - trace.PrintError(err) - } - }(&t) - } - if err := svc.modelSvc.GetBaseService(interfaces.ModelIdTaskQueue).DeleteList(nil); err != nil { - return - } -} - -func (svc *Service) isMasterNode(t *models.Task) (ok bool, err error) { - if t.GetNodeId().IsZero() { - return false, trace.TraceError(errors.ErrorTaskNoNodeId) - } - n, err := svc.modelSvc.GetNodeById(t.GetNodeId()) - if err != nil { - if err == mongo2.ErrNoDocuments { - return false, trace.TraceError(errors.ErrorTaskNodeNotFound) - } - return false, trace.TraceError(err) - } - return n.IsMaster, nil -} - -func (svc *Service) cleanupTasks() { - for { - // task stats over 30 days ago - taskStats, err := svc.modelSvc.GetTaskStatList(bson.M{ - "create_ts": bson.M{ - "$lt": time.Now().Add(-30 * 24 * time.Hour), - }, - }, nil) - if err != nil { - time.Sleep(30 * time.Minute) - continue - } - - // task ids - var ids []primitive.ObjectID - for _, ts := range taskStats { - ids = append(ids, ts.Id) - } - - if len(ids) > 0 { - // remove tasks - if err := svc.modelSvc.GetBaseService(interfaces.ModelIdTask).DeleteList(bson.M{ - "_id": bson.M{"$in": ids}, - }); err != nil { - trace.PrintError(err) - } - - // remove task stats - if err := svc.modelSvc.GetBaseService(interfaces.ModelIdTaskStat).DeleteList(bson.M{ - "_id": bson.M{"$in": ids}, - }); err != nil { - trace.PrintError(err) - } - } - - time.Sleep(30 * time.Minute) - } -} - -func NewTaskSchedulerService() (svc2 interfaces.TaskSchedulerService, err error) { - // base service - baseSvc, err := task.NewBaseService() - if err != nil { - return nil, trace.TraceError(err) - } - - // service - svc := &Service{ - TaskBaseService: baseSvc, - interval: 5 * time.Second, - } - - // dependency injection - if err := container.GetContainer().Invoke(func( - nodeCfgSvc interfaces.NodeConfigService, - modelSvc service.ModelService, - svr interfaces.GrpcServer, - handlerSvc interfaces.TaskHandlerService, - ) { - svc.nodeCfgSvc = nodeCfgSvc - svc.modelSvc = modelSvc - svc.svr = svr - svc.handlerSvc = handlerSvc - }); err != nil { - return nil, err - } - - return svc, nil -} - -var svc interfaces.TaskSchedulerService - -func GetTaskSchedulerService() (svr interfaces.TaskSchedulerService, err error) { - if svc != nil { - return svc, nil - } - svc, err = NewTaskSchedulerService() - if err != nil { - return nil, err - } - return svc, nil -} diff --git a/core/task/stats/service.go b/core/task/stats/service.go deleted file mode 100644 index b6964c29..00000000 --- a/core/task/stats/service.go +++ /dev/null @@ -1,155 +0,0 @@ -package stats - -import ( - "github.com/crawlab-team/crawlab/core/container" - "github.com/crawlab-team/crawlab/core/interfaces" - "github.com/crawlab-team/crawlab/core/models/service" - "github.com/crawlab-team/crawlab/core/result" - "github.com/crawlab-team/crawlab/core/task" - "github.com/crawlab-team/crawlab/core/task/log" - "github.com/crawlab-team/crawlab/db/mongo" - "github.com/crawlab-team/crawlab/trace" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/bson/primitive" - "sync" - "time" -) - -type Service struct { - // dependencies - interfaces.TaskBaseService - nodeCfgSvc interfaces.NodeConfigService - modelSvc service.ModelService - - // internals - mu sync.Mutex - resultServices sync.Map - rsTtl time.Duration - logDriver log.Driver -} - -func (svc *Service) Init() (err error) { - go svc.cleanup() - return nil -} - -func (svc *Service) InsertData(id primitive.ObjectID, records ...interface{}) (err error) { - resultSvc, err := svc.getResultService(id) - if err != nil { - return err - } - if err := resultSvc.Insert(records...); err != nil { - return err - } - go svc.updateTaskStats(id, len(records)) - return nil -} - -func (svc *Service) InsertLogs(id primitive.ObjectID, logs ...string) (err error) { - return svc.logDriver.WriteLines(id.Hex(), logs) -} - -func (svc *Service) getResultService(id primitive.ObjectID) (resultSvc interfaces.ResultService, err error) { - // atomic operation - svc.mu.Lock() - defer svc.mu.Unlock() - - // attempt to get from cache - res, _ := svc.resultServices.Load(id.Hex()) - if res != nil { - // hit in cache - resultSvc, ok := res.(interfaces.ResultService) - resultSvc.SetTime(time.Now()) - if ok { - return resultSvc, nil - } - } - - // task - t, err := svc.modelSvc.GetTaskById(id) - if err != nil { - return nil, err - } - - // result service - resultSvc, err = result.GetResultService(t.SpiderId) - if err != nil { - return nil, err - } - - // store in cache - svc.resultServices.Store(id.Hex(), resultSvc) - - return resultSvc, nil -} - -func (svc *Service) updateTaskStats(id primitive.ObjectID, resultCount int) { - _ = mongo.GetMongoCol(interfaces.ModelColNameTaskStat).UpdateId(id, bson.M{ - "$inc": bson.M{ - "result_count": resultCount, - }, - }) -} - -func (svc *Service) cleanup() { - for { - // atomic operation - svc.mu.Lock() - - svc.resultServices.Range(func(key, value interface{}) bool { - rs := value.(interfaces.ResultService) - if time.Now().After(rs.GetTime().Add(svc.rsTtl)) { - svc.resultServices.Delete(key) - } - return true - }) - - svc.mu.Unlock() - - time.Sleep(10 * time.Minute) - } -} - -func NewTaskStatsService() (svc2 interfaces.TaskStatsService, err error) { - // base service - baseSvc, err := task.NewBaseService() - if err != nil { - return nil, trace.TraceError(err) - } - - // service - svc := &Service{ - mu: sync.Mutex{}, - TaskBaseService: baseSvc, - resultServices: sync.Map{}, - } - - // dependency injection - if err := container.GetContainer().Invoke(func(nodeCfgSvc interfaces.NodeConfigService, modelSvc service.ModelService) { - svc.nodeCfgSvc = nodeCfgSvc - svc.modelSvc = modelSvc - }); err != nil { - return nil, trace.TraceError(err) - } - - // log driver - svc.logDriver, err = log.GetLogDriver(log.DriverTypeFile) - if err != nil { - return nil, err - } - - return svc, nil -} - -var _service interfaces.TaskStatsService - -func GetTaskStatsService() (svr interfaces.TaskStatsService, err error) { - if _service != nil { - return _service, nil - } - _service, err = NewTaskStatsService() - if err != nil { - return nil, err - } - return _service, nil -}