refactor: Remove unused code and update models and functions for notification channels and settings

This commit is contained in:
Marvin Zhang
2024-07-23 12:22:59 +08:00
parent c893b5452e
commit 9ffdd3f1cd
14 changed files with 169 additions and 832 deletions

View File

@@ -28,6 +28,7 @@ var (
*new(models2.MetricV2),
*new(models2.NodeV2),
*new(models2.NotificationChannelV2),
*new(models2.NotificationRequestV2),
*new(models2.NotificationSettingV2),
*new(models2.PermissionV2),
*new(models2.ProjectV2),

View File

@@ -0,0 +1,12 @@
package models
import "go.mongodb.org/mongo-driver/bson/primitive"
type NotificationRequestV2 struct {
any `collection:"notification_requests"`
BaseModelV2[NotificationRequestV2] `bson:",inline"`
Status string `json:"status" bson:"status"`
Error string `json:"error,omitempty" bson:"error,omitempty"`
SettingId primitive.ObjectID `json:"setting_id" bson:"setting_id"`
ChannelId primitive.ObjectID `json:"channel_id" bson:"channel_id"`
}

View File

@@ -19,5 +19,4 @@ type ScheduleV2 struct {
NodeIds []primitive.ObjectID `json:"node_ids" bson:"node_ids"`
Priority int `json:"priority" bson:"priority"`
Enabled bool `json:"enabled" bson:"enabled"`
UserId primitive.ObjectID `json:"user_id" bson:"user_id"`
}

View File

@@ -2,7 +2,6 @@ package models
import (
"go.mongodb.org/mongo-driver/bson/primitive"
"time"
)
type TaskV2 struct {
@@ -26,5 +25,4 @@ type TaskV2 struct {
SubTasks []TaskV2 `json:"sub_tasks,omitempty" bson:"-"`
Spider *SpiderV2 `json:"spider,omitempty" bson:"-"`
UserId primitive.ObjectID `json:"-" bson:"-"`
CreateTs time.Time `json:"create_ts" bson:"create_ts"`
}

View File

@@ -2,6 +2,7 @@ package notification
import (
"errors"
"github.com/apex/log"
"github.com/crawlab-team/crawlab/core/models/models/v2"
"github.com/crawlab-team/crawlab/trace"
"github.com/imroc/req"
@@ -15,6 +16,10 @@ type ResBody struct {
func SendIMNotification(s *models.NotificationSettingV2, ch *models.NotificationChannelV2, content string) error {
// TODO: compatibility with different IM providers
switch ch.Provider {
case ChannelIMProviderLark:
return sendImLark(ch, content)
}
// request header
header := req.Header{
@@ -63,3 +68,49 @@ func SendIMNotification(s *models.NotificationSettingV2, ch *models.Notification
return nil
}
func getIMRequestHeader() req.Header {
return req.Header{
"Content-Type": "application/json; charset=utf-8",
}
}
func performIMRequest(webhookUrl string, data req.Param) error {
// perform request
res, err := req.Post(webhookUrl, getIMRequestHeader(), req.BodyJSON(&data))
if err != nil {
log.Errorf("IM request error: %v", err)
return err
}
// parse response
var resBody ResBody
if err := res.ToJSON(&resBody); err != nil {
log.Errorf("Parsing IM response error: %v", err)
return err
}
// validate response code
if resBody.ErrCode != 0 {
log.Errorf("IM response error: %v", resBody.ErrMsg)
return errors.New(resBody.ErrMsg)
}
return nil
}
func sendImLark(ch *models.NotificationChannelV2, content string) error {
// request header
data := req.Param{
"msg_type": "interactive",
"card": req.Param{
"elements": []req.Param{
{
"tag": "markdown",
"content": content,
},
},
},
}
return performIMRequest(ch.WebhookUrl, data)
}

View File

@@ -9,7 +9,6 @@ import (
"gopkg.in/gomail.v2"
"net/mail"
"regexp"
"runtime/debug"
"strings"
)
@@ -44,8 +43,8 @@ func SendMail(s *models.NotificationSettingV2, ch *models.NotificationChannelV2,
// send the email
if err := send(smtpConfig, options, content, text); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
log.Errorf("failed to send email: %v", err)
trace.PrintError(err)
return err
}

View File

@@ -1,8 +0,0 @@
package notification
import "github.com/matcornic/hermes/v2"
type MailTheme interface {
hermes.Theme
GetStyle() string
}

View File

@@ -1,287 +0,0 @@
package notification
// MailThemeFlat is a theme
type MailThemeFlat struct{}
// Name returns the name of the flat theme
func (dt *MailThemeFlat) Name() string {
return "flat"
}
// HTMLTemplate returns a Golang template that will generate an HTML email.
func (dt *MailThemeFlat) HTMLTemplate() string {
return `
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8" />
</head>
<body dir="{{.Hermes.TextDirection}}">
<table class="email-wrapper" width="100%" cellpadding="0" cellspacing="0">
<tr>
<td class="content">
<table class="email-content" width="100%" cellpadding="0" cellspacing="0">
<!-- Logo -->
<tr>
<td class="email-masthead">
<a class="email-masthead_name" href="{{.Hermes.Product.Link}}" target="_blank">
{{ if .Hermes.Product.Logo }}
<img src="{{.Hermes.Product.Logo | url }}" class="email-logo" style="height: 48px"/>
<span style="font-size:36px;font-weight:600;margin-left:12px;color:#409eff">{{ .Hermes.Product.Name}} </span>
{{ else }}
{{ .Hermes.Product.Name }}
{{ end }}
</a>
</td>
</tr>
<!-- Email Body -->
<tr>
<td class="email-body" width="100%">
<table class="email-body_inner" align="center" width="570" cellpadding="0" cellspacing="0">
<!-- Body content -->
<tr>
<td class="content-cell">
{{ if (ne .Email.Body.FreeMarkdown "") }}
{{ .Email.Body.FreeMarkdown.ToHTML }}
{{ else }}
{{ with .Email.Body.Dictionary }}
{{ if gt (len .) 0 }}
<dl class="body-dictionary">
{{ range $entry := . }}
<dt>{{ $entry.Key }}:</dt>
<dd>{{ $entry.Value }}</dd>
{{ end }}
</dl>
{{ end }}
{{ end }}
<!-- Table -->
{{ with .Email.Body.Table }}
{{ $data := .Data }}
{{ $columns := .Columns }}
{{ if gt (len $data) 0 }}
<table class="data-wrapper" width="100%" cellpadding="0" cellspacing="0">
<tr>
<td colspan="2">
<table class="data-table" width="100%" cellpadding="0" cellspacing="0">
<tr>
{{ $col := index $data 0 }}
{{ range $entry := $col }}
<th
{{ with $columns }}
{{ $width := index .CustomWidth $entry.Key }}
{{ with $width }}
width="{{ . }}"
{{ end }}
{{ $align := index .CustomAlignment $entry.Key }}
{{ with $align }}
style="text-align:{{ . }}"
{{ end }}
{{ end }}
>
<p>{{ $entry.Key }}</p>
</th>
{{ end }}
</tr>
{{ range $row := $data }}
<tr>
{{ range $cell := $row }}
<td
{{ with $columns }}
{{ $align := index .CustomAlignment $cell.Key }}
{{ with $align }}
style="text-align:{{ . }}"
{{ end }}
{{ end }}
>
{{ $cell.Value }}
</td>
{{ end }}
</tr>
{{ end }}
</table>
</td>
</tr>
</table>
{{ end }}
{{ end }}
<!-- Action -->
{{ with .Email.Body.Actions }}
{{ if gt (len .) 0 }}
{{ range $action := . }}
<p>{{ $action.Instructions }}</p>
<table class="body-action" align="center" width="100%" cellpadding="0" cellspacing="0">
<tr>
<td align="center">
<div>
<a href="{{ $action.Button.Link }}" class="button" style="background-color: {{ $action.Button.Color }}; color: {{ $action.Button.TextColor }}" target="_blank">
{{ $action.Button.Text }}
</a>
</div>
</td>
</tr>
</table>
{{ end }}
{{ end }}
{{ end }}
{{ end }}
{{ with .Email.Body.Outros }}
{{ if gt (len .) 0 }}
{{ range $line := . }}
<p>{{ $line }}</p>
{{ end }}
{{ end }}
{{ end }}
<p>
{{.Email.Body.Signature}}
</p>
{{ if (eq .Email.Body.FreeMarkdown "") }}
{{ with .Email.Body.Actions }}
<table class="body-sub">
<tbody>
{{ range $action := . }}
<tr>
<td>
<p class="sub">{{$.Hermes.Product.TroubleText | replace "{ACTION}" $action.Button.Text}}</p>
<p class="sub"><a href="{{ $action.Button.Link }}">{{ $action.Button.Link }}</a></p>
</td>
</tr>
{{ end }}
</tbody>
</table>
{{ end }}
{{ end }}
</td>
</tr>
</table>
</td>
</tr>
<tr>
<td>
<table class="email-footer" align="center" width="570" cellpadding="0" cellspacing="0">
<tr>
<td class="content-cell">
<p class="sub center">
{{.Hermes.Product.Copyright}}
</p>
</td>
</tr>
</table>
</td>
</tr>
</table>
</td>
</tr>
</table>
</body>
</html>
`
}
// PlainTextTemplate returns a Golang template that will generate an plain text email.
func (dt *MailThemeFlat) PlainTextTemplate() string {
return `{{ with .Email.Body.Intros }}
{{ range $line := . }}
<p>{{ $line }}</p>
{{ end }}
{{ end }}
{{ if (ne .Email.Body.FreeMarkdown "") }}
{{ .Email.Body.FreeMarkdown.ToHTML }}
{{ else }}
{{ with .Email.Body.Dictionary }}
<ul>
{{ range $entry := . }}
<li>{{ $entry.Key }}: {{ $entry.Value }}</li>
{{ end }}
</ul>
{{ end }}
{{ with .Email.Body.Table }}
{{ $data := .Data }}
{{ $columns := .Columns }}
{{ if gt (len $data) 0 }}
<table class="data-table" width="100%" cellpadding="0" cellspacing="0">
<tr>
{{ $col := index $data 0 }}
{{ range $entry := $col }}
<th>{{ $entry.Key }} </th>
{{ end }}
</tr>
{{ range $row := $data }}
<tr>
{{ range $cell := $row }}
<td>
{{ $cell.Value }}
</td>
{{ end }}
</tr>
{{ end }}
</table>
{{ end }}
{{ end }}
{{ with .Email.Body.Actions }}
{{ range $action := . }}
<p>{{ $action.Instructions }} {{ $action.Button.Link }}</p>
{{ end }}
{{ end }}
{{ end }}
{{ with .Email.Body.Outros }}
{{ range $line := . }}
<p>{{ $line }}<p>
{{ end }}
{{ end }}
<p>{{.Email.Body.Signature}},<br>{{.Hermes.Product.Name}} - {{.Hermes.Product.Link}}</p>
<p>{{.Hermes.Product.Copyright}}</p>
`
}
func (dt *MailThemeFlat) GetStyle() string {
return `
<style>
.content-cell table {
width: 100%;
border-collapse: collapse;
}
.content-cell table,
.content-cell th,
.content-cell td {
border: 1px solid #EDEFF2;
}
.content-cell th,
.content-cell td {
padding: 10px;
font-size: 14px;
line-height: 18px;
}
.content-cell th {
background: #409eff;
color: white;
}
.content-cell td {
color: #606266;
}
.content-cell p {
color: #606266;
}
.content-cell a {
color: #409eff;
}
.email-masthead .email-masthead_name {
display: flex;
justify-content: center;
align-items: center;
text-decoration: none;
color: #409eff;
margin-bottom: 20px;
}
</style>
`
}

View File

@@ -1,8 +0,0 @@
package notification
import "go.mongodb.org/mongo-driver/bson/primitive"
type SendPayload struct {
TaskId primitive.ObjectID `json:"task_id"`
Data string `json:"data"`
}

View File

@@ -11,6 +11,7 @@ import (
"regexp"
"strings"
"sync"
"time"
)
type ServiceV2 struct {
@@ -91,14 +92,108 @@ func (svc *ServiceV2) getTaskContent(template string, variables []entity.Notific
content = strings.ReplaceAll(content, v.GetKey(), vd.Task.Id.Hex())
case "status":
content = strings.ReplaceAll(content, v.GetKey(), vd.Task.Status)
case "priority":
content = strings.ReplaceAll(content, v.GetKey(), fmt.Sprintf("%d", vd.Task.Priority))
case "mode":
content = strings.ReplaceAll(content, v.GetKey(), vd.Task.Mode)
case "cmd":
content = strings.ReplaceAll(content, v.GetKey(), vd.Task.Cmd)
case "param":
content = strings.ReplaceAll(content, v.GetKey(), vd.Task.Param)
case "error":
content = strings.ReplaceAll(content, v.GetKey(), vd.Task.Error)
case "pid":
content = strings.ReplaceAll(content, v.GetKey(), fmt.Sprintf("%d", vd.Task.Pid))
case "type":
content = strings.ReplaceAll(content, v.GetKey(), vd.Task.Type)
case "mode":
content = strings.ReplaceAll(content, v.GetKey(), vd.Task.Mode)
case "priority":
content = strings.ReplaceAll(content, v.GetKey(), fmt.Sprintf("%d", vd.Task.Priority))
case "created_at":
content = strings.ReplaceAll(content, v.GetKey(), vd.Task.CreatedAt.Format(time.DateTime))
case "updated_at":
content = strings.ReplaceAll(content, v.GetKey(), vd.Task.UpdatedAt.Format(time.DateTime))
}
case "spider":
switch v.Name {
case "id":
content = strings.ReplaceAll(content, v.GetKey(), vd.Spider.Id.Hex())
case "name":
content = strings.ReplaceAll(content, v.GetKey(), vd.Spider.Name)
case "type":
content = strings.ReplaceAll(content, v.GetKey(), vd.Spider.Type)
case "description":
content = strings.ReplaceAll(content, v.GetKey(), vd.Spider.Description)
case "mode":
content = strings.ReplaceAll(content, v.GetKey(), vd.Spider.Mode)
case "cmd":
content = strings.ReplaceAll(content, v.GetKey(), vd.Spider.Cmd)
case "param":
content = strings.ReplaceAll(content, v.GetKey(), vd.Spider.Param)
case "priority":
content = strings.ReplaceAll(content, v.GetKey(), fmt.Sprintf("%d", vd.Spider.Priority))
case "created_at":
content = strings.ReplaceAll(content, v.GetKey(), vd.Spider.CreatedAt.Format(time.DateTime))
case "updated_at":
content = strings.ReplaceAll(content, v.GetKey(), vd.Spider.UpdatedAt.Format(time.DateTime))
}
case "node":
switch v.Name {
case "id":
content = strings.ReplaceAll(content, v.GetKey(), vd.Node.Id.Hex())
case "key":
content = strings.ReplaceAll(content, v.GetKey(), vd.Node.Key)
case "name":
content = strings.ReplaceAll(content, v.GetKey(), vd.Node.Name)
case "ip":
content = strings.ReplaceAll(content, v.GetKey(), vd.Node.Ip)
case "port":
content = strings.ReplaceAll(content, v.GetKey(), vd.Node.Port)
case "hostname":
content = strings.ReplaceAll(content, v.GetKey(), vd.Node.Hostname)
case "description":
content = strings.ReplaceAll(content, v.GetKey(), vd.Node.Description)
case "status":
content = strings.ReplaceAll(content, v.GetKey(), vd.Node.Status)
case "enabled":
content = strings.ReplaceAll(content, v.GetKey(), fmt.Sprintf("%t", vd.Node.Enabled))
case "active":
content = strings.ReplaceAll(content, v.GetKey(), fmt.Sprintf("%t", vd.Node.Active))
case "active_at":
content = strings.ReplaceAll(content, v.GetKey(), vd.Node.ActiveAt.Format("2006-01-02 15:04:05"))
case "available_runners":
content = strings.ReplaceAll(content, v.GetKey(), fmt.Sprintf("%d", vd.Node.AvailableRunners))
case "max_runners":
content = strings.ReplaceAll(content, v.GetKey(), fmt.Sprintf("%d", vd.Node.MaxRunners))
case "created_at":
content = strings.ReplaceAll(content, v.GetKey(), vd.Node.CreatedAt.Format(time.DateTime))
case "updated_at":
content = strings.ReplaceAll(content, v.GetKey(), vd.Node.UpdatedAt.Format(time.DateTime))
}
case "schedule":
switch v.Name {
case "id":
content = strings.ReplaceAll(content, v.GetKey(), vd.Schedule.Id.Hex())
case "name":
content = strings.ReplaceAll(content, v.GetKey(), vd.Schedule.Name)
case "description":
content = strings.ReplaceAll(content, v.GetKey(), vd.Schedule.Description)
case "cron":
content = strings.ReplaceAll(content, v.GetKey(), vd.Schedule.Cron)
case "cmd":
content = strings.ReplaceAll(content, v.GetKey(), vd.Schedule.Cmd)
case "param":
content = strings.ReplaceAll(content, v.GetKey(), vd.Schedule.Param)
case "mode":
content = strings.ReplaceAll(content, v.GetKey(), vd.Schedule.Mode)
case "priority":
content = strings.ReplaceAll(content, v.GetKey(), fmt.Sprintf("%d", vd.Schedule.Priority))
case "enabled":
content = strings.ReplaceAll(content, v.GetKey(), fmt.Sprintf("%t", vd.Schedule.Enabled))
case "created_at":
content = strings.ReplaceAll(content, v.GetKey(), vd.Schedule.CreatedAt.Format(time.DateTime))
case "updated_at":
content = strings.ReplaceAll(content, v.GetKey(), vd.Schedule.UpdatedAt.Format(time.DateTime))
}
}
}
@@ -153,6 +248,10 @@ func (svc *ServiceV2) parseTemplateVariables(template string) (variables []entit
return variables
}
func (svc *ServiceV2) getUserById(id primitive.ObjectID) {
}
func newNotificationServiceV2() *ServiceV2 {
return &ServiceV2{}
}

View File

@@ -15,7 +15,6 @@ import (
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"sync"
"time"
)
type ServiceV2 struct {
@@ -51,7 +50,6 @@ func (svc *ServiceV2) scheduleTasks(s *models2.SpiderV2, opts *interfaces.Spider
ScheduleId: opts.ScheduleId,
Priority: opts.Priority,
UserId: opts.UserId,
CreateTs: time.Now(),
}
mainTask.SetId(primitive.NewObjectID())
@@ -88,7 +86,6 @@ func (svc *ServiceV2) scheduleTasks(s *models2.SpiderV2, opts *interfaces.Spider
ScheduleId: opts.ScheduleId,
Priority: opts.Priority,
UserId: opts.UserId,
CreateTs: time.Now(),
}
t.SetId(primitive.NewObjectID())
t2, err := svc.schedulerSvc.Enqueue(t, opts.UserId)

View File

@@ -1,97 +0,0 @@
package task
import (
"fmt"
"github.com/crawlab-team/crawlab/core/constants"
"github.com/crawlab-team/crawlab/core/container"
"github.com/crawlab-team/crawlab/core/interfaces"
"github.com/crawlab-team/crawlab/core/models/delegate"
"github.com/crawlab-team/crawlab/core/models/service"
"github.com/crawlab-team/crawlab/core/utils"
"github.com/crawlab-team/crawlab/trace"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
)
type BaseService struct {
// dependencies
interfaces.WithConfigPath
modelSvc service.ModelService
// internals
stopped bool
}
func (svc *BaseService) Init() error {
// implement me
return nil
}
func (svc *BaseService) Start() {
// implement me
}
func (svc *BaseService) Wait() {
utils.DefaultWait()
}
func (svc *BaseService) Stop() {
svc.stopped = true
}
// SaveTask deprecated
func (svc *BaseService) SaveTask(t interfaces.Task, status string) (err error) {
// normalize status
if status == "" {
status = constants.TaskStatusPending
}
// set task status
t.SetStatus(status)
// attempt to get task from database
_, err = svc.modelSvc.GetTaskById(t.GetId())
if err != nil {
// if task does not exist, add to database
if err == mongo.ErrNoDocuments {
if err := delegate.NewModelDelegate(t).Add(); err != nil {
return err
}
return nil
} else {
return err
}
} else {
// otherwise, update
if err := delegate.NewModelDelegate(t).Save(); err != nil {
return err
}
return nil
}
}
func (svc *BaseService) IsStopped() (res bool) {
return svc.stopped
}
func (svc *BaseService) GetQueue(nodeId primitive.ObjectID) (queue string) {
if nodeId.IsZero() {
return fmt.Sprintf("%s", constants.TaskListQueuePrefixPublic)
} else {
return fmt.Sprintf("%s:%s", constants.TaskListQueuePrefixNodes, nodeId.Hex())
}
}
func NewBaseService() (svc2 interfaces.TaskBaseService, err error) {
svc := &BaseService{}
// dependency injection
if err := container.GetContainer().Invoke(func(cfgPath interfaces.WithConfigPath, modelSvc service.ModelService) {
svc.WithConfigPath = cfgPath
svc.modelSvc = modelSvc
}); err != nil {
return nil, trace.TraceError(err)
}
return svc, nil
}

View File

@@ -1,264 +0,0 @@
package scheduler
import (
"github.com/crawlab-team/crawlab/core/constants"
"github.com/crawlab-team/crawlab/core/container"
"github.com/crawlab-team/crawlab/core/errors"
"github.com/crawlab-team/crawlab/core/interfaces"
"github.com/crawlab-team/crawlab/core/models/delegate"
"github.com/crawlab-team/crawlab/core/models/models"
"github.com/crawlab-team/crawlab/core/models/service"
"github.com/crawlab-team/crawlab/core/task"
"github.com/crawlab-team/crawlab/db/mongo"
grpc "github.com/crawlab-team/crawlab/grpc"
"github.com/crawlab-team/crawlab/trace"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
mongo2 "go.mongodb.org/mongo-driver/mongo"
"time"
)
type Service struct {
// dependencies
interfaces.TaskBaseService
nodeCfgSvc interfaces.NodeConfigService
modelSvc service.ModelService
svr interfaces.GrpcServer
handlerSvc interfaces.TaskHandlerService
// settings
interval time.Duration
}
func (svc *Service) Start() {
go svc.initTaskStatus()
go svc.cleanupTasks()
svc.Wait()
svc.Stop()
}
func (svc *Service) Enqueue(t interfaces.Task) (t2 interfaces.Task, err error) {
// set task status
t.SetStatus(constants.TaskStatusPending)
// user
var u *models.User
if !t.GetUserId().IsZero() {
u, _ = svc.modelSvc.GetUserById(t.GetUserId())
}
// add task
if err = delegate.NewModelDelegate(t, u).Add(); err != nil {
return nil, err
}
// task queue item
tq := &models.TaskQueueItem{
Id: t.GetId(),
Priority: t.GetPriority(),
NodeId: t.GetNodeId(),
}
// task stat
ts := &models.TaskStat{
Id: t.GetId(),
CreateTs: time.Now(),
}
// enqueue task
_, err = mongo.GetMongoCol(interfaces.ModelColNameTaskQueue).Insert(tq)
if err != nil {
return nil, trace.TraceError(err)
}
// add task stat
_, err = mongo.GetMongoCol(interfaces.ModelColNameTaskStat).Insert(ts)
if err != nil {
return nil, trace.TraceError(err)
}
// success
return t, nil
}
func (svc *Service) Cancel(id primitive.ObjectID, args ...interface{}) (err error) {
// task
t, err := svc.modelSvc.GetTaskById(id)
if err != nil {
return trace.TraceError(err)
}
// initial status
initialStatus := t.Status
// set task status as "cancelled"
_ = svc.SaveTask(t, constants.TaskStatusCancelled)
// set status of pending tasks as "cancelled" and remove from task item queue
if initialStatus == constants.TaskStatusPending {
// remove from task item queue
if err := mongo.GetMongoCol(interfaces.ModelColNameTaskQueue).DeleteId(t.GetId()); err != nil {
return trace.TraceError(err)
}
return nil
}
// whether task is running on master node
isMasterTask, err := svc.isMasterNode(t)
if err != nil {
// when error, force status being set as "cancelled"
return svc.SaveTask(t, constants.TaskStatusCancelled)
}
// node
n, err := svc.modelSvc.GetNodeById(t.GetNodeId())
if err != nil {
return trace.TraceError(err)
}
if isMasterTask {
// cancel task on master
if err := svc.handlerSvc.Cancel(id); err != nil {
return trace.TraceError(err)
}
// cancel success
return nil
} else {
// send to cancel task on worker nodes
if err := svc.svr.SendStreamMessageWithData("node:"+n.GetKey(), grpc.StreamMessageCode_CANCEL_TASK, t); err != nil {
return trace.TraceError(err)
}
// cancel success
return nil
}
}
func (svc *Service) SetInterval(interval time.Duration) {
svc.interval = interval
}
// initTaskStatus initialize task status of existing tasks
func (svc *Service) initTaskStatus() {
// set status of running tasks as TaskStatusAbnormal
runningTasks, err := svc.modelSvc.GetTaskList(bson.M{
"status": bson.M{
"$in": []string{
constants.TaskStatusPending,
constants.TaskStatusRunning,
},
},
}, nil)
if err != nil {
if err == mongo2.ErrNoDocuments {
return
}
trace.PrintError(err)
}
for _, t := range runningTasks {
go func(t *models.Task) {
if err := svc.SaveTask(t, constants.TaskStatusAbnormal); err != nil {
trace.PrintError(err)
}
}(&t)
}
if err := svc.modelSvc.GetBaseService(interfaces.ModelIdTaskQueue).DeleteList(nil); err != nil {
return
}
}
func (svc *Service) isMasterNode(t *models.Task) (ok bool, err error) {
if t.GetNodeId().IsZero() {
return false, trace.TraceError(errors.ErrorTaskNoNodeId)
}
n, err := svc.modelSvc.GetNodeById(t.GetNodeId())
if err != nil {
if err == mongo2.ErrNoDocuments {
return false, trace.TraceError(errors.ErrorTaskNodeNotFound)
}
return false, trace.TraceError(err)
}
return n.IsMaster, nil
}
func (svc *Service) cleanupTasks() {
for {
// task stats over 30 days ago
taskStats, err := svc.modelSvc.GetTaskStatList(bson.M{
"create_ts": bson.M{
"$lt": time.Now().Add(-30 * 24 * time.Hour),
},
}, nil)
if err != nil {
time.Sleep(30 * time.Minute)
continue
}
// task ids
var ids []primitive.ObjectID
for _, ts := range taskStats {
ids = append(ids, ts.Id)
}
if len(ids) > 0 {
// remove tasks
if err := svc.modelSvc.GetBaseService(interfaces.ModelIdTask).DeleteList(bson.M{
"_id": bson.M{"$in": ids},
}); err != nil {
trace.PrintError(err)
}
// remove task stats
if err := svc.modelSvc.GetBaseService(interfaces.ModelIdTaskStat).DeleteList(bson.M{
"_id": bson.M{"$in": ids},
}); err != nil {
trace.PrintError(err)
}
}
time.Sleep(30 * time.Minute)
}
}
func NewTaskSchedulerService() (svc2 interfaces.TaskSchedulerService, err error) {
// base service
baseSvc, err := task.NewBaseService()
if err != nil {
return nil, trace.TraceError(err)
}
// service
svc := &Service{
TaskBaseService: baseSvc,
interval: 5 * time.Second,
}
// dependency injection
if err := container.GetContainer().Invoke(func(
nodeCfgSvc interfaces.NodeConfigService,
modelSvc service.ModelService,
svr interfaces.GrpcServer,
handlerSvc interfaces.TaskHandlerService,
) {
svc.nodeCfgSvc = nodeCfgSvc
svc.modelSvc = modelSvc
svc.svr = svr
svc.handlerSvc = handlerSvc
}); err != nil {
return nil, err
}
return svc, nil
}
var svc interfaces.TaskSchedulerService
func GetTaskSchedulerService() (svr interfaces.TaskSchedulerService, err error) {
if svc != nil {
return svc, nil
}
svc, err = NewTaskSchedulerService()
if err != nil {
return nil, err
}
return svc, nil
}

View File

@@ -1,155 +0,0 @@
package stats
import (
"github.com/crawlab-team/crawlab/core/container"
"github.com/crawlab-team/crawlab/core/interfaces"
"github.com/crawlab-team/crawlab/core/models/service"
"github.com/crawlab-team/crawlab/core/result"
"github.com/crawlab-team/crawlab/core/task"
"github.com/crawlab-team/crawlab/core/task/log"
"github.com/crawlab-team/crawlab/db/mongo"
"github.com/crawlab-team/crawlab/trace"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"sync"
"time"
)
type Service struct {
// dependencies
interfaces.TaskBaseService
nodeCfgSvc interfaces.NodeConfigService
modelSvc service.ModelService
// internals
mu sync.Mutex
resultServices sync.Map
rsTtl time.Duration
logDriver log.Driver
}
func (svc *Service) Init() (err error) {
go svc.cleanup()
return nil
}
func (svc *Service) InsertData(id primitive.ObjectID, records ...interface{}) (err error) {
resultSvc, err := svc.getResultService(id)
if err != nil {
return err
}
if err := resultSvc.Insert(records...); err != nil {
return err
}
go svc.updateTaskStats(id, len(records))
return nil
}
func (svc *Service) InsertLogs(id primitive.ObjectID, logs ...string) (err error) {
return svc.logDriver.WriteLines(id.Hex(), logs)
}
func (svc *Service) getResultService(id primitive.ObjectID) (resultSvc interfaces.ResultService, err error) {
// atomic operation
svc.mu.Lock()
defer svc.mu.Unlock()
// attempt to get from cache
res, _ := svc.resultServices.Load(id.Hex())
if res != nil {
// hit in cache
resultSvc, ok := res.(interfaces.ResultService)
resultSvc.SetTime(time.Now())
if ok {
return resultSvc, nil
}
}
// task
t, err := svc.modelSvc.GetTaskById(id)
if err != nil {
return nil, err
}
// result service
resultSvc, err = result.GetResultService(t.SpiderId)
if err != nil {
return nil, err
}
// store in cache
svc.resultServices.Store(id.Hex(), resultSvc)
return resultSvc, nil
}
func (svc *Service) updateTaskStats(id primitive.ObjectID, resultCount int) {
_ = mongo.GetMongoCol(interfaces.ModelColNameTaskStat).UpdateId(id, bson.M{
"$inc": bson.M{
"result_count": resultCount,
},
})
}
func (svc *Service) cleanup() {
for {
// atomic operation
svc.mu.Lock()
svc.resultServices.Range(func(key, value interface{}) bool {
rs := value.(interfaces.ResultService)
if time.Now().After(rs.GetTime().Add(svc.rsTtl)) {
svc.resultServices.Delete(key)
}
return true
})
svc.mu.Unlock()
time.Sleep(10 * time.Minute)
}
}
func NewTaskStatsService() (svc2 interfaces.TaskStatsService, err error) {
// base service
baseSvc, err := task.NewBaseService()
if err != nil {
return nil, trace.TraceError(err)
}
// service
svc := &Service{
mu: sync.Mutex{},
TaskBaseService: baseSvc,
resultServices: sync.Map{},
}
// dependency injection
if err := container.GetContainer().Invoke(func(nodeCfgSvc interfaces.NodeConfigService, modelSvc service.ModelService) {
svc.nodeCfgSvc = nodeCfgSvc
svc.modelSvc = modelSvc
}); err != nil {
return nil, trace.TraceError(err)
}
// log driver
svc.logDriver, err = log.GetLogDriver(log.DriverTypeFile)
if err != nil {
return nil, err
}
return svc, nil
}
var _service interfaces.TaskStatsService
func GetTaskStatsService() (svr interfaces.TaskStatsService, err error) {
if _service != nil {
return _service, nil
}
_service, err = NewTaskStatsService()
if err != nil {
return nil, err
}
return _service, nil
}