From 7b1fa48fd9385c0ba480521675de85a309a8eecb Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Wed, 24 Jul 2024 17:00:35 +0800 Subject: [PATCH] feat: support notification for node --- core/grpc/server/node_server_v2.go | 11 ++++ core/grpc/server/task_server_v2.go | 4 ++ core/node/service/master_service_v2.go | 28 +++++++-- core/notification/entity.go | 2 +- core/notification/service_v2.go | 78 ++++++++++++++++++-------- 5 files changed, 92 insertions(+), 31 deletions(-) diff --git a/core/grpc/server/node_server_v2.go b/core/grpc/server/node_server_v2.go index 269ca7ef..d1936e3e 100644 --- a/core/grpc/server/node_server_v2.go +++ b/core/grpc/server/node_server_v2.go @@ -10,6 +10,8 @@ import ( "github.com/crawlab-team/crawlab/core/models/models/v2" "github.com/crawlab-team/crawlab/core/models/service" nodeconfig "github.com/crawlab-team/crawlab/core/node/config" + "github.com/crawlab-team/crawlab/core/notification" + "github.com/crawlab-team/crawlab/core/utils" "github.com/crawlab-team/crawlab/grpc" errors2 "github.com/pkg/errors" "go.mongodb.org/mongo-driver/bson" @@ -93,6 +95,7 @@ func (svr NodeServerV2) SendHeartbeat(_ context.Context, req *grpc.NodeServiceSe } return HandleError(err) } + oldStatus := node.Status // validate status if node.Status == constants.NodeStatusUnregistered { @@ -107,6 +110,14 @@ func (svr NodeServerV2) SendHeartbeat(_ context.Context, req *grpc.NodeServiceSe if err != nil { return HandleError(err) } + newStatus := node.Status + + // send notification if status changed + if utils.IsPro() { + if oldStatus != newStatus { + go notification.GetNotificationServiceV2().SendNodeNotification(node) + } + } return HandleSuccessWithData(node) } diff --git a/core/grpc/server/task_server_v2.go b/core/grpc/server/task_server_v2.go index 9cc28ed9..d90760fc 100644 --- a/core/grpc/server/task_server_v2.go +++ b/core/grpc/server/task_server_v2.go @@ -110,6 +110,10 @@ func (svr TaskServerV2) Fetch(ctx context.Context, request *grpc.Request) (respo } func (svr TaskServerV2) SendNotification(_ context.Context, request *grpc.TaskServiceSendNotificationRequest) (response *grpc.Response, err error) { + if !utils.IsPro() { + return nil, nil + } + // task id taskId, err := primitive.ObjectIDFromHex(request.TaskId) if err != nil { diff --git a/core/node/service/master_service_v2.go b/core/node/service/master_service_v2.go index 87f9c85d..580a4f57 100644 --- a/core/node/service/master_service_v2.go +++ b/core/node/service/master_service_v2.go @@ -12,6 +12,7 @@ import ( models2 "github.com/crawlab-team/crawlab/core/models/models/v2" "github.com/crawlab-team/crawlab/core/models/service" "github.com/crawlab-team/crawlab/core/node/config" + "github.com/crawlab-team/crawlab/core/notification" "github.com/crawlab-team/crawlab/core/schedule" "github.com/crawlab-team/crawlab/core/system" "github.com/crawlab-team/crawlab/core/task/handler" @@ -207,11 +208,12 @@ func (svc *MasterServiceV2) monitor() (err error) { wg.Add(len(workerNodes)) for _, n := range workerNodes { go func(n *models2.NodeV2) { + defer wg.Done() + // subscribe ok := svc.subscribeNode(n) if !ok { go svc.setWorkerNodeOffline(n) - wg.Done() return } @@ -219,19 +221,14 @@ func (svc *MasterServiceV2) monitor() (err error) { ok = svc.pingNodeClient(n) if !ok { go svc.setWorkerNodeOffline(n) - wg.Done() return } // update node available runners if err := svc.updateNodeAvailableRunners(n); err != nil { trace.PrintError(err) - wg.Done() return } - - // done - wg.Done() }(&n) } @@ -261,13 +258,24 @@ func (svc *MasterServiceV2) updateMasterNodeStatus() (err error) { if err != nil { return err } + oldStatus := node.Status + node.Status = constants.NodeStatusOnline node.Active = true node.ActiveAt = time.Now() + newStatus := node.Status + err = service.NewModelServiceV2[models2.NodeV2]().ReplaceById(node.Id, *node) if err != nil { return err } + + if utils.IsPro() { + if oldStatus != newStatus { + go svc.sendNotification(node) + } + } + return nil } @@ -280,6 +288,7 @@ func (svc *MasterServiceV2) setWorkerNodeOffline(node *models2.NodeV2) { if err != nil { trace.PrintError(err) } + svc.sendNotification(node) } func (svc *MasterServiceV2) subscribeNode(n *models2.NodeV2) (ok bool) { @@ -316,6 +325,13 @@ func (svc *MasterServiceV2) updateNodeAvailableRunners(node *models2.NodeV2) (er return nil } +func (svc *MasterServiceV2) sendNotification(node *models2.NodeV2) { + if !utils.IsPro() { + return + } + go notification.GetNotificationServiceV2().SendNodeNotification(node) +} + func newMasterServiceV2() (res *MasterServiceV2, err error) { // master service svc := &MasterServiceV2{ diff --git a/core/notification/entity.go b/core/notification/entity.go index 40b69eec..b7e41027 100644 --- a/core/notification/entity.go +++ b/core/notification/entity.go @@ -2,7 +2,7 @@ package notification import "github.com/crawlab-team/crawlab/core/models/models/v2" -type VariableDataTask struct { +type VariableData struct { Task *models.TaskV2 `json:"task"` TaskStat *models.TaskStatV2 `json:"task_stat"` Spider *models.SpiderV2 `json:"spider"` diff --git a/core/notification/service_v2.go b/core/notification/service_v2.go index c41c3a26..fae1ee16 100644 --- a/core/notification/service_v2.go +++ b/core/notification/service_v2.go @@ -7,7 +7,9 @@ import ( "github.com/crawlab-team/crawlab/core/entity" "github.com/crawlab-team/crawlab/core/models/models/v2" "github.com/crawlab-team/crawlab/core/models/service" + "github.com/crawlab-team/crawlab/trace" "github.com/gomarkdown/markdown" + "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "regexp" "strings" @@ -64,35 +66,28 @@ func (svc *ServiceV2) SendIM(s *models.NotificationSettingV2, ch *models.Notific } func (svc *ServiceV2) getContent(s *models.NotificationSettingV2, ch *models.NotificationChannelV2, args ...any) (content string) { - switch s.TriggerTarget { - case constants.NotificationTriggerTargetTask: - vd := svc.getTaskVariableData(args...) - switch s.TemplateMode { - case constants.NotificationTemplateModeMarkdown: - variables := svc.parseTemplateVariables(s.TemplateMarkdown) - content = svc.getTaskContent(s.TemplateMarkdown, variables, vd) - if ch.Type == TypeMail { - content = svc.convertMarkdownToHtml(content) - } - return content - case constants.NotificationTemplateModeRichText: - template := s.TemplateRichText - if ch.Type == TypeIM { - template = s.TemplateMarkdown - } - variables := svc.parseTemplateVariables(template) - return svc.getTaskContent(template, variables, vd) + vd := svc.getVariableData(args...) + switch s.TemplateMode { + case constants.NotificationTemplateModeMarkdown: + variables := svc.parseTemplateVariables(s.TemplateMarkdown) + content = svc.geContentWithVariables(s.TemplateMarkdown, variables, vd) + if ch.Type == TypeMail { + content = svc.convertMarkdownToHtml(content) } - - case constants.NotificationTriggerTargetNode: - // TODO: implement - + return content + case constants.NotificationTemplateModeRichText: + template := s.TemplateRichText + if ch.Type == TypeIM { + template = s.TemplateMarkdown + } + variables := svc.parseTemplateVariables(template) + return svc.geContentWithVariables(template, variables, vd) } return content } -func (svc *ServiceV2) getTaskContent(template string, variables []entity.NotificationVariable, vd VariableDataTask) (content string) { +func (svc *ServiceV2) geContentWithVariables(template string, variables []entity.NotificationVariable, vd VariableData) (content string) { content = template for _, v := range variables { switch v.Category { @@ -192,6 +187,8 @@ func (svc *ServiceV2) getTaskContent(template string, variables []entity.Notific content = strings.ReplaceAll(content, v.GetKey(), vd.Node.Key) case "name": content = strings.ReplaceAll(content, v.GetKey(), vd.Node.Name) + case "is_master": + content = strings.ReplaceAll(content, v.GetKey(), fmt.Sprintf("%t", vd.Node.IsMaster)) case "ip": content = strings.ReplaceAll(content, v.GetKey(), vd.Node.Ip) case "mac": @@ -260,7 +257,7 @@ func (svc *ServiceV2) getTaskContent(template string, variables []entity.Notific return content } -func (svc *ServiceV2) getTaskVariableData(args ...any) (vd VariableDataTask) { +func (svc *ServiceV2) getVariableData(args ...any) (vd VariableData) { for _, arg := range args { switch arg.(type) { case *models.TaskV2: @@ -331,6 +328,39 @@ func (svc *ServiceV2) convertMarkdownToHtml(content string) (html string) { return string(markdown.ToHTML([]byte(content), nil, nil)) } +func (svc *ServiceV2) SendNodeNotification(node *models.NodeV2) { + // arguments + var args []any + args = append(args, node) + + // settings + settings, err := service.NewModelServiceV2[models.NotificationSettingV2]().GetMany(bson.M{ + "enabled": true, + "trigger_target": constants.NotificationTriggerTargetNode, + }, nil) + if err != nil { + log.Errorf("get notification settings error: %v", err) + trace.PrintError(err) + return + } + + for _, s := range settings { + // send notification + switch s.Trigger { + case constants.NotificationTriggerNodeStatusChange: + go svc.Send(&s, args...) + case constants.NotificationTriggerNodeOnline: + if node.Status == constants.NodeStatusOnline { + go svc.Send(&s, args...) + } + case constants.NotificationTriggerNodeOffline: + if node.Status == constants.NodeStatusOffline { + go svc.Send(&s, args...) + } + } + } +} + func newNotificationServiceV2() *ServiceV2 { return &ServiceV2{} }