Files
crawlab/core/spider/admin/service.go
Marvin Zhang 34509b8d4c refactor: enhance parameter handling and improve code clarity
- Updated GetListParams to set a default sort option for better query handling.
- Enhanced PostSpiderRunParams to include a default mode and improved error handling for missing spider.
- Added parameters field to ChatMessageContent for more flexible message content management.
- Refactored getNodeIds method to simplify mode handling and removed unnecessary error checks.
- Improved ChatMessageAction component to display parameters and response sections more effectively, enhancing user experience.
2025-04-17 18:03:15 +08:00

148 lines
3.4 KiB
Go

package admin
import (
"github.com/crawlab-team/crawlab/core/constants"
"github.com/crawlab-team/crawlab/core/interfaces"
"github.com/crawlab-team/crawlab/core/models/models"
"github.com/crawlab-team/crawlab/core/models/service"
"github.com/crawlab-team/crawlab/core/node/config"
"github.com/crawlab-team/crawlab/core/task/scheduler"
"github.com/crawlab-team/crawlab/trace"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"sync"
)
type Service struct {
// dependencies
schedulerSvc *scheduler.Service
}
func (svc *Service) Schedule(id primitive.ObjectID, opts *interfaces.SpiderRunOptions) (taskIds []primitive.ObjectID, err error) {
// spider
s, err := service.NewModelService[models.Spider]().GetById(id)
if err != nil {
return nil, err
}
// assign tasks
return svc.scheduleTasks(s, opts)
}
func (svc *Service) scheduleTasks(s *models.Spider, opts *interfaces.SpiderRunOptions) (taskIds []primitive.ObjectID, err error) {
// get node ids
nodeIds, err := svc.getNodeIds(opts)
if err != nil {
return nil, err
}
// iterate node ids
for _, nodeId := range nodeIds {
// task
t := &models.Task{
SpiderId: s.Id,
NodeId: nodeId,
NodeIds: opts.NodeIds,
Mode: opts.Mode,
Cmd: opts.Cmd,
Param: opts.Param,
ScheduleId: opts.ScheduleId,
Priority: opts.Priority,
}
// normalize
if t.Mode == "" {
t.Mode = s.Mode
}
if t.NodeIds == nil {
t.NodeIds = s.NodeIds
}
if t.Cmd == "" {
t.Cmd = s.Cmd
}
if t.Param == "" {
t.Param = s.Param
}
if t.Priority == 0 {
t.Priority = s.Priority
}
// enqueue task
t, err = svc.schedulerSvc.Enqueue(t, opts.UserId)
if err != nil {
return nil, err
}
// append task id
taskIds = append(taskIds, t.Id)
}
return taskIds, nil
}
func (svc *Service) getNodeIds(opts *interfaces.SpiderRunOptions) (nodeIds []primitive.ObjectID, err error) {
switch opts.Mode {
case constants.RunTypeAllNodes:
query := bson.M{
"active": true,
"enabled": true,
"status": constants.NodeStatusOnline,
}
nodes, err := service.NewModelService[models.Node]().GetMany(query, nil)
if err != nil {
return nil, err
}
for _, node := range nodes {
nodeIds = append(nodeIds, node.Id)
}
case constants.RunTypeSelectedNodes:
nodeIds = opts.NodeIds
default:
nodeIds = []primitive.ObjectID{primitive.NilObjectID}
}
return nodeIds, nil
}
func (svc *Service) isMultiTask(opts *interfaces.SpiderRunOptions) (res bool) {
if opts.Mode == constants.RunTypeAllNodes {
query := bson.M{
"active": true,
"enabled": true,
"status": constants.NodeStatusOnline,
}
nodes, err := service.NewModelService[models.Node]().GetMany(query, nil)
if err != nil {
trace.PrintError(err)
return false
}
return len(nodes) > 1
} else if opts.Mode == constants.RunTypeRandom {
return false
} else if opts.Mode == constants.RunTypeSelectedNodes {
return len(opts.NodeIds) > 1
} else {
return false
}
}
func newSpiderAdminService() *Service {
nodeCfgSvc := config.GetNodeConfigService()
// validate node type
if !nodeCfgSvc.IsMaster() {
panic("only master node can run spider admin service")
}
return &Service{
schedulerSvc: scheduler.GetTaskSchedulerService(),
}
}
var _service *Service
var _serviceOnce sync.Once
func GetSpiderAdminService() *Service {
_serviceOnce.Do(func() {
_service = newSpiderAdminService()
})
return _service
}