mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-01-21 17:21:09 +01:00
fix: optimized node runners calculation
This commit is contained in:
@@ -5,19 +5,19 @@ import (
|
||||
)
|
||||
|
||||
type Node struct {
|
||||
any `collection:"nodes"`
|
||||
BaseModel[Node] `bson:",inline"`
|
||||
Key string `json:"key" bson:"key"`
|
||||
Name string `json:"name" bson:"name"`
|
||||
Ip string `json:"ip" bson:"ip"`
|
||||
Mac string `json:"mac" bson:"mac"`
|
||||
Hostname string `json:"hostname" bson:"hostname"`
|
||||
Description string `json:"description" bson:"description"`
|
||||
IsMaster bool `json:"is_master" bson:"is_master"`
|
||||
Status string `json:"status" bson:"status"`
|
||||
Enabled bool `json:"enabled" bson:"enabled"`
|
||||
Active bool `json:"active" bson:"active"`
|
||||
ActiveAt time.Time `json:"active_at" bson:"active_ts"`
|
||||
AvailableRunners int `json:"available_runners" bson:"available_runners"`
|
||||
MaxRunners int `json:"max_runners" bson:"max_runners"`
|
||||
any `collection:"nodes"`
|
||||
BaseModel[Node] `bson:",inline"`
|
||||
Key string `json:"key" bson:"key"`
|
||||
Name string `json:"name" bson:"name"`
|
||||
Ip string `json:"ip" bson:"ip"`
|
||||
Mac string `json:"mac" bson:"mac"`
|
||||
Hostname string `json:"hostname" bson:"hostname"`
|
||||
Description string `json:"description" bson:"description"`
|
||||
IsMaster bool `json:"is_master" bson:"is_master"`
|
||||
Status string `json:"status" bson:"status"`
|
||||
Enabled bool `json:"enabled" bson:"enabled"`
|
||||
Active bool `json:"active" bson:"active"`
|
||||
ActiveAt time.Time `json:"active_at" bson:"active_ts"`
|
||||
CurrentRunners int `json:"current_runners" bson:"current_runners"`
|
||||
MaxRunners int `json:"max_runners" bson:"max_runners"`
|
||||
}
|
||||
|
||||
@@ -181,7 +181,7 @@ func (svc *MasterService) monitor() (err error) {
|
||||
}
|
||||
|
||||
// update node available runners
|
||||
if err := svc.updateNodeAvailableRunners(n); err != nil {
|
||||
if err := svc.updateNodeRunners(n); err != nil {
|
||||
trace.PrintError(err)
|
||||
return
|
||||
}
|
||||
@@ -268,18 +268,20 @@ func (svc *MasterService) pingNodeClient(n *models.Node) (ok bool) {
|
||||
return true
|
||||
}
|
||||
|
||||
func (svc *MasterService) updateNodeAvailableRunners(node *models.Node) (err error) {
|
||||
func (svc *MasterService) updateNodeRunners(node *models.Node) (err error) {
|
||||
query := bson.M{
|
||||
"node_id": node.Id,
|
||||
"status": constants.TaskStatusRunning,
|
||||
}
|
||||
runningTasksCount, err := service.NewModelService[models.Task]().Count(query)
|
||||
if err != nil {
|
||||
return trace.TraceError(err)
|
||||
log.Errorf("failed to count running tasks for node[%s]: %v", node.Key, err)
|
||||
return err
|
||||
}
|
||||
node.AvailableRunners = node.MaxRunners - runningTasksCount
|
||||
node.CurrentRunners = runningTasksCount
|
||||
err = service.NewModelService[models.Node]().ReplaceById(node.Id, *node)
|
||||
if err != nil {
|
||||
log.Errorf("failed to update node runners for node[%s]: %v", node.Key, err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
||||
@@ -270,8 +270,8 @@ func (svc *Service) geContentWithVariables(template string, variables []entity.N
|
||||
content = strings.ReplaceAll(content, v.GetKey(), fmt.Sprintf("%t", vd.Node.Active))
|
||||
case "active_at":
|
||||
content = strings.ReplaceAll(content, v.GetKey(), svc.getFormattedTime(vd.Node.ActiveAt))
|
||||
case "available_runners":
|
||||
content = strings.ReplaceAll(content, v.GetKey(), fmt.Sprintf("%d", vd.Node.AvailableRunners))
|
||||
case "current_runners":
|
||||
content = strings.ReplaceAll(content, v.GetKey(), fmt.Sprintf("%d", vd.Node.CurrentRunners))
|
||||
case "max_runners":
|
||||
content = strings.ReplaceAll(content, v.GetKey(), fmt.Sprintf("%d", vd.Node.MaxRunners))
|
||||
case "created_ts":
|
||||
|
||||
@@ -215,13 +215,13 @@ func (svc *Service) getRunnerCount() (count int) {
|
||||
if svc.cfgSvc.IsMaster() {
|
||||
count, err = service.NewModelService[models.Task]().Count(query)
|
||||
if err != nil {
|
||||
trace.PrintError(err)
|
||||
log.Errorf("failed to count tasks: %v", err)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
count, err = client.NewModelService[models.Task]().Count(query)
|
||||
if err != nil {
|
||||
trace.PrintError(err)
|
||||
log.Errorf("failed to count tasks: %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -260,11 +260,8 @@ func (svc *Service) updateNodeStatus() (err error) {
|
||||
return err
|
||||
}
|
||||
|
||||
// available runners of handler
|
||||
ar := n.MaxRunners - svc.getRunnerCount()
|
||||
|
||||
// set available runners
|
||||
n.AvailableRunners = ar
|
||||
n.CurrentRunners = svc.getRunnerCount()
|
||||
|
||||
// save node
|
||||
n.SetUpdated(n.CreatedBy)
|
||||
|
||||
Reference in New Issue
Block a user