refactor: code cleanup

This commit is contained in:
Marvin Zhang
2024-10-29 12:59:45 +08:00
parent 43931a3e48
commit 1c03cb3e5c
31 changed files with 232 additions and 2327 deletions

View File

@@ -53,6 +53,10 @@ func (svc *ServiceV2) Start() {
go svc.Fetch()
}
func (svc *ServiceV2) Stop() {
svc.stopped = true
}
func (svc *ServiceV2) Run(taskId primitive.ObjectID) (err error) {
return svc.run(taskId)
}
@@ -76,128 +80,82 @@ func (svc *ServiceV2) Cancel(taskId primitive.ObjectID) (err error) {
func (svc *ServiceV2) Fetch() {
ticker := time.NewTicker(svc.fetchInterval)
for {
// wait
<-ticker.C
// current node
n, err := svc.GetCurrentNode()
if err != nil {
continue
}
// skip if node is not active or enabled
if !n.Active || !n.Enabled {
continue
}
// validate if there are available runners
if svc.getRunnerCount() >= n.MaxRunners {
continue
}
// stop
if svc.stopped {
ticker.Stop()
return
}
// fetch task
tid, err := svc.fetch()
if err != nil {
trace.PrintError(err)
continue
}
// skip if no task id
if tid.IsZero() {
continue
}
// run task
if err := svc.run(tid); err != nil {
trace.PrintError(err)
t, err := svc.GetTaskById(tid)
if err != nil && t.Status != constants.TaskStatusCancelled {
t.Error = err.Error()
t.Status = constants.TaskStatusError
t.SetUpdated(t.CreatedBy)
_ = client.NewModelServiceV2[models2.TaskV2]().ReplaceById(t.Id, *t)
select {
case <-ticker.C:
// current node
n, err := svc.GetCurrentNode()
if err != nil {
continue
}
// skip if node is not active or enabled
if !n.Active || !n.Enabled {
continue
}
// validate if there are available runners
if svc.getRunnerCount() >= n.MaxRunners {
continue
}
// fetch task
tid, err := svc.fetch()
if err != nil {
trace.PrintError(err)
continue
}
// skip if no task id
if tid.IsZero() {
continue
}
// run task
if err := svc.run(tid); err != nil {
trace.PrintError(err)
t, err := svc.GetTaskById(tid)
if err != nil && t.Status != constants.TaskStatusCancelled {
t.Error = err.Error()
t.Status = constants.TaskStatusError
t.SetUpdated(t.CreatedBy)
_ = client.NewModelServiceV2[models2.TaskV2]().ReplaceById(t.Id, *t)
continue
}
continue
}
continue
}
}
}
func (svc *ServiceV2) ReportStatus() {
ticker := time.NewTicker(svc.reportInterval)
for {
if svc.stopped {
return
}
// report handler status
if err := svc.reportStatus(); err != nil {
trace.PrintError(err)
select {
case <-ticker.C:
// report handler status
if err := svc.reportStatus(); err != nil {
trace.PrintError(err)
}
}
// wait
time.Sleep(svc.reportInterval)
}
}
func (svc *ServiceV2) IsSyncLocked(path string) (ok bool) {
_, ok = svc.syncLocks.Load(path)
return ok
}
func (svc *ServiceV2) LockSync(path string) {
svc.syncLocks.Store(path, true)
}
func (svc *ServiceV2) UnlockSync(path string) {
svc.syncLocks.Delete(path)
}
//func (svc *ServiceV2) GetMaxRunners() (maxRunners int) {
// return svc.maxRunners
//}
//
//func (svc *ServiceV2) SetMaxRunners(maxRunners int) {
// svc.maxRunners = maxRunners
//}
func (svc *ServiceV2) GetExitWatchDuration() (duration time.Duration) {
return svc.exitWatchDuration
}
func (svc *ServiceV2) SetExitWatchDuration(duration time.Duration) {
svc.exitWatchDuration = duration
}
func (svc *ServiceV2) GetFetchInterval() (interval time.Duration) {
return svc.fetchInterval
}
func (svc *ServiceV2) SetFetchInterval(interval time.Duration) {
svc.fetchInterval = interval
}
func (svc *ServiceV2) GetReportInterval() (interval time.Duration) {
return svc.reportInterval
}
func (svc *ServiceV2) SetReportInterval(interval time.Duration) {
svc.reportInterval = interval
}
func (svc *ServiceV2) GetCancelTimeout() (timeout time.Duration) {
return svc.cancelTimeout
}
func (svc *ServiceV2) SetCancelTimeout(timeout time.Duration) {
svc.cancelTimeout = timeout
}
func (svc *ServiceV2) GetNodeConfigService() (cfgSvc interfaces.NodeConfigService) {
return svc.cfgSvc
}
@@ -245,17 +203,6 @@ func (svc *ServiceV2) GetSpiderById(id primitive.ObjectID) (s *models2.SpiderV2,
return s, nil
}
func (svc *ServiceV2) getRunners() (runners []*RunnerV2) {
svc.mu.Lock()
defer svc.mu.Unlock()
svc.runners.Range(func(key, value interface{}) bool {
r := value.(RunnerV2)
runners = append(runners, &r)
return true
})
return runners
}
func (svc *ServiceV2) getRunnerCount() (count int) {
n, err := svc.GetCurrentNode()
if err != nil {
@@ -401,7 +348,6 @@ func newTaskHandlerServiceV2() (svc2 *ServiceV2, err error) {
cancelTimeout: 5 * time.Second,
mu: sync.Mutex{},
runners: sync.Map{},
syncLocks: sync.Map{},
}
// dependency injection