fix: unable to sync directories to work nodes

This commit is contained in:
Marvin Zhang
2024-07-01 15:59:20 +08:00
parent 68307d250e
commit 023ba27566
40 changed files with 768 additions and 1540 deletions

View File

@@ -74,9 +74,10 @@ func (svc *ServiceV2) Cancel(taskId primitive.ObjectID) (err error) {
}
func (svc *ServiceV2) Fetch() {
ticker := time.NewTicker(svc.fetchInterval)
for {
// wait
time.Sleep(svc.fetchInterval)
<-ticker.C
// current node
n, err := svc.GetCurrentNode()
@@ -96,6 +97,7 @@ func (svc *ServiceV2) Fetch() {
// stop
if svc.stopped {
ticker.Stop()
return
}
@@ -115,10 +117,11 @@ func (svc *ServiceV2) Fetch() {
if err := svc.run(tid); err != nil {
trace.PrintError(err)
t, err := svc.GetTaskById(tid)
if err == nil && t.Status != constants.TaskStatusCancelled {
if err != nil && t.Status != constants.TaskStatusCancelled {
t.Error = err.Error()
t.Status = constants.TaskStatusError
t.SetUpdated(t.CreatedBy)
_ = client.NewModelServiceV2[models.TaskV2]().ReplaceById(t.Id, *t)
continue
}
continue
@@ -388,7 +391,7 @@ func (svc *ServiceV2) run(taskId primitive.ObjectID) (err error) {
return nil
}
func NewTaskHandlerServiceV2() (svc2 *ServiceV2, err error) {
func newTaskHandlerServiceV2() (svc2 *ServiceV2, err error) {
// service
svc := &ServiceV2{
exitWatchDuration: 60 * time.Second,
@@ -413,12 +416,18 @@ func NewTaskHandlerServiceV2() (svc2 *ServiceV2, err error) {
}
var _serviceV2 *ServiceV2
var _serviceV2Once = new(sync.Once)
func GetTaskHandlerServiceV2() (svr *ServiceV2, err error) {
if _serviceV2 != nil {
return _serviceV2, nil
}
_serviceV2, err = NewTaskHandlerServiceV2()
_serviceV2Once.Do(func() {
_serviceV2, err = newTaskHandlerServiceV2()
if err != nil {
log.Errorf("failed to create task handler service: %v", err)
}
})
if err != nil {
return nil, err
}