mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-01-21 17:21:09 +01:00
feat: added modules
This commit is contained in:
40
core/task/handler/options.go
Normal file
40
core/task/handler/options.go
Normal file
@@ -0,0 +1,40 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"github.com/crawlab-team/crawlab/core/interfaces"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Option func(svc interfaces.TaskHandlerService)
|
||||
|
||||
func WithConfigPath(path string) Option {
|
||||
return func(svc interfaces.TaskHandlerService) {
|
||||
svc.SetConfigPath(path)
|
||||
}
|
||||
}
|
||||
|
||||
func WithExitWatchDuration(duration time.Duration) Option {
|
||||
return func(svc interfaces.TaskHandlerService) {
|
||||
svc.SetExitWatchDuration(duration)
|
||||
}
|
||||
}
|
||||
|
||||
func WithReportInterval(interval time.Duration) Option {
|
||||
return func(svc interfaces.TaskHandlerService) {
|
||||
svc.SetReportInterval(interval)
|
||||
}
|
||||
}
|
||||
|
||||
func WithCancelTimeout(timeout time.Duration) Option {
|
||||
return func(svc interfaces.TaskHandlerService) {
|
||||
svc.SetCancelTimeout(timeout)
|
||||
}
|
||||
}
|
||||
|
||||
type RunnerOption func(r interfaces.TaskRunner)
|
||||
|
||||
func WithSubscribeTimeout(timeout time.Duration) RunnerOption {
|
||||
return func(r interfaces.TaskRunner) {
|
||||
r.SetSubscribeTimeout(timeout)
|
||||
}
|
||||
}
|
||||
691
core/task/handler/runner.go
Normal file
691
core/task/handler/runner.go
Normal file
@@ -0,0 +1,691 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/apex/log"
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
"github.com/crawlab-team/crawlab-db/mongo"
|
||||
"github.com/crawlab-team/crawlab/core/constants"
|
||||
"github.com/crawlab-team/crawlab/core/container"
|
||||
"github.com/crawlab-team/crawlab/core/entity"
|
||||
"github.com/crawlab-team/crawlab/core/errors"
|
||||
fs2 "github.com/crawlab-team/crawlab/core/fs"
|
||||
"github.com/crawlab-team/crawlab/core/interfaces"
|
||||
"github.com/crawlab-team/crawlab/core/models/client"
|
||||
"github.com/crawlab-team/crawlab/core/models/delegate"
|
||||
"github.com/crawlab-team/crawlab/core/models/models"
|
||||
"github.com/crawlab-team/crawlab/core/sys_exec"
|
||||
"github.com/crawlab-team/crawlab/core/utils"
|
||||
grpc "github.com/crawlab-team/crawlab/grpc"
|
||||
"github.com/crawlab-team/go-trace"
|
||||
"github.com/shirou/gopsutil/process"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/spf13/viper"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Runner struct {
|
||||
// dependencies
|
||||
svc interfaces.TaskHandlerService // task handler service
|
||||
fsSvc interfaces.FsServiceV2 // task fs service
|
||||
hookSvc interfaces.TaskHookService // task hook service
|
||||
|
||||
// settings
|
||||
subscribeTimeout time.Duration
|
||||
bufferSize int
|
||||
|
||||
// internals
|
||||
cmd *exec.Cmd // process command instance
|
||||
pid int // process id
|
||||
tid primitive.ObjectID // task id
|
||||
t interfaces.Task // task model.Task
|
||||
s interfaces.Spider // spider model.Spider
|
||||
ch chan constants.TaskSignal // channel to communicate between Service and Runner
|
||||
err error // standard process error
|
||||
envs []models.Env // environment variables
|
||||
cwd string // working directory
|
||||
c interfaces.GrpcClient // grpc client
|
||||
sub grpc.TaskService_SubscribeClient // grpc task service stream client
|
||||
|
||||
// log internals
|
||||
scannerStdout *bufio.Reader
|
||||
scannerStderr *bufio.Reader
|
||||
logBatchSize int
|
||||
}
|
||||
|
||||
func (r *Runner) Init() (err error) {
|
||||
// update task
|
||||
if err := r.updateTask("", nil); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// start grpc client
|
||||
if !r.c.IsStarted() {
|
||||
r.c.Start()
|
||||
}
|
||||
|
||||
// working directory
|
||||
workspacePath := viper.GetString("workspace")
|
||||
r.cwd = filepath.Join(workspacePath, r.s.GetId().Hex())
|
||||
|
||||
// sync files from master
|
||||
if !utils.IsMaster() {
|
||||
if err := r.syncFiles(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// grpc task service stream client
|
||||
if err := r.initSub(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// pre actions
|
||||
if r.hookSvc != nil {
|
||||
if err := r.hookSvc.PreActions(r.t, r.s, r.fsSvc, r.svc); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Runner) Run() (err error) {
|
||||
// log task started
|
||||
log.Infof("task[%s] started", r.tid.Hex())
|
||||
|
||||
// configure cmd
|
||||
r.configureCmd()
|
||||
|
||||
// configure environment variables
|
||||
r.configureEnv()
|
||||
|
||||
// configure logging
|
||||
r.configureLogging()
|
||||
|
||||
// start process
|
||||
if err := r.cmd.Start(); err != nil {
|
||||
return r.updateTask(constants.TaskStatusError, err)
|
||||
}
|
||||
|
||||
// start logging
|
||||
go r.startLogging()
|
||||
|
||||
// process id
|
||||
if r.cmd.Process == nil {
|
||||
return r.updateTask(constants.TaskStatusError, constants.ErrNotExists)
|
||||
}
|
||||
r.pid = r.cmd.Process.Pid
|
||||
r.t.SetPid(r.pid)
|
||||
|
||||
// update task status (processing)
|
||||
if err := r.updateTask(constants.TaskStatusRunning, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// wait for process to finish
|
||||
go r.wait()
|
||||
|
||||
// start health check
|
||||
go r.startHealthCheck()
|
||||
|
||||
// declare task status
|
||||
status := ""
|
||||
|
||||
// wait for signal
|
||||
signal := <-r.ch
|
||||
switch signal {
|
||||
case constants.TaskSignalFinish:
|
||||
err = nil
|
||||
status = constants.TaskStatusFinished
|
||||
case constants.TaskSignalCancel:
|
||||
err = constants.ErrTaskCancelled
|
||||
status = constants.TaskStatusCancelled
|
||||
case constants.TaskSignalError:
|
||||
err = r.err
|
||||
status = constants.TaskStatusError
|
||||
case constants.TaskSignalLost:
|
||||
err = constants.ErrTaskLost
|
||||
status = constants.TaskStatusError
|
||||
default:
|
||||
err = constants.ErrInvalidSignal
|
||||
status = constants.TaskStatusError
|
||||
}
|
||||
|
||||
// update task status
|
||||
if err := r.updateTask(status, err); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// post actions
|
||||
if r.hookSvc != nil {
|
||||
if err := r.hookSvc.PostActions(r.t, r.s, r.fsSvc, r.svc); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *Runner) Cancel() (err error) {
|
||||
// kill process
|
||||
opts := &sys_exec.KillProcessOptions{
|
||||
Timeout: r.svc.GetCancelTimeout(),
|
||||
Force: true,
|
||||
}
|
||||
if err := sys_exec.KillProcess(r.cmd, opts); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// make sure the process does not exist
|
||||
op := func() error {
|
||||
if exists, _ := process.PidExists(int32(r.pid)); exists {
|
||||
return errors.ErrorTaskProcessStillExists
|
||||
}
|
||||
return nil
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), r.svc.GetExitWatchDuration())
|
||||
defer cancel()
|
||||
b := backoff.WithContext(backoff.NewConstantBackOff(1*time.Second), ctx)
|
||||
if err := backoff.Retry(op, b); err != nil {
|
||||
return trace.TraceError(errors.ErrorTaskUnableToCancel)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// CleanUp clean up task runner
|
||||
func (r *Runner) CleanUp() (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Runner) SetSubscribeTimeout(timeout time.Duration) {
|
||||
r.subscribeTimeout = timeout
|
||||
}
|
||||
|
||||
func (r *Runner) GetTaskId() (id primitive.ObjectID) {
|
||||
return r.tid
|
||||
}
|
||||
|
||||
func (r *Runner) configureCmd() {
|
||||
var cmdStr string
|
||||
|
||||
// customized spider
|
||||
if r.t.GetCmd() == "" {
|
||||
cmdStr = r.s.GetCmd()
|
||||
} else {
|
||||
cmdStr = r.t.GetCmd()
|
||||
}
|
||||
|
||||
// parameters
|
||||
if r.t.GetParam() != "" {
|
||||
cmdStr += " " + r.t.GetParam()
|
||||
} else if r.s.GetParam() != "" {
|
||||
cmdStr += " " + r.s.GetParam()
|
||||
}
|
||||
|
||||
// get cmd instance
|
||||
r.cmd = sys_exec.BuildCmd(cmdStr)
|
||||
|
||||
// set working directory
|
||||
r.cmd.Dir = r.cwd
|
||||
|
||||
// configure pgid to allow killing sub processes
|
||||
//sys_exec.SetPgid(r.cmd)
|
||||
}
|
||||
|
||||
func (r *Runner) configureLogging() {
|
||||
// set stdout reader
|
||||
stdout, _ := r.cmd.StdoutPipe()
|
||||
r.scannerStdout = bufio.NewReaderSize(stdout, r.bufferSize)
|
||||
|
||||
// set stderr reader
|
||||
stderr, _ := r.cmd.StderrPipe()
|
||||
r.scannerStderr = bufio.NewReaderSize(stderr, r.bufferSize)
|
||||
}
|
||||
|
||||
func (r *Runner) startLogging() {
|
||||
// start reading stdout
|
||||
go r.startLoggingReaderStdout()
|
||||
|
||||
// start reading stderr
|
||||
go r.startLoggingReaderStderr()
|
||||
}
|
||||
|
||||
func (r *Runner) startLoggingReaderStdout() {
|
||||
for {
|
||||
line, err := r.scannerStdout.ReadString(byte('\n'))
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
line = strings.TrimSuffix(line, "\n")
|
||||
r.writeLogLines([]string{line})
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Runner) startLoggingReaderStderr() {
|
||||
for {
|
||||
line, err := r.scannerStderr.ReadString(byte('\n'))
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
line = strings.TrimSuffix(line, "\n")
|
||||
r.writeLogLines([]string{line})
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Runner) startHealthCheck() {
|
||||
if r.cmd.ProcessState == nil || r.cmd.ProcessState.Exited() {
|
||||
return
|
||||
}
|
||||
for {
|
||||
exists, _ := process.PidExists(int32(r.pid))
|
||||
if !exists {
|
||||
// process lost
|
||||
r.ch <- constants.TaskSignalLost
|
||||
return
|
||||
}
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Runner) configureEnv() {
|
||||
// 默认把Node.js的全局node_modules加入环境变量
|
||||
envPath := os.Getenv("PATH")
|
||||
nodePath := "/usr/lib/node_modules"
|
||||
if !strings.Contains(envPath, nodePath) {
|
||||
_ = os.Setenv("PATH", nodePath+":"+envPath)
|
||||
}
|
||||
_ = os.Setenv("NODE_PATH", nodePath)
|
||||
|
||||
// default envs
|
||||
r.cmd.Env = append(os.Environ(), "CRAWLAB_TASK_ID="+r.tid.Hex())
|
||||
if viper.GetString("grpc.address") != "" {
|
||||
r.cmd.Env = append(r.cmd.Env, "CRAWLAB_GRPC_ADDRESS="+viper.GetString("grpc.address"))
|
||||
}
|
||||
if viper.GetString("grpc.authKey") != "" {
|
||||
r.cmd.Env = append(r.cmd.Env, "CRAWLAB_GRPC_AUTH_KEY="+viper.GetString("grpc.authKey"))
|
||||
} else {
|
||||
r.cmd.Env = append(r.cmd.Env, "CRAWLAB_GRPC_AUTH_KEY="+constants.DefaultGrpcAuthKey)
|
||||
}
|
||||
|
||||
// global environment variables
|
||||
envs, err := r.svc.GetModelEnvironmentService().GetEnvironmentList(nil, nil)
|
||||
if err != nil {
|
||||
trace.PrintError(err)
|
||||
return
|
||||
}
|
||||
for _, env := range envs {
|
||||
r.cmd.Env = append(r.cmd.Env, env.GetKey()+"="+env.GetValue())
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Runner) syncFiles() (err error) {
|
||||
masterURL := fmt.Sprintf("%s/sync/%s", viper.GetString("api.endpoint"), r.s.GetId().Hex())
|
||||
workspacePath := viper.GetString("workspace")
|
||||
workerDir := filepath.Join(workspacePath, r.s.GetId().Hex())
|
||||
|
||||
// get file list from master
|
||||
resp, err := http.Get(masterURL + "/scan")
|
||||
if err != nil {
|
||||
fmt.Println("Error getting file list from master:", err)
|
||||
return trace.TraceError(err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
fmt.Println("Error reading response body:", err)
|
||||
return trace.TraceError(err)
|
||||
}
|
||||
var masterFiles map[string]entity.FsFileInfo
|
||||
err = json.Unmarshal(body, &masterFiles)
|
||||
if err != nil {
|
||||
fmt.Println("Error unmarshaling JSON:", err)
|
||||
return trace.TraceError(err)
|
||||
}
|
||||
|
||||
// create a map for master files
|
||||
masterFilesMap := make(map[string]entity.FsFileInfo)
|
||||
for _, file := range masterFiles {
|
||||
masterFilesMap[file.Path] = file
|
||||
}
|
||||
|
||||
// create worker directory if not exists
|
||||
if _, err := os.Stat(workerDir); os.IsNotExist(err) {
|
||||
if err := os.MkdirAll(workerDir, os.ModePerm); err != nil {
|
||||
fmt.Println("Error creating worker directory:", err)
|
||||
return trace.TraceError(err)
|
||||
}
|
||||
}
|
||||
|
||||
// get file list from worker
|
||||
workerFiles, err := utils.ScanDirectory(workerDir)
|
||||
if err != nil {
|
||||
fmt.Println("Error scanning worker directory:", err)
|
||||
return trace.TraceError(err)
|
||||
}
|
||||
|
||||
// set up wait group and error channel
|
||||
var wg sync.WaitGroup
|
||||
errCh := make(chan error, 1)
|
||||
|
||||
// delete files that are deleted on master node
|
||||
for path, workerFile := range workerFiles {
|
||||
if _, exists := masterFilesMap[path]; !exists {
|
||||
fmt.Println("Deleting file:", path)
|
||||
err := os.Remove(workerFile.FullPath)
|
||||
if err != nil {
|
||||
fmt.Println("Error deleting file:", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// download files that are new or modified on master node
|
||||
for path, masterFile := range masterFilesMap {
|
||||
workerFile, exists := workerFiles[path]
|
||||
if !exists || masterFile.Hash != workerFile.Hash {
|
||||
wg.Add(1)
|
||||
go func(path string, masterFile entity.FsFileInfo) {
|
||||
defer wg.Done()
|
||||
logrus.Infof("File needs to be synchronized: %s", path)
|
||||
err := r.downloadFile(masterURL+"/download?path="+path, filepath.Join(workerDir, path))
|
||||
if err != nil {
|
||||
logrus.Errorf("Error downloading file: %v", err)
|
||||
select {
|
||||
case errCh <- err:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}(path, masterFile)
|
||||
}
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
close(errCh)
|
||||
if err := <-errCh; err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Runner) downloadFile(url string, filePath string) error {
|
||||
resp, err := http.Get(url)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
out, err := os.Create(filePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer out.Close()
|
||||
|
||||
_, err = io.Copy(out, resp.Body)
|
||||
return err
|
||||
}
|
||||
|
||||
// wait for process to finish and send task signal (constants.TaskSignal)
|
||||
// to task runner's channel (Runner.ch) according to exit code
|
||||
func (r *Runner) wait() {
|
||||
// wait for process to finish
|
||||
if err := r.cmd.Wait(); err != nil {
|
||||
exitError, ok := err.(*exec.ExitError)
|
||||
if !ok {
|
||||
r.ch <- constants.TaskSignalError
|
||||
return
|
||||
}
|
||||
exitCode := exitError.ExitCode()
|
||||
if exitCode == -1 {
|
||||
// cancel error
|
||||
r.ch <- constants.TaskSignalCancel
|
||||
return
|
||||
}
|
||||
|
||||
// standard error
|
||||
r.err = err
|
||||
r.ch <- constants.TaskSignalError
|
||||
return
|
||||
}
|
||||
|
||||
// success
|
||||
r.ch <- constants.TaskSignalFinish
|
||||
}
|
||||
|
||||
// updateTask update and get updated info of task (Runner.t)
|
||||
func (r *Runner) updateTask(status string, e error) (err error) {
|
||||
if r.t != nil && status != "" {
|
||||
// update task status
|
||||
r.t.SetStatus(status)
|
||||
if e != nil {
|
||||
r.t.SetError(e.Error())
|
||||
}
|
||||
if r.svc.GetNodeConfigService().IsMaster() {
|
||||
if err := delegate.NewModelDelegate(r.t).Save(); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if err := client.NewModelDelegate(r.t, client.WithDelegateConfigPath(r.svc.GetConfigPath())).Save(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// send notification
|
||||
go r.sendNotification()
|
||||
|
||||
// update stats
|
||||
go func() {
|
||||
r._updateTaskStat(status)
|
||||
r._updateSpiderStat(status)
|
||||
}()
|
||||
}
|
||||
|
||||
// get task
|
||||
r.t, err = r.svc.GetTaskById(r.tid)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Runner) initSub() (err error) {
|
||||
r.sub, err = r.c.GetTaskClient().Subscribe(context.Background())
|
||||
if err != nil {
|
||||
return trace.TraceError(err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Runner) writeLogLines(lines []string) {
|
||||
data, err := json.Marshal(&entity.StreamMessageTaskData{
|
||||
TaskId: r.tid,
|
||||
Logs: lines,
|
||||
})
|
||||
if err != nil {
|
||||
trace.PrintError(err)
|
||||
return
|
||||
}
|
||||
msg := &grpc.StreamMessage{
|
||||
Code: grpc.StreamMessageCode_INSERT_LOGS,
|
||||
Data: data,
|
||||
}
|
||||
if err := r.sub.Send(msg); err != nil {
|
||||
trace.PrintError(err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Runner) _updateTaskStat(status string) {
|
||||
ts, err := r.svc.GetModelTaskStatService().GetTaskStatById(r.tid)
|
||||
if err != nil {
|
||||
trace.PrintError(err)
|
||||
return
|
||||
}
|
||||
switch status {
|
||||
case constants.TaskStatusPending:
|
||||
// do nothing
|
||||
case constants.TaskStatusRunning:
|
||||
ts.SetStartTs(time.Now())
|
||||
ts.SetWaitDuration(ts.GetStartTs().Sub(ts.GetCreateTs()).Milliseconds())
|
||||
case constants.TaskStatusFinished, constants.TaskStatusError, constants.TaskStatusCancelled:
|
||||
ts.SetEndTs(time.Now())
|
||||
ts.SetRuntimeDuration(ts.GetEndTs().Sub(ts.GetStartTs()).Milliseconds())
|
||||
ts.SetTotalDuration(ts.GetEndTs().Sub(ts.GetCreateTs()).Milliseconds())
|
||||
}
|
||||
if r.svc.GetNodeConfigService().IsMaster() {
|
||||
if err := delegate.NewModelDelegate(ts).Save(); err != nil {
|
||||
trace.PrintError(err)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
if err := client.NewModelDelegate(ts, client.WithDelegateConfigPath(r.svc.GetConfigPath())).Save(); err != nil {
|
||||
trace.PrintError(err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Runner) sendNotification() {
|
||||
data, err := json.Marshal(r.t)
|
||||
if err != nil {
|
||||
trace.PrintError(err)
|
||||
return
|
||||
}
|
||||
req := &grpc.Request{
|
||||
NodeKey: r.svc.GetNodeConfigService().GetNodeKey(),
|
||||
Data: data,
|
||||
}
|
||||
_, err = r.c.GetTaskClient().SendNotification(context.Background(), req)
|
||||
if err != nil {
|
||||
trace.PrintError(err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Runner) _updateSpiderStat(status string) {
|
||||
// task stat
|
||||
ts, err := r.svc.GetModelTaskStatService().GetTaskStatById(r.tid)
|
||||
if err != nil {
|
||||
trace.PrintError(err)
|
||||
return
|
||||
}
|
||||
|
||||
// update
|
||||
var update bson.M
|
||||
switch status {
|
||||
case constants.TaskStatusPending, constants.TaskStatusRunning:
|
||||
update = bson.M{
|
||||
"$set": bson.M{
|
||||
"last_task_id": r.tid, // last task id
|
||||
},
|
||||
"$inc": bson.M{
|
||||
"tasks": 1, // task count
|
||||
"wait_duration": ts.GetWaitDuration(), // wait duration
|
||||
},
|
||||
}
|
||||
case constants.TaskStatusFinished, constants.TaskStatusError, constants.TaskStatusCancelled:
|
||||
update = bson.M{
|
||||
"$inc": bson.M{
|
||||
"results": ts.GetResultCount(), // results
|
||||
"runtime_duration": ts.GetRuntimeDuration() / 1000, // runtime duration
|
||||
"total_duration": ts.GetTotalDuration() / 1000, // total duration
|
||||
},
|
||||
}
|
||||
default:
|
||||
trace.PrintError(errors.ErrorTaskInvalidType)
|
||||
return
|
||||
}
|
||||
|
||||
// perform update
|
||||
if r.svc.GetNodeConfigService().IsMaster() {
|
||||
if err := mongo.GetMongoCol(interfaces.ModelColNameSpiderStat).UpdateId(r.s.GetId(), update); err != nil {
|
||||
trace.PrintError(err)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
modelSvc, err := client.NewBaseServiceDelegate(
|
||||
client.WithBaseServiceModelId(interfaces.ModelIdSpiderStat),
|
||||
client.WithBaseServiceConfigPath(r.svc.GetConfigPath()),
|
||||
)
|
||||
if err != nil {
|
||||
trace.PrintError(err)
|
||||
return
|
||||
}
|
||||
if err := modelSvc.UpdateById(r.s.GetId(), update); err != nil {
|
||||
trace.PrintError(err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func NewTaskRunner(id primitive.ObjectID, svc interfaces.TaskHandlerService, opts ...RunnerOption) (r2 interfaces.TaskRunner, err error) {
|
||||
// validate options
|
||||
if id.IsZero() {
|
||||
return nil, constants.ErrInvalidOptions
|
||||
}
|
||||
|
||||
// runner
|
||||
r := &Runner{
|
||||
subscribeTimeout: 30 * time.Second,
|
||||
bufferSize: 1024 * 1024,
|
||||
svc: svc,
|
||||
tid: id,
|
||||
ch: make(chan constants.TaskSignal),
|
||||
logBatchSize: 20,
|
||||
}
|
||||
|
||||
// apply options
|
||||
for _, opt := range opts {
|
||||
opt(r)
|
||||
}
|
||||
|
||||
// task
|
||||
r.t, err = svc.GetTaskById(id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// spider
|
||||
r.s, err = svc.GetSpiderById(r.t.GetSpiderId())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// task fs service
|
||||
r.fsSvc = fs2.NewFsServiceV2(filepath.Join(viper.GetString("workspace"), r.s.GetId().Hex()))
|
||||
|
||||
// dependency injection
|
||||
if err := container.GetContainer().Invoke(func(
|
||||
c interfaces.GrpcClient,
|
||||
) {
|
||||
r.c = c
|
||||
}); err != nil {
|
||||
return nil, trace.TraceError(err)
|
||||
}
|
||||
|
||||
_ = container.GetContainer().Invoke(func(hookSvc interfaces.TaskHookService) {
|
||||
r.hookSvc = hookSvc
|
||||
})
|
||||
|
||||
// initialize task runner
|
||||
if err := r.Init(); err != nil {
|
||||
return r, err
|
||||
}
|
||||
|
||||
return r, nil
|
||||
}
|
||||
103
core/task/handler/runner_test.go
Normal file
103
core/task/handler/runner_test.go
Normal file
@@ -0,0 +1,103 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"github.com/crawlab-team/crawlab/core/models/models"
|
||||
"github.com/google/uuid"
|
||||
"github.com/spf13/viper"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
)
|
||||
|
||||
type MockRunner struct {
|
||||
mock.Mock
|
||||
Runner
|
||||
}
|
||||
|
||||
func (m *MockRunner) downloadFile(url string, filePath string) error {
|
||||
args := m.Called(url, filePath)
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
func newMockRunner() *MockRunner {
|
||||
r := &MockRunner{}
|
||||
r.s = &models.Spider{}
|
||||
return r
|
||||
}
|
||||
|
||||
func TestSyncFiles_SuccessWithDummyFiles(t *testing.T) {
|
||||
// Create a test server that responds with a list of files
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if strings.HasSuffix(r.URL.Path, "/scan") {
|
||||
w.Write([]byte(`{"file1.txt":{"path": "file1.txt", "hash": "hash1"}, "file2.txt":{"path": "file2.txt", "hash": "hash2"}}`))
|
||||
return
|
||||
}
|
||||
|
||||
if strings.HasSuffix(r.URL.Path, "/download") {
|
||||
w.Write([]byte("file content"))
|
||||
return
|
||||
}
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
// Create a mock runner
|
||||
r := newMockRunner()
|
||||
r.On("downloadFile", mock.Anything, mock.Anything).Return(nil)
|
||||
|
||||
// Set the master URL to the test server URL
|
||||
viper.Set("api.endpoint", ts.URL)
|
||||
|
||||
localPath := filepath.Join(os.TempDir(), uuid.New().String())
|
||||
os.MkdirAll(filepath.Join(localPath, r.s.GetId().Hex()), os.ModePerm)
|
||||
defer os.RemoveAll(localPath)
|
||||
viper.Set("workspace", localPath)
|
||||
|
||||
// Call the method under test
|
||||
err := r.syncFiles()
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Assert that the files were downloaded
|
||||
assert.FileExists(t, filepath.Join(localPath, r.s.GetId().Hex(), "file1.txt"))
|
||||
assert.FileExists(t, filepath.Join(localPath, r.s.GetId().Hex(), "file2.txt"))
|
||||
}
|
||||
|
||||
func TestSyncFiles_DeletesFilesNotOnMaster(t *testing.T) {
|
||||
// Create a test server that responds with an empty list of files
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if strings.HasSuffix(r.URL.Path, "/scan") {
|
||||
w.Write([]byte(`{}`))
|
||||
return
|
||||
}
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
// Create a mock runner
|
||||
r := newMockRunner()
|
||||
r.On("downloadFile", mock.Anything, mock.Anything).Return(nil)
|
||||
|
||||
// Set the master URL to the test server URL
|
||||
viper.Set("api.endpoint", ts.URL)
|
||||
|
||||
localPath := filepath.Join(os.TempDir(), uuid.New().String())
|
||||
os.MkdirAll(filepath.Join(localPath, r.s.GetId().Hex()), os.ModePerm)
|
||||
defer os.RemoveAll(localPath)
|
||||
viper.Set("workspace", localPath)
|
||||
|
||||
// Create a dummy file that should be deleted
|
||||
dummyFilePath := filepath.Join(localPath, r.s.GetId().Hex(), "dummy.txt")
|
||||
dummyFile, _ := os.Create(dummyFilePath)
|
||||
dummyFile.Close()
|
||||
|
||||
// Call the method under test
|
||||
err := r.syncFiles()
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Assert that the dummy file was deleted
|
||||
assert.NoFileExists(t, dummyFilePath)
|
||||
}
|
||||
671
core/task/handler/runner_v2.go
Normal file
671
core/task/handler/runner_v2.go
Normal file
@@ -0,0 +1,671 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/apex/log"
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
"github.com/crawlab-team/crawlab/core/constants"
|
||||
"github.com/crawlab-team/crawlab/core/container"
|
||||
"github.com/crawlab-team/crawlab/core/entity"
|
||||
"github.com/crawlab-team/crawlab/core/errors"
|
||||
fs2 "github.com/crawlab-team/crawlab/core/fs"
|
||||
"github.com/crawlab-team/crawlab/core/interfaces"
|
||||
"github.com/crawlab-team/crawlab/core/models/client"
|
||||
"github.com/crawlab-team/crawlab/core/models/models"
|
||||
service2 "github.com/crawlab-team/crawlab/core/models/service"
|
||||
"github.com/crawlab-team/crawlab/core/sys_exec"
|
||||
"github.com/crawlab-team/crawlab/core/utils"
|
||||
grpc "github.com/crawlab-team/crawlab/grpc"
|
||||
"github.com/crawlab-team/go-trace"
|
||||
"github.com/shirou/gopsutil/process"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/spf13/viper"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type RunnerV2 struct {
|
||||
// dependencies
|
||||
svc *ServiceV2 // task handler service
|
||||
fsSvc interfaces.FsServiceV2 // task fs service
|
||||
|
||||
// settings
|
||||
subscribeTimeout time.Duration
|
||||
bufferSize int
|
||||
|
||||
// internals
|
||||
cmd *exec.Cmd // process command instance
|
||||
pid int // process id
|
||||
tid primitive.ObjectID // task id
|
||||
t *models.TaskV2 // task model.Task
|
||||
s *models.SpiderV2 // spider model.Spider
|
||||
ch chan constants.TaskSignal // channel to communicate between Service and RunnerV2
|
||||
err error // standard process error
|
||||
envs []models.Env // environment variables
|
||||
cwd string // working directory
|
||||
c interfaces.GrpcClient // grpc client
|
||||
sub grpc.TaskService_SubscribeClient // grpc task service stream client
|
||||
|
||||
// log internals
|
||||
scannerStdout *bufio.Reader
|
||||
scannerStderr *bufio.Reader
|
||||
logBatchSize int
|
||||
}
|
||||
|
||||
func (r *RunnerV2) Init() (err error) {
|
||||
// update task
|
||||
if err := r.updateTask("", nil); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// start grpc client
|
||||
if !r.c.IsStarted() {
|
||||
r.c.Start()
|
||||
}
|
||||
|
||||
// working directory
|
||||
workspacePath := viper.GetString("workspace")
|
||||
r.cwd = filepath.Join(workspacePath, r.s.Id.Hex())
|
||||
|
||||
// sync files from master
|
||||
if !utils.IsMaster() {
|
||||
if err := r.syncFiles(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// grpc task service stream client
|
||||
if err := r.initSub(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *RunnerV2) Run() (err error) {
|
||||
// log task started
|
||||
log.Infof("task[%s] started", r.tid.Hex())
|
||||
|
||||
// configure cmd
|
||||
r.configureCmd()
|
||||
|
||||
// configure environment variables
|
||||
r.configureEnv()
|
||||
|
||||
// configure logging
|
||||
r.configureLogging()
|
||||
|
||||
// start process
|
||||
if err := r.cmd.Start(); err != nil {
|
||||
return r.updateTask(constants.TaskStatusError, err)
|
||||
}
|
||||
|
||||
// start logging
|
||||
go r.startLogging()
|
||||
|
||||
// process id
|
||||
if r.cmd.Process == nil {
|
||||
return r.updateTask(constants.TaskStatusError, constants.ErrNotExists)
|
||||
}
|
||||
r.pid = r.cmd.Process.Pid
|
||||
r.t.Pid = r.pid
|
||||
|
||||
// update task status (processing)
|
||||
if err := r.updateTask(constants.TaskStatusRunning, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// wait for process to finish
|
||||
go r.wait()
|
||||
|
||||
// start health check
|
||||
go r.startHealthCheck()
|
||||
|
||||
// declare task status
|
||||
status := ""
|
||||
|
||||
// wait for signal
|
||||
signal := <-r.ch
|
||||
switch signal {
|
||||
case constants.TaskSignalFinish:
|
||||
err = nil
|
||||
status = constants.TaskStatusFinished
|
||||
case constants.TaskSignalCancel:
|
||||
err = constants.ErrTaskCancelled
|
||||
status = constants.TaskStatusCancelled
|
||||
case constants.TaskSignalError:
|
||||
err = r.err
|
||||
status = constants.TaskStatusError
|
||||
case constants.TaskSignalLost:
|
||||
err = constants.ErrTaskLost
|
||||
status = constants.TaskStatusError
|
||||
default:
|
||||
err = constants.ErrInvalidSignal
|
||||
status = constants.TaskStatusError
|
||||
}
|
||||
|
||||
// update task status
|
||||
if err := r.updateTask(status, err); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *RunnerV2) Cancel() (err error) {
|
||||
// kill process
|
||||
opts := &sys_exec.KillProcessOptions{
|
||||
Timeout: r.svc.GetCancelTimeout(),
|
||||
Force: true,
|
||||
}
|
||||
if err := sys_exec.KillProcess(r.cmd, opts); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// make sure the process does not exist
|
||||
op := func() error {
|
||||
if exists, _ := process.PidExists(int32(r.pid)); exists {
|
||||
return errors.ErrorTaskProcessStillExists
|
||||
}
|
||||
return nil
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), r.svc.GetExitWatchDuration())
|
||||
defer cancel()
|
||||
b := backoff.WithContext(backoff.NewConstantBackOff(1*time.Second), ctx)
|
||||
if err := backoff.Retry(op, b); err != nil {
|
||||
return trace.TraceError(errors.ErrorTaskUnableToCancel)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// CleanUp clean up task runner
|
||||
func (r *RunnerV2) CleanUp() (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *RunnerV2) SetSubscribeTimeout(timeout time.Duration) {
|
||||
r.subscribeTimeout = timeout
|
||||
}
|
||||
|
||||
func (r *RunnerV2) GetTaskId() (id primitive.ObjectID) {
|
||||
return r.tid
|
||||
}
|
||||
|
||||
func (r *RunnerV2) configureCmd() {
|
||||
var cmdStr string
|
||||
|
||||
// customized spider
|
||||
if r.t.Cmd == "" {
|
||||
cmdStr = r.s.Cmd
|
||||
} else {
|
||||
cmdStr = r.t.Cmd
|
||||
}
|
||||
|
||||
// parameters
|
||||
if r.t.Param != "" {
|
||||
cmdStr += " " + r.t.Param
|
||||
} else if r.s.Param != "" {
|
||||
cmdStr += " " + r.s.Param
|
||||
}
|
||||
|
||||
// get cmd instance
|
||||
r.cmd = sys_exec.BuildCmd(cmdStr)
|
||||
|
||||
// set working directory
|
||||
r.cmd.Dir = r.cwd
|
||||
|
||||
// configure pgid to allow killing sub processes
|
||||
//sys_exec.SetPgid(r.cmd)
|
||||
}
|
||||
|
||||
func (r *RunnerV2) configureLogging() {
|
||||
// set stdout reader
|
||||
stdout, _ := r.cmd.StdoutPipe()
|
||||
r.scannerStdout = bufio.NewReaderSize(stdout, r.bufferSize)
|
||||
|
||||
// set stderr reader
|
||||
stderr, _ := r.cmd.StderrPipe()
|
||||
r.scannerStderr = bufio.NewReaderSize(stderr, r.bufferSize)
|
||||
}
|
||||
|
||||
func (r *RunnerV2) startLogging() {
|
||||
// start reading stdout
|
||||
go r.startLoggingReaderStdout()
|
||||
|
||||
// start reading stderr
|
||||
go r.startLoggingReaderStderr()
|
||||
}
|
||||
|
||||
func (r *RunnerV2) startLoggingReaderStdout() {
|
||||
for {
|
||||
line, err := r.scannerStdout.ReadString(byte('\n'))
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
line = strings.TrimSuffix(line, "\n")
|
||||
r.writeLogLines([]string{line})
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RunnerV2) startLoggingReaderStderr() {
|
||||
for {
|
||||
line, err := r.scannerStderr.ReadString(byte('\n'))
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
line = strings.TrimSuffix(line, "\n")
|
||||
r.writeLogLines([]string{line})
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RunnerV2) startHealthCheck() {
|
||||
if r.cmd.ProcessState == nil || r.cmd.ProcessState.Exited() {
|
||||
return
|
||||
}
|
||||
for {
|
||||
exists, _ := process.PidExists(int32(r.pid))
|
||||
if !exists {
|
||||
// process lost
|
||||
r.ch <- constants.TaskSignalLost
|
||||
return
|
||||
}
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RunnerV2) configureEnv() {
|
||||
// 默认把Node.js的全局node_modules加入环境变量
|
||||
envPath := os.Getenv("PATH")
|
||||
nodePath := "/usr/lib/node_modules"
|
||||
if !strings.Contains(envPath, nodePath) {
|
||||
_ = os.Setenv("PATH", nodePath+":"+envPath)
|
||||
}
|
||||
_ = os.Setenv("NODE_PATH", nodePath)
|
||||
|
||||
// default envs
|
||||
r.cmd.Env = append(os.Environ(), "CRAWLAB_TASK_ID="+r.tid.Hex())
|
||||
if viper.GetString("grpc.address") != "" {
|
||||
r.cmd.Env = append(r.cmd.Env, "CRAWLAB_GRPC_ADDRESS="+viper.GetString("grpc.address"))
|
||||
}
|
||||
if viper.GetString("grpc.authKey") != "" {
|
||||
r.cmd.Env = append(r.cmd.Env, "CRAWLAB_GRPC_AUTH_KEY="+viper.GetString("grpc.authKey"))
|
||||
} else {
|
||||
r.cmd.Env = append(r.cmd.Env, "CRAWLAB_GRPC_AUTH_KEY="+constants.DefaultGrpcAuthKey)
|
||||
}
|
||||
|
||||
// global environment variables
|
||||
envs, err := client.NewModelServiceV2[models.EnvironmentV2]().GetMany(nil, nil)
|
||||
if err != nil {
|
||||
trace.PrintError(err)
|
||||
return
|
||||
}
|
||||
for _, env := range envs {
|
||||
r.cmd.Env = append(r.cmd.Env, env.Key+"="+env.Value)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RunnerV2) syncFiles() (err error) {
|
||||
masterURL := fmt.Sprintf("%s/sync/%s", viper.GetString("api.endpoint"), r.s.Id.Hex())
|
||||
workspacePath := viper.GetString("workspace")
|
||||
workerDir := filepath.Join(workspacePath, r.s.Id.Hex())
|
||||
|
||||
// get file list from master
|
||||
resp, err := http.Get(masterURL + "/scan")
|
||||
if err != nil {
|
||||
fmt.Println("Error getting file list from master:", err)
|
||||
return trace.TraceError(err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
fmt.Println("Error reading response body:", err)
|
||||
return trace.TraceError(err)
|
||||
}
|
||||
var masterFiles map[string]entity.FsFileInfo
|
||||
err = json.Unmarshal(body, &masterFiles)
|
||||
if err != nil {
|
||||
fmt.Println("Error unmarshaling JSON:", err)
|
||||
return trace.TraceError(err)
|
||||
}
|
||||
|
||||
// create a map for master files
|
||||
masterFilesMap := make(map[string]entity.FsFileInfo)
|
||||
for _, file := range masterFiles {
|
||||
masterFilesMap[file.Path] = file
|
||||
}
|
||||
|
||||
// create worker directory if not exists
|
||||
if _, err := os.Stat(workerDir); os.IsNotExist(err) {
|
||||
if err := os.MkdirAll(workerDir, os.ModePerm); err != nil {
|
||||
fmt.Println("Error creating worker directory:", err)
|
||||
return trace.TraceError(err)
|
||||
}
|
||||
}
|
||||
|
||||
// get file list from worker
|
||||
workerFiles, err := utils.ScanDirectory(workerDir)
|
||||
if err != nil {
|
||||
fmt.Println("Error scanning worker directory:", err)
|
||||
return trace.TraceError(err)
|
||||
}
|
||||
|
||||
// set up wait group and error channel
|
||||
var wg sync.WaitGroup
|
||||
errCh := make(chan error, 1)
|
||||
|
||||
// delete files that are deleted on master node
|
||||
for path, workerFile := range workerFiles {
|
||||
if _, exists := masterFilesMap[path]; !exists {
|
||||
fmt.Println("Deleting file:", path)
|
||||
err := os.Remove(workerFile.FullPath)
|
||||
if err != nil {
|
||||
fmt.Println("Error deleting file:", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// download files that are new or modified on master node
|
||||
for path, masterFile := range masterFilesMap {
|
||||
workerFile, exists := workerFiles[path]
|
||||
if !exists || masterFile.Hash != workerFile.Hash {
|
||||
wg.Add(1)
|
||||
go func(path string, masterFile entity.FsFileInfo) {
|
||||
defer wg.Done()
|
||||
logrus.Infof("File needs to be synchronized: %s", path)
|
||||
err := r.downloadFile(masterURL+"/download?path="+path, filepath.Join(workerDir, path))
|
||||
if err != nil {
|
||||
logrus.Errorf("Error downloading file: %v", err)
|
||||
select {
|
||||
case errCh <- err:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}(path, masterFile)
|
||||
}
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
close(errCh)
|
||||
if err := <-errCh; err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *RunnerV2) downloadFile(url string, filePath string) error {
|
||||
resp, err := http.Get(url)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
out, err := os.Create(filePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer out.Close()
|
||||
|
||||
_, err = io.Copy(out, resp.Body)
|
||||
return err
|
||||
}
|
||||
|
||||
// wait for process to finish and send task signal (constants.TaskSignal)
|
||||
// to task runner's channel (RunnerV2.ch) according to exit code
|
||||
func (r *RunnerV2) wait() {
|
||||
// wait for process to finish
|
||||
if err := r.cmd.Wait(); err != nil {
|
||||
exitError, ok := err.(*exec.ExitError)
|
||||
if !ok {
|
||||
r.ch <- constants.TaskSignalError
|
||||
return
|
||||
}
|
||||
exitCode := exitError.ExitCode()
|
||||
if exitCode == -1 {
|
||||
// cancel error
|
||||
r.ch <- constants.TaskSignalCancel
|
||||
return
|
||||
}
|
||||
|
||||
// standard error
|
||||
r.err = err
|
||||
r.ch <- constants.TaskSignalError
|
||||
return
|
||||
}
|
||||
|
||||
// success
|
||||
r.ch <- constants.TaskSignalFinish
|
||||
}
|
||||
|
||||
// updateTask update and get updated info of task (RunnerV2.t)
|
||||
func (r *RunnerV2) updateTask(status string, e error) (err error) {
|
||||
if r.t != nil && status != "" {
|
||||
// update task status
|
||||
r.t.Status = status
|
||||
if e != nil {
|
||||
r.t.Error = e.Error()
|
||||
}
|
||||
if r.svc.GetNodeConfigService().IsMaster() {
|
||||
err = service2.NewModelServiceV2[models.TaskV2]().ReplaceById(r.t.Id, *r.t)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
err = client.NewModelServiceV2[models.TaskV2]().ReplaceById(r.t.Id, *r.t)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// send notification
|
||||
go r.sendNotification()
|
||||
|
||||
// update stats
|
||||
go func() {
|
||||
r._updateTaskStat(status)
|
||||
r._updateSpiderStat(status)
|
||||
}()
|
||||
}
|
||||
|
||||
// get task
|
||||
r.t, err = r.svc.GetTaskById(r.tid)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *RunnerV2) initSub() (err error) {
|
||||
r.sub, err = r.c.GetTaskClient().Subscribe(context.Background())
|
||||
if err != nil {
|
||||
return trace.TraceError(err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *RunnerV2) writeLogLines(lines []string) {
|
||||
data, err := json.Marshal(&entity.StreamMessageTaskData{
|
||||
TaskId: r.tid,
|
||||
Logs: lines,
|
||||
})
|
||||
if err != nil {
|
||||
trace.PrintError(err)
|
||||
return
|
||||
}
|
||||
msg := &grpc.StreamMessage{
|
||||
Code: grpc.StreamMessageCode_INSERT_LOGS,
|
||||
Data: data,
|
||||
}
|
||||
if err := r.sub.Send(msg); err != nil {
|
||||
trace.PrintError(err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RunnerV2) _updateTaskStat(status string) {
|
||||
ts, err := client.NewModelServiceV2[models.TaskStatV2]().GetById(r.tid)
|
||||
if err != nil {
|
||||
trace.PrintError(err)
|
||||
return
|
||||
}
|
||||
switch status {
|
||||
case constants.TaskStatusPending:
|
||||
// do nothing
|
||||
case constants.TaskStatusRunning:
|
||||
ts.StartTs = time.Now()
|
||||
ts.WaitDuration = ts.StartTs.Sub(ts.CreateTs).Milliseconds()
|
||||
case constants.TaskStatusFinished, constants.TaskStatusError, constants.TaskStatusCancelled:
|
||||
if ts.StartTs.IsZero() {
|
||||
ts.StartTs = time.Now()
|
||||
ts.WaitDuration = ts.StartTs.Sub(ts.CreateTs).Milliseconds()
|
||||
}
|
||||
ts.EndTs = time.Now()
|
||||
ts.RuntimeDuration = ts.EndTs.Sub(ts.StartTs).Milliseconds()
|
||||
ts.TotalDuration = ts.EndTs.Sub(ts.CreateTs).Milliseconds()
|
||||
}
|
||||
if r.svc.GetNodeConfigService().IsMaster() {
|
||||
err = service2.NewModelServiceV2[models.TaskStatV2]().ReplaceById(ts.Id, *ts)
|
||||
if err != nil {
|
||||
trace.PrintError(err)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
err = client.NewModelServiceV2[models.TaskStatV2]().ReplaceById(ts.Id, *ts)
|
||||
if err != nil {
|
||||
trace.PrintError(err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RunnerV2) sendNotification() {
|
||||
data, err := json.Marshal(r.t)
|
||||
if err != nil {
|
||||
trace.PrintError(err)
|
||||
return
|
||||
}
|
||||
req := &grpc.Request{
|
||||
NodeKey: r.svc.GetNodeConfigService().GetNodeKey(),
|
||||
Data: data,
|
||||
}
|
||||
_, err = r.c.GetTaskClient().SendNotification(context.Background(), req)
|
||||
if err != nil {
|
||||
trace.PrintError(err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RunnerV2) _updateSpiderStat(status string) {
|
||||
// task stat
|
||||
ts, err := client.NewModelServiceV2[models.TaskStatV2]().GetById(r.tid)
|
||||
if err != nil {
|
||||
trace.PrintError(err)
|
||||
return
|
||||
}
|
||||
|
||||
// update
|
||||
var update bson.M
|
||||
switch status {
|
||||
case constants.TaskStatusPending, constants.TaskStatusRunning:
|
||||
update = bson.M{
|
||||
"$set": bson.M{
|
||||
"last_task_id": r.tid, // last task id
|
||||
},
|
||||
"$inc": bson.M{
|
||||
"tasks": 1, // task count
|
||||
"wait_duration": ts.WaitDuration, // wait duration
|
||||
},
|
||||
}
|
||||
case constants.TaskStatusFinished, constants.TaskStatusError, constants.TaskStatusCancelled:
|
||||
update = bson.M{
|
||||
"$set": bson.M{
|
||||
"last_task_id": r.tid, // last task id
|
||||
},
|
||||
"$inc": bson.M{
|
||||
"results": ts.ResultCount, // results
|
||||
"runtime_duration": ts.RuntimeDuration / 1000, // runtime duration
|
||||
"total_duration": ts.TotalDuration / 1000, // total duration
|
||||
},
|
||||
}
|
||||
default:
|
||||
trace.PrintError(errors.ErrorTaskInvalidType)
|
||||
return
|
||||
}
|
||||
|
||||
// perform update
|
||||
if r.svc.GetNodeConfigService().IsMaster() {
|
||||
err = service2.NewModelServiceV2[models.SpiderStatV2]().UpdateById(r.s.Id, update)
|
||||
if err != nil {
|
||||
trace.PrintError(err)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
err = client.NewModelServiceV2[models.SpiderStatV2]().UpdateById(r.s.Id, update)
|
||||
if err != nil {
|
||||
trace.PrintError(err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func NewTaskRunnerV2(id primitive.ObjectID, svc *ServiceV2) (r2 *RunnerV2, err error) {
|
||||
// validate options
|
||||
if id.IsZero() {
|
||||
return nil, constants.ErrInvalidOptions
|
||||
}
|
||||
|
||||
// runner
|
||||
r := &RunnerV2{
|
||||
subscribeTimeout: 30 * time.Second,
|
||||
bufferSize: 1024 * 1024,
|
||||
svc: svc,
|
||||
tid: id,
|
||||
ch: make(chan constants.TaskSignal),
|
||||
logBatchSize: 20,
|
||||
}
|
||||
|
||||
// task
|
||||
r.t, err = svc.GetTaskById(id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// spider
|
||||
r.s, err = svc.GetSpiderById(r.t.SpiderId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// task fs service
|
||||
r.fsSvc = fs2.NewFsServiceV2(filepath.Join(viper.GetString("workspace"), r.s.Id.Hex()))
|
||||
|
||||
// dependency injection
|
||||
if err := container.GetContainer().Invoke(func(
|
||||
c interfaces.GrpcClient,
|
||||
) {
|
||||
r.c = c
|
||||
}); err != nil {
|
||||
return nil, trace.TraceError(err)
|
||||
}
|
||||
|
||||
// initialize task runner
|
||||
if err := r.Init(); err != nil {
|
||||
return r, err
|
||||
}
|
||||
|
||||
return r, nil
|
||||
}
|
||||
506
core/task/handler/service.go
Normal file
506
core/task/handler/service.go
Normal file
@@ -0,0 +1,506 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"github.com/apex/log"
|
||||
"github.com/crawlab-team/crawlab/core/constants"
|
||||
"github.com/crawlab-team/crawlab/core/container"
|
||||
errors2 "github.com/crawlab-team/crawlab/core/errors"
|
||||
"github.com/crawlab-team/crawlab/core/interfaces"
|
||||
"github.com/crawlab-team/crawlab/core/models/client"
|
||||
"github.com/crawlab-team/crawlab/core/models/delegate"
|
||||
"github.com/crawlab-team/crawlab/core/models/service"
|
||||
"github.com/crawlab-team/crawlab/core/task"
|
||||
"github.com/crawlab-team/go-trace"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Service struct {
|
||||
// dependencies
|
||||
interfaces.TaskBaseService
|
||||
cfgSvc interfaces.NodeConfigService
|
||||
modelSvc service.ModelService
|
||||
clientModelSvc interfaces.GrpcClientModelService
|
||||
clientModelNodeSvc interfaces.GrpcClientModelNodeService
|
||||
clientModelSpiderSvc interfaces.GrpcClientModelSpiderService
|
||||
clientModelTaskSvc interfaces.GrpcClientModelTaskService
|
||||
clientModelTaskStatSvc interfaces.GrpcClientModelTaskStatService
|
||||
clientModelEnvironmentSvc interfaces.GrpcClientModelEnvironmentService
|
||||
c interfaces.GrpcClient // grpc client
|
||||
|
||||
// settings
|
||||
//maxRunners int
|
||||
exitWatchDuration time.Duration
|
||||
reportInterval time.Duration
|
||||
fetchInterval time.Duration
|
||||
fetchTimeout time.Duration
|
||||
cancelTimeout time.Duration
|
||||
|
||||
// internals variables
|
||||
stopped bool
|
||||
mu sync.Mutex
|
||||
runners sync.Map // pool of task runners started
|
||||
syncLocks sync.Map // files sync locks map of task runners
|
||||
}
|
||||
|
||||
func (svc *Service) Start() {
|
||||
// Initialize gRPC if not started
|
||||
if !svc.c.IsStarted() {
|
||||
svc.c.Start()
|
||||
}
|
||||
|
||||
go svc.ReportStatus()
|
||||
go svc.Fetch()
|
||||
}
|
||||
|
||||
func (svc *Service) Run(taskId primitive.ObjectID) (err error) {
|
||||
return svc.run(taskId)
|
||||
}
|
||||
|
||||
func (svc *Service) Reset() {
|
||||
svc.mu.Lock()
|
||||
defer svc.mu.Unlock()
|
||||
}
|
||||
|
||||
func (svc *Service) Cancel(taskId primitive.ObjectID) (err error) {
|
||||
r, err := svc.getRunner(taskId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := r.Cancel(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (svc *Service) Fetch() {
|
||||
for {
|
||||
// wait
|
||||
time.Sleep(svc.fetchInterval)
|
||||
|
||||
// current node
|
||||
n, err := svc.GetCurrentNode()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// skip if node is not active or enabled
|
||||
if !n.GetActive() || !n.GetEnabled() {
|
||||
continue
|
||||
}
|
||||
|
||||
// validate if there are available runners
|
||||
if svc.getRunnerCount() >= n.GetMaxRunners() {
|
||||
continue
|
||||
}
|
||||
|
||||
// stop
|
||||
if svc.stopped {
|
||||
return
|
||||
}
|
||||
|
||||
// fetch task
|
||||
tid, err := svc.fetch()
|
||||
if err != nil {
|
||||
trace.PrintError(err)
|
||||
continue
|
||||
}
|
||||
|
||||
// skip if no task id
|
||||
if tid.IsZero() {
|
||||
continue
|
||||
}
|
||||
|
||||
// run task
|
||||
if err := svc.run(tid); err != nil {
|
||||
trace.PrintError(err)
|
||||
t, err := svc.GetTaskById(tid)
|
||||
if err == nil && t.GetStatus() != constants.TaskStatusCancelled {
|
||||
t.SetError(err.Error())
|
||||
_ = svc.SaveTask(t, constants.TaskStatusError)
|
||||
continue
|
||||
}
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (svc *Service) ReportStatus() {
|
||||
for {
|
||||
if svc.stopped {
|
||||
return
|
||||
}
|
||||
|
||||
// report handler status
|
||||
if err := svc.reportStatus(); err != nil {
|
||||
trace.PrintError(err)
|
||||
}
|
||||
|
||||
// wait
|
||||
time.Sleep(svc.reportInterval)
|
||||
}
|
||||
}
|
||||
|
||||
func (svc *Service) IsSyncLocked(path string) (ok bool) {
|
||||
_, ok = svc.syncLocks.Load(path)
|
||||
return ok
|
||||
}
|
||||
|
||||
func (svc *Service) LockSync(path string) {
|
||||
svc.syncLocks.Store(path, true)
|
||||
}
|
||||
|
||||
func (svc *Service) UnlockSync(path string) {
|
||||
svc.syncLocks.Delete(path)
|
||||
}
|
||||
|
||||
//func (svc *Service) GetMaxRunners() (maxRunners int) {
|
||||
// return svc.maxRunners
|
||||
//}
|
||||
//
|
||||
//func (svc *Service) SetMaxRunners(maxRunners int) {
|
||||
// svc.maxRunners = maxRunners
|
||||
//}
|
||||
|
||||
func (svc *Service) GetExitWatchDuration() (duration time.Duration) {
|
||||
return svc.exitWatchDuration
|
||||
}
|
||||
|
||||
func (svc *Service) SetExitWatchDuration(duration time.Duration) {
|
||||
svc.exitWatchDuration = duration
|
||||
}
|
||||
|
||||
func (svc *Service) GetFetchInterval() (interval time.Duration) {
|
||||
return svc.fetchInterval
|
||||
}
|
||||
|
||||
func (svc *Service) SetFetchInterval(interval time.Duration) {
|
||||
svc.fetchInterval = interval
|
||||
}
|
||||
|
||||
func (svc *Service) GetReportInterval() (interval time.Duration) {
|
||||
return svc.reportInterval
|
||||
}
|
||||
|
||||
func (svc *Service) SetReportInterval(interval time.Duration) {
|
||||
svc.reportInterval = interval
|
||||
}
|
||||
|
||||
func (svc *Service) GetCancelTimeout() (timeout time.Duration) {
|
||||
return svc.cancelTimeout
|
||||
}
|
||||
|
||||
func (svc *Service) SetCancelTimeout(timeout time.Duration) {
|
||||
svc.cancelTimeout = timeout
|
||||
}
|
||||
|
||||
func (svc *Service) GetModelService() (modelSvc interfaces.GrpcClientModelService) {
|
||||
return svc.clientModelSvc
|
||||
}
|
||||
|
||||
func (svc *Service) GetModelSpiderService() (modelSpiderSvc interfaces.GrpcClientModelSpiderService) {
|
||||
return svc.clientModelSpiderSvc
|
||||
}
|
||||
|
||||
func (svc *Service) GetModelTaskService() (modelTaskSvc interfaces.GrpcClientModelTaskService) {
|
||||
return svc.clientModelTaskSvc
|
||||
}
|
||||
|
||||
func (svc *Service) GetModelTaskStatService() (modelTaskSvc interfaces.GrpcClientModelTaskStatService) {
|
||||
return svc.clientModelTaskStatSvc
|
||||
}
|
||||
|
||||
func (svc *Service) GetModelEnvironmentService() (modelTaskSvc interfaces.GrpcClientModelEnvironmentService) {
|
||||
return svc.clientModelEnvironmentSvc
|
||||
}
|
||||
|
||||
func (svc *Service) GetNodeConfigService() (cfgSvc interfaces.NodeConfigService) {
|
||||
return svc.cfgSvc
|
||||
}
|
||||
|
||||
func (svc *Service) GetCurrentNode() (n interfaces.Node, err error) {
|
||||
// node key
|
||||
nodeKey := svc.cfgSvc.GetNodeKey()
|
||||
|
||||
// current node
|
||||
if svc.cfgSvc.IsMaster() {
|
||||
n, err = svc.modelSvc.GetNodeByKey(nodeKey, nil)
|
||||
} else {
|
||||
n, err = svc.clientModelNodeSvc.GetNodeByKey(nodeKey)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func (svc *Service) GetTaskById(id primitive.ObjectID) (t interfaces.Task, err error) {
|
||||
if svc.cfgSvc.IsMaster() {
|
||||
t, err = svc.modelSvc.GetTaskById(id)
|
||||
} else {
|
||||
t, err = svc.clientModelTaskSvc.GetTaskById(id)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return t, nil
|
||||
}
|
||||
|
||||
func (svc *Service) GetSpiderById(id primitive.ObjectID) (s interfaces.Spider, err error) {
|
||||
if svc.cfgSvc.IsMaster() {
|
||||
s, err = svc.modelSvc.GetSpiderById(id)
|
||||
} else {
|
||||
s, err = svc.clientModelSpiderSvc.GetSpiderById(id)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (svc *Service) getRunners() (runners []*Runner) {
|
||||
svc.mu.Lock()
|
||||
defer svc.mu.Unlock()
|
||||
svc.runners.Range(func(key, value interface{}) bool {
|
||||
r := value.(Runner)
|
||||
runners = append(runners, &r)
|
||||
return true
|
||||
})
|
||||
return runners
|
||||
}
|
||||
|
||||
func (svc *Service) getRunnerCount() (count int) {
|
||||
n, err := svc.GetCurrentNode()
|
||||
if err != nil {
|
||||
trace.PrintError(err)
|
||||
return
|
||||
}
|
||||
query := bson.M{
|
||||
"node_id": n.GetId(),
|
||||
"status": constants.TaskStatusRunning,
|
||||
}
|
||||
if svc.cfgSvc.IsMaster() {
|
||||
count, err = svc.modelSvc.GetBaseService(interfaces.ModelIdTask).Count(query)
|
||||
if err != nil {
|
||||
trace.PrintError(err)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
count, err = svc.clientModelTaskSvc.Count(query)
|
||||
if err != nil {
|
||||
trace.PrintError(err)
|
||||
return
|
||||
}
|
||||
}
|
||||
return count
|
||||
}
|
||||
|
||||
func (svc *Service) getRunner(taskId primitive.ObjectID) (r interfaces.TaskRunner, err error) {
|
||||
log.Debugf("[TaskHandlerService] getRunner: taskId[%v]", taskId)
|
||||
v, ok := svc.runners.Load(taskId)
|
||||
if !ok {
|
||||
return nil, trace.TraceError(errors2.ErrorTaskNotExists)
|
||||
}
|
||||
switch v.(type) {
|
||||
case interfaces.TaskRunner:
|
||||
r = v.(interfaces.TaskRunner)
|
||||
default:
|
||||
return nil, trace.TraceError(errors2.ErrorModelInvalidType)
|
||||
}
|
||||
return r, nil
|
||||
}
|
||||
|
||||
func (svc *Service) addRunner(taskId primitive.ObjectID, r interfaces.TaskRunner) {
|
||||
log.Debugf("[TaskHandlerService] addRunner: taskId[%v]", taskId)
|
||||
svc.runners.Store(taskId, r)
|
||||
}
|
||||
|
||||
func (svc *Service) deleteRunner(taskId primitive.ObjectID) {
|
||||
log.Debugf("[TaskHandlerService] deleteRunner: taskId[%v]", taskId)
|
||||
svc.runners.Delete(taskId)
|
||||
}
|
||||
|
||||
func (svc *Service) saveTask(t interfaces.Task, status string) (err error) {
|
||||
// normalize status
|
||||
if status == "" {
|
||||
status = constants.TaskStatusPending
|
||||
}
|
||||
|
||||
// set task status
|
||||
t.SetStatus(status)
|
||||
|
||||
// attempt to get task from database
|
||||
_, err = svc.clientModelTaskSvc.GetTaskById(t.GetId())
|
||||
if err != nil {
|
||||
// if task does not exist, add to database
|
||||
if err == mongo.ErrNoDocuments {
|
||||
if err := client.NewModelDelegate(t, client.WithDelegateConfigPath(svc.GetConfigPath())).Add(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
// otherwise, update
|
||||
if err := client.NewModelDelegate(t, client.WithDelegateConfigPath(svc.GetConfigPath())).Save(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (svc *Service) reportStatus() (err error) {
|
||||
// current node
|
||||
n, err := svc.GetCurrentNode()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// available runners of handler
|
||||
ar := n.GetMaxRunners() - svc.getRunnerCount()
|
||||
|
||||
// set available runners
|
||||
n.SetAvailableRunners(ar)
|
||||
|
||||
// save node
|
||||
if svc.cfgSvc.IsMaster() {
|
||||
err = delegate.NewModelDelegate(n).Save()
|
||||
} else {
|
||||
err = client.NewModelDelegate(n, client.WithDelegateConfigPath(svc.GetConfigPath())).Save()
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (svc *Service) fetch() (tid primitive.ObjectID, err error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), svc.fetchTimeout)
|
||||
defer cancel()
|
||||
res, err := svc.c.GetTaskClient().Fetch(ctx, svc.c.NewRequest(nil))
|
||||
if err != nil {
|
||||
return tid, trace.TraceError(err)
|
||||
}
|
||||
if err := json.Unmarshal(res.Data, &tid); err != nil {
|
||||
return tid, trace.TraceError(err)
|
||||
}
|
||||
return tid, nil
|
||||
}
|
||||
|
||||
func (svc *Service) run(taskId primitive.ObjectID) (err error) {
|
||||
// attempt to get runner from pool
|
||||
_, ok := svc.runners.Load(taskId)
|
||||
if ok {
|
||||
return trace.TraceError(errors2.ErrorTaskAlreadyExists)
|
||||
}
|
||||
|
||||
// create a new task runner
|
||||
r, err := NewTaskRunner(taskId, svc)
|
||||
if err != nil {
|
||||
return trace.TraceError(err)
|
||||
}
|
||||
|
||||
// add runner to pool
|
||||
svc.addRunner(taskId, r)
|
||||
|
||||
// create a goroutine to run task
|
||||
go func() {
|
||||
// delete runner from pool
|
||||
defer svc.deleteRunner(r.GetTaskId())
|
||||
defer func(r interfaces.TaskRunner) {
|
||||
err := r.CleanUp()
|
||||
if err != nil {
|
||||
log.Errorf("task[%s] clean up error: %v", r.GetTaskId().Hex(), err)
|
||||
}
|
||||
}(r)
|
||||
// run task process (blocking)
|
||||
// error or finish after task runner ends
|
||||
if err := r.Run(); err != nil {
|
||||
switch {
|
||||
case errors.Is(err, constants.ErrTaskError):
|
||||
log.Errorf("task[%s] finished with error: %v", r.GetTaskId().Hex(), err)
|
||||
case errors.Is(err, constants.ErrTaskCancelled):
|
||||
log.Errorf("task[%s] cancelled", r.GetTaskId().Hex())
|
||||
default:
|
||||
log.Errorf("task[%s] finished with unknown error: %v", r.GetTaskId().Hex(), err)
|
||||
}
|
||||
}
|
||||
log.Infof("task[%s] finished", r.GetTaskId().Hex())
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewTaskHandlerService() (svc2 interfaces.TaskHandlerService, err error) {
|
||||
// base service
|
||||
baseSvc, err := task.NewBaseService()
|
||||
if err != nil {
|
||||
return nil, trace.TraceError(err)
|
||||
}
|
||||
|
||||
// service
|
||||
svc := &Service{
|
||||
TaskBaseService: baseSvc,
|
||||
exitWatchDuration: 60 * time.Second,
|
||||
fetchInterval: 1 * time.Second,
|
||||
fetchTimeout: 15 * time.Second,
|
||||
reportInterval: 5 * time.Second,
|
||||
cancelTimeout: 5 * time.Second,
|
||||
mu: sync.Mutex{},
|
||||
runners: sync.Map{},
|
||||
syncLocks: sync.Map{},
|
||||
}
|
||||
|
||||
// dependency injection
|
||||
if err := container.GetContainer().Invoke(func(
|
||||
cfgSvc interfaces.NodeConfigService,
|
||||
modelSvc service.ModelService,
|
||||
clientModelSvc interfaces.GrpcClientModelService,
|
||||
clientModelNodeSvc interfaces.GrpcClientModelNodeService,
|
||||
clientModelSpiderSvc interfaces.GrpcClientModelSpiderService,
|
||||
clientModelTaskSvc interfaces.GrpcClientModelTaskService,
|
||||
clientModelTaskStatSvc interfaces.GrpcClientModelTaskStatService,
|
||||
clientModelEnvironmentSvc interfaces.GrpcClientModelEnvironmentService,
|
||||
c interfaces.GrpcClient,
|
||||
) {
|
||||
svc.cfgSvc = cfgSvc
|
||||
svc.modelSvc = modelSvc
|
||||
svc.clientModelSvc = clientModelSvc
|
||||
svc.clientModelNodeSvc = clientModelNodeSvc
|
||||
svc.clientModelSpiderSvc = clientModelSpiderSvc
|
||||
svc.clientModelTaskSvc = clientModelTaskSvc
|
||||
svc.clientModelTaskStatSvc = clientModelTaskStatSvc
|
||||
svc.clientModelEnvironmentSvc = clientModelEnvironmentSvc
|
||||
svc.c = c
|
||||
}); err != nil {
|
||||
return nil, trace.TraceError(err)
|
||||
}
|
||||
|
||||
log.Debugf("[NewTaskHandlerService] svc[cfgPath: %s]", svc.cfgSvc.GetConfigPath())
|
||||
|
||||
return svc, nil
|
||||
}
|
||||
|
||||
var _service interfaces.TaskHandlerService
|
||||
|
||||
func GetTaskHandlerService() (svr interfaces.TaskHandlerService, err error) {
|
||||
if _service != nil {
|
||||
return _service, nil
|
||||
}
|
||||
_service, err = NewTaskHandlerService()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return _service, nil
|
||||
}
|
||||
426
core/task/handler/service_v2.go
Normal file
426
core/task/handler/service_v2.go
Normal file
@@ -0,0 +1,426 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"github.com/apex/log"
|
||||
"github.com/crawlab-team/crawlab/core/constants"
|
||||
errors2 "github.com/crawlab-team/crawlab/core/errors"
|
||||
grpcclient "github.com/crawlab-team/crawlab/core/grpc/client"
|
||||
"github.com/crawlab-team/crawlab/core/interfaces"
|
||||
"github.com/crawlab-team/crawlab/core/models/client"
|
||||
"github.com/crawlab-team/crawlab/core/models/models"
|
||||
"github.com/crawlab-team/crawlab/core/models/service"
|
||||
nodeconfig "github.com/crawlab-team/crawlab/core/node/config"
|
||||
"github.com/crawlab-team/go-trace"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type ServiceV2 struct {
|
||||
// dependencies
|
||||
cfgSvc interfaces.NodeConfigService
|
||||
c *grpcclient.GrpcClientV2 // grpc client
|
||||
|
||||
// settings
|
||||
//maxRunners int
|
||||
exitWatchDuration time.Duration
|
||||
reportInterval time.Duration
|
||||
fetchInterval time.Duration
|
||||
fetchTimeout time.Duration
|
||||
cancelTimeout time.Duration
|
||||
|
||||
// internals variables
|
||||
stopped bool
|
||||
mu sync.Mutex
|
||||
runners sync.Map // pool of task runners started
|
||||
syncLocks sync.Map // files sync locks map of task runners
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) Start() {
|
||||
// Initialize gRPC if not started
|
||||
if !svc.c.IsStarted() {
|
||||
svc.c.Start()
|
||||
}
|
||||
|
||||
go svc.ReportStatus()
|
||||
go svc.Fetch()
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) Run(taskId primitive.ObjectID) (err error) {
|
||||
return svc.run(taskId)
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) Reset() {
|
||||
svc.mu.Lock()
|
||||
defer svc.mu.Unlock()
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) Cancel(taskId primitive.ObjectID) (err error) {
|
||||
r, err := svc.getRunner(taskId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := r.Cancel(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) Fetch() {
|
||||
for {
|
||||
// wait
|
||||
time.Sleep(svc.fetchInterval)
|
||||
|
||||
// current node
|
||||
n, err := svc.GetCurrentNode()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// skip if node is not active or enabled
|
||||
if !n.Active || !n.Enabled {
|
||||
continue
|
||||
}
|
||||
|
||||
// validate if there are available runners
|
||||
if svc.getRunnerCount() >= n.MaxRunners {
|
||||
continue
|
||||
}
|
||||
|
||||
// stop
|
||||
if svc.stopped {
|
||||
return
|
||||
}
|
||||
|
||||
// fetch task
|
||||
tid, err := svc.fetch()
|
||||
if err != nil {
|
||||
trace.PrintError(err)
|
||||
continue
|
||||
}
|
||||
|
||||
// skip if no task id
|
||||
if tid.IsZero() {
|
||||
continue
|
||||
}
|
||||
|
||||
// run task
|
||||
if err := svc.run(tid); err != nil {
|
||||
trace.PrintError(err)
|
||||
t, err := svc.GetTaskById(tid)
|
||||
if err == nil && t.Status != constants.TaskStatusCancelled {
|
||||
t.Error = err.Error()
|
||||
t.Status = constants.TaskStatusError
|
||||
t.SetUpdated(t.CreatedBy)
|
||||
continue
|
||||
}
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) ReportStatus() {
|
||||
for {
|
||||
if svc.stopped {
|
||||
return
|
||||
}
|
||||
|
||||
// report handler status
|
||||
if err := svc.reportStatus(); err != nil {
|
||||
trace.PrintError(err)
|
||||
}
|
||||
|
||||
// wait
|
||||
time.Sleep(svc.reportInterval)
|
||||
}
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) IsSyncLocked(path string) (ok bool) {
|
||||
_, ok = svc.syncLocks.Load(path)
|
||||
return ok
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) LockSync(path string) {
|
||||
svc.syncLocks.Store(path, true)
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) UnlockSync(path string) {
|
||||
svc.syncLocks.Delete(path)
|
||||
}
|
||||
|
||||
//func (svc *ServiceV2) GetMaxRunners() (maxRunners int) {
|
||||
// return svc.maxRunners
|
||||
//}
|
||||
//
|
||||
//func (svc *ServiceV2) SetMaxRunners(maxRunners int) {
|
||||
// svc.maxRunners = maxRunners
|
||||
//}
|
||||
|
||||
func (svc *ServiceV2) GetExitWatchDuration() (duration time.Duration) {
|
||||
return svc.exitWatchDuration
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) SetExitWatchDuration(duration time.Duration) {
|
||||
svc.exitWatchDuration = duration
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) GetFetchInterval() (interval time.Duration) {
|
||||
return svc.fetchInterval
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) SetFetchInterval(interval time.Duration) {
|
||||
svc.fetchInterval = interval
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) GetReportInterval() (interval time.Duration) {
|
||||
return svc.reportInterval
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) SetReportInterval(interval time.Duration) {
|
||||
svc.reportInterval = interval
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) GetCancelTimeout() (timeout time.Duration) {
|
||||
return svc.cancelTimeout
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) SetCancelTimeout(timeout time.Duration) {
|
||||
svc.cancelTimeout = timeout
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) GetNodeConfigService() (cfgSvc interfaces.NodeConfigService) {
|
||||
return svc.cfgSvc
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) GetCurrentNode() (n *models.NodeV2, err error) {
|
||||
// node key
|
||||
nodeKey := svc.cfgSvc.GetNodeKey()
|
||||
|
||||
// current node
|
||||
if svc.cfgSvc.IsMaster() {
|
||||
n, err = service.NewModelServiceV2[models.NodeV2]().GetOne(bson.M{"key": nodeKey}, nil)
|
||||
} else {
|
||||
n, err = client.NewModelServiceV2[models.NodeV2]().GetOne(bson.M{"key": nodeKey}, nil)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) GetTaskById(id primitive.ObjectID) (t *models.TaskV2, err error) {
|
||||
if svc.cfgSvc.IsMaster() {
|
||||
t, err = service.NewModelServiceV2[models.TaskV2]().GetById(id)
|
||||
} else {
|
||||
t, err = client.NewModelServiceV2[models.TaskV2]().GetById(id)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return t, nil
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) GetSpiderById(id primitive.ObjectID) (s *models.SpiderV2, err error) {
|
||||
if svc.cfgSvc.IsMaster() {
|
||||
s, err = service.NewModelServiceV2[models.SpiderV2]().GetById(id)
|
||||
} else {
|
||||
s, err = client.NewModelServiceV2[models.SpiderV2]().GetById(id)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) getRunners() (runners []*Runner) {
|
||||
svc.mu.Lock()
|
||||
defer svc.mu.Unlock()
|
||||
svc.runners.Range(func(key, value interface{}) bool {
|
||||
r := value.(Runner)
|
||||
runners = append(runners, &r)
|
||||
return true
|
||||
})
|
||||
return runners
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) getRunnerCount() (count int) {
|
||||
n, err := svc.GetCurrentNode()
|
||||
if err != nil {
|
||||
trace.PrintError(err)
|
||||
return
|
||||
}
|
||||
query := bson.M{
|
||||
"node_id": n.Id,
|
||||
"status": constants.TaskStatusRunning,
|
||||
}
|
||||
if svc.cfgSvc.IsMaster() {
|
||||
count, err = service.NewModelServiceV2[models.TaskV2]().Count(query)
|
||||
if err != nil {
|
||||
trace.PrintError(err)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
count, err = client.NewModelServiceV2[models.TaskV2]().Count(query)
|
||||
if err != nil {
|
||||
trace.PrintError(err)
|
||||
return
|
||||
}
|
||||
}
|
||||
return count
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) getRunner(taskId primitive.ObjectID) (r interfaces.TaskRunner, err error) {
|
||||
log.Debugf("[TaskHandlerService] getRunner: taskId[%v]", taskId)
|
||||
v, ok := svc.runners.Load(taskId)
|
||||
if !ok {
|
||||
return nil, trace.TraceError(errors2.ErrorTaskNotExists)
|
||||
}
|
||||
switch v.(type) {
|
||||
case interfaces.TaskRunner:
|
||||
r = v.(interfaces.TaskRunner)
|
||||
default:
|
||||
return nil, trace.TraceError(errors2.ErrorModelInvalidType)
|
||||
}
|
||||
return r, nil
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) addRunner(taskId primitive.ObjectID, r interfaces.TaskRunner) {
|
||||
log.Debugf("[TaskHandlerService] addRunner: taskId[%v]", taskId)
|
||||
svc.runners.Store(taskId, r)
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) deleteRunner(taskId primitive.ObjectID) {
|
||||
log.Debugf("[TaskHandlerService] deleteRunner: taskId[%v]", taskId)
|
||||
svc.runners.Delete(taskId)
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) reportStatus() (err error) {
|
||||
// current node
|
||||
n, err := svc.GetCurrentNode()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// available runners of handler
|
||||
ar := n.MaxRunners - svc.getRunnerCount()
|
||||
|
||||
// set available runners
|
||||
n.AvailableRunners = ar
|
||||
|
||||
// save node
|
||||
n.SetUpdated(n.CreatedBy)
|
||||
if svc.cfgSvc.IsMaster() {
|
||||
err = service.NewModelServiceV2[models.NodeV2]().ReplaceById(n.Id, *n)
|
||||
} else {
|
||||
err = client.NewModelServiceV2[models.NodeV2]().ReplaceById(n.Id, *n)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) fetch() (tid primitive.ObjectID, err error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), svc.fetchTimeout)
|
||||
defer cancel()
|
||||
res, err := svc.c.TaskClient.Fetch(ctx, svc.c.NewRequest(nil))
|
||||
if err != nil {
|
||||
return tid, trace.TraceError(err)
|
||||
}
|
||||
if err := json.Unmarshal(res.Data, &tid); err != nil {
|
||||
return tid, trace.TraceError(err)
|
||||
}
|
||||
return tid, nil
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) run(taskId primitive.ObjectID) (err error) {
|
||||
// attempt to get runner from pool
|
||||
_, ok := svc.runners.Load(taskId)
|
||||
if ok {
|
||||
return trace.TraceError(errors2.ErrorTaskAlreadyExists)
|
||||
}
|
||||
|
||||
// create a new task runner
|
||||
r, err := NewTaskRunnerV2(taskId, svc)
|
||||
if err != nil {
|
||||
return trace.TraceError(err)
|
||||
}
|
||||
|
||||
// add runner to pool
|
||||
svc.addRunner(taskId, r)
|
||||
|
||||
// create a goroutine to run task
|
||||
go func() {
|
||||
// delete runner from pool
|
||||
defer svc.deleteRunner(r.GetTaskId())
|
||||
defer func(r interfaces.TaskRunner) {
|
||||
err := r.CleanUp()
|
||||
if err != nil {
|
||||
log.Errorf("task[%s] clean up error: %v", r.GetTaskId().Hex(), err)
|
||||
}
|
||||
}(r)
|
||||
// run task process (blocking)
|
||||
// error or finish after task runner ends
|
||||
if err := r.Run(); err != nil {
|
||||
switch {
|
||||
case errors.Is(err, constants.ErrTaskError):
|
||||
log.Errorf("task[%s] finished with error: %v", r.GetTaskId().Hex(), err)
|
||||
case errors.Is(err, constants.ErrTaskCancelled):
|
||||
log.Errorf("task[%s] cancelled", r.GetTaskId().Hex())
|
||||
default:
|
||||
log.Errorf("task[%s] finished with unknown error: %v", r.GetTaskId().Hex(), err)
|
||||
}
|
||||
}
|
||||
log.Infof("task[%s] finished", r.GetTaskId().Hex())
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewTaskHandlerServiceV2() (svc2 *ServiceV2, err error) {
|
||||
// service
|
||||
svc := &ServiceV2{
|
||||
exitWatchDuration: 60 * time.Second,
|
||||
fetchInterval: 1 * time.Second,
|
||||
fetchTimeout: 15 * time.Second,
|
||||
reportInterval: 5 * time.Second,
|
||||
cancelTimeout: 5 * time.Second,
|
||||
mu: sync.Mutex{},
|
||||
runners: sync.Map{},
|
||||
syncLocks: sync.Map{},
|
||||
}
|
||||
|
||||
// dependency injection
|
||||
svc.cfgSvc = nodeconfig.GetNodeConfigService()
|
||||
|
||||
// grpc client
|
||||
svc.c, err = grpcclient.NewGrpcClientV2()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
log.Debugf("[NewTaskHandlerService] svc[cfgPath: %s]", svc.cfgSvc.GetConfigPath())
|
||||
|
||||
return svc, nil
|
||||
}
|
||||
|
||||
var _serviceV2 *ServiceV2
|
||||
|
||||
func GetTaskHandlerServiceV2() (svr *ServiceV2, err error) {
|
||||
if _serviceV2 != nil {
|
||||
return _serviceV2, nil
|
||||
}
|
||||
_serviceV2, err = NewTaskHandlerServiceV2()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return _serviceV2, nil
|
||||
}
|
||||
Reference in New Issue
Block a user