From 71194131cde49dae47b0b3569fff9dfd2fc95492 Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Mon, 15 Jul 2024 17:34:04 +0800 Subject: [PATCH] refactor: Update SendNotification function to handle old and new settings triggers --- core/constants/notification.go | 7 +- core/entity/notification_variable.go | 6 + core/grpc/server/node_server_v2.go | 6 +- core/grpc/server/task_server_v2.go | 21 ++- core/node/service/master_service_v2.go | 11 +- core/notification/base_test.go | 73 --------- core/notification/mail.go | 2 +- core/notification/service_v2.go | 207 +++++++------------------ core/notification/service_v2_test.go | 39 +++++ 9 files changed, 131 insertions(+), 241 deletions(-) create mode 100644 core/entity/notification_variable.go delete mode 100644 core/notification/base_test.go create mode 100644 core/notification/service_v2_test.go diff --git a/core/constants/notification.go b/core/constants/notification.go index eb2b8f25..34cced2e 100644 --- a/core/constants/notification.go +++ b/core/constants/notification.go @@ -1,14 +1,17 @@ package constants +const ( + NotificationTriggerTargetTask = "task" + NotificationTriggerTargetNode = "node" +) + const ( NotificationTriggerTaskFinish = "task_finish" NotificationTriggerTaskError = "task_error" NotificationTriggerTaskEmptyResults = "task_empty_results" - NotificationTriggerTaskNever = "task_never" NotificationTriggerNodeStatusChange = "node_status_change" NotificationTriggerNodeOnline = "node_online" NotificationTriggerNodeOffline = "node_offline" - NotificationTriggerNodeNever = "node_never" ) const ( diff --git a/core/entity/notification_variable.go b/core/entity/notification_variable.go new file mode 100644 index 00000000..ad00c02f --- /dev/null +++ b/core/entity/notification_variable.go @@ -0,0 +1,6 @@ +package entity + +type NotificationVariable struct { + Category string `json:"category"` + Name string `json:"name"` +} diff --git a/core/grpc/server/node_server_v2.go b/core/grpc/server/node_server_v2.go index 4ef7de74..269ca7ef 100644 --- a/core/grpc/server/node_server_v2.go +++ b/core/grpc/server/node_server_v2.go @@ -30,7 +30,7 @@ type NodeServerV2 struct { } // Register from handler/worker to master -func (svr NodeServerV2) Register(ctx context.Context, req *grpc.NodeServiceRegisterRequest) (res *grpc.Response, err error) { +func (svr NodeServerV2) Register(_ context.Context, req *grpc.NodeServiceRegisterRequest) (res *grpc.Response, err error) { // unmarshall data if req.IsMaster { // error: cannot register master node @@ -84,7 +84,7 @@ func (svr NodeServerV2) Register(ctx context.Context, req *grpc.NodeServiceRegis } // SendHeartbeat from worker to master -func (svr NodeServerV2) SendHeartbeat(ctx context.Context, req *grpc.NodeServiceSendHeartbeatRequest) (res *grpc.Response, err error) { +func (svr NodeServerV2) SendHeartbeat(_ context.Context, req *grpc.NodeServiceSendHeartbeatRequest) (res *grpc.Response, err error) { // find in db node, err := service.NewModelServiceV2[models.NodeV2]().GetOne(bson.M{"key": req.Key}, nil) if err != nil { @@ -139,7 +139,7 @@ func (svr NodeServerV2) Subscribe(request *grpc.Request, stream grpc.NodeService } } -func (svr NodeServerV2) Unsubscribe(ctx context.Context, req *grpc.Request) (res *grpc.Response, err error) { +func (svr NodeServerV2) Unsubscribe(_ context.Context, req *grpc.Request) (res *grpc.Response, err error) { sub, err := svr.server.GetSubscribe("node:" + req.NodeKey) if err != nil { return nil, errors.ErrorGrpcSubscribeNotExists diff --git a/core/grpc/server/task_server_v2.go b/core/grpc/server/task_server_v2.go index 68cadaf9..51ccc198 100644 --- a/core/grpc/server/task_server_v2.go +++ b/core/grpc/server/task_server_v2.go @@ -109,8 +109,8 @@ func (svr TaskServerV2) Fetch(ctx context.Context, request *grpc.Request) (respo return HandleSuccessWithData(tid) } -func (svr TaskServerV2) SendNotification(ctx context.Context, request *grpc.Request) (response *grpc.Response, err error) { - svc := notification.GetNotificationServiceV2() +func (svr TaskServerV2) SendNotification(_ context.Context, request *grpc.Request) (response *grpc.Response, err error) { + // task var t = new(models2.TaskV2) if err := json.Unmarshal(request.Data, t); err != nil { return nil, trace.TraceError(err) @@ -119,6 +119,8 @@ func (svr TaskServerV2) SendNotification(ctx context.Context, request *grpc.Requ if err != nil { return nil, trace.TraceError(err) } + + // serialize task data td, err := json.Marshal(t) if err != nil { return nil, trace.TraceError(err) @@ -131,12 +133,19 @@ func (svr TaskServerV2) SendNotification(ctx context.Context, request *grpc.Requ if err != nil { return nil, trace.TraceError(err) } - settings, _, err := svc.GetSettingList(bson.M{ - "enabled": true, - }, nil, nil) + + // settings + settings, err := service.NewModelServiceV2[models2.NotificationSettingV2]().GetMany(bson.M{ + "enabled": true, + "trigger_target": constants.NotificationTriggerTargetTask, + }, nil) if err != nil { return nil, trace.TraceError(err) } + + // notification service + svc := notification.GetNotificationServiceV2() + for _, s := range settings { // compatible with old settings trigger := s.Trigger @@ -160,9 +169,9 @@ func (svr TaskServerV2) SendNotification(ctx context.Context, request *grpc.Requ _ = svc.Send(&s, e) } } - case constants.NotificationTriggerTaskNever: } } + return nil, nil } diff --git a/core/node/service/master_service_v2.go b/core/node/service/master_service_v2.go index 196a7448..1487547b 100644 --- a/core/node/service/master_service_v2.go +++ b/core/node/service/master_service_v2.go @@ -101,8 +101,14 @@ func (svc *MasterServiceV2) Stop() { func (svc *MasterServiceV2) Monitor() { log.Infof("master[%s] monitoring started", svc.GetConfigService().GetNodeKey()) + + // ticker + ticker := time.NewTicker(svc.monitorInterval) + for { - if err := svc.monitor(); err != nil { + // monitor + err := svc.monitor() + if err != nil { trace.PrintError(err) if svc.stopOnError { log.Errorf("master[%s] monitor error, now stopping...", svc.GetConfigService().GetNodeKey()) @@ -111,7 +117,8 @@ func (svc *MasterServiceV2) Monitor() { } } - time.Sleep(svc.monitorInterval) + // wait + <-ticker.C } } diff --git a/core/notification/base_test.go b/core/notification/base_test.go deleted file mode 100644 index 5b29810d..00000000 --- a/core/notification/base_test.go +++ /dev/null @@ -1,73 +0,0 @@ -package notification - -import ( - "github.com/crawlab-team/crawlab/core/interfaces" - "github.com/crawlab-team/crawlab/core/models/delegate" - "github.com/crawlab-team/crawlab/core/models/models" - "github.com/gavv/httpexpect/v2" - "github.com/spf13/viper" - "go.mongodb.org/mongo-driver/bson/primitive" - "net/http/httptest" - "testing" -) - -func init() { - viper.Set("mongo.db", "crawlab_test") - var err error - T, err = NewTest() - if err != nil { - panic(err) - } -} - -type Test struct { - svc *Service - svr *httptest.Server - - // test data - TestNode interfaces.Node - TestSpider interfaces.Spider - TestTask interfaces.Task - TestTaskStat interfaces.TaskStat -} - -func (t *Test) Setup(t2 *testing.T) { - _ = t.svc.Start() - t2.Cleanup(t.Cleanup) -} - -func (t *Test) Cleanup() { - _ = t.svc.Stop() -} - -func (t *Test) NewExpect(t2 *testing.T) (e *httpexpect.Expect) { - e = httpexpect.New(t2, t.svr.URL) - return e -} - -var T *Test - -func NewTest() (res *Test, err error) { - // test - t := &Test{ - svc: NewService(), - } - - // test node - t.TestNode = &models.Node{Id: primitive.NewObjectID(), Name: "test-node"} - _ = delegate.NewModelDelegate(t.TestNode).Add() - - // test spider - t.TestSpider = &models.Spider{Id: primitive.NewObjectID(), Name: "test-spider"} - _ = delegate.NewModelDelegate(t.TestSpider).Add() - - // test task - t.TestTask = &models.Task{Id: primitive.NewObjectID(), SpiderId: t.TestSpider.GetId(), NodeId: t.TestNode.GetId()} - _ = delegate.NewModelDelegate(t.TestTask).Add() - - // test task stat - t.TestTaskStat = &models.TaskStat{Id: t.TestTask.GetId()} - _ = delegate.NewModelDelegate(t.TestTaskStat).Add() - - return t, nil -} diff --git a/core/notification/mail.go b/core/notification/mail.go index c7f71a0e..c2f63289 100644 --- a/core/notification/mail.go +++ b/core/notification/mail.go @@ -28,7 +28,7 @@ func SendMail(m *models.NotificationSettingMail, to, cc, title, content string) // config port, _ := strconv.Atoi(m.Port) - password := m.Password // test password: ALWVDPRHBEXOENXD + password := m.Password SMTPUser := m.User smtpConfig := smtpAuthentication{ Server: m.Server, diff --git a/core/notification/service_v2.go b/core/notification/service_v2.go index 3bf0b5b1..ed91f0bf 100644 --- a/core/notification/service_v2.go +++ b/core/notification/service_v2.go @@ -6,28 +6,19 @@ import ( "github.com/crawlab-team/crawlab/core/entity" "github.com/crawlab-team/crawlab/core/models/models/v2" "github.com/crawlab-team/crawlab/core/models/service" - mongo2 "github.com/crawlab-team/crawlab/db/mongo" - parser "github.com/crawlab-team/crawlab/template-parser" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/bson/primitive" - "go.mongodb.org/mongo-driver/mongo" + "regexp" "sync" ) type ServiceV2 struct { } -func (svc *ServiceV2) Start() (err error) { +func (svc *ServiceV2) Start() { // initialize data if err := svc.initData(); err != nil { - return err + log.Errorf("[NotificationServiceV2] initializing data error: %v", err) + return } - - return nil -} - -func (svc *ServiceV2) Stop() (err error) { - return nil } func (svc *ServiceV2) initData() (err error) { @@ -175,187 +166,95 @@ Please find the task data as below. return nil } -func (svc *ServiceV2) Send(s *models.NotificationSettingV2, entity bson.M) (err error) { +func (svc *ServiceV2) Send(s *models.NotificationSettingV2, args ...any) (err error) { + content := svc.getContent(s, args...) switch s.Type { case TypeMail: - return svc.SendMail(s, entity) + return svc.SendMail(s, content) case TypeMobile: - return svc.SendMobile(s, entity) + return svc.SendMobile(s, content) } return nil } -func (svc *ServiceV2) SendMail(s *models.NotificationSettingV2, entity bson.M) (err error) { - // to - to, err := parser.Parse(s.Mail.To, entity) - if err != nil { - log.Warnf("parsing 'to' error: %v", err) - } - if to == "" { - return nil - } - - // cc - cc, err := parser.Parse(s.Mail.Cc, entity) - if err != nil { - log.Warnf("parsing 'cc' error: %v", err) - } - - // title - title, err := parser.Parse(s.Title, entity) - if err != nil { - log.Warnf("parsing 'title' error: %v", err) - } - - // content - content, err := parser.Parse(s.TemplateMarkdown, entity) - if err != nil { - log.Warnf("parsing 'content' error: %v", err) - } +func (svc *ServiceV2) SendMail(s *models.NotificationSettingV2, content string) (err error) { + // TODO: parse to/cc/bcc // send mail - if err := SendMail(&s.Mail, to, cc, title, content); err != nil { + if err := SendMail(&s.Mail, s.Mail.To, s.Mail.Cc, s.Title, content); err != nil { return err } return nil } -func (svc *ServiceV2) SendMobile(s *models.NotificationSettingV2, entity bson.M) (err error) { - // webhook - webhook, err := parser.Parse(s.Mobile.Webhook, entity) - if err != nil { - log.Warnf("parsing 'webhook' error: %v", err) - } - if webhook == "" { - return nil - } - - // title - title, err := parser.Parse(s.Title, entity) - if err != nil { - log.Warnf("parsing 'title' error: %v", err) - } - - // content - content, err := parser.Parse(s.TemplateMarkdown, entity) - if err != nil { - log.Warnf("parsing 'content' error: %v", err) - } - +func (svc *ServiceV2) SendMobile(s *models.NotificationSettingV2, content string) (err error) { // send - if err := SendMobileNotification(webhook, title, content); err != nil { + if err := SendMobileNotification(s.Mobile.Webhook, s.Title, content); err != nil { return err } return nil } -func (svc *ServiceV2) GetSettingList(query bson.M, pagination *entity.Pagination, sort bson.D) (res []models.NotificationSettingV2, total int, err error) { - // options - var options *mongo2.FindOptions - if pagination != nil || sort != nil { - options = new(mongo2.FindOptions) - if pagination != nil { - options.Skip = pagination.Size * (pagination.Page - 1) - options.Limit = pagination.Size - } - if sort != nil { - options.Sort = sort +func (svc *ServiceV2) getContent(s *models.NotificationSettingV2, args ...any) (content string) { + switch s.TriggerTarget { + case constants.NotificationTriggerTargetTask: + //task := new(models.TaskV2) + //taskStat := new(models.TaskStatV2) + //spider := new(models.SpiderV2) + //node := new(models.NodeV2) + //for _, arg := range args { + // switch arg.(type) { + // case models.TaskV2: + // task = arg.(*models.TaskV2) + // case models.TaskStatV2: + // taskStat = arg.(*models.TaskStatV2) + // case models.SpiderV2: + // spider = arg.(*models.SpiderV2) + // case models.NodeV2: + // node = arg.(*models.NodeV2) + // } + //} + switch s.TemplateMode { + case constants.NotificationTemplateModeMarkdown: + // TODO: implement + case constants.NotificationTemplateModeRichText: + //s.TemplateRichText } + + case constants.NotificationTriggerTargetNode: + } - // get list - list, err := service.NewModelServiceV2[models.NotificationSettingV2]().GetMany(query, options) - if err != nil { - if err.Error() == mongo.ErrNoDocuments.Error() { - return nil, 0, nil - } else { - return nil, 0, err - } - } - - // total count - total, err = service.NewModelServiceV2[models.NotificationSettingV2]().Count(query) - if err != nil { - return nil, 0, err - } - - return list, total, nil + return content } -func (svc *ServiceV2) GetSetting(id primitive.ObjectID) (res *models.NotificationSettingV2, err error) { - s, err := service.NewModelServiceV2[models.NotificationSettingV2]().GetById(id) - if err != nil { - return nil, err - } - return s, nil -} +func (svc *ServiceV2) parseTemplateVariables(s *models.NotificationSettingV2) (variables []entity.NotificationVariable) { + regex := regexp.MustCompile("\\$\\{(\\w+):(\\w+)}") -func (svc *ServiceV2) PosSetting(s *models.NotificationSettingV2) (err error) { - s.Id = primitive.NewObjectID() - _, err = service.NewModelServiceV2[models.NotificationSettingV2]().InsertOne(*s) - if err != nil { - return err - } - return nil -} + // find all matches + matches := regex.FindAllStringSubmatch(s.Template, -1) -func (svc *ServiceV2) PutSetting(id primitive.ObjectID, s models.NotificationSettingV2) (err error) { - err = service.NewModelServiceV2[models.NotificationSettingV2]().ReplaceById(id, s) - if err != nil { - return err + // iterate over matches + for _, match := range matches { + variables = append(variables, entity.NotificationVariable{ + Category: match[1], + Name: match[2], + }) } - return nil -} - -func (svc *ServiceV2) DeleteSetting(id primitive.ObjectID) (err error) { - err = service.NewModelServiceV2[models.NotificationSettingV2]().DeleteById(id) - if err != nil { - return err - } - - return nil -} - -func (svc *ServiceV2) EnableSetting(id primitive.ObjectID) (err error) { - return svc._toggleSettingFunc(true)(id) -} - -func (svc *ServiceV2) DisableSetting(id primitive.ObjectID) (err error) { - return svc._toggleSettingFunc(false)(id) -} - -func (svc *ServiceV2) _toggleSettingFunc(value bool) func(id primitive.ObjectID) error { - return func(id primitive.ObjectID) (err error) { - s, err := service.NewModelServiceV2[models.NotificationSettingV2]().GetById(id) - if err != nil { - return err - } - s.Enabled = value - err = service.NewModelServiceV2[models.NotificationSettingV2]().ReplaceById(id, *s) - if err != nil { - return err - } - return nil - } + return variables } func newNotificationServiceV2() *ServiceV2 { - // service - svc := &ServiceV2{} - - return svc + return &ServiceV2{} } var _serviceV2 *ServiceV2 var _serviceV2Once = new(sync.Once) func GetNotificationServiceV2() *ServiceV2 { - if _serviceV2 != nil { - return _serviceV2 - } _serviceV2Once.Do(func() { _serviceV2 = newNotificationServiceV2() }) diff --git a/core/notification/service_v2_test.go b/core/notification/service_v2_test.go new file mode 100644 index 00000000..11c9ca7d --- /dev/null +++ b/core/notification/service_v2_test.go @@ -0,0 +1,39 @@ +package notification + +import ( + "github.com/crawlab-team/crawlab/core/entity" + "github.com/crawlab-team/crawlab/core/models/models/v2" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestParseTemplateVariables_WithValidTemplate_ReturnsVariables(t *testing.T) { + svc := ServiceV2{} + template := "Dear ${user:name}, your task ${task:id} is ${task:status}." + expected := []entity.NotificationVariable{ + {Category: "user", Name: "name"}, + {Category: "task", Name: "id"}, + {Category: "task", Name: "status"}, + } + setting := models.NotificationSettingV2{Template: template} + + variables := svc.parseTemplateVariables(&setting) + + assert.Equal(t, expected, variables) +} + +func TestParseTemplateVariables_WithRepeatedVariables_ReturnsUniqueVariables(t *testing.T) { + svc := ServiceV2{} + template := "Dear ${user:name}, your task ${task:id} is ${task:status}. Again, ${user:name} and ${task:id}." + expected := []entity.NotificationVariable{ + {Category: "user", Name: "name"}, + {Category: "task", Name: "id"}, + {Category: "task", Name: "status"}, + } + setting := models.NotificationSettingV2{Template: template} + + variables := svc.parseTemplateVariables(&setting) + + assert.Equal(t, expected, variables) +}