mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-01-30 18:00:56 +01:00
refactor: renamed files and services
This commit is contained in:
@@ -6,6 +6,7 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/crawlab-team/crawlab/core/models/models"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
@@ -22,7 +23,6 @@ import (
|
||||
client2 "github.com/crawlab-team/crawlab/core/grpc/client"
|
||||
"github.com/crawlab-team/crawlab/core/interfaces"
|
||||
"github.com/crawlab-team/crawlab/core/models/client"
|
||||
"github.com/crawlab-team/crawlab/core/models/models/v2"
|
||||
"github.com/crawlab-team/crawlab/core/models/service"
|
||||
"github.com/crawlab-team/crawlab/core/sys_exec"
|
||||
"github.com/crawlab-team/crawlab/core/utils"
|
||||
@@ -47,8 +47,8 @@ type Runner struct {
|
||||
cmd *exec.Cmd // process command instance
|
||||
pid int // process id
|
||||
tid primitive.ObjectID // task id
|
||||
t *models.TaskV2 // task model.Task
|
||||
s *models.SpiderV2 // spider model.Spider
|
||||
t *models.Task // task model.Task
|
||||
s *models.Spider // spider model.Spider
|
||||
ch chan constants.TaskSignal // channel to communicate between Service and Runner
|
||||
err error // standard process error
|
||||
cwd string // working directory
|
||||
@@ -307,7 +307,7 @@ func (r *Runner) configureEnv() {
|
||||
}
|
||||
|
||||
// Global environment variables
|
||||
envs, err := client.NewModelService[models.EnvironmentV2]().GetMany(nil, nil)
|
||||
envs, err := client.NewModelService[models.Environment]().GetMany(nil, nil)
|
||||
if err != nil {
|
||||
trace.PrintError(err)
|
||||
return
|
||||
@@ -502,12 +502,12 @@ func (r *Runner) updateTask(status string, e error) (err error) {
|
||||
r.t.Error = e.Error()
|
||||
}
|
||||
if r.svc.GetNodeConfigService().IsMaster() {
|
||||
err = service.NewModelService[models.TaskV2]().ReplaceById(r.t.Id, *r.t)
|
||||
err = service.NewModelService[models.Task]().ReplaceById(r.t.Id, *r.t)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
err = client.NewModelService[models.TaskV2]().ReplaceById(r.t.Id, *r.t)
|
||||
err = client.NewModelService[models.Task]().ReplaceById(r.t.Id, *r.t)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -556,7 +556,7 @@ func (r *Runner) writeLogLines(lines []string) {
|
||||
}
|
||||
|
||||
func (r *Runner) _updateTaskStat(status string) {
|
||||
ts, err := client.NewModelService[models.TaskStatV2]().GetById(r.tid)
|
||||
ts, err := client.NewModelService[models.TaskStat]().GetById(r.tid)
|
||||
if err != nil {
|
||||
trace.PrintError(err)
|
||||
return
|
||||
@@ -566,24 +566,24 @@ func (r *Runner) _updateTaskStat(status string) {
|
||||
// do nothing
|
||||
case constants.TaskStatusRunning:
|
||||
ts.StartTs = time.Now()
|
||||
ts.WaitDuration = ts.StartTs.Sub(ts.BaseModelV2.CreatedAt).Milliseconds()
|
||||
ts.WaitDuration = ts.StartTs.Sub(ts.CreatedAt).Milliseconds()
|
||||
case constants.TaskStatusFinished, constants.TaskStatusError, constants.TaskStatusCancelled:
|
||||
if ts.StartTs.IsZero() {
|
||||
ts.StartTs = time.Now()
|
||||
ts.WaitDuration = ts.StartTs.Sub(ts.BaseModelV2.CreatedAt).Milliseconds()
|
||||
ts.WaitDuration = ts.StartTs.Sub(ts.CreatedAt).Milliseconds()
|
||||
}
|
||||
ts.EndTs = time.Now()
|
||||
ts.RuntimeDuration = ts.EndTs.Sub(ts.StartTs).Milliseconds()
|
||||
ts.TotalDuration = ts.EndTs.Sub(ts.BaseModelV2.CreatedAt).Milliseconds()
|
||||
ts.TotalDuration = ts.EndTs.Sub(ts.CreatedAt).Milliseconds()
|
||||
}
|
||||
if r.svc.GetNodeConfigService().IsMaster() {
|
||||
err = service.NewModelService[models.TaskStatV2]().ReplaceById(ts.Id, *ts)
|
||||
err = service.NewModelService[models.TaskStat]().ReplaceById(ts.Id, *ts)
|
||||
if err != nil {
|
||||
trace.PrintError(err)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
err = client.NewModelService[models.TaskStatV2]().ReplaceById(ts.Id, *ts)
|
||||
err = client.NewModelService[models.TaskStat]().ReplaceById(ts.Id, *ts)
|
||||
if err != nil {
|
||||
trace.PrintError(err)
|
||||
return
|
||||
@@ -606,7 +606,7 @@ func (r *Runner) sendNotification() {
|
||||
|
||||
func (r *Runner) _updateSpiderStat(status string) {
|
||||
// task stat
|
||||
ts, err := client.NewModelService[models.TaskStatV2]().GetById(r.tid)
|
||||
ts, err := client.NewModelService[models.TaskStat]().GetById(r.tid)
|
||||
if err != nil {
|
||||
trace.PrintError(err)
|
||||
return
|
||||
@@ -644,13 +644,13 @@ func (r *Runner) _updateSpiderStat(status string) {
|
||||
|
||||
// perform update
|
||||
if r.svc.GetNodeConfigService().IsMaster() {
|
||||
err = service.NewModelService[models.SpiderStatV2]().UpdateById(r.s.Id, update)
|
||||
err = service.NewModelService[models.SpiderStat]().UpdateById(r.s.Id, update)
|
||||
if err != nil {
|
||||
trace.PrintError(err)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
err = client.NewModelService[models.SpiderStatV2]().UpdateById(r.s.Id, update)
|
||||
err = client.NewModelService[models.SpiderStat]().UpdateById(r.s.Id, update)
|
||||
if err != nil {
|
||||
trace.PrintError(err)
|
||||
return
|
||||
@@ -669,7 +669,7 @@ func (r *Runner) configureCwd() {
|
||||
}
|
||||
}
|
||||
|
||||
func NewTaskRunnerV2(id primitive.ObjectID, svc *Service) (r2 *Runner, err error) {
|
||||
func newTaskRunner(id primitive.ObjectID, svc *Service) (r2 *Runner, err error) {
|
||||
// validate options
|
||||
if id.IsZero() {
|
||||
return nil, constants.ErrInvalidOptions
|
||||
@@ -698,7 +698,7 @@ func NewTaskRunnerV2(id primitive.ObjectID, svc *Service) (r2 *Runner, err error
|
||||
}
|
||||
|
||||
// task fs service
|
||||
r.fsSvc = fs.NewFsServiceV2(filepath.Join(viper.GetString("workspace"), r.s.Id.Hex()))
|
||||
r.fsSvc = fs.NewFsService(filepath.Join(viper.GetString("workspace"), r.s.Id.Hex()))
|
||||
|
||||
// grpc client
|
||||
r.c = client2.GetGrpcClient()
|
||||
|
||||
@@ -10,7 +10,7 @@ import (
|
||||
grpcclient "github.com/crawlab-team/crawlab/core/grpc/client"
|
||||
"github.com/crawlab-team/crawlab/core/interfaces"
|
||||
"github.com/crawlab-team/crawlab/core/models/client"
|
||||
models2 "github.com/crawlab-team/crawlab/core/models/models/v2"
|
||||
"github.com/crawlab-team/crawlab/core/models/models"
|
||||
"github.com/crawlab-team/crawlab/core/models/service"
|
||||
nodeconfig "github.com/crawlab-team/crawlab/core/node/config"
|
||||
"github.com/crawlab-team/crawlab/grpc"
|
||||
@@ -110,7 +110,7 @@ func (svc *Service) fetchAndRunTasks() {
|
||||
t.Error = err.Error()
|
||||
t.Status = constants.TaskStatusError
|
||||
t.SetUpdated(t.CreatedBy)
|
||||
_ = client.NewModelService[models2.TaskV2]().ReplaceById(t.Id, *t)
|
||||
_ = client.NewModelService[models.Task]().ReplaceById(t.Id, *t)
|
||||
continue
|
||||
}
|
||||
continue
|
||||
@@ -148,15 +148,15 @@ func (svc *Service) GetNodeConfigService() (cfgSvc interfaces.NodeConfigService)
|
||||
return svc.cfgSvc
|
||||
}
|
||||
|
||||
func (svc *Service) GetCurrentNode() (n *models2.NodeV2, err error) {
|
||||
func (svc *Service) GetCurrentNode() (n *models.Node, err error) {
|
||||
// node key
|
||||
nodeKey := svc.cfgSvc.GetNodeKey()
|
||||
|
||||
// current node
|
||||
if svc.cfgSvc.IsMaster() {
|
||||
n, err = service.NewModelService[models2.NodeV2]().GetOne(bson.M{"key": nodeKey}, nil)
|
||||
n, err = service.NewModelService[models.Node]().GetOne(bson.M{"key": nodeKey}, nil)
|
||||
} else {
|
||||
n, err = client.NewModelService[models2.NodeV2]().GetOne(bson.M{"key": nodeKey}, nil)
|
||||
n, err = client.NewModelService[models.Node]().GetOne(bson.M{"key": nodeKey}, nil)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -165,11 +165,11 @@ func (svc *Service) GetCurrentNode() (n *models2.NodeV2, err error) {
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func (svc *Service) GetTaskById(id primitive.ObjectID) (t *models2.TaskV2, err error) {
|
||||
func (svc *Service) GetTaskById(id primitive.ObjectID) (t *models.Task, err error) {
|
||||
if svc.cfgSvc.IsMaster() {
|
||||
t, err = service.NewModelService[models2.TaskV2]().GetById(id)
|
||||
t, err = service.NewModelService[models.Task]().GetById(id)
|
||||
} else {
|
||||
t, err = client.NewModelService[models2.TaskV2]().GetById(id)
|
||||
t, err = client.NewModelService[models.Task]().GetById(id)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -178,12 +178,12 @@ func (svc *Service) GetTaskById(id primitive.ObjectID) (t *models2.TaskV2, err e
|
||||
return t, nil
|
||||
}
|
||||
|
||||
func (svc *Service) UpdateTask(t *models2.TaskV2) (err error) {
|
||||
func (svc *Service) UpdateTask(t *models.Task) (err error) {
|
||||
t.SetUpdated(t.CreatedBy)
|
||||
if svc.cfgSvc.IsMaster() {
|
||||
err = service.NewModelService[models2.TaskV2]().ReplaceById(t.Id, *t)
|
||||
err = service.NewModelService[models.Task]().ReplaceById(t.Id, *t)
|
||||
} else {
|
||||
err = client.NewModelService[models2.TaskV2]().ReplaceById(t.Id, *t)
|
||||
err = client.NewModelService[models.Task]().ReplaceById(t.Id, *t)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -191,11 +191,11 @@ func (svc *Service) UpdateTask(t *models2.TaskV2) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (svc *Service) GetSpiderById(id primitive.ObjectID) (s *models2.SpiderV2, err error) {
|
||||
func (svc *Service) GetSpiderById(id primitive.ObjectID) (s *models.Spider, err error) {
|
||||
if svc.cfgSvc.IsMaster() {
|
||||
s, err = service.NewModelService[models2.SpiderV2]().GetById(id)
|
||||
s, err = service.NewModelService[models.Spider]().GetById(id)
|
||||
} else {
|
||||
s, err = client.NewModelService[models2.SpiderV2]().GetById(id)
|
||||
s, err = client.NewModelService[models.Spider]().GetById(id)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -217,13 +217,13 @@ func (svc *Service) getRunnerCount() (count int) {
|
||||
},
|
||||
}
|
||||
if svc.cfgSvc.IsMaster() {
|
||||
count, err = service.NewModelService[models2.TaskV2]().Count(query)
|
||||
count, err = service.NewModelService[models.Task]().Count(query)
|
||||
if err != nil {
|
||||
trace.PrintError(err)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
count, err = client.NewModelService[models2.TaskV2]().Count(query)
|
||||
count, err = client.NewModelService[models.Task]().Count(query)
|
||||
if err != nil {
|
||||
trace.PrintError(err)
|
||||
return
|
||||
@@ -273,9 +273,9 @@ func (svc *Service) updateNodeStatus() (err error) {
|
||||
// save node
|
||||
n.SetUpdated(n.CreatedBy)
|
||||
if svc.cfgSvc.IsMaster() {
|
||||
err = service.NewModelService[models2.NodeV2]().ReplaceById(n.Id, *n)
|
||||
err = service.NewModelService[models.Node]().ReplaceById(n.Id, *n)
|
||||
} else {
|
||||
err = client.NewModelService[models2.NodeV2]().ReplaceById(n.Id, *n)
|
||||
err = client.NewModelService[models.Node]().ReplaceById(n.Id, *n)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -311,7 +311,7 @@ func (svc *Service) runTask(taskId primitive.ObjectID) (err error) {
|
||||
}
|
||||
|
||||
// create a new task runner
|
||||
r, err := NewTaskRunnerV2(taskId, svc)
|
||||
r, err := newTaskRunner(taskId, svc)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to create task runner: %v", err)
|
||||
log.Errorf("run task error: %v", err)
|
||||
@@ -439,7 +439,7 @@ func (svc *Service) cancelTask(taskId primitive.ObjectID, force bool) (err error
|
||||
return nil
|
||||
}
|
||||
|
||||
func newTaskHandlerService() (svc2 *Service, err error) {
|
||||
func newTaskHandlerService() *Service {
|
||||
// service
|
||||
svc := &Service{
|
||||
exitWatchDuration: 60 * time.Second,
|
||||
@@ -459,24 +459,15 @@ func newTaskHandlerService() (svc2 *Service, err error) {
|
||||
|
||||
log.Debugf("[NewTaskHandlerService] svc[cfgPath: %s]", svc.cfgSvc.GetConfigPath())
|
||||
|
||||
return svc, nil
|
||||
return svc
|
||||
}
|
||||
|
||||
var _serviceV2 *Service
|
||||
var _serviceV2Once = new(sync.Once)
|
||||
var _service *Service
|
||||
var _serviceOnce sync.Once
|
||||
|
||||
func GetTaskHandlerService() (svr *Service, err error) {
|
||||
if _serviceV2 != nil {
|
||||
return _serviceV2, nil
|
||||
}
|
||||
_serviceV2Once.Do(func() {
|
||||
_serviceV2, err = newTaskHandlerService()
|
||||
if err != nil {
|
||||
log.Errorf("failed to create task handler service: %v", err)
|
||||
}
|
||||
func GetTaskHandlerService() *Service {
|
||||
_serviceOnce.Do(func() {
|
||||
_service = newTaskHandlerService()
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return _serviceV2, nil
|
||||
return _service
|
||||
}
|
||||
|
||||
@@ -1,18 +1,16 @@
|
||||
package log
|
||||
|
||||
func GetLogDriver(logDriverType string) (driver Driver, err error) {
|
||||
import "fmt"
|
||||
|
||||
func GetLogDriver(logDriverType string) Driver {
|
||||
switch logDriverType {
|
||||
case DriverTypeFile:
|
||||
driver, err = GetFileLogDriver()
|
||||
if err != nil {
|
||||
return driver, err
|
||||
}
|
||||
return GetFileLogDriver()
|
||||
case DriverTypeMongo:
|
||||
return driver, ErrNotImplemented
|
||||
panic("mongo driver not implemented")
|
||||
case DriverTypeEs:
|
||||
return driver, ErrNotImplemented
|
||||
panic("es driver not implemented")
|
||||
default:
|
||||
return driver, ErrInvalidType
|
||||
panic(fmt.Sprintf("invalid log driver type: %s", logDriverType))
|
||||
}
|
||||
return driver, nil
|
||||
}
|
||||
|
||||
@@ -26,10 +26,8 @@ type FileLogDriver struct {
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func (d *FileLogDriver) Init() (err error) {
|
||||
func (d *FileLogDriver) Init() {
|
||||
go d.cleanup()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *FileLogDriver) Close() (err error) {
|
||||
@@ -255,30 +253,25 @@ func (d *FileLogDriver) cleanup() {
|
||||
}
|
||||
}
|
||||
|
||||
var logDriver Driver
|
||||
|
||||
func newFileLogDriver() (driver Driver, err error) {
|
||||
func newFileLogDriver() Driver {
|
||||
// driver
|
||||
driver = &FileLogDriver{
|
||||
driver := &FileLogDriver{
|
||||
logFileName: "log.txt",
|
||||
mu: sync.Mutex{},
|
||||
}
|
||||
|
||||
// init
|
||||
if err := driver.Init(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
driver.Init()
|
||||
|
||||
return driver, nil
|
||||
return driver
|
||||
}
|
||||
|
||||
func GetFileLogDriver() (driver Driver, err error) {
|
||||
if logDriver != nil {
|
||||
return logDriver, nil
|
||||
}
|
||||
logDriver, err = newFileLogDriver()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return logDriver, nil
|
||||
var logDriver Driver
|
||||
var logDriverOnce sync.Once
|
||||
|
||||
func GetFileLogDriver() Driver {
|
||||
logDriverOnce.Do(func() {
|
||||
logDriver = newFileLogDriver()
|
||||
})
|
||||
return logDriver
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
package log
|
||||
|
||||
type Driver interface {
|
||||
Init() (err error)
|
||||
Init()
|
||||
Close() (err error)
|
||||
WriteLine(id string, line string) (err error)
|
||||
WriteLines(id string, lines []string) (err error)
|
||||
|
||||
@@ -7,10 +7,8 @@ import (
|
||||
"github.com/crawlab-team/crawlab/core/constants"
|
||||
"github.com/crawlab-team/crawlab/core/errors"
|
||||
"github.com/crawlab-team/crawlab/core/grpc/server"
|
||||
"github.com/crawlab-team/crawlab/core/interfaces"
|
||||
models2 "github.com/crawlab-team/crawlab/core/models/models/v2"
|
||||
"github.com/crawlab-team/crawlab/core/models/models"
|
||||
"github.com/crawlab-team/crawlab/core/models/service"
|
||||
nodeconfig "github.com/crawlab-team/crawlab/core/node/config"
|
||||
"github.com/crawlab-team/crawlab/core/task/handler"
|
||||
"github.com/crawlab-team/crawlab/core/utils"
|
||||
"github.com/crawlab-team/crawlab/grpc"
|
||||
@@ -18,12 +16,12 @@ import (
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
mongo2 "go.mongodb.org/mongo-driver/mongo"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Service struct {
|
||||
// dependencies
|
||||
nodeCfgSvc interfaces.NodeConfigService
|
||||
svr *server.GrpcServer
|
||||
handlerSvc *handler.Service
|
||||
|
||||
@@ -37,27 +35,27 @@ func (svc *Service) Start() {
|
||||
utils.DefaultWait()
|
||||
}
|
||||
|
||||
func (svc *Service) Enqueue(t *models2.TaskV2, by primitive.ObjectID) (t2 *models2.TaskV2, err error) {
|
||||
func (svc *Service) Enqueue(t *models.Task, by primitive.ObjectID) (t2 *models.Task, err error) {
|
||||
// set task status
|
||||
t.Status = constants.TaskStatusPending
|
||||
t.SetCreated(by)
|
||||
t.SetUpdated(by)
|
||||
|
||||
// add task
|
||||
taskModelSvc := service.NewModelService[models2.TaskV2]()
|
||||
taskModelSvc := service.NewModelService[models.Task]()
|
||||
t.Id, err = taskModelSvc.InsertOne(*t)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// task stat
|
||||
ts := models2.TaskStatV2{}
|
||||
ts := models.TaskStat{}
|
||||
ts.SetId(t.Id)
|
||||
ts.SetCreated(by)
|
||||
ts.SetUpdated(by)
|
||||
|
||||
// add task stat
|
||||
_, err = service.NewModelService[models2.TaskStatV2]().InsertOne(ts)
|
||||
_, err = service.NewModelService[models.TaskStat]().InsertOne(ts)
|
||||
if err != nil {
|
||||
return nil, trace.TraceError(err)
|
||||
}
|
||||
@@ -68,7 +66,7 @@ func (svc *Service) Enqueue(t *models2.TaskV2, by primitive.ObjectID) (t2 *model
|
||||
|
||||
func (svc *Service) Cancel(id, by primitive.ObjectID, force bool) (err error) {
|
||||
// task
|
||||
t, err := service.NewModelService[models2.TaskV2]().GetById(id)
|
||||
t, err := service.NewModelService[models.Task]().GetById(id)
|
||||
if err != nil {
|
||||
log.Errorf("task not found: %s", id.Hex())
|
||||
return err
|
||||
@@ -101,7 +99,7 @@ func (svc *Service) Cancel(id, by primitive.ObjectID, force bool) (err error) {
|
||||
}
|
||||
}
|
||||
|
||||
func (svc *Service) cancelOnMaster(t *models2.TaskV2, by primitive.ObjectID, force bool) (err error) {
|
||||
func (svc *Service) cancelOnMaster(t *models.Task, by primitive.ObjectID, force bool) (err error) {
|
||||
if err := svc.handlerSvc.Cancel(t.Id, force); err != nil {
|
||||
log.Errorf("failed to cancel task on master: %s", t.Id.Hex())
|
||||
return err
|
||||
@@ -112,7 +110,7 @@ func (svc *Service) cancelOnMaster(t *models2.TaskV2, by primitive.ObjectID, for
|
||||
return svc.SaveTask(t, by)
|
||||
}
|
||||
|
||||
func (svc *Service) cancelOnWorker(t *models2.TaskV2, by primitive.ObjectID, force bool) (err error) {
|
||||
func (svc *Service) cancelOnWorker(t *models.Task, by primitive.ObjectID, force bool) (err error) {
|
||||
// get subscribe stream
|
||||
stream, ok := svc.svr.TaskSvr.GetSubscribeStream(t.Id)
|
||||
if !ok {
|
||||
@@ -141,22 +139,22 @@ func (svc *Service) SetInterval(interval time.Duration) {
|
||||
svc.interval = interval
|
||||
}
|
||||
|
||||
func (svc *Service) SaveTask(t *models2.TaskV2, by primitive.ObjectID) (err error) {
|
||||
func (svc *Service) SaveTask(t *models.Task, by primitive.ObjectID) (err error) {
|
||||
if t.Id.IsZero() {
|
||||
t.SetCreated(by)
|
||||
t.SetUpdated(by)
|
||||
_, err = service.NewModelService[models2.TaskV2]().InsertOne(*t)
|
||||
_, err = service.NewModelService[models.Task]().InsertOne(*t)
|
||||
return err
|
||||
} else {
|
||||
t.SetUpdated(by)
|
||||
return service.NewModelService[models2.TaskV2]().ReplaceById(t.Id, *t)
|
||||
return service.NewModelService[models.Task]().ReplaceById(t.Id, *t)
|
||||
}
|
||||
}
|
||||
|
||||
// initTaskStatus initialize task status of existing tasks
|
||||
func (svc *Service) initTaskStatus() {
|
||||
// set status of running tasks as TaskStatusAbnormal
|
||||
runningTasks, err := service.NewModelService[models2.TaskV2]().GetMany(bson.M{
|
||||
runningTasks, err := service.NewModelService[models.Task]().GetMany(bson.M{
|
||||
"status": bson.M{
|
||||
"$in": []string{
|
||||
constants.TaskStatusPending,
|
||||
@@ -173,7 +171,7 @@ func (svc *Service) initTaskStatus() {
|
||||
return
|
||||
}
|
||||
for _, t := range runningTasks {
|
||||
go func(t *models2.TaskV2) {
|
||||
go func(t *models.Task) {
|
||||
t.Status = constants.TaskStatusAbnormal
|
||||
if err := svc.SaveTask(t, primitive.NilObjectID); err != nil {
|
||||
trace.PrintError(err)
|
||||
@@ -182,11 +180,11 @@ func (svc *Service) initTaskStatus() {
|
||||
}
|
||||
}
|
||||
|
||||
func (svc *Service) isMasterNode(t *models2.TaskV2) (ok bool, err error) {
|
||||
func (svc *Service) isMasterNode(t *models.Task) (ok bool, err error) {
|
||||
if t.NodeId.IsZero() {
|
||||
return false, trace.TraceError(errors.ErrorTaskNoNodeId)
|
||||
}
|
||||
n, err := service.NewModelService[models2.NodeV2]().GetById(t.NodeId)
|
||||
n, err := service.NewModelService[models.Node]().GetById(t.NodeId)
|
||||
if err != nil {
|
||||
if errors2.Is(err, mongo2.ErrNoDocuments) {
|
||||
return false, trace.TraceError(errors.ErrorTaskNodeNotFound)
|
||||
@@ -199,7 +197,7 @@ func (svc *Service) isMasterNode(t *models2.TaskV2) (ok bool, err error) {
|
||||
func (svc *Service) cleanupTasks() {
|
||||
for {
|
||||
// task stats over 30 days ago
|
||||
taskStats, err := service.NewModelService[models2.TaskStatV2]().GetMany(bson.M{
|
||||
taskStats, err := service.NewModelService[models.TaskStat]().GetMany(bson.M{
|
||||
"created_ts": bson.M{
|
||||
"$lt": time.Now().Add(-30 * 24 * time.Hour),
|
||||
},
|
||||
@@ -217,14 +215,14 @@ func (svc *Service) cleanupTasks() {
|
||||
|
||||
if len(ids) > 0 {
|
||||
// remove tasks
|
||||
if err := service.NewModelService[models2.TaskV2]().DeleteMany(bson.M{
|
||||
if err := service.NewModelService[models.Task]().DeleteMany(bson.M{
|
||||
"_id": bson.M{"$in": ids},
|
||||
}); err != nil {
|
||||
trace.PrintError(err)
|
||||
}
|
||||
|
||||
// remove task stats
|
||||
if err := service.NewModelService[models2.TaskStatV2]().DeleteMany(bson.M{
|
||||
if err := service.NewModelService[models.TaskStat]().DeleteMany(bson.M{
|
||||
"_id": bson.M{"$in": ids},
|
||||
}); err != nil {
|
||||
trace.PrintError(err)
|
||||
@@ -235,35 +233,20 @@ func (svc *Service) cleanupTasks() {
|
||||
}
|
||||
}
|
||||
|
||||
func NewTaskSchedulerService() (svc2 *Service, err error) {
|
||||
// service
|
||||
svc := &Service{
|
||||
interval: 5 * time.Second,
|
||||
func newTaskSchedulerService() *Service {
|
||||
return &Service{
|
||||
interval: 5 * time.Second,
|
||||
svr: server.GetGrpcServer(),
|
||||
handlerSvc: handler.GetTaskHandlerService(),
|
||||
}
|
||||
svc.nodeCfgSvc = nodeconfig.GetNodeConfigService()
|
||||
svc.svr, err = server.GetGrpcServer()
|
||||
if err != nil {
|
||||
log.Errorf("failed to get grpc server: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
svc.handlerSvc, err = handler.GetTaskHandlerService()
|
||||
if err != nil {
|
||||
log.Errorf("failed to get task handler service: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return svc, nil
|
||||
}
|
||||
|
||||
var svc *Service
|
||||
var _service *Service
|
||||
var _serviceOnce sync.Once
|
||||
|
||||
func GetTaskSchedulerService() (svr *Service, err error) {
|
||||
if svc != nil {
|
||||
return svc, nil
|
||||
}
|
||||
svc, err = NewTaskSchedulerService()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return svc, nil
|
||||
func GetTaskSchedulerService() *Service {
|
||||
_serviceOnce.Do(func() {
|
||||
_service = newTaskSchedulerService()
|
||||
})
|
||||
return _service
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@ import (
|
||||
"github.com/crawlab-team/crawlab/core/database"
|
||||
interfaces2 "github.com/crawlab-team/crawlab/core/database/interfaces"
|
||||
"github.com/crawlab-team/crawlab/core/interfaces"
|
||||
models2 "github.com/crawlab-team/crawlab/core/models/models/v2"
|
||||
"github.com/crawlab-team/crawlab/core/models/models"
|
||||
"github.com/crawlab-team/crawlab/core/models/service"
|
||||
nodeconfig "github.com/crawlab-team/crawlab/core/node/config"
|
||||
"github.com/crawlab-team/crawlab/core/task/log"
|
||||
@@ -98,13 +98,13 @@ func (svc *Service) getDatabaseServiceItem(taskId primitive.ObjectID) (item *dat
|
||||
}
|
||||
|
||||
// task
|
||||
t, err := service.NewModelService[models2.TaskV2]().GetById(taskId)
|
||||
t, err := service.NewModelService[models.Task]().GetById(taskId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// spider
|
||||
s, err := service.NewModelService[models2.SpiderV2]().GetById(t.SpiderId)
|
||||
s, err := service.NewModelService[models.Spider]().GetById(t.SpiderId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -137,7 +137,7 @@ func (svc *Service) getDatabaseServiceItem(taskId primitive.ObjectID) (item *dat
|
||||
}
|
||||
|
||||
func (svc *Service) updateTaskStats(id primitive.ObjectID, resultCount int) {
|
||||
err := service.NewModelService[models2.TaskStatV2]().UpdateById(id, bson.M{
|
||||
err := service.NewModelService[models.TaskStat]().UpdateById(id, bson.M{
|
||||
"$inc": bson.M{
|
||||
"result_count": resultCount,
|
||||
},
|
||||
@@ -176,7 +176,7 @@ func (svc *Service) normalizeRecord(item *databaseServiceItem, record map[string
|
||||
return res
|
||||
}
|
||||
|
||||
func NewTaskStatsServiceV2() (svc2 *Service, err error) {
|
||||
func NewTaskStatsService() *Service {
|
||||
// service
|
||||
svc := &Service{
|
||||
mu: sync.Mutex{},
|
||||
@@ -187,23 +187,17 @@ func NewTaskStatsServiceV2() (svc2 *Service, err error) {
|
||||
svc.nodeCfgSvc = nodeconfig.GetNodeConfigService()
|
||||
|
||||
// log driver
|
||||
svc.logDriver, err = log.GetLogDriver(log.DriverTypeFile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
svc.logDriver = log.GetLogDriver(log.DriverTypeFile)
|
||||
|
||||
return svc, nil
|
||||
return svc
|
||||
}
|
||||
|
||||
var _serviceV2 *Service
|
||||
var _service *Service
|
||||
var _serviceOnce sync.Once
|
||||
|
||||
func GetTaskStatsServiceV2() (svr *Service, err error) {
|
||||
if _serviceV2 != nil {
|
||||
return _serviceV2, nil
|
||||
}
|
||||
_serviceV2, err = NewTaskStatsServiceV2()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return _serviceV2, nil
|
||||
func GetTaskStatsService() *Service {
|
||||
_serviceOnce.Do(func() {
|
||||
_service = NewTaskStatsService()
|
||||
})
|
||||
return _service
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user