feat: support notification for node

This commit is contained in:
Marvin Zhang
2024-07-24 17:00:35 +08:00
parent 923921c17a
commit 7b1fa48fd9
5 changed files with 92 additions and 31 deletions

View File

@@ -10,6 +10,8 @@ import (
"github.com/crawlab-team/crawlab/core/models/models/v2" "github.com/crawlab-team/crawlab/core/models/models/v2"
"github.com/crawlab-team/crawlab/core/models/service" "github.com/crawlab-team/crawlab/core/models/service"
nodeconfig "github.com/crawlab-team/crawlab/core/node/config" nodeconfig "github.com/crawlab-team/crawlab/core/node/config"
"github.com/crawlab-team/crawlab/core/notification"
"github.com/crawlab-team/crawlab/core/utils"
"github.com/crawlab-team/crawlab/grpc" "github.com/crawlab-team/crawlab/grpc"
errors2 "github.com/pkg/errors" errors2 "github.com/pkg/errors"
"go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson"
@@ -93,6 +95,7 @@ func (svr NodeServerV2) SendHeartbeat(_ context.Context, req *grpc.NodeServiceSe
} }
return HandleError(err) return HandleError(err)
} }
oldStatus := node.Status
// validate status // validate status
if node.Status == constants.NodeStatusUnregistered { if node.Status == constants.NodeStatusUnregistered {
@@ -107,6 +110,14 @@ func (svr NodeServerV2) SendHeartbeat(_ context.Context, req *grpc.NodeServiceSe
if err != nil { if err != nil {
return HandleError(err) return HandleError(err)
} }
newStatus := node.Status
// send notification if status changed
if utils.IsPro() {
if oldStatus != newStatus {
go notification.GetNotificationServiceV2().SendNodeNotification(node)
}
}
return HandleSuccessWithData(node) return HandleSuccessWithData(node)
} }

View File

@@ -110,6 +110,10 @@ func (svr TaskServerV2) Fetch(ctx context.Context, request *grpc.Request) (respo
} }
func (svr TaskServerV2) SendNotification(_ context.Context, request *grpc.TaskServiceSendNotificationRequest) (response *grpc.Response, err error) { func (svr TaskServerV2) SendNotification(_ context.Context, request *grpc.TaskServiceSendNotificationRequest) (response *grpc.Response, err error) {
if !utils.IsPro() {
return nil, nil
}
// task id // task id
taskId, err := primitive.ObjectIDFromHex(request.TaskId) taskId, err := primitive.ObjectIDFromHex(request.TaskId)
if err != nil { if err != nil {

View File

@@ -12,6 +12,7 @@ import (
models2 "github.com/crawlab-team/crawlab/core/models/models/v2" models2 "github.com/crawlab-team/crawlab/core/models/models/v2"
"github.com/crawlab-team/crawlab/core/models/service" "github.com/crawlab-team/crawlab/core/models/service"
"github.com/crawlab-team/crawlab/core/node/config" "github.com/crawlab-team/crawlab/core/node/config"
"github.com/crawlab-team/crawlab/core/notification"
"github.com/crawlab-team/crawlab/core/schedule" "github.com/crawlab-team/crawlab/core/schedule"
"github.com/crawlab-team/crawlab/core/system" "github.com/crawlab-team/crawlab/core/system"
"github.com/crawlab-team/crawlab/core/task/handler" "github.com/crawlab-team/crawlab/core/task/handler"
@@ -207,11 +208,12 @@ func (svc *MasterServiceV2) monitor() (err error) {
wg.Add(len(workerNodes)) wg.Add(len(workerNodes))
for _, n := range workerNodes { for _, n := range workerNodes {
go func(n *models2.NodeV2) { go func(n *models2.NodeV2) {
defer wg.Done()
// subscribe // subscribe
ok := svc.subscribeNode(n) ok := svc.subscribeNode(n)
if !ok { if !ok {
go svc.setWorkerNodeOffline(n) go svc.setWorkerNodeOffline(n)
wg.Done()
return return
} }
@@ -219,19 +221,14 @@ func (svc *MasterServiceV2) monitor() (err error) {
ok = svc.pingNodeClient(n) ok = svc.pingNodeClient(n)
if !ok { if !ok {
go svc.setWorkerNodeOffline(n) go svc.setWorkerNodeOffline(n)
wg.Done()
return return
} }
// update node available runners // update node available runners
if err := svc.updateNodeAvailableRunners(n); err != nil { if err := svc.updateNodeAvailableRunners(n); err != nil {
trace.PrintError(err) trace.PrintError(err)
wg.Done()
return return
} }
// done
wg.Done()
}(&n) }(&n)
} }
@@ -261,13 +258,24 @@ func (svc *MasterServiceV2) updateMasterNodeStatus() (err error) {
if err != nil { if err != nil {
return err return err
} }
oldStatus := node.Status
node.Status = constants.NodeStatusOnline node.Status = constants.NodeStatusOnline
node.Active = true node.Active = true
node.ActiveAt = time.Now() node.ActiveAt = time.Now()
newStatus := node.Status
err = service.NewModelServiceV2[models2.NodeV2]().ReplaceById(node.Id, *node) err = service.NewModelServiceV2[models2.NodeV2]().ReplaceById(node.Id, *node)
if err != nil { if err != nil {
return err return err
} }
if utils.IsPro() {
if oldStatus != newStatus {
go svc.sendNotification(node)
}
}
return nil return nil
} }
@@ -280,6 +288,7 @@ func (svc *MasterServiceV2) setWorkerNodeOffline(node *models2.NodeV2) {
if err != nil { if err != nil {
trace.PrintError(err) trace.PrintError(err)
} }
svc.sendNotification(node)
} }
func (svc *MasterServiceV2) subscribeNode(n *models2.NodeV2) (ok bool) { func (svc *MasterServiceV2) subscribeNode(n *models2.NodeV2) (ok bool) {
@@ -316,6 +325,13 @@ func (svc *MasterServiceV2) updateNodeAvailableRunners(node *models2.NodeV2) (er
return nil return nil
} }
func (svc *MasterServiceV2) sendNotification(node *models2.NodeV2) {
if !utils.IsPro() {
return
}
go notification.GetNotificationServiceV2().SendNodeNotification(node)
}
func newMasterServiceV2() (res *MasterServiceV2, err error) { func newMasterServiceV2() (res *MasterServiceV2, err error) {
// master service // master service
svc := &MasterServiceV2{ svc := &MasterServiceV2{

View File

@@ -2,7 +2,7 @@ package notification
import "github.com/crawlab-team/crawlab/core/models/models/v2" import "github.com/crawlab-team/crawlab/core/models/models/v2"
type VariableDataTask struct { type VariableData struct {
Task *models.TaskV2 `json:"task"` Task *models.TaskV2 `json:"task"`
TaskStat *models.TaskStatV2 `json:"task_stat"` TaskStat *models.TaskStatV2 `json:"task_stat"`
Spider *models.SpiderV2 `json:"spider"` Spider *models.SpiderV2 `json:"spider"`

View File

@@ -7,7 +7,9 @@ import (
"github.com/crawlab-team/crawlab/core/entity" "github.com/crawlab-team/crawlab/core/entity"
"github.com/crawlab-team/crawlab/core/models/models/v2" "github.com/crawlab-team/crawlab/core/models/models/v2"
"github.com/crawlab-team/crawlab/core/models/service" "github.com/crawlab-team/crawlab/core/models/service"
"github.com/crawlab-team/crawlab/trace"
"github.com/gomarkdown/markdown" "github.com/gomarkdown/markdown"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/bson/primitive"
"regexp" "regexp"
"strings" "strings"
@@ -64,13 +66,11 @@ func (svc *ServiceV2) SendIM(s *models.NotificationSettingV2, ch *models.Notific
} }
func (svc *ServiceV2) getContent(s *models.NotificationSettingV2, ch *models.NotificationChannelV2, args ...any) (content string) { func (svc *ServiceV2) getContent(s *models.NotificationSettingV2, ch *models.NotificationChannelV2, args ...any) (content string) {
switch s.TriggerTarget { vd := svc.getVariableData(args...)
case constants.NotificationTriggerTargetTask:
vd := svc.getTaskVariableData(args...)
switch s.TemplateMode { switch s.TemplateMode {
case constants.NotificationTemplateModeMarkdown: case constants.NotificationTemplateModeMarkdown:
variables := svc.parseTemplateVariables(s.TemplateMarkdown) variables := svc.parseTemplateVariables(s.TemplateMarkdown)
content = svc.getTaskContent(s.TemplateMarkdown, variables, vd) content = svc.geContentWithVariables(s.TemplateMarkdown, variables, vd)
if ch.Type == TypeMail { if ch.Type == TypeMail {
content = svc.convertMarkdownToHtml(content) content = svc.convertMarkdownToHtml(content)
} }
@@ -81,18 +81,13 @@ func (svc *ServiceV2) getContent(s *models.NotificationSettingV2, ch *models.Not
template = s.TemplateMarkdown template = s.TemplateMarkdown
} }
variables := svc.parseTemplateVariables(template) variables := svc.parseTemplateVariables(template)
return svc.getTaskContent(template, variables, vd) return svc.geContentWithVariables(template, variables, vd)
}
case constants.NotificationTriggerTargetNode:
// TODO: implement
} }
return content return content
} }
func (svc *ServiceV2) getTaskContent(template string, variables []entity.NotificationVariable, vd VariableDataTask) (content string) { func (svc *ServiceV2) geContentWithVariables(template string, variables []entity.NotificationVariable, vd VariableData) (content string) {
content = template content = template
for _, v := range variables { for _, v := range variables {
switch v.Category { switch v.Category {
@@ -192,6 +187,8 @@ func (svc *ServiceV2) getTaskContent(template string, variables []entity.Notific
content = strings.ReplaceAll(content, v.GetKey(), vd.Node.Key) content = strings.ReplaceAll(content, v.GetKey(), vd.Node.Key)
case "name": case "name":
content = strings.ReplaceAll(content, v.GetKey(), vd.Node.Name) content = strings.ReplaceAll(content, v.GetKey(), vd.Node.Name)
case "is_master":
content = strings.ReplaceAll(content, v.GetKey(), fmt.Sprintf("%t", vd.Node.IsMaster))
case "ip": case "ip":
content = strings.ReplaceAll(content, v.GetKey(), vd.Node.Ip) content = strings.ReplaceAll(content, v.GetKey(), vd.Node.Ip)
case "mac": case "mac":
@@ -260,7 +257,7 @@ func (svc *ServiceV2) getTaskContent(template string, variables []entity.Notific
return content return content
} }
func (svc *ServiceV2) getTaskVariableData(args ...any) (vd VariableDataTask) { func (svc *ServiceV2) getVariableData(args ...any) (vd VariableData) {
for _, arg := range args { for _, arg := range args {
switch arg.(type) { switch arg.(type) {
case *models.TaskV2: case *models.TaskV2:
@@ -331,6 +328,39 @@ func (svc *ServiceV2) convertMarkdownToHtml(content string) (html string) {
return string(markdown.ToHTML([]byte(content), nil, nil)) return string(markdown.ToHTML([]byte(content), nil, nil))
} }
func (svc *ServiceV2) SendNodeNotification(node *models.NodeV2) {
// arguments
var args []any
args = append(args, node)
// settings
settings, err := service.NewModelServiceV2[models.NotificationSettingV2]().GetMany(bson.M{
"enabled": true,
"trigger_target": constants.NotificationTriggerTargetNode,
}, nil)
if err != nil {
log.Errorf("get notification settings error: %v", err)
trace.PrintError(err)
return
}
for _, s := range settings {
// send notification
switch s.Trigger {
case constants.NotificationTriggerNodeStatusChange:
go svc.Send(&s, args...)
case constants.NotificationTriggerNodeOnline:
if node.Status == constants.NodeStatusOnline {
go svc.Send(&s, args...)
}
case constants.NotificationTriggerNodeOffline:
if node.Status == constants.NodeStatusOffline {
go svc.Send(&s, args...)
}
}
}
}
func newNotificationServiceV2() *ServiceV2 { func newNotificationServiceV2() *ServiceV2 {
return &ServiceV2{} return &ServiceV2{}
} }