refactor: optimized node communication

This commit is contained in:
Marvin Zhang
2024-10-31 20:15:56 +08:00
parent d9b327de17
commit 68ba84a4e7
18 changed files with 291 additions and 308 deletions

View File

@@ -33,7 +33,7 @@ import (
"time"
)
type RunnerV2 struct {
type Runner struct {
// dependencies
svc *Service // task handler service
fsSvc interfaces.FsService // task fs service
@@ -48,7 +48,7 @@ type RunnerV2 struct {
tid primitive.ObjectID // task id
t *models.TaskV2 // task model.Task
s *models.SpiderV2 // spider model.Spider
ch chan constants.TaskSignal // channel to communicate between Service and RunnerV2
ch chan constants.TaskSignal // channel to communicate between Service and Runner
err error // standard process error
cwd string // working directory
c *client2.GrpcClient // grpc client
@@ -60,7 +60,7 @@ type RunnerV2 struct {
logBatchSize int
}
func (r *RunnerV2) Init() (err error) {
func (r *Runner) Init() (err error) {
// update task
if err := r.updateTask("", nil); err != nil {
return err
@@ -82,7 +82,7 @@ func (r *RunnerV2) Init() (err error) {
return nil
}
func (r *RunnerV2) Run() (err error) {
func (r *Runner) Run() (err error) {
// log task started
log.Infof("task[%s] started", r.tid.Hex())
@@ -165,7 +165,7 @@ func (r *RunnerV2) Run() (err error) {
return err
}
func (r *RunnerV2) Cancel(force bool) (err error) {
func (r *Runner) Cancel(force bool) (err error) {
// kill process
opts := &sys_exec.KillProcessOptions{
Timeout: r.svc.GetCancelTimeout(),
@@ -191,15 +191,15 @@ func (r *RunnerV2) Cancel(force bool) (err error) {
}
}
func (r *RunnerV2) SetSubscribeTimeout(timeout time.Duration) {
func (r *Runner) SetSubscribeTimeout(timeout time.Duration) {
r.subscribeTimeout = timeout
}
func (r *RunnerV2) GetTaskId() (id primitive.ObjectID) {
func (r *Runner) GetTaskId() (id primitive.ObjectID) {
return r.tid
}
func (r *RunnerV2) configureCmd() (err error) {
func (r *Runner) configureCmd() (err error) {
var cmdStr string
// customized spider
@@ -230,7 +230,7 @@ func (r *RunnerV2) configureCmd() (err error) {
return nil
}
func (r *RunnerV2) configureLogging() {
func (r *Runner) configureLogging() {
// set stdout reader
stdout, _ := r.cmd.StdoutPipe()
r.scannerStdout = bufio.NewReaderSize(stdout, r.bufferSize)
@@ -240,7 +240,7 @@ func (r *RunnerV2) configureLogging() {
r.scannerStderr = bufio.NewReaderSize(stderr, r.bufferSize)
}
func (r *RunnerV2) startLogging() {
func (r *Runner) startLogging() {
// start reading stdout
go r.startLoggingReaderStdout()
@@ -248,7 +248,7 @@ func (r *RunnerV2) startLogging() {
go r.startLoggingReaderStderr()
}
func (r *RunnerV2) startLoggingReaderStdout() {
func (r *Runner) startLoggingReaderStdout() {
for {
line, err := r.scannerStdout.ReadString(byte('\n'))
if err != nil {
@@ -259,7 +259,7 @@ func (r *RunnerV2) startLoggingReaderStdout() {
}
}
func (r *RunnerV2) startLoggingReaderStderr() {
func (r *Runner) startLoggingReaderStderr() {
for {
line, err := r.scannerStderr.ReadString(byte('\n'))
if err != nil {
@@ -270,7 +270,7 @@ func (r *RunnerV2) startLoggingReaderStderr() {
}
}
func (r *RunnerV2) startHealthCheck() {
func (r *Runner) startHealthCheck() {
if r.cmd.ProcessState == nil || r.cmd.ProcessState.Exited() {
return
}
@@ -285,7 +285,7 @@ func (r *RunnerV2) startHealthCheck() {
}
}
func (r *RunnerV2) configureEnv() {
func (r *Runner) configureEnv() {
// 默认把Node.js的全局node_modules加入环境变量
envPath := os.Getenv("PATH")
nodePath := "/usr/lib/node_modules"
@@ -316,7 +316,7 @@ func (r *RunnerV2) configureEnv() {
}
}
func (r *RunnerV2) syncFiles() (err error) {
func (r *Runner) syncFiles() (err error) {
var id string
var workingDir string
if r.s.GitId.IsZero() {
@@ -425,7 +425,7 @@ func (r *RunnerV2) syncFiles() (err error) {
return err
}
func (r *RunnerV2) downloadFile(url string, filePath string, fileInfo *entity.FsFileInfo) error {
func (r *Runner) downloadFile(url string, filePath string, fileInfo *entity.FsFileInfo) error {
// get file response
resp, err := http.Get(url)
if err != nil {
@@ -465,8 +465,8 @@ func (r *RunnerV2) downloadFile(url string, filePath string, fileInfo *entity.Fs
}
// wait for process to finish and send task signal (constants.TaskSignal)
// to task runner's channel (RunnerV2.ch) according to exit code
func (r *RunnerV2) wait() {
// to task runner's channel (Runner.ch) according to exit code
func (r *Runner) wait() {
// wait for process to finish
if err := r.cmd.Wait(); err != nil {
var exitError *exec.ExitError
@@ -492,8 +492,8 @@ func (r *RunnerV2) wait() {
r.ch <- constants.TaskSignalFinish
}
// updateTask update and get updated info of task (RunnerV2.t)
func (r *RunnerV2) updateTask(status string, e error) (err error) {
// updateTask update and get updated info of task (Runner.t)
func (r *Runner) updateTask(status string, e error) (err error) {
if r.t != nil && status != "" {
// update task status
r.t.Status = status
@@ -529,7 +529,7 @@ func (r *RunnerV2) updateTask(status string, e error) (err error) {
return nil
}
func (r *RunnerV2) initConnection() (err error) {
func (r *Runner) initConnection() (err error) {
r.conn, err = r.c.TaskClient.Connect(context.Background())
if err != nil {
return trace.TraceError(err)
@@ -537,7 +537,7 @@ func (r *RunnerV2) initConnection() (err error) {
return nil
}
func (r *RunnerV2) writeLogLines(lines []string) {
func (r *Runner) writeLogLines(lines []string) {
linesBytes, err := json.Marshal(lines)
if err != nil {
log.Errorf("Error marshaling log lines: %v", err)
@@ -554,7 +554,7 @@ func (r *RunnerV2) writeLogLines(lines []string) {
}
}
func (r *RunnerV2) _updateTaskStat(status string) {
func (r *Runner) _updateTaskStat(status string) {
ts, err := client.NewModelServiceV2[models.TaskStatV2]().GetById(r.tid)
if err != nil {
trace.PrintError(err)
@@ -590,7 +590,7 @@ func (r *RunnerV2) _updateTaskStat(status string) {
}
}
func (r *RunnerV2) sendNotification() {
func (r *Runner) sendNotification() {
req := &grpc.TaskServiceSendNotificationRequest{
NodeKey: r.svc.GetNodeConfigService().GetNodeKey(),
TaskId: r.tid.Hex(),
@@ -603,7 +603,7 @@ func (r *RunnerV2) sendNotification() {
}
}
func (r *RunnerV2) _updateSpiderStat(status string) {
func (r *Runner) _updateSpiderStat(status string) {
// task stat
ts, err := client.NewModelServiceV2[models.TaskStatV2]().GetById(r.tid)
if err != nil {
@@ -657,7 +657,7 @@ func (r *RunnerV2) _updateSpiderStat(status string) {
}
}
func (r *RunnerV2) configureCwd() {
func (r *Runner) configureCwd() {
workspacePath := viper.GetString("workspace")
if r.s.GitId.IsZero() {
// not git
@@ -668,14 +668,14 @@ func (r *RunnerV2) configureCwd() {
}
}
func NewTaskRunnerV2(id primitive.ObjectID, svc *Service) (r2 *RunnerV2, err error) {
func NewTaskRunnerV2(id primitive.ObjectID, svc *Service) (r2 *Runner, err error) {
// validate options
if id.IsZero() {
return nil, constants.ErrInvalidOptions
}
// runner
r := &RunnerV2{
r := &Runner{
subscribeTimeout: 30 * time.Second,
bufferSize: 1024 * 1024,
svc: svc,

View File

@@ -51,8 +51,8 @@ func (svc *Service) Start() {
}
}
go svc.ReportStatus()
go svc.FetchAndRunTasks()
go svc.reportStatus()
go svc.fetchAndRunTasks()
}
func (svc *Service) Stop() {
@@ -67,7 +67,7 @@ func (svc *Service) Cancel(taskId primitive.ObjectID, force bool) (err error) {
return svc.cancelTask(taskId, force)
}
func (svc *Service) FetchAndRunTasks() {
func (svc *Service) fetchAndRunTasks() {
ticker := time.NewTicker(svc.fetchInterval)
for {
if svc.stopped {
@@ -119,7 +119,7 @@ func (svc *Service) FetchAndRunTasks() {
}
}
func (svc *Service) ReportStatus() {
func (svc *Service) reportStatus() {
ticker := time.NewTicker(svc.reportInterval)
for {
if svc.stopped {
@@ -128,9 +128,9 @@ func (svc *Service) ReportStatus() {
select {
case <-ticker.C:
// report handler status
if err := svc.reportStatus(); err != nil {
trace.PrintError(err)
// update node status
if err := svc.updateNodeStatus(); err != nil {
log.Errorf("failed to report status: %v", err)
}
}
}
@@ -178,6 +178,19 @@ func (svc *Service) GetTaskById(id primitive.ObjectID) (t *models2.TaskV2, err e
return t, nil
}
func (svc *Service) UpdateTask(t *models2.TaskV2) (err error) {
t.SetUpdated(t.CreatedBy)
if svc.cfgSvc.IsMaster() {
err = service.NewModelServiceV2[models2.TaskV2]().ReplaceById(t.Id, *t)
} else {
err = client.NewModelServiceV2[models2.TaskV2]().ReplaceById(t.Id, *t)
}
if err != nil {
return err
}
return nil
}
func (svc *Service) GetSpiderById(id primitive.ObjectID) (s *models2.SpiderV2, err error) {
if svc.cfgSvc.IsMaster() {
s, err = service.NewModelServiceV2[models2.SpiderV2]().GetById(id)
@@ -194,12 +207,14 @@ func (svc *Service) GetSpiderById(id primitive.ObjectID) (s *models2.SpiderV2, e
func (svc *Service) getRunnerCount() (count int) {
n, err := svc.GetCurrentNode()
if err != nil {
trace.PrintError(err)
log.Errorf("failed to get current node: %v", err)
return
}
query := bson.M{
"node_id": n.Id,
"status": constants.TaskStatusRunning,
"status": bson.M{
"$in": []string{constants.TaskStatusAssigned, constants.TaskStatusRunning},
},
}
if svc.cfgSvc.IsMaster() {
count, err = service.NewModelServiceV2[models2.TaskV2]().Count(query)
@@ -242,7 +257,7 @@ func (svc *Service) deleteRunner(taskId primitive.ObjectID) {
svc.runners.Delete(taskId)
}
func (svc *Service) reportStatus() (err error) {
func (svc *Service) updateNodeStatus() (err error) {
// current node
n, err := svc.GetCurrentNode()
if err != nil {
@@ -398,8 +413,19 @@ func (svc *Service) handleCancel(msg *grpc.TaskServiceSubscribeResponse, taskId
log.Errorf("task[%s] failed to cancel: %v", taskId.Hex(), err)
return
}
log.Infof("task[%s] cancelled", taskId.Hex())
// set task status as "cancelled"
t, err := svc.GetTaskById(taskId)
if err != nil {
log.Errorf("task[%s] failed to get task: %v", taskId.Hex(), err)
return
}
t.Status = constants.TaskStatusCancelled
err = svc.UpdateTask(t)
if err != nil {
log.Errorf("task[%s] failed to update task: %v", taskId.Hex(), err)
}
}
func (svc *Service) cancelTask(taskId primitive.ObjectID, force bool) (err error) {

View File

@@ -21,7 +21,7 @@ import (
"time"
)
type ServiceV2 struct {
type Service struct {
// dependencies
nodeCfgSvc interfaces.NodeConfigService
svr *server.GrpcServer
@@ -31,13 +31,13 @@ type ServiceV2 struct {
interval time.Duration
}
func (svc *ServiceV2) Start() {
func (svc *Service) Start() {
go svc.initTaskStatus()
go svc.cleanupTasks()
utils.DefaultWait()
}
func (svc *ServiceV2) Enqueue(t *models2.TaskV2, by primitive.ObjectID) (t2 *models2.TaskV2, err error) {
func (svc *Service) Enqueue(t *models2.TaskV2, by primitive.ObjectID) (t2 *models2.TaskV2, err error) {
// set task status
t.Status = constants.TaskStatusPending
t.SetCreated(by)
@@ -45,32 +45,17 @@ func (svc *ServiceV2) Enqueue(t *models2.TaskV2, by primitive.ObjectID) (t2 *mod
// add task
taskModelSvc := service.NewModelServiceV2[models2.TaskV2]()
id, err := taskModelSvc.InsertOne(*t)
t.Id, err = taskModelSvc.InsertOne(*t)
if err != nil {
return nil, err
}
// task queue item
tq := models2.TaskQueueItemV2{
Priority: t.Priority,
NodeId: t.NodeId,
}
tq.SetId(id)
tq.SetCreated(by)
tq.SetUpdated(by)
// task stat
ts := models2.TaskStatV2{}
ts.SetId(id)
ts.SetId(t.Id)
ts.SetCreated(by)
ts.SetUpdated(by)
// enqueue task
_, err = service.NewModelServiceV2[models2.TaskQueueItemV2]().InsertOne(tq)
if err != nil {
return nil, trace.TraceError(err)
}
// add task stat
_, err = service.NewModelServiceV2[models2.TaskStatV2]().InsertOne(ts)
if err != nil {
@@ -81,7 +66,7 @@ func (svc *ServiceV2) Enqueue(t *models2.TaskV2, by primitive.ObjectID) (t2 *mod
return t, nil
}
func (svc *ServiceV2) Cancel(id, by primitive.ObjectID, force bool) (err error) {
func (svc *Service) Cancel(id, by primitive.ObjectID, force bool) (err error) {
// task
t, err := service.NewModelServiceV2[models2.TaskV2]().GetById(id)
if err != nil {
@@ -92,14 +77,10 @@ func (svc *ServiceV2) Cancel(id, by primitive.ObjectID, force bool) (err error)
// initial status
initialStatus := t.Status
// set status of pending tasks as "cancelled" and remove from task item queue
// set status of pending tasks as "cancelled"
if initialStatus == constants.TaskStatusPending {
// remove from task item queue
if err := service.NewModelServiceV2[models2.TaskQueueItemV2]().DeleteById(t.Id); err != nil {
log.Errorf("failed to delete task queue item: %s", t.Id.Hex())
return err
}
return nil
t.Status = constants.TaskStatusCancelled
return svc.SaveTask(t, by)
}
// whether task is running on master node
@@ -120,7 +101,7 @@ func (svc *ServiceV2) Cancel(id, by primitive.ObjectID, force bool) (err error)
}
}
func (svc *ServiceV2) cancelOnMaster(t *models2.TaskV2, by primitive.ObjectID, force bool) (err error) {
func (svc *Service) cancelOnMaster(t *models2.TaskV2, by primitive.ObjectID, force bool) (err error) {
if err := svc.handlerSvc.Cancel(t.Id, force); err != nil {
log.Errorf("failed to cancel task on master: %s", t.Id.Hex())
return err
@@ -131,7 +112,7 @@ func (svc *ServiceV2) cancelOnMaster(t *models2.TaskV2, by primitive.ObjectID, f
return svc.SaveTask(t, by)
}
func (svc *ServiceV2) cancelOnWorker(t *models2.TaskV2, by primitive.ObjectID, force bool) (err error) {
func (svc *Service) cancelOnWorker(t *models2.TaskV2, by primitive.ObjectID, force bool) (err error) {
// get subscribe stream
stream, ok := svc.svr.TaskSvr.GetSubscribeStream(t.Id)
if !ok {
@@ -156,11 +137,11 @@ func (svc *ServiceV2) cancelOnWorker(t *models2.TaskV2, by primitive.ObjectID, f
return nil
}
func (svc *ServiceV2) SetInterval(interval time.Duration) {
func (svc *Service) SetInterval(interval time.Duration) {
svc.interval = interval
}
func (svc *ServiceV2) SaveTask(t *models2.TaskV2, by primitive.ObjectID) (err error) {
func (svc *Service) SaveTask(t *models2.TaskV2, by primitive.ObjectID) (err error) {
if t.Id.IsZero() {
t.SetCreated(by)
t.SetUpdated(by)
@@ -173,12 +154,13 @@ func (svc *ServiceV2) SaveTask(t *models2.TaskV2, by primitive.ObjectID) (err er
}
// initTaskStatus initialize task status of existing tasks
func (svc *ServiceV2) initTaskStatus() {
func (svc *Service) initTaskStatus() {
// set status of running tasks as TaskStatusAbnormal
runningTasks, err := service.NewModelServiceV2[models2.TaskV2]().GetMany(bson.M{
"status": bson.M{
"$in": []string{
constants.TaskStatusPending,
constants.TaskStatusAssigned,
constants.TaskStatusRunning,
},
},
@@ -187,7 +169,8 @@ func (svc *ServiceV2) initTaskStatus() {
if errors2.Is(err, mongo2.ErrNoDocuments) {
return
}
trace.PrintError(err)
log.Errorf("failed to get running tasks: %v", err)
return
}
for _, t := range runningTasks {
go func(t *models2.TaskV2) {
@@ -197,12 +180,9 @@ func (svc *ServiceV2) initTaskStatus() {
}
}(&t)
}
if err := service.NewModelServiceV2[models2.TaskQueueItemV2]().DeleteMany(nil); err != nil {
return
}
}
func (svc *ServiceV2) isMasterNode(t *models2.TaskV2) (ok bool, err error) {
func (svc *Service) isMasterNode(t *models2.TaskV2) (ok bool, err error) {
if t.NodeId.IsZero() {
return false, trace.TraceError(errors.ErrorTaskNoNodeId)
}
@@ -216,7 +196,7 @@ func (svc *ServiceV2) isMasterNode(t *models2.TaskV2) (ok bool, err error) {
return n.IsMaster, nil
}
func (svc *ServiceV2) cleanupTasks() {
func (svc *Service) cleanupTasks() {
for {
// task stats over 30 days ago
taskStats, err := service.NewModelServiceV2[models2.TaskStatV2]().GetMany(bson.M{
@@ -255,9 +235,9 @@ func (svc *ServiceV2) cleanupTasks() {
}
}
func NewTaskSchedulerServiceV2() (svc2 *ServiceV2, err error) {
func NewTaskSchedulerService() (svc2 *Service, err error) {
// service
svc := &ServiceV2{
svc := &Service{
interval: 5 * time.Second,
}
svc.nodeCfgSvc = nodeconfig.GetNodeConfigService()
@@ -275,15 +255,15 @@ func NewTaskSchedulerServiceV2() (svc2 *ServiceV2, err error) {
return svc, nil
}
var svcV2 *ServiceV2
var svc *Service
func GetTaskSchedulerServiceV2() (svr *ServiceV2, err error) {
if svcV2 != nil {
return svcV2, nil
func GetTaskSchedulerService() (svr *Service, err error) {
if svc != nil {
return svc, nil
}
svcV2, err = NewTaskSchedulerServiceV2()
svc, err = NewTaskSchedulerService()
if err != nil {
return nil, err
}
return svcV2, nil
return svc, nil
}