From d0611b4567348f819cc93bcd3c73fb21fa9669b3 Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Fri, 12 Jul 2024 18:00:19 +0800 Subject: [PATCH] refactor: removed unnecessary code --- core/apps/utils.go | 91 ------- core/grpc/server/task_server.go | 238 ----------------- core/node/service/master_service.go | 368 -------------------------- core/notification/models.go | 32 --- core/notification/service.go | 395 ---------------------------- core/notification/service_test.go | 19 -- 6 files changed, 1143 deletions(-) delete mode 100644 core/apps/utils.go delete mode 100644 core/grpc/server/task_server.go delete mode 100644 core/node/service/master_service.go delete mode 100644 core/notification/models.go delete mode 100644 core/notification/service.go delete mode 100644 core/notification/service_test.go diff --git a/core/apps/utils.go b/core/apps/utils.go deleted file mode 100644 index f0d279e3..00000000 --- a/core/apps/utils.go +++ /dev/null @@ -1,91 +0,0 @@ -package apps - -import ( - "fmt" - "github.com/apex/log" - "github.com/crawlab-team/crawlab/core/color" - "github.com/crawlab-team/crawlab/core/config" - "github.com/crawlab-team/crawlab/core/container" - grpcclient "github.com/crawlab-team/crawlab/core/grpc/client" - grpcserver "github.com/crawlab-team/crawlab/core/grpc/server" - modelsclient "github.com/crawlab-team/crawlab/core/models/client" - modelsservice "github.com/crawlab-team/crawlab/core/models/service" - nodeconfig "github.com/crawlab-team/crawlab/core/node/config" - "github.com/crawlab-team/crawlab/core/schedule" - "github.com/crawlab-team/crawlab/core/spider/admin" - "github.com/crawlab-team/crawlab/core/stats" - "github.com/crawlab-team/crawlab/core/task/handler" - "github.com/crawlab-team/crawlab/core/task/scheduler" - taskstats "github.com/crawlab-team/crawlab/core/task/stats" - "github.com/crawlab-team/crawlab/core/user" - "github.com/crawlab-team/crawlab/core/utils" - "github.com/crawlab-team/crawlab/trace" -) - -func Start(app App) { - start(app) -} - -func start(app App) { - app.Init() - go app.Start() - app.Wait() - app.Stop() -} - -func DefaultWait() { - utils.DefaultWait() -} - -func initModule(name string, fn func() error) (err error) { - if err := fn(); err != nil { - log.Error(fmt.Sprintf("init %s error: %s", name, err.Error())) - _ = trace.TraceError(err) - panic(err) - } - log.Info(fmt.Sprintf("initialized %s successfully", name)) - return nil -} - -func initApp(name string, app App) { - _ = initModule(name, func() error { - app.Init() - return nil - }) -} - -var injectors = []interface{}{ - modelsservice.GetService, - modelsclient.NewServiceDelegate, - modelsclient.NewNodeServiceDelegate, - modelsclient.NewSpiderServiceDelegate, - modelsclient.NewTaskServiceDelegate, - modelsclient.NewTaskStatServiceDelegate, - modelsclient.NewEnvironmentServiceDelegate, - grpcclient.NewClient, - grpcclient.NewPool, - grpcserver.NewModelDelegateServer, - grpcserver.NewModelBaseServiceServer, - grpcserver.NewNodeServer, - grpcserver.NewTaskServer, - grpcserver.NewMessageServer, - config.NewConfigPathService, - user.GetUserService, - schedule.GetScheduleService, - admin.GetSpiderAdminService, - stats.GetStatsService, - nodeconfig.GetNodeConfigService, - taskstats.GetTaskStatsService, - color.NewService, - scheduler.GetTaskSchedulerService, - handler.GetTaskHandlerService, -} - -func injectModules() { - c := container.GetContainer() - for _, injector := range injectors { - if err := c.Provide(injector); err != nil { - panic(err) - } - } -} diff --git a/core/grpc/server/task_server.go b/core/grpc/server/task_server.go deleted file mode 100644 index b6d08592..00000000 --- a/core/grpc/server/task_server.go +++ /dev/null @@ -1,238 +0,0 @@ -package server - -import ( - "context" - "encoding/json" - "github.com/apex/log" - "github.com/crawlab-team/crawlab/core/constants" - "github.com/crawlab-team/crawlab/core/container" - "github.com/crawlab-team/crawlab/core/entity" - "github.com/crawlab-team/crawlab/core/errors" - "github.com/crawlab-team/crawlab/core/interfaces" - "github.com/crawlab-team/crawlab/core/models/delegate" - "github.com/crawlab-team/crawlab/core/models/models" - "github.com/crawlab-team/crawlab/core/models/service" - "github.com/crawlab-team/crawlab/core/notification" - "github.com/crawlab-team/crawlab/core/utils" - "github.com/crawlab-team/crawlab/db/mongo" - grpc "github.com/crawlab-team/crawlab/grpc" - "github.com/crawlab-team/crawlab/trace" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/bson/primitive" - mongo2 "go.mongodb.org/mongo-driver/mongo" - "io" - "strings" -) - -type TaskServer struct { - grpc.UnimplementedTaskServiceServer - - // dependencies - modelSvc service.ModelService - cfgSvc interfaces.NodeConfigService - statsSvc interfaces.TaskStatsService - - // internals - server interfaces.GrpcServer -} - -// Subscribe to task stream when a task runner in a node starts -func (svr TaskServer) Subscribe(stream grpc.TaskService_SubscribeServer) (err error) { - for { - msg, err := stream.Recv() - utils.LogDebug(msg.String()) - if err == io.EOF { - return nil - } - if err != nil { - if strings.HasSuffix(err.Error(), "context canceled") { - return nil - } - trace.PrintError(err) - continue - } - switch msg.Code { - case grpc.StreamMessageCode_INSERT_DATA: - err = svr.handleInsertData(msg) - case grpc.StreamMessageCode_INSERT_LOGS: - err = svr.handleInsertLogs(msg) - default: - err = errors.ErrorGrpcInvalidCode - log.Errorf("invalid stream message code: %d", msg.Code) - continue - } - if err != nil { - log.Errorf("grpc error[%d]: %v", msg.Code, err) - } - } -} - -// Fetch tasks to be executed by a task handler -func (svr TaskServer) Fetch(ctx context.Context, request *grpc.Request) (response *grpc.Response, err error) { - nodeKey := request.GetNodeKey() - if nodeKey == "" { - return nil, trace.TraceError(errors.ErrorGrpcInvalidNodeKey) - } - n, err := svr.modelSvc.GetNodeByKey(nodeKey, nil) - if err != nil { - return nil, trace.TraceError(err) - } - var tid primitive.ObjectID - opts := &mongo.FindOptions{ - Sort: bson.D{ - {"p", 1}, - {"_id", 1}, - }, - Limit: 1, - } - if err := mongo.RunTransactionWithContext(ctx, func(sc mongo2.SessionContext) (err error) { - // get task queue item assigned to this node - tid, err = svr.getTaskQueueItemIdAndDequeue(bson.M{"nid": n.Id}, opts, n.Id) - if err != nil { - return err - } - if !tid.IsZero() { - return nil - } - - // get task queue item assigned to any node (random mode) - tid, err = svr.getTaskQueueItemIdAndDequeue(bson.M{"nid": nil}, opts, n.Id) - if !tid.IsZero() { - return nil - } - if err != nil { - return err - } - return nil - }); err != nil { - return nil, err - } - return HandleSuccessWithData(tid) -} - -func (svr TaskServer) SendNotification(ctx context.Context, request *grpc.Request) (response *grpc.Response, err error) { - svc := notification.GetService() - var t = new(models.Task) - if err := json.Unmarshal(request.Data, t); err != nil { - return nil, trace.TraceError(err) - } - t, err = svr.modelSvc.GetTaskById(t.Id) - if err != nil { - return nil, trace.TraceError(err) - } - td, err := json.Marshal(t) - if err != nil { - return nil, trace.TraceError(err) - } - var e bson.M - if err := json.Unmarshal(td, &e); err != nil { - return nil, trace.TraceError(err) - } - ts, err := svr.modelSvc.GetTaskStatById(t.Id) - if err != nil { - return nil, trace.TraceError(err) - } - settings, _, err := svc.GetSettingList(bson.M{ - "enabled": true, - }, nil, nil) - if err != nil { - return nil, trace.TraceError(err) - } - for _, s := range settings { - switch s.TaskTrigger { - case constants.NotificationTriggerTaskFinish: - if t.Status != constants.TaskStatusPending && t.Status != constants.TaskStatusRunning { - _ = svc.Send(s, e) - } - case constants.NotificationTriggerTaskError: - if t.Status == constants.TaskStatusError || t.Status == constants.TaskStatusAbnormal { - _ = svc.Send(s, e) - } - case constants.NotificationTriggerTaskEmptyResults: - if t.Status != constants.TaskStatusPending && t.Status != constants.TaskStatusRunning { - if ts.ResultCount == 0 { - _ = svc.Send(s, e) - } - } - case constants.NotificationTriggerTaskNever: - } - } - return nil, nil -} - -func (svr TaskServer) handleInsertData(msg *grpc.StreamMessage) (err error) { - data, err := svr.deserialize(msg) - if err != nil { - return err - } - var records []interface{} - for _, d := range data.Records { - res, ok := d[constants.TaskKey] - if ok { - switch res.(type) { - case string: - id, err := primitive.ObjectIDFromHex(res.(string)) - if err == nil { - d[constants.TaskKey] = id - } - } - } - records = append(records, d) - } - return svr.statsSvc.InsertData(data.TaskId, records...) -} - -func (svr TaskServer) handleInsertLogs(msg *grpc.StreamMessage) (err error) { - data, err := svr.deserialize(msg) - if err != nil { - return err - } - return svr.statsSvc.InsertLogs(data.TaskId, data.Logs...) -} - -func (svr TaskServer) getTaskQueueItemIdAndDequeue(query bson.M, opts *mongo.FindOptions, nid primitive.ObjectID) (tid primitive.ObjectID, err error) { - var tq models.TaskQueueItem - if err := mongo.GetMongoCol(interfaces.ModelColNameTaskQueue).Find(query, opts).One(&tq); err != nil { - if err == mongo2.ErrNoDocuments { - return tid, nil - } - return tid, trace.TraceError(err) - } - t, err := svr.modelSvc.GetTaskById(tq.Id) - if err == nil { - t.NodeId = nid - _ = delegate.NewModelDelegate(t).Save() - } - _ = delegate.NewModelDelegate(&tq).Delete() - return tq.Id, nil -} - -func (svr TaskServer) deserialize(msg *grpc.StreamMessage) (data entity.StreamMessageTaskData, err error) { - if err := json.Unmarshal(msg.Data, &data); err != nil { - return data, trace.TraceError(err) - } - if data.TaskId.IsZero() { - return data, trace.TraceError(errors.ErrorGrpcInvalidType) - } - return data, nil -} - -func NewTaskServer() (res *TaskServer, err error) { - // task server - svr := &TaskServer{} - - // dependency injection - if err := container.GetContainer().Invoke(func( - modelSvc service.ModelService, - statsSvc interfaces.TaskStatsService, - cfgSvc interfaces.NodeConfigService, - ) { - svr.modelSvc = modelSvc - svr.statsSvc = statsSvc - svr.cfgSvc = cfgSvc - }); err != nil { - return nil, err - } - - return svr, nil -} diff --git a/core/node/service/master_service.go b/core/node/service/master_service.go deleted file mode 100644 index 3ade5da1..00000000 --- a/core/node/service/master_service.go +++ /dev/null @@ -1,368 +0,0 @@ -package service - -import ( - "github.com/apex/log" - config2 "github.com/crawlab-team/crawlab/core/config" - "github.com/crawlab-team/crawlab/core/constants" - "github.com/crawlab-team/crawlab/core/container" - "github.com/crawlab-team/crawlab/core/errors" - "github.com/crawlab-team/crawlab/core/grpc/server" - "github.com/crawlab-team/crawlab/core/interfaces" - "github.com/crawlab-team/crawlab/core/models/common" - "github.com/crawlab-team/crawlab/core/models/delegate" - "github.com/crawlab-team/crawlab/core/models/models" - "github.com/crawlab-team/crawlab/core/models/service" - "github.com/crawlab-team/crawlab/core/node/config" - "github.com/crawlab-team/crawlab/core/notification" - "github.com/crawlab-team/crawlab/core/system" - "github.com/crawlab-team/crawlab/core/utils" - "github.com/crawlab-team/crawlab/db/mongo" - grpc "github.com/crawlab-team/crawlab/grpc" - "github.com/crawlab-team/crawlab/trace" - "github.com/spf13/viper" - "go.mongodb.org/mongo-driver/bson" - mongo2 "go.mongodb.org/mongo-driver/mongo" - "time" -) - -type MasterService struct { - // dependencies - modelSvc service.ModelService - cfgSvc interfaces.NodeConfigService - server interfaces.GrpcServer - schedulerSvc interfaces.TaskSchedulerService - handlerSvc interfaces.TaskHandlerService - scheduleSvc interfaces.ScheduleService - notificationSvc *notification.Service - spiderAdminSvc interfaces.SpiderAdminService - systemSvc *system.Service - - // settings - cfgPath string - address interfaces.Address - monitorInterval time.Duration - stopOnError bool -} - -func (svc *MasterService) Init() (err error) { - // do nothing - return nil -} - -func (svc *MasterService) Start() { - // create indexes - common.CreateIndexes() - - // start grpc server - if err := svc.server.Start(); err != nil { - panic(err) - } - - // register to db - if err := svc.Register(); err != nil { - panic(err) - } - - // start monitoring worker nodes - go svc.Monitor() - - // start task handler - go svc.handlerSvc.Start() - - // start task scheduler - go svc.schedulerSvc.Start() - - // start schedule service - go svc.scheduleSvc.Start() - - // start notification service - go svc.notificationSvc.Start() - - // start spider admin service - go svc.spiderAdminSvc.Start() - - // wait for quit signal - svc.Wait() - - // stop - svc.Stop() -} - -func (svc *MasterService) Wait() { - utils.DefaultWait() -} - -func (svc *MasterService) Stop() { - _ = svc.server.Stop() - log.Infof("master[%s] service has stopped", svc.GetConfigService().GetNodeKey()) -} - -func (svc *MasterService) Monitor() { - log.Infof("master[%s] monitoring started", svc.GetConfigService().GetNodeKey()) - for { - if err := svc.monitor(); err != nil { - trace.PrintError(err) - if svc.stopOnError { - log.Errorf("master[%s] monitor error, now stopping...", svc.GetConfigService().GetNodeKey()) - svc.Stop() - return - } - } - - time.Sleep(svc.monitorInterval) - } -} - -func (svc *MasterService) GetConfigService() (cfgSvc interfaces.NodeConfigService) { - return svc.cfgSvc -} - -func (svc *MasterService) GetConfigPath() (path string) { - return svc.cfgPath -} - -func (svc *MasterService) SetConfigPath(path string) { - svc.cfgPath = path -} - -func (svc *MasterService) GetAddress() (address interfaces.Address) { - return svc.address -} - -func (svc *MasterService) SetAddress(address interfaces.Address) { - svc.address = address -} - -func (svc *MasterService) SetMonitorInterval(duration time.Duration) { - svc.monitorInterval = duration -} - -func (svc *MasterService) Register() (err error) { - nodeKey := svc.GetConfigService().GetNodeKey() - nodeName := svc.GetConfigService().GetNodeName() - node, err := svc.modelSvc.GetNodeByKey(nodeKey, nil) - if err != nil && err.Error() == mongo2.ErrNoDocuments.Error() { - // not exists - log.Infof("master[%s] does not exist in db", nodeKey) - node := &models.Node{ - Key: nodeKey, - Name: nodeName, - MaxRunners: config.DefaultConfigOptions.MaxRunners, - IsMaster: true, - Status: constants.NodeStatusOnline, - Enabled: true, - Active: true, - ActiveTs: time.Now(), - } - if viper.GetInt("task.handler.maxRunners") > 0 { - node.MaxRunners = viper.GetInt("task.handler.maxRunners") - } - nodeD := delegate.NewModelNodeDelegate(node) - if err := nodeD.Add(); err != nil { - return err - } - log.Infof("added master[%s] in db. id: %s", nodeKey, nodeD.GetModel().GetId().Hex()) - return nil - } else if err == nil { - // exists - log.Infof("master[%s] exists in db", nodeKey) - nodeD := delegate.NewModelNodeDelegate(node) - if err := nodeD.UpdateStatusOnline(); err != nil { - return err - } - log.Infof("updated master[%s] in db. id: %s", nodeKey, nodeD.GetModel().GetId().Hex()) - return nil - } else { - // error - return err - } -} - -func (svc *MasterService) StopOnError() { - svc.stopOnError = true -} - -func (svc *MasterService) GetServer() (svr interfaces.GrpcServer) { - return svc.server -} - -func (svc *MasterService) monitor() (err error) { - // update master node status in db - if err := svc.updateMasterNodeStatus(); err != nil { - if err.Error() == mongo2.ErrNoDocuments.Error() { - return nil - } - return err - } - - // all worker nodes - nodes, err := svc.getAllWorkerNodes() - if err != nil { - return err - } - - // error flag - isErr := false - - // iterate all nodes - for _, n := range nodes { - // subscribe - if err := svc.subscribeNode(&n); err != nil { - isErr = true - continue - } - - // ping client - if err := svc.pingNodeClient(&n); err != nil { - isErr = true - continue - } - - // update node available runners - if err := svc.updateNodeAvailableRunners(&n); err != nil { - isErr = true - continue - } - } - - if isErr { - return trace.TraceError(errors.ErrorNodeMonitorError) - } - - return nil -} - -func (svc *MasterService) getAllWorkerNodes() (nodes []models.Node, err error) { - query := bson.M{ - "key": bson.M{"$ne": svc.cfgSvc.GetNodeKey()}, // not self - "active": true, // active - } - nodes, err = svc.modelSvc.GetNodeList(query, nil) - if err != nil { - if err == mongo2.ErrNoDocuments { - return nil, nil - } - return nil, trace.TraceError(err) - } - return nodes, nil -} - -func (svc *MasterService) updateMasterNodeStatus() (err error) { - nodeKey := svc.GetConfigService().GetNodeKey() - node, err := svc.modelSvc.GetNodeByKey(nodeKey, nil) - if err != nil { - return err - } - nodeD := delegate.NewModelNodeDelegate(node) - return nodeD.UpdateStatusOnline() -} - -func (svc *MasterService) setWorkerNodeOffline(n interfaces.Node) (err error) { - return delegate.NewModelNodeDelegate(n).UpdateStatusOffline() -} - -func (svc *MasterService) subscribeNode(n interfaces.Node) (err error) { - _, err = svc.server.GetSubscribe("node:" + n.GetKey()) - if err != nil { - log.Errorf("cannot subscribe worker node[%s]: %v", n.GetKey(), err) - if err := svc.setWorkerNodeOffline(n); err != nil { - return trace.TraceError(err) - } - return trace.TraceError(err) - } - return nil -} - -func (svc *MasterService) pingNodeClient(n interfaces.Node) (err error) { - if err := svc.server.SendStreamMessage("node:"+n.GetKey(), grpc.StreamMessageCode_PING); err != nil { - log.Errorf("cannot ping worker node client[%s]: %v", n.GetKey(), err) - if err := svc.setWorkerNodeOffline(n); err != nil { - return trace.TraceError(err) - } - return trace.TraceError(err) - } - return nil -} - -func (svc *MasterService) updateNodeAvailableRunners(n interfaces.Node) (err error) { - query := bson.M{ - "node_id": n.GetId(), - "status": constants.TaskStatusRunning, - } - runningTasksCount, err := mongo.GetMongoCol(interfaces.ModelColNameTask).Count(query) - if err != nil { - return trace.TraceError(err) - } - n.SetAvailableRunners(n.GetMaxRunners() - runningTasksCount) - return delegate.NewModelDelegate(n).Save() -} - -func NewMasterService(opts ...Option) (res interfaces.NodeMasterService, err error) { - // master service - svc := &MasterService{ - cfgPath: config2.GetConfigPath(), - monitorInterval: 15 * time.Second, - stopOnError: false, - } - - // apply options - for _, opt := range opts { - opt(svc) - } - - // server options - var serverOpts []server.Option - if svc.address != nil { - serverOpts = append(serverOpts, server.WithAddress(svc.address)) - } - - // dependency injection - if err := container.GetContainer().Invoke(func( - cfgSvc interfaces.NodeConfigService, - modelSvc service.ModelService, - server interfaces.GrpcServer, - schedulerSvc interfaces.TaskSchedulerService, - handlerSvc interfaces.TaskHandlerService, - scheduleSvc interfaces.ScheduleService, - spiderAdminSvc interfaces.SpiderAdminService, - ) { - svc.cfgSvc = cfgSvc - svc.modelSvc = modelSvc - svc.server = server - svc.schedulerSvc = schedulerSvc - svc.handlerSvc = handlerSvc - svc.scheduleSvc = scheduleSvc - svc.spiderAdminSvc = spiderAdminSvc - }); err != nil { - return nil, err - } - - // notification service - svc.notificationSvc = notification.GetService() - - // system service - svc.systemSvc = system.GetService() - - // init - if err := svc.Init(); err != nil { - return nil, err - } - - return svc, nil -} - -func ProvideMasterService(path string, opts ...Option) func() (interfaces.NodeMasterService, error) { - // path - if path == "" || path == config2.GetConfigPath() { - if viper.GetString("config.path") != "" { - path = viper.GetString("config.path") - } else { - path = config2.GetConfigPath() - } - } - opts = append(opts, WithConfigPath(path)) - - return func() (interfaces.NodeMasterService, error) { - return NewMasterService(opts...) - } -} diff --git a/core/notification/models.go b/core/notification/models.go deleted file mode 100644 index efb55448..00000000 --- a/core/notification/models.go +++ /dev/null @@ -1,32 +0,0 @@ -package notification - -import "go.mongodb.org/mongo-driver/bson/primitive" - -type NotificationSetting struct { - Id primitive.ObjectID `json:"_id" bson:"_id"` - Type string `json:"type" bson:"type"` - Name string `json:"name" bson:"name"` - Description string `json:"description" bson:"description"` - Enabled bool `json:"enabled" bson:"enabled"` - Global bool `json:"global" bson:"global"` - Title string `json:"title,omitempty" bson:"title,omitempty"` - Template string `json:"template,omitempty" bson:"template,omitempty"` - TaskTrigger string `json:"task_trigger" bson:"task_trigger"` - Mail NotificationSettingMail `json:"mail,omitempty" bson:"mail,omitempty"` - Mobile NotificationSettingMobile `json:"mobile,omitempty" bson:"mobile,omitempty"` -} - -type NotificationSettingMail struct { - Server string `json:"server" bson:"server"` - Port string `json:"port,omitempty" bson:"port,omitempty"` - User string `json:"user,omitempty" bson:"user,omitempty"` - Password string `json:"password,omitempty" bson:"password,omitempty"` - SenderEmail string `json:"sender_email,omitempty" bson:"sender_email,omitempty"` - SenderIdentity string `json:"sender_identity,omitempty" bson:"sender_identity,omitempty"` - To string `json:"to,omitempty" bson:"to,omitempty"` - Cc string `json:"cc,omitempty" bson:"cc,omitempty"` -} - -type NotificationSettingMobile struct { - Webhook string `json:"webhook" bson:"webhook"` -} diff --git a/core/notification/service.go b/core/notification/service.go deleted file mode 100644 index 0f3bf7bf..00000000 --- a/core/notification/service.go +++ /dev/null @@ -1,395 +0,0 @@ -package notification - -import ( - "github.com/apex/log" - "github.com/crawlab-team/crawlab/core/constants" - "github.com/crawlab-team/crawlab/core/entity" - "github.com/crawlab-team/crawlab/core/models/models/v2" - "github.com/crawlab-team/crawlab/core/models/service" - "github.com/crawlab-team/crawlab/core/utils" - mongo2 "github.com/crawlab-team/crawlab/db/mongo" - parser "github.com/crawlab-team/crawlab/template-parser" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/bson/primitive" - "go.mongodb.org/mongo-driver/mongo" -) - -type Service struct { - col *mongo2.Col // notification settings - modelSvc service.ModelService -} - -func (svc *Service) Init() (err error) { - if !utils.IsPro() { - return nil - } - - return nil -} - -func (svc *Service) Start() (err error) { - // initialize data - if err := svc.initData(); err != nil { - return err - } - - return nil -} - -func (svc *Service) Stop() (err error) { - return nil -} - -func (svc *Service) initData() (err error) { - total, err := svc.col.Count(nil) - if err != nil { - return err - } - if total > 0 { - return nil - } - - // data to initialize - settings := []NotificationSetting{ - { - Id: primitive.NewObjectID(), - Type: TypeMail, - Enabled: true, - Name: "任务通知(邮件)", - Description: "这是默认的邮件通知。您可以使用您自己的设置进行编辑。", - TaskTrigger: constants.NotificationTriggerTaskFinish, - Title: "[Crawlab] 爬虫任务更新: {{$.status}}", - Template: `尊敬的 {{$.user.username}}, - -请查看下面的任务数据。 - -|键|值| -|:-:|:--| -|任务状态|{{$.status}}| -|任务优先级|{{$.priority}}| -|任务模式|{{$.mode}}| -|执行命令|{{$.cmd}}| -|执行参数|{{$.param}}| -|错误信息|{{$.error}}| -|节点|{{$.node.name}}| -|爬虫|{{$.spider.name}}| -|项目|{{$.spider.project.name}}| -|定时任务|{{$.schedule.name}}| -|结果数|{{$.:task_stat.result_count}}| -|等待时间(秒)|{#{{$.:task_stat.wait_duration}}/1000#}| -|运行时间(秒)|{#{{$.:task_stat.runtime_duration}}/1000#}| -|总时间(秒)|{#{{$.:task_stat.total_duration}}/1000#}| -|平均结果数/秒|{#{{$.:task_stat.result_count}}/({{$.:task_stat.total_duration}}/1000)#}| -`, - Mail: NotificationSettingMail{ - Server: "smtp.163.com", - Port: "465", - To: "{{$.user[create].email}}", - }, - }, - { - Id: primitive.NewObjectID(), - Type: TypeMail, - Enabled: true, - Name: "Task Change (Mail)", - Description: "This is the default mail notification. You can edit it with your own settings", - TaskTrigger: constants.NotificationTriggerTaskFinish, - Title: "[Crawlab] Task Update: {{$.status}}", - Template: `Dear {{$.user.username}}, - -Please find the task data as below. - -|Key|Value| -|:-:|:--| -|Task Status|{{$.status}}| -|Task Priority|{{$.priority}}| -|Task Mode|{{$.mode}}| -|Task Command|{{$.cmd}}| -|Task Params|{{$.param}}| -|Error Message|{{$.error}}| -|Node|{{$.node.name}}| -|Spider|{{$.spider.name}}| -|Project|{{$.spider.project.name}}| -|Schedule|{{$.schedule.name}}| -|Result Count|{{$.:task_stat.result_count}}| -|Wait Duration (sec)|{#{{$.:task_stat.wait_duration}}/1000#}| -|Runtime Duration (sec)|{#{{$.:task_stat.runtime_duration}}/1000#}| -|Total Duration (sec)|{#{{$.:task_stat.total_duration}}/1000#}| -|Avg Results / Sec|{#{{$.:task_stat.result_count}}/({{$.:task_stat.total_duration}}/1000)#}| -`, - Mail: NotificationSettingMail{ - Server: "smtp.163.com", - Port: "465", - To: "{{$.user[create].email}}", - }, - }, - { - Id: primitive.NewObjectID(), - Type: TypeMobile, - Enabled: true, - Name: "任务通知(移动端)", - Description: "这是默认的手机通知。您可以使用您自己的设置进行编辑。", - TaskTrigger: constants.NotificationTriggerTaskFinish, - Title: "[Crawlab] 任务更新: {{$.status}}", - Template: `尊敬的 {{$.user.username}}, - -请查看下面的任务数据。 - -- **任务状态**: {{$.status}} -- **任务优先级**: {{$.priority}} -- **任务模式**: {{$.mode}} -- **执行命令**: {{$.cmd}} -- **执行参数**: {{$.param}} -- **错误信息**: {{$.error}} -- **节点**: {{$.node.name}} -- **爬虫**: {{$.spider.name}} -- **项目**: {{$.spider.project.name}} -- **定时任务**: {{$.schedule.name}} -- **结果数**: {{$.:task_stat.result_count}} -- **等待时间(秒)**: {#{{$.:task_stat.wait_duration}}/1000#} -- **运行时间(秒)**: {#{{$.:task_stat.runtime_duration}}/1000#} -- **总时间(秒)**: {#{{$.:task_stat.total_duration}}/1000#} -- **平均结果数/秒**: {#{{$.:task_stat.result_count}}/({{$.:task_stat.total_duration}}/1000)#}`, - Mobile: NotificationSettingMobile{}, - }, - { - Id: primitive.NewObjectID(), - Type: TypeMobile, - Enabled: true, - Name: "Task Change (Mobile)", - Description: "This is the default mobile notification. You can edit it with your own settings", - TaskTrigger: constants.NotificationTriggerTaskError, - Title: "[Crawlab] Task Update: {{$.status}}", - Template: `Dear {{$.user.username}}, - -Please find the task data as below. - -- **Task Status**: {{$.status}} -- **Task Priority**: {{$.priority}} -- **Task Mode**: {{$.mode}} -- **Task Command**: {{$.cmd}} -- **Task Params**: {{$.param}} -- **Error Message**: {{$.error}} -- **Node**: {{$.node.name}} -- **Spider**: {{$.spider.name}} -- **Project**: {{$.spider.project.name}} -- **Schedule**: {{$.schedule.name}} -- **Result Count**: {{$.:task_stat.result_count}} -- **Wait Duration (sec)**: {#{{$.:task_stat.wait_duration}}/1000#} -- **Runtime Duration (sec)**: {#{{$.:task_stat.runtime_duration}}/1000#} -- **Total Duration (sec)**: {#{{$.:task_stat.total_duration}}/1000#} -- **Avg Results / Sec**: {#{{$.:task_stat.result_count}}/({{$.:task_stat.total_duration}}/1000)#}`, - Mobile: NotificationSettingMobile{}, - }, - } - var data []interface{} - for _, s := range settings { - data = append(data, s) - } - _, err = svc.col.InsertMany(data) - if err != nil { - return err - } - return nil -} - -func (svc *Service) Send(s NotificationSetting, entity bson.M) (err error) { - switch s.Type { - case TypeMail: - return svc.SendMail(s, entity) - case TypeMobile: - return svc.SendMobile(s, entity) - } - return nil -} - -func (svc *Service) SendMail(s NotificationSetting, entity bson.M) (err error) { - // to - to, err := parser.Parse(s.Mail.To, entity) - if err != nil { - log.Warnf("parsing 'to' error: %v", err) - } - if to == "" { - return nil - } - - // cc - cc, err := parser.Parse(s.Mail.Cc, entity) - if err != nil { - log.Warnf("parsing 'cc' error: %v", err) - } - - // title - title, err := parser.Parse(s.Title, entity) - if err != nil { - log.Warnf("parsing 'title' error: %v", err) - } - - // content - content, err := parser.Parse(s.Template, entity) - if err != nil { - log.Warnf("parsing 'content' error: %v", err) - } - - // send mail - if err := SendMail(&models.NotificationSettingMail{ - Server: s.Mail.Server, - Port: s.Mail.Port, - User: s.Mail.User, - Password: s.Mail.Password, - SenderEmail: s.Mail.SenderEmail, - SenderIdentity: s.Mail.SenderIdentity, - To: s.Mail.To, - Cc: s.Mail.Cc, - }, to, cc, title, content); err != nil { - return err - } - - return nil -} - -func (svc *Service) SendMobile(s NotificationSetting, entity bson.M) (err error) { - // webhook - webhook, err := parser.Parse(s.Mobile.Webhook, entity) - if err != nil { - log.Warnf("parsing 'webhook' error: %v", err) - } - if webhook == "" { - return nil - } - - // title - title, err := parser.Parse(s.Title, entity) - if err != nil { - log.Warnf("parsing 'title' error: %v", err) - } - - // content - content, err := parser.Parse(s.Template, entity) - if err != nil { - log.Warnf("parsing 'content' error: %v", err) - } - - // send - if err := SendMobileNotification(webhook, title, content); err != nil { - return err - } - - return nil -} - -func (svc *Service) GetSettingList(query bson.M, pagination *entity.Pagination, sort bson.D) (res []NotificationSetting, total int, err error) { - // options - var options *mongo2.FindOptions - if pagination != nil || sort != nil { - options = new(mongo2.FindOptions) - if pagination != nil { - options.Skip = pagination.Size * (pagination.Page - 1) - options.Limit = pagination.Size - } - if sort != nil { - options.Sort = sort - } - } - - // get list - var list []NotificationSetting - if err := svc.col.Find(query, options).All(&list); err != nil { - if err.Error() == mongo.ErrNoDocuments.Error() { - return nil, 0, nil - } else { - return nil, 0, err - } - } - - // total count - total, err = svc.col.Count(query) - if err != nil { - return nil, 0, err - } - - return list, total, nil -} - -func (svc *Service) GetSetting(id primitive.ObjectID) (res *NotificationSetting, err error) { - var s NotificationSetting - if err := svc.col.FindId(id).One(&s); err != nil { - return nil, err - } - return &s, nil -} - -func (svc *Service) PosSetting(s *NotificationSetting) (err error) { - s.Id = primitive.NewObjectID() - if _, err := svc.col.Insert(s); err != nil { - return err - } - return nil -} - -func (svc *Service) PutSetting(id primitive.ObjectID, s NotificationSetting) (err error) { - if err := svc.col.ReplaceId(id, s); err != nil { - return err - } - - return nil -} - -func (svc *Service) DeleteSetting(id primitive.ObjectID) (err error) { - if err := svc.col.DeleteId(id); err != nil { - return err - } - - return nil -} - -func (svc *Service) EnableSetting(id primitive.ObjectID) (err error) { - return svc._toggleSettingFunc(true)(id) -} - -func (svc *Service) DisableSetting(id primitive.ObjectID) (err error) { - return svc._toggleSettingFunc(false)(id) -} - -func (svc *Service) _toggleSettingFunc(value bool) func(id primitive.ObjectID) error { - return func(id primitive.ObjectID) (err error) { - var s NotificationSetting - if err := svc.col.FindId(id).One(&s); err != nil { - return err - } - s.Enabled = value - if err := svc.col.ReplaceId(id, s); err != nil { - return err - } - return nil - } -} - -func NewService() *Service { - // service - svc := &Service{ - col: mongo2.GetMongoCol(SettingsColName), - } - - // model service - modelSvc, err := service.GetService() - if err != nil { - panic(err) - } - svc.modelSvc = modelSvc - - if err := svc.Init(); err != nil { - panic(err) - } - - return svc -} - -var _service *Service - -func GetService() *Service { - if _service == nil { - _service = NewService() - } - return _service -} diff --git a/core/notification/service_test.go b/core/notification/service_test.go deleted file mode 100644 index 20e09478..00000000 --- a/core/notification/service_test.go +++ /dev/null @@ -1,19 +0,0 @@ -package notification - -import ( - "net/http" - "testing" - "time" -) - -func TestService_sendMobile(t *testing.T) { - T.Setup(t) - e := T.NewExpect(t) - time.Sleep(1 * time.Second) - - data := map[string]interface{}{ - "task_id": T.TestTask.GetId().Hex(), - } - e.POST("/send/mobile").WithJSON(data). - Expect().Status(http.StatusOK) -}