mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-01-25 17:42:25 +01:00
refactor: renamed files and services
This commit is contained in:
@@ -116,7 +116,7 @@ func newNodeConfigService() (svc2 interfaces.NodeConfigService, err error) {
|
||||
}
|
||||
|
||||
var _service interfaces.NodeConfigService
|
||||
var _serviceOnce = new(sync.Once)
|
||||
var _serviceOnce sync.Once
|
||||
|
||||
func GetNodeConfigService() interfaces.NodeConfigService {
|
||||
_serviceOnce.Do(func() {
|
||||
|
||||
@@ -9,7 +9,7 @@ import (
|
||||
"github.com/crawlab-team/crawlab/core/grpc/server"
|
||||
"github.com/crawlab-team/crawlab/core/interfaces"
|
||||
"github.com/crawlab-team/crawlab/core/models/common"
|
||||
models2 "github.com/crawlab-team/crawlab/core/models/models/v2"
|
||||
"github.com/crawlab-team/crawlab/core/models/models"
|
||||
"github.com/crawlab-team/crawlab/core/models/service"
|
||||
"github.com/crawlab-team/crawlab/core/node/config"
|
||||
"github.com/crawlab-team/crawlab/core/notification"
|
||||
@@ -29,12 +29,12 @@ import (
|
||||
|
||||
type MasterService struct {
|
||||
// dependencies
|
||||
cfgSvc interfaces.NodeConfigService
|
||||
server *server.GrpcServer
|
||||
schedulerSvc *scheduler.Service
|
||||
handlerSvc *handler.Service
|
||||
scheduleSvc *schedule.ServiceV2
|
||||
systemSvc *system.ServiceV2
|
||||
cfgSvc interfaces.NodeConfigService
|
||||
server *server.GrpcServer
|
||||
taskSchedulerSvc *scheduler.Service
|
||||
taskHandlerSvc *handler.Service
|
||||
scheduleSvc *schedule.Service
|
||||
systemSvc *system.Service
|
||||
|
||||
// settings
|
||||
cfgPath string
|
||||
@@ -43,11 +43,6 @@ type MasterService struct {
|
||||
stopOnError bool
|
||||
}
|
||||
|
||||
func (svc *MasterService) Init() (err error) {
|
||||
// do nothing
|
||||
return nil
|
||||
}
|
||||
|
||||
func (svc *MasterService) Start() {
|
||||
// create indexes
|
||||
common.InitIndexes()
|
||||
@@ -66,10 +61,10 @@ func (svc *MasterService) Start() {
|
||||
go svc.Monitor()
|
||||
|
||||
// start task handler
|
||||
go svc.handlerSvc.Start()
|
||||
go svc.taskHandlerSvc.Start()
|
||||
|
||||
// start task scheduler
|
||||
go svc.schedulerSvc.Start()
|
||||
go svc.taskSchedulerSvc.Start()
|
||||
|
||||
// start schedule service
|
||||
go svc.scheduleSvc.Start()
|
||||
@@ -87,7 +82,7 @@ func (svc *MasterService) Wait() {
|
||||
|
||||
func (svc *MasterService) Stop() {
|
||||
_ = svc.server.Stop()
|
||||
svc.handlerSvc.Stop()
|
||||
svc.taskHandlerSvc.Stop()
|
||||
log.Infof("master[%s] service has stopped", svc.GetConfigService().GetNodeKey())
|
||||
}
|
||||
|
||||
@@ -133,11 +128,11 @@ func (svc *MasterService) SetMonitorInterval(duration time.Duration) {
|
||||
func (svc *MasterService) Register() (err error) {
|
||||
nodeKey := svc.GetConfigService().GetNodeKey()
|
||||
nodeName := svc.GetConfigService().GetNodeName()
|
||||
node, err := service.NewModelService[models2.NodeV2]().GetOne(bson.M{"key": nodeKey}, nil)
|
||||
node, err := service.NewModelService[models.Node]().GetOne(bson.M{"key": nodeKey}, nil)
|
||||
if err != nil && err.Error() == mongo2.ErrNoDocuments.Error() {
|
||||
// not exists
|
||||
log.Infof("master[%s] does not exist in db", nodeKey)
|
||||
node := models2.NodeV2{
|
||||
node := models.Node{
|
||||
Key: nodeKey,
|
||||
Name: nodeName,
|
||||
MaxRunners: config.DefaultConfigOptions.MaxRunners,
|
||||
@@ -149,7 +144,7 @@ func (svc *MasterService) Register() (err error) {
|
||||
}
|
||||
node.SetCreated(primitive.NilObjectID)
|
||||
node.SetUpdated(primitive.NilObjectID)
|
||||
id, err := service.NewModelService[models2.NodeV2]().InsertOne(node)
|
||||
id, err := service.NewModelService[models.Node]().InsertOne(node)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -161,7 +156,7 @@ func (svc *MasterService) Register() (err error) {
|
||||
node.Status = constants.NodeStatusOnline
|
||||
node.Active = true
|
||||
node.ActiveAt = time.Now()
|
||||
err = service.NewModelService[models2.NodeV2]().ReplaceById(node.Id, *node)
|
||||
err = service.NewModelService[models.Node]().ReplaceById(node.Id, *node)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -192,7 +187,7 @@ func (svc *MasterService) monitor() (err error) {
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(len(workerNodes))
|
||||
for _, n := range workerNodes {
|
||||
go func(n *models2.NodeV2) {
|
||||
go func(n *models.Node) {
|
||||
defer wg.Done()
|
||||
|
||||
// subscribe
|
||||
@@ -222,12 +217,12 @@ func (svc *MasterService) monitor() (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (svc *MasterService) getAllWorkerNodes() (nodes []models2.NodeV2, err error) {
|
||||
func (svc *MasterService) getAllWorkerNodes() (nodes []models.Node, err error) {
|
||||
query := bson.M{
|
||||
"key": bson.M{"$ne": svc.cfgSvc.GetNodeKey()}, // not self
|
||||
"active": true, // active
|
||||
}
|
||||
nodes, err = service.NewModelService[models2.NodeV2]().GetMany(query, nil)
|
||||
nodes, err = service.NewModelService[models.Node]().GetMany(query, nil)
|
||||
if err != nil {
|
||||
if errors.Is(err, mongo2.ErrNoDocuments) {
|
||||
return nil, nil
|
||||
@@ -239,7 +234,7 @@ func (svc *MasterService) getAllWorkerNodes() (nodes []models2.NodeV2, err error
|
||||
|
||||
func (svc *MasterService) updateMasterNodeStatus() (err error) {
|
||||
nodeKey := svc.GetConfigService().GetNodeKey()
|
||||
node, err := service.NewModelService[models2.NodeV2]().GetOne(bson.M{"key": nodeKey}, nil)
|
||||
node, err := service.NewModelService[models.Node]().GetOne(bson.M{"key": nodeKey}, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -250,7 +245,7 @@ func (svc *MasterService) updateMasterNodeStatus() (err error) {
|
||||
node.ActiveAt = time.Now()
|
||||
newStatus := node.Status
|
||||
|
||||
err = service.NewModelService[models2.NodeV2]().ReplaceById(node.Id, *node)
|
||||
err = service.NewModelService[models.Node]().ReplaceById(node.Id, *node)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -264,11 +259,11 @@ func (svc *MasterService) updateMasterNodeStatus() (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (svc *MasterService) setWorkerNodeOffline(node *models2.NodeV2) {
|
||||
func (svc *MasterService) setWorkerNodeOffline(node *models.Node) {
|
||||
node.Status = constants.NodeStatusOffline
|
||||
node.Active = false
|
||||
err := backoff.Retry(func() error {
|
||||
return service.NewModelService[models2.NodeV2]().ReplaceById(node.Id, *node)
|
||||
return service.NewModelService[models.Node]().ReplaceById(node.Id, *node)
|
||||
}, backoff.WithMaxRetries(backoff.NewConstantBackOff(1*time.Second), 3))
|
||||
if err != nil {
|
||||
trace.PrintError(err)
|
||||
@@ -276,12 +271,12 @@ func (svc *MasterService) setWorkerNodeOffline(node *models2.NodeV2) {
|
||||
svc.sendNotification(node)
|
||||
}
|
||||
|
||||
func (svc *MasterService) subscribeNode(n *models2.NodeV2) (ok bool) {
|
||||
func (svc *MasterService) subscribeNode(n *models.Node) (ok bool) {
|
||||
_, ok = svc.server.NodeSvr.GetSubscribeStream(n.Id)
|
||||
return ok
|
||||
}
|
||||
|
||||
func (svc *MasterService) pingNodeClient(n *models2.NodeV2) (ok bool) {
|
||||
func (svc *MasterService) pingNodeClient(n *models.Node) (ok bool) {
|
||||
stream, ok := svc.server.NodeSvr.GetSubscribeStream(n.Id)
|
||||
if !ok {
|
||||
log.Errorf("cannot get worker node client[%s]", n.Key)
|
||||
@@ -297,85 +292,50 @@ func (svc *MasterService) pingNodeClient(n *models2.NodeV2) (ok bool) {
|
||||
return true
|
||||
}
|
||||
|
||||
func (svc *MasterService) updateNodeAvailableRunners(node *models2.NodeV2) (err error) {
|
||||
func (svc *MasterService) updateNodeAvailableRunners(node *models.Node) (err error) {
|
||||
query := bson.M{
|
||||
"node_id": node.Id,
|
||||
"status": constants.TaskStatusRunning,
|
||||
}
|
||||
runningTasksCount, err := service.NewModelService[models2.TaskV2]().Count(query)
|
||||
runningTasksCount, err := service.NewModelService[models.Task]().Count(query)
|
||||
if err != nil {
|
||||
return trace.TraceError(err)
|
||||
}
|
||||
node.AvailableRunners = node.MaxRunners - runningTasksCount
|
||||
err = service.NewModelService[models2.NodeV2]().ReplaceById(node.Id, *node)
|
||||
err = service.NewModelService[models.Node]().ReplaceById(node.Id, *node)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (svc *MasterService) sendNotification(node *models2.NodeV2) {
|
||||
func (svc *MasterService) sendNotification(node *models.Node) {
|
||||
if !utils.IsPro() {
|
||||
return
|
||||
}
|
||||
go notification.GetNotificationServiceV2().SendNodeNotification(node)
|
||||
go notification.GetNotificationService().SendNodeNotification(node)
|
||||
}
|
||||
|
||||
func newMasterServiceV2() (res *MasterService, err error) {
|
||||
// master service
|
||||
svc := &MasterService{
|
||||
cfgPath: config2.GetConfigPath(),
|
||||
monitorInterval: 15 * time.Second,
|
||||
stopOnError: false,
|
||||
func newMasterService() *MasterService {
|
||||
return &MasterService{
|
||||
cfgPath: config2.GetConfigPath(),
|
||||
cfgSvc: config.GetNodeConfigService(),
|
||||
monitorInterval: 15 * time.Second,
|
||||
stopOnError: false,
|
||||
server: server.GetGrpcServer(),
|
||||
taskSchedulerSvc: scheduler.GetTaskSchedulerService(),
|
||||
taskHandlerSvc: handler.GetTaskHandlerService(),
|
||||
scheduleSvc: schedule.GetScheduleService(),
|
||||
systemSvc: system.GetSystemService(),
|
||||
}
|
||||
|
||||
// node config service
|
||||
svc.cfgSvc = config.GetNodeConfigService()
|
||||
|
||||
// grpc server
|
||||
svc.server, err = server.GetGrpcServer()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// scheduler service
|
||||
svc.schedulerSvc, err = scheduler.GetTaskSchedulerService()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// handler service
|
||||
svc.handlerSvc, err = handler.GetTaskHandlerService()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// schedule service
|
||||
svc.scheduleSvc, err = schedule.GetScheduleServiceV2()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// system service
|
||||
svc.systemSvc = system.GetSystemServiceV2()
|
||||
|
||||
// init
|
||||
if err := svc.Init(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return svc, nil
|
||||
}
|
||||
|
||||
var masterService *MasterService
|
||||
var masterServiceOnce = new(sync.Once)
|
||||
var masterServiceOnce sync.Once
|
||||
|
||||
func GetMasterService() (res *MasterService, err error) {
|
||||
func GetMasterService() *MasterService {
|
||||
masterServiceOnce.Do(func() {
|
||||
masterService, err = newMasterServiceV2()
|
||||
if err != nil {
|
||||
log.Errorf("failed to get master service: %v", err)
|
||||
}
|
||||
masterService = newMasterService()
|
||||
})
|
||||
return masterService, err
|
||||
return masterService
|
||||
}
|
||||
|
||||
@@ -5,13 +5,14 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/crawlab-team/crawlab/core/models/models"
|
||||
|
||||
"github.com/apex/log"
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
"github.com/crawlab-team/crawlab/core/config"
|
||||
"github.com/crawlab-team/crawlab/core/grpc/client"
|
||||
"github.com/crawlab-team/crawlab/core/interfaces"
|
||||
client2 "github.com/crawlab-team/crawlab/core/models/client"
|
||||
"github.com/crawlab-team/crawlab/core/models/models/v2"
|
||||
nodeconfig "github.com/crawlab-team/crawlab/core/node/config"
|
||||
"github.com/crawlab-team/crawlab/core/task/handler"
|
||||
"github.com/crawlab-team/crawlab/core/utils"
|
||||
@@ -33,15 +34,10 @@ type WorkerService struct {
|
||||
|
||||
// internals
|
||||
stopped bool
|
||||
n *models.NodeV2
|
||||
n *models.Node
|
||||
s grpc.NodeService_SubscribeClient
|
||||
}
|
||||
|
||||
func (svc *WorkerService) Init() (err error) {
|
||||
// do nothing
|
||||
return nil
|
||||
}
|
||||
|
||||
func (svc *WorkerService) Start() {
|
||||
// start grpc client
|
||||
if err := svc.client.Start(); err != nil {
|
||||
@@ -90,7 +86,7 @@ func (svc *WorkerService) register() {
|
||||
log.Fatalf("failed to register worker[%s] to master: %v", svc.cfgSvc.GetNodeKey(), err)
|
||||
panic(err)
|
||||
}
|
||||
svc.n, err = client2.NewModelService[models.NodeV2]().GetOne(bson.M{"key": svc.GetConfigService().GetNodeKey()}, nil)
|
||||
svc.n, err = client2.NewModelService[models.Node]().GetOne(bson.M{"key": svc.GetConfigService().GetNodeKey()}, nil)
|
||||
if err != nil {
|
||||
log.Fatalf("failed to get node: %v", err)
|
||||
panic(err)
|
||||
@@ -197,42 +193,22 @@ func (svc *WorkerService) sendHeartbeat() {
|
||||
}
|
||||
}
|
||||
|
||||
var workerServiceV2 *WorkerService
|
||||
var workerServiceV2Once = new(sync.Once)
|
||||
|
||||
func newWorkerService() (res *WorkerService, err error) {
|
||||
svc := &WorkerService{
|
||||
func newWorkerService() *WorkerService {
|
||||
return &WorkerService{
|
||||
cfgPath: config.GetConfigPath(),
|
||||
heartbeatInterval: 15 * time.Second,
|
||||
cfgSvc: nodeconfig.GetNodeConfigService(),
|
||||
client: client.GetGrpcClient(),
|
||||
handlerSvc: handler.GetTaskHandlerService(),
|
||||
}
|
||||
|
||||
// node config service
|
||||
svc.cfgSvc = nodeconfig.GetNodeConfigService()
|
||||
|
||||
// grpc client
|
||||
svc.client = client.GetGrpcClient()
|
||||
|
||||
// handler service
|
||||
svc.handlerSvc, err = handler.GetTaskHandlerService()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// init
|
||||
err = svc.Init()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return svc, nil
|
||||
}
|
||||
|
||||
func GetWorkerService() (res *WorkerService, err error) {
|
||||
workerServiceV2Once.Do(func() {
|
||||
workerServiceV2, err = newWorkerService()
|
||||
if err != nil {
|
||||
log.Errorf("failed to get worker service: %v", err)
|
||||
}
|
||||
var workerService *WorkerService
|
||||
var workerServiceOnce sync.Once
|
||||
|
||||
func GetWorkerService() *WorkerService {
|
||||
workerServiceOnce.Do(func() {
|
||||
workerService = newWorkerService()
|
||||
})
|
||||
return workerServiceV2, err
|
||||
return workerService
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user