mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-01-22 17:31:03 +01:00
- Remove Crawlab-specific environment variables from the task runner's environment - Automatically create workspace directory if it doesn't exist - Enhance environment setup to prevent potential configuration conflicts
1207 lines
31 KiB
Go
1207 lines
31 KiB
Go
package handler
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/crawlab-team/crawlab/core/dependency"
|
|
"github.com/crawlab-team/crawlab/core/fs"
|
|
"github.com/hashicorp/go-multierror"
|
|
|
|
"github.com/crawlab-team/crawlab/core/models/models"
|
|
|
|
"github.com/crawlab-team/crawlab/core/constants"
|
|
"github.com/crawlab-team/crawlab/core/entity"
|
|
client2 "github.com/crawlab-team/crawlab/core/grpc/client"
|
|
"github.com/crawlab-team/crawlab/core/interfaces"
|
|
"github.com/crawlab-team/crawlab/core/models/client"
|
|
"github.com/crawlab-team/crawlab/core/models/service"
|
|
"github.com/crawlab-team/crawlab/core/utils"
|
|
"github.com/crawlab-team/crawlab/grpc"
|
|
"go.mongodb.org/mongo-driver/bson"
|
|
"go.mongodb.org/mongo-driver/bson/primitive"
|
|
)
|
|
|
|
// newTaskRunner creates a new task runner instance with the specified task ID
|
|
// It initializes all necessary components and establishes required connections
|
|
func newTaskRunner(id primitive.ObjectID, svc *Service) (r *Runner, err error) {
|
|
// validate options
|
|
if id.IsZero() {
|
|
err = fmt.Errorf("invalid task id: %s", id.Hex())
|
|
return nil, err
|
|
}
|
|
|
|
// runner
|
|
r = &Runner{
|
|
subscribeTimeout: 30 * time.Second,
|
|
bufferSize: 1024 * 1024,
|
|
svc: svc,
|
|
tid: id,
|
|
ch: make(chan constants.TaskSignal),
|
|
logBatchSize: 20,
|
|
Logger: utils.NewLogger("TaskRunner"),
|
|
}
|
|
|
|
// multi error
|
|
var errs multierror.Error
|
|
|
|
// task
|
|
r.t, err = svc.GetTaskById(id)
|
|
if err != nil {
|
|
errs.Errors = append(errs.Errors, err)
|
|
} else {
|
|
// spider
|
|
r.s, err = svc.GetSpiderById(r.t.SpiderId)
|
|
if err != nil {
|
|
errs.Errors = append(errs.Errors, err)
|
|
} else {
|
|
// task fs service
|
|
r.fsSvc = fs.NewFsService(filepath.Join(utils.GetWorkspace(), r.s.Id.Hex()))
|
|
}
|
|
}
|
|
|
|
// Initialize context and done channel
|
|
r.ctx, r.cancel = context.WithCancel(context.Background())
|
|
r.done = make(chan struct{})
|
|
|
|
// initialize task runner
|
|
if err := r.Init(); err != nil {
|
|
r.Errorf("error initializing task runner: %v", err)
|
|
errs.Errors = append(errs.Errors, err)
|
|
}
|
|
|
|
return r, errs.ErrorOrNil()
|
|
}
|
|
|
|
// Runner represents a task execution handler that manages the lifecycle of a running task
|
|
type Runner struct {
|
|
// dependencies
|
|
svc *Service // task handler service
|
|
fsSvc interfaces.FsService // task fs service
|
|
|
|
// settings
|
|
subscribeTimeout time.Duration // maximum time to wait for task subscription
|
|
bufferSize int // buffer size for reading process output
|
|
|
|
// internals
|
|
cmd *exec.Cmd // process command instance
|
|
pid int // process id
|
|
tid primitive.ObjectID // task id
|
|
t *models.Task // task model instance
|
|
s *models.Spider // spider model instance
|
|
ch chan constants.TaskSignal // channel for task status communication
|
|
err error // captures any process execution errors
|
|
cwd string // current working directory for task
|
|
conn grpc.TaskService_ConnectClient // gRPC stream connection for task service
|
|
interfaces.Logger
|
|
|
|
// log handling
|
|
readerStdout *bufio.Reader // reader for process stdout
|
|
readerStderr *bufio.Reader // reader for process stderr
|
|
logBatchSize int // number of log lines to batch before sending
|
|
|
|
// IPC (Inter-Process Communication)
|
|
stdinPipe io.WriteCloser // pipe for writing to child process
|
|
stdoutPipe io.ReadCloser // pipe for reading from child process
|
|
ipcChan chan entity.IPCMessage // channel for sending IPC messages
|
|
ipcHandler func(entity.IPCMessage) // callback for handling received IPC messages
|
|
|
|
// goroutine management
|
|
ctx context.Context // context for controlling goroutine lifecycle
|
|
cancel context.CancelFunc // function to cancel the context
|
|
done chan struct{} // channel to signal completion
|
|
wg sync.WaitGroup // wait group for goroutine synchronization
|
|
}
|
|
|
|
// Init initializes the task runner by updating the task status and establishing gRPC connections
|
|
func (r *Runner) Init() (err error) {
|
|
// wait for grpc client ready
|
|
client2.GetGrpcClient().WaitForReady()
|
|
|
|
// update task
|
|
if err := r.updateTask("", nil); err != nil {
|
|
return err
|
|
}
|
|
|
|
// grpc task service stream client
|
|
if err := r.initConnection(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Run executes the task and manages its lifecycle, including file synchronization, process execution,
|
|
// and status monitoring. Returns an error if the task execution fails.
|
|
func (r *Runner) Run() (err error) {
|
|
// log task started
|
|
r.Infof("task[%s] started", r.tid.Hex())
|
|
|
|
// update task status (processing)
|
|
if err := r.updateTask(constants.TaskStatusRunning, nil); err != nil {
|
|
return err
|
|
}
|
|
|
|
// configure working directory
|
|
r.configureCwd()
|
|
|
|
// sync files worker nodes
|
|
if !utils.IsMaster() {
|
|
if err := r.syncFiles(); err != nil {
|
|
return r.updateTask(constants.TaskStatusError, err)
|
|
}
|
|
}
|
|
|
|
// install dependencies
|
|
if err := r.installDependenciesIfAvailable(); err != nil {
|
|
r.Warnf("error installing dependencies: %v", err)
|
|
}
|
|
|
|
// configure cmd
|
|
err = r.configureCmd()
|
|
if err != nil {
|
|
return r.updateTask(constants.TaskStatusError, err)
|
|
}
|
|
|
|
// configure environment variables
|
|
r.configureEnv()
|
|
|
|
// start process
|
|
if err := r.cmd.Start(); err != nil {
|
|
return r.updateTask(constants.TaskStatusError, err)
|
|
}
|
|
|
|
// process id
|
|
if r.cmd.Process == nil {
|
|
return r.updateTask(constants.TaskStatusError, constants.ErrNotExists)
|
|
}
|
|
r.pid = r.cmd.Process.Pid
|
|
r.t.Pid = r.pid
|
|
|
|
// start health check
|
|
go r.startHealthCheck()
|
|
|
|
// Start IPC reader
|
|
go r.startIPCReader()
|
|
|
|
// Start IPC handler
|
|
go r.handleIPC()
|
|
|
|
// Ensure cleanup when Run() exits
|
|
defer func() {
|
|
_ = r.conn.CloseSend() // Close gRPC connection
|
|
r.cancel() // Cancel context to stop all goroutines
|
|
r.wg.Wait() // Wait for all goroutines to finish
|
|
close(r.done) // Signal that everything is done
|
|
close(r.ipcChan) // Close IPC channel
|
|
}()
|
|
|
|
// wait for process to finish
|
|
return r.wait()
|
|
}
|
|
|
|
// Cancel terminates the running task. If force is true, the process will be killed immediately
|
|
// without waiting for graceful shutdown.
|
|
func (r *Runner) Cancel(force bool) (err error) {
|
|
// Signal goroutines to stop
|
|
r.cancel()
|
|
|
|
// Kill process
|
|
err = utils.KillProcess(r.cmd, force)
|
|
if err != nil {
|
|
r.Errorf("kill process error: %v", err)
|
|
return err
|
|
}
|
|
r.Debugf("attempt to kill process[%d]", r.pid)
|
|
|
|
// Create a context with timeout
|
|
ctx, cancel := context.WithTimeout(context.Background(), r.svc.GetCancelTimeout())
|
|
defer cancel()
|
|
|
|
// Wait for process to be killed with context
|
|
ticker := time.NewTicker(100 * time.Millisecond)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return fmt.Errorf("timeout waiting for task to stop")
|
|
case <-ticker.C:
|
|
if !utils.ProcessIdExists(r.pid) {
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (r *Runner) SetSubscribeTimeout(timeout time.Duration) {
|
|
r.subscribeTimeout = timeout
|
|
}
|
|
|
|
func (r *Runner) GetTaskId() (id primitive.ObjectID) {
|
|
return r.tid
|
|
}
|
|
|
|
// configureCmd builds and configures the command to be executed, including setting up IPC pipes
|
|
// and processing command parameters
|
|
func (r *Runner) configureCmd() (err error) {
|
|
var cmdStr string
|
|
|
|
// command
|
|
if r.t.Cmd == "" {
|
|
cmdStr = r.s.Cmd
|
|
} else {
|
|
cmdStr = r.t.Cmd
|
|
}
|
|
|
|
// parameters
|
|
if r.t.Param != "" {
|
|
cmdStr += " " + r.t.Param
|
|
} else if r.s.Param != "" {
|
|
cmdStr += " " + r.s.Param
|
|
}
|
|
|
|
// get cmd instance
|
|
r.cmd, err = utils.BuildCmd(cmdStr)
|
|
if err != nil {
|
|
r.Errorf("error building command: %v", err)
|
|
return err
|
|
}
|
|
|
|
// set working directory
|
|
r.cmd.Dir = r.cwd
|
|
|
|
// Configure pipes for IPC and logs
|
|
r.stdinPipe, err = r.cmd.StdinPipe()
|
|
if err != nil {
|
|
r.Errorf("error creating stdin pipe: %v", err)
|
|
return err
|
|
}
|
|
|
|
// Add stdout pipe for IPC and logs
|
|
r.stdoutPipe, err = r.cmd.StdoutPipe()
|
|
if err != nil {
|
|
r.Errorf("error creating stdout pipe: %v", err)
|
|
return err
|
|
}
|
|
|
|
// Add stderr pipe for error logs
|
|
stderrPipe, err := r.cmd.StderrPipe()
|
|
if err != nil {
|
|
r.Errorf("error creating stderr pipe: %v", err)
|
|
return err
|
|
}
|
|
|
|
// Create buffered readers
|
|
r.readerStdout = bufio.NewReader(r.stdoutPipe)
|
|
r.readerStderr = bufio.NewReader(stderrPipe)
|
|
|
|
// Initialize IPC channel
|
|
r.ipcChan = make(chan entity.IPCMessage)
|
|
|
|
return nil
|
|
}
|
|
|
|
// startHealthCheck periodically verifies that the process is still running
|
|
// If the process disappears unexpectedly, it signals a task lost condition
|
|
func (r *Runner) startHealthCheck() {
|
|
r.wg.Add(1)
|
|
defer r.wg.Done()
|
|
|
|
if r.cmd.ProcessState == nil || r.cmd.ProcessState.Exited() {
|
|
return
|
|
}
|
|
|
|
ticker := time.NewTicker(1 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-r.ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
if !utils.ProcessIdExists(r.pid) {
|
|
// process lost
|
|
r.ch <- constants.TaskSignalLost
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// configurePythonPath sets up the Python environment paths, handling both pyenv and default installations
|
|
func (r *Runner) configurePythonPath() {
|
|
// Configure global node_modules path
|
|
pyenvRoot := utils.GetPyenvPath()
|
|
pyenvShimsPath := pyenvRoot + "/shims"
|
|
pyenvBinPath := pyenvRoot + "/bin"
|
|
|
|
// Configure global pyenv path
|
|
_ = os.Setenv("PYENV_ROOT", pyenvRoot)
|
|
_ = os.Setenv("PATH", pyenvShimsPath+":"+os.Getenv("PATH"))
|
|
_ = os.Setenv("PATH", pyenvBinPath+":"+os.Getenv("PATH"))
|
|
}
|
|
|
|
// configureNodePath sets up the Node.js environment paths, handling both nvm and default installations
|
|
func (r *Runner) configureNodePath() {
|
|
// Configure nvm-based Node.js paths
|
|
envPath := os.Getenv("PATH")
|
|
|
|
// Configure global node_modules path
|
|
nodePath := utils.GetNodeModulesPath()
|
|
if !strings.Contains(envPath, nodePath) {
|
|
_ = os.Setenv("PATH", nodePath+":"+envPath)
|
|
}
|
|
_ = os.Setenv("NODE_PATH", nodePath)
|
|
}
|
|
|
|
func (r *Runner) configureGoPath() {
|
|
// Configure global go path
|
|
goPath := utils.GetGoPath()
|
|
if goPath != "" {
|
|
_ = os.Setenv("GOPATH", goPath)
|
|
}
|
|
}
|
|
|
|
// configureEnv sets up the environment variables for the task process, including:
|
|
// - Node.js paths
|
|
// - Crawlab-specific variables
|
|
// - Global environment variables from the system
|
|
func (r *Runner) configureEnv() {
|
|
// Configure Python path
|
|
r.configurePythonPath()
|
|
|
|
// Configure Node.js path
|
|
r.configureNodePath()
|
|
|
|
// Configure Go path
|
|
r.configureGoPath()
|
|
|
|
// Default envs
|
|
r.cmd.Env = os.Environ()
|
|
|
|
// Remove CRAWLAB_ prefixed environment variables
|
|
for i := 0; i < len(r.cmd.Env); i++ {
|
|
if strings.HasPrefix(r.cmd.Env[i], "CRAWLAB_") {
|
|
r.cmd.Env = append(r.cmd.Env[:i], r.cmd.Env[i+1:]...)
|
|
i--
|
|
}
|
|
}
|
|
|
|
// Task-specific environment variables
|
|
r.cmd.Env = append(r.cmd.Env, "CRAWLAB_TASK_ID="+r.tid.Hex())
|
|
|
|
// Global environment variables
|
|
envs, err := client.NewModelService[models.Environment]().GetMany(nil, nil)
|
|
if err != nil {
|
|
r.Errorf("error getting environment variables: %v", err)
|
|
return
|
|
}
|
|
for _, env := range envs {
|
|
r.cmd.Env = append(r.cmd.Env, env.Key+"="+env.Value)
|
|
}
|
|
|
|
// Add environment variable for child processes to identify they're running under Crawlab
|
|
r.cmd.Env = append(r.cmd.Env, "CRAWLAB_PARENT_PID="+fmt.Sprint(os.Getpid()))
|
|
}
|
|
|
|
func (r *Runner) createHttpRequest(method, path string) (*http.Response, error) {
|
|
// Normalize path
|
|
if strings.HasPrefix(path, "/") {
|
|
path = path[1:]
|
|
}
|
|
|
|
// Construct master URL
|
|
var id string
|
|
if r.s.GitId.IsZero() {
|
|
id = r.s.Id.Hex()
|
|
} else {
|
|
id = r.s.GitId.Hex()
|
|
}
|
|
url := fmt.Sprintf("%s/sync/%s/%s", utils.GetApiEndpoint(), id, path)
|
|
|
|
// Create and execute request
|
|
req, err := http.NewRequest(method, url, nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error creating request: %v", err)
|
|
}
|
|
|
|
for k, v := range r.getHttpRequestHeaders() {
|
|
req.Header.Set(k, v)
|
|
}
|
|
|
|
return http.DefaultClient.Do(req)
|
|
}
|
|
|
|
// syncFiles synchronizes files between master and worker nodes:
|
|
// 1. Gets file list from master
|
|
// 2. Compares with local files
|
|
// 3. Downloads new/modified files
|
|
// 4. Deletes files that no longer exist on master
|
|
func (r *Runner) syncFiles() (err error) {
|
|
r.Infof("starting file synchronization for spider: %s", r.s.Id.Hex())
|
|
|
|
workingDir := ""
|
|
if !r.s.GitId.IsZero() {
|
|
workingDir = r.s.GitRootPath
|
|
r.Debugf("using git root path: %s", workingDir)
|
|
}
|
|
|
|
// get file list from master
|
|
r.Infof("fetching file list from master node")
|
|
resp, err := r.createHttpRequest("GET", "/scan?path="+workingDir)
|
|
if err != nil {
|
|
r.Errorf("error getting file list from master: %v", err)
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
body, err := io.ReadAll(resp.Body)
|
|
if err != nil {
|
|
r.Errorf("error reading response body: %v", err)
|
|
return err
|
|
}
|
|
var masterFiles map[string]entity.FsFileInfo
|
|
err = json.Unmarshal(body, &masterFiles)
|
|
if err != nil {
|
|
r.Errorf("error unmarshaling JSON for URL: %s", resp.Request.URL.String())
|
|
r.Errorf("error details: %v", err)
|
|
return err
|
|
}
|
|
|
|
// create a map for master files
|
|
masterFilesMap := make(map[string]entity.FsFileInfo)
|
|
for _, file := range masterFiles {
|
|
masterFilesMap[file.Path] = file
|
|
}
|
|
|
|
// create working directory if not exists
|
|
if _, err := os.Stat(r.cwd); os.IsNotExist(err) {
|
|
if err := os.MkdirAll(r.cwd, os.ModePerm); err != nil {
|
|
r.Errorf("error creating worker directory: %v", err)
|
|
return err
|
|
}
|
|
}
|
|
|
|
// get file list from worker
|
|
workerFiles, err := utils.ScanDirectory(r.cwd)
|
|
if err != nil {
|
|
r.Errorf("error scanning worker directory: %v", err)
|
|
return err
|
|
}
|
|
|
|
// delete files that are deleted on master node
|
|
for path, workerFile := range workerFiles {
|
|
if _, exists := masterFilesMap[path]; !exists {
|
|
r.Infof("deleting file: %s", path)
|
|
err := os.Remove(workerFile.FullPath)
|
|
if err != nil {
|
|
r.Errorf("error deleting file: %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// set up wait group and error channel
|
|
var wg sync.WaitGroup
|
|
pool := make(chan struct{}, 10)
|
|
|
|
// download files that are new or modified on master node
|
|
for path, masterFile := range masterFilesMap {
|
|
workerFile, exists := workerFiles[path]
|
|
if !exists || masterFile.Hash != workerFile.Hash {
|
|
wg.Add(1)
|
|
|
|
// acquire token
|
|
pool <- struct{}{}
|
|
|
|
// start goroutine to synchronize file or directory
|
|
go func(path string, masterFile *entity.FsFileInfo) {
|
|
defer wg.Done()
|
|
|
|
if masterFile.IsDir {
|
|
r.Infof("directory needs to be synchronized: %s", path)
|
|
_err := os.MkdirAll(filepath.Join(r.cwd, path), masterFile.Mode)
|
|
if _err != nil {
|
|
r.Errorf("error creating directory: %v", _err)
|
|
err = errors.Join(err, _err)
|
|
}
|
|
} else {
|
|
r.Infof("file needs to be synchronized: %s", path)
|
|
_err := r.downloadFile(path, filepath.Join(r.cwd, path), masterFile)
|
|
if _err != nil {
|
|
r.Errorf("error downloading file: %v", _err)
|
|
err = errors.Join(err, _err)
|
|
}
|
|
}
|
|
|
|
// release token
|
|
<-pool
|
|
|
|
}(path, &masterFile)
|
|
}
|
|
}
|
|
|
|
// wait for all files and directories to be synchronized
|
|
wg.Wait()
|
|
|
|
r.Infof("file synchronization completed successfully")
|
|
return err
|
|
}
|
|
|
|
// downloadFile downloads a file from the master node and saves it to the local file system
|
|
func (r *Runner) downloadFile(path string, filePath string, fileInfo *entity.FsFileInfo) error {
|
|
r.Debugf("downloading file: %s -> %s", path, filePath)
|
|
|
|
resp, err := r.createHttpRequest("GET", "/download?path="+path)
|
|
if err != nil {
|
|
r.Errorf("error getting file response: %v", err)
|
|
return err
|
|
}
|
|
if resp.StatusCode != http.StatusOK {
|
|
r.Errorf("error downloading file: %s", resp.Status)
|
|
return errors.New(resp.Status)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
// create directory if not exists
|
|
dirPath := filepath.Dir(filePath)
|
|
utils.Exists(dirPath)
|
|
err = os.MkdirAll(dirPath, os.ModePerm)
|
|
if err != nil {
|
|
r.Errorf("error creating directory: %v", err)
|
|
return err
|
|
}
|
|
|
|
// create local file
|
|
out, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, fileInfo.Mode)
|
|
if err != nil {
|
|
r.Errorf("error creating file: %v", err)
|
|
return err
|
|
}
|
|
defer out.Close()
|
|
|
|
// copy file content to local file
|
|
_, err = io.Copy(out, resp.Body)
|
|
if err != nil {
|
|
r.Errorf("error copying file: %v", err)
|
|
return err
|
|
}
|
|
|
|
r.Debugf("successfully downloaded file: %s (size: %d bytes)", path, fileInfo.FileSize)
|
|
return nil
|
|
}
|
|
|
|
// getHttpRequestHeaders returns the headers for HTTP requests to the master node
|
|
func (r *Runner) getHttpRequestHeaders() (headers map[string]string) {
|
|
headers = make(map[string]string)
|
|
headers["Authorization"] = utils.GetAuthKey()
|
|
return headers
|
|
}
|
|
|
|
// wait monitors the process execution and sends appropriate signals based on the exit status:
|
|
// - TaskSignalFinish for successful completion
|
|
// - TaskSignalCancel for cancellation
|
|
// - TaskSignalError for execution errors
|
|
func (r *Runner) wait() (err error) {
|
|
// start a goroutine to wait for process to finish
|
|
go func() {
|
|
r.Debugf("waiting for process[%d] to finish", r.pid)
|
|
err = r.cmd.Wait()
|
|
if err != nil {
|
|
var exitError *exec.ExitError
|
|
if !errors.As(err, &exitError) {
|
|
r.ch <- constants.TaskSignalError
|
|
r.Debugf("process[%d] exited with error: %v", r.pid, err)
|
|
return
|
|
}
|
|
exitCode := exitError.ExitCode()
|
|
if exitCode == -1 {
|
|
// cancel error
|
|
r.ch <- constants.TaskSignalCancel
|
|
r.Debugf("process[%d] cancelled", r.pid)
|
|
return
|
|
}
|
|
|
|
// standard error
|
|
r.err = err
|
|
r.ch <- constants.TaskSignalError
|
|
r.Debugf("process[%d] exited with error: %v", r.pid, err)
|
|
return
|
|
}
|
|
|
|
// success
|
|
r.ch <- constants.TaskSignalFinish
|
|
r.Debugf("process[%d] exited successfully", r.pid)
|
|
}()
|
|
|
|
// declare task status
|
|
status := ""
|
|
|
|
// wait for signal
|
|
signal := <-r.ch
|
|
switch signal {
|
|
case constants.TaskSignalFinish:
|
|
err = nil
|
|
status = constants.TaskStatusFinished
|
|
case constants.TaskSignalCancel:
|
|
err = constants.ErrTaskCancelled
|
|
status = constants.TaskStatusCancelled
|
|
case constants.TaskSignalError:
|
|
err = r.err
|
|
status = constants.TaskStatusError
|
|
case constants.TaskSignalLost:
|
|
err = constants.ErrTaskLost
|
|
status = constants.TaskStatusError
|
|
default:
|
|
err = constants.ErrInvalidSignal
|
|
status = constants.TaskStatusError
|
|
}
|
|
|
|
// update task status
|
|
if err := r.updateTask(status, err); err != nil {
|
|
r.Errorf("error updating task status: %v", err)
|
|
return err
|
|
}
|
|
|
|
// log according to status
|
|
switch status {
|
|
case constants.TaskStatusFinished:
|
|
r.Infof("task[%s] finished", r.tid.Hex())
|
|
case constants.TaskStatusCancelled:
|
|
r.Infof("task[%s] cancelled", r.tid.Hex())
|
|
case constants.TaskStatusError:
|
|
r.Errorf("task[%s] error: %v", r.tid.Hex(), err)
|
|
default:
|
|
r.Errorf("invalid task status: %s", status)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// updateTask updates the task status and related statistics in the database
|
|
// If running on a worker node, updates are sent to the master
|
|
func (r *Runner) updateTask(status string, e error) (err error) {
|
|
if status != "" {
|
|
r.Debugf("updating task status to: %s", status)
|
|
}
|
|
|
|
if r.t != nil && status != "" {
|
|
// update task status
|
|
r.t.Status = status
|
|
if e != nil {
|
|
r.t.Error = e.Error()
|
|
}
|
|
if utils.IsMaster() {
|
|
err = service.NewModelService[models.Task]().ReplaceById(r.t.Id, *r.t)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
err = client.NewModelService[models.Task]().ReplaceById(r.t.Id, *r.t)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// update stats
|
|
r._updateTaskStat(status)
|
|
r._updateSpiderStat(status)
|
|
|
|
// send notification
|
|
go r.sendNotification()
|
|
}
|
|
|
|
// get task
|
|
r.Debugf("fetching updated task from database")
|
|
r.t, err = r.svc.GetTaskById(r.tid)
|
|
if err != nil {
|
|
r.Errorf("failed to get updated task: %v", err)
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// initConnection establishes a gRPC connection to the task service
|
|
func (r *Runner) initConnection() (err error) {
|
|
r.conn, err = client2.GetGrpcClient().TaskClient.Connect(context.Background())
|
|
if err != nil {
|
|
r.Errorf("error connecting to task service: %v", err)
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// writeLogLines marshals log lines to JSON and sends them to the task service
|
|
func (r *Runner) writeLogLines(lines []string) {
|
|
linesBytes, err := json.Marshal(lines)
|
|
if err != nil {
|
|
r.Errorf("error marshaling log lines: %v", err)
|
|
return
|
|
}
|
|
msg := &grpc.TaskServiceConnectRequest{
|
|
Code: grpc.TaskServiceConnectCode_INSERT_LOGS,
|
|
TaskId: r.tid.Hex(),
|
|
Data: linesBytes,
|
|
}
|
|
if err := r.conn.Send(msg); err != nil {
|
|
r.Errorf("error sending log lines: %v", err)
|
|
return
|
|
}
|
|
}
|
|
|
|
// _updateTaskStat updates task statistics based on the current status:
|
|
// - For running tasks: sets start time and wait duration
|
|
// - For completed tasks: sets end time and calculates durations
|
|
func (r *Runner) _updateTaskStat(status string) {
|
|
if status != "" {
|
|
r.Debugf("updating task statistics for status: %s", status)
|
|
}
|
|
|
|
ts, err := client.NewModelService[models.TaskStat]().GetById(r.tid)
|
|
if err != nil {
|
|
r.Errorf("error getting task stat: %v", err)
|
|
return
|
|
}
|
|
|
|
r.Debugf("current task statistics - wait_duration: %dms, runtime_duration: %dms", ts.WaitDuration, ts.RuntimeDuration)
|
|
|
|
switch status {
|
|
case constants.TaskStatusPending:
|
|
// do nothing
|
|
case constants.TaskStatusRunning:
|
|
ts.StartTs = time.Now()
|
|
ts.WaitDuration = ts.StartTs.Sub(ts.CreatedAt).Milliseconds()
|
|
case constants.TaskStatusFinished, constants.TaskStatusError, constants.TaskStatusCancelled:
|
|
if ts.StartTs.IsZero() {
|
|
ts.StartTs = time.Now()
|
|
ts.WaitDuration = ts.StartTs.Sub(ts.CreatedAt).Milliseconds()
|
|
}
|
|
ts.EndTs = time.Now()
|
|
ts.RuntimeDuration = ts.EndTs.Sub(ts.StartTs).Milliseconds()
|
|
ts.TotalDuration = ts.EndTs.Sub(ts.CreatedAt).Milliseconds()
|
|
}
|
|
if utils.IsMaster() {
|
|
err = service.NewModelService[models.TaskStat]().ReplaceById(ts.Id, *ts)
|
|
if err != nil {
|
|
r.Errorf("error updating task stat: %v", err)
|
|
return
|
|
}
|
|
} else {
|
|
err = client.NewModelService[models.TaskStat]().ReplaceById(ts.Id, *ts)
|
|
if err != nil {
|
|
r.Errorf("error updating task stat: %v", err)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// sendNotification sends a notification to the task service
|
|
func (r *Runner) sendNotification() {
|
|
req := &grpc.TaskServiceSendNotificationRequest{
|
|
NodeKey: r.svc.GetNodeConfigService().GetNodeKey(),
|
|
TaskId: r.tid.Hex(),
|
|
}
|
|
_, err := client2.GetGrpcClient().TaskClient.SendNotification(context.Background(), req)
|
|
if err != nil {
|
|
r.Errorf("error sending notification: %v", err)
|
|
return
|
|
}
|
|
}
|
|
|
|
// _updateSpiderStat updates spider statistics based on task completion:
|
|
// - Updates last task ID
|
|
// - Increments task counts
|
|
// - Updates duration metrics
|
|
func (r *Runner) _updateSpiderStat(status string) {
|
|
// task stat
|
|
ts, err := client.NewModelService[models.TaskStat]().GetById(r.tid)
|
|
if err != nil {
|
|
r.Errorf("error getting task stat: %v", err)
|
|
return
|
|
}
|
|
|
|
// update
|
|
var update bson.M
|
|
switch status {
|
|
case constants.TaskStatusPending, constants.TaskStatusRunning:
|
|
update = bson.M{
|
|
"$set": bson.M{
|
|
"last_task_id": r.tid, // last task id
|
|
},
|
|
"$inc": bson.M{
|
|
"tasks": 1, // task count
|
|
"wait_duration": ts.WaitDuration, // wait duration
|
|
},
|
|
}
|
|
case constants.TaskStatusFinished, constants.TaskStatusError, constants.TaskStatusCancelled:
|
|
update = bson.M{
|
|
"$set": bson.M{
|
|
"last_task_id": r.tid, // last task id
|
|
},
|
|
"$inc": bson.M{
|
|
"results": ts.ResultCount, // results
|
|
"runtime_duration": ts.RuntimeDuration / 1000, // runtime duration
|
|
"total_duration": ts.TotalDuration / 1000, // total duration
|
|
},
|
|
}
|
|
default:
|
|
r.Errorf("Invalid task status: %s", status)
|
|
return
|
|
}
|
|
|
|
// perform update
|
|
if utils.IsMaster() {
|
|
err = service.NewModelService[models.SpiderStat]().UpdateById(r.s.Id, update)
|
|
if err != nil {
|
|
r.Errorf("error updating spider stat: %v", err)
|
|
return
|
|
}
|
|
} else {
|
|
err = client.NewModelService[models.SpiderStat]().UpdateById(r.s.Id, update)
|
|
if err != nil {
|
|
r.Errorf("error updating spider stat: %v", err)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// configureCwd sets the working directory for the task based on the spider's configuration
|
|
func (r *Runner) configureCwd() {
|
|
workspacePath := utils.GetWorkspace()
|
|
if r.s.GitId.IsZero() {
|
|
// not git
|
|
r.cwd = filepath.Join(workspacePath, r.s.Id.Hex())
|
|
} else {
|
|
// git
|
|
r.cwd = filepath.Join(workspacePath, r.s.GitId.Hex(), r.s.GitRootPath)
|
|
}
|
|
}
|
|
|
|
// handleIPC processes incoming IPC messages from the child process
|
|
// Messages are converted to JSON and written to the child process's stdin
|
|
func (r *Runner) handleIPC() {
|
|
for msg := range r.ipcChan {
|
|
// Convert message to JSON
|
|
jsonData, err := json.Marshal(msg)
|
|
if err != nil {
|
|
r.Errorf("error marshaling IPC message: %v", err)
|
|
continue
|
|
}
|
|
|
|
// Write to child process's stdin
|
|
_, err = fmt.Fprintln(r.stdinPipe, string(jsonData))
|
|
if err != nil {
|
|
r.Errorf("error writing to child process: %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// SetIPCHandler sets the handler for incoming IPC messages
|
|
func (r *Runner) SetIPCHandler(handler func(entity.IPCMessage)) {
|
|
r.ipcHandler = handler
|
|
}
|
|
|
|
// startIPCReader continuously reads IPC messages from the child process's stdout
|
|
// Messages are parsed and either handled by the IPC handler or written to logs
|
|
func (r *Runner) startIPCReader() {
|
|
r.wg.Add(2) // Add 2 to wait group for both stdout and stderr readers
|
|
|
|
// Start stdout reader
|
|
go func() {
|
|
defer r.wg.Done()
|
|
r.readOutput(r.readerStdout, true) // true for stdout
|
|
}()
|
|
|
|
// Start stderr reader
|
|
go func() {
|
|
defer r.wg.Done()
|
|
r.readOutput(r.readerStderr, false) // false for stderr
|
|
}()
|
|
}
|
|
|
|
func (r *Runner) readOutput(reader *bufio.Reader, isStdout bool) {
|
|
scanner := bufio.NewScanner(reader)
|
|
for {
|
|
select {
|
|
case <-r.ctx.Done():
|
|
// Context cancelled, stop reading
|
|
return
|
|
default:
|
|
// Scan the next line
|
|
if !scanner.Scan() {
|
|
return
|
|
}
|
|
|
|
// Get the line
|
|
line := scanner.Text()
|
|
|
|
// Trim the line
|
|
line = strings.TrimRight(line, "\n\r")
|
|
|
|
// For stdout, try to parse as IPC message first
|
|
if isStdout {
|
|
var ipcMsg entity.IPCMessage
|
|
if err := json.Unmarshal([]byte(line), &ipcMsg); err == nil && ipcMsg.IPC {
|
|
if r.ipcHandler != nil {
|
|
r.ipcHandler(ipcMsg)
|
|
} else {
|
|
// Default handler (insert data)
|
|
if ipcMsg.Type == "" || ipcMsg.Type == constants.IPCMessageData {
|
|
r.handleIPCInsertDataMessage(ipcMsg)
|
|
} else {
|
|
r.Warnf("no IPC handler set for message: %v", ipcMsg)
|
|
}
|
|
}
|
|
continue
|
|
}
|
|
}
|
|
|
|
// If not an IPC message or from stderr, treat as log
|
|
r.writeLogLines([]string{line})
|
|
}
|
|
}
|
|
}
|
|
|
|
// handleIPCInsertDataMessage converts the IPC message payload to JSON and sends it to the master node
|
|
func (r *Runner) handleIPCInsertDataMessage(ipcMsg entity.IPCMessage) {
|
|
if ipcMsg.Payload == nil {
|
|
r.Errorf("empty payload in IPC message")
|
|
return
|
|
}
|
|
|
|
// Convert payload to data to be inserted
|
|
var records []map[string]interface{}
|
|
|
|
switch payload := ipcMsg.Payload.(type) {
|
|
case []interface{}: // Handle array of objects
|
|
records = make([]map[string]interface{}, 0, len(payload))
|
|
for i, item := range payload {
|
|
if itemMap, ok := item.(map[string]interface{}); ok {
|
|
records = append(records, itemMap)
|
|
} else {
|
|
r.Errorf("invalid record at index %d: %v", i, item)
|
|
continue
|
|
}
|
|
}
|
|
case []map[string]interface{}: // Handle direct array of maps
|
|
records = payload
|
|
case map[string]interface{}: // Handle single object
|
|
records = []map[string]interface{}{payload}
|
|
case interface{}: // Handle generic interface
|
|
if itemMap, ok := payload.(map[string]interface{}); ok {
|
|
records = []map[string]interface{}{itemMap}
|
|
} else {
|
|
r.Errorf("invalid payload type: %T", payload)
|
|
return
|
|
}
|
|
default:
|
|
r.Errorf("unsupported payload type: %T, value: %v", payload, ipcMsg.Payload)
|
|
return
|
|
}
|
|
|
|
// Validate records
|
|
if len(records) == 0 {
|
|
r.Warnf("no valid records to insert")
|
|
return
|
|
}
|
|
|
|
// Marshal data with error handling
|
|
data, err := json.Marshal(records)
|
|
if err != nil {
|
|
r.Errorf("error marshaling records: %v", err)
|
|
return
|
|
}
|
|
|
|
// Validate connection
|
|
if r.conn == nil {
|
|
r.Errorf("gRPC connection not initialized")
|
|
return
|
|
}
|
|
|
|
// Send IPC message to master with context and timeout
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
// Create gRPC message
|
|
grpcMsg := &grpc.TaskServiceConnectRequest{
|
|
Code: grpc.TaskServiceConnectCode_INSERT_DATA,
|
|
TaskId: r.tid.Hex(),
|
|
Data: data,
|
|
}
|
|
|
|
// Use context for sending
|
|
select {
|
|
case <-ctx.Done():
|
|
r.Errorf("timeout sending IPC message")
|
|
return
|
|
default:
|
|
if err := r.conn.Send(grpcMsg); err != nil {
|
|
r.Errorf("error sending IPC message: %v", err)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (r *Runner) installDependenciesIfAvailable() (err error) {
|
|
if !utils.IsPro() {
|
|
return nil
|
|
}
|
|
|
|
// Get dependency installer service
|
|
depSvc := dependency.GetDependencyInstallerRegistryService()
|
|
if depSvc == nil {
|
|
r.Warnf("dependency installer service not available")
|
|
return nil
|
|
}
|
|
|
|
// Check if auto install is enabled
|
|
if !depSvc.IsAutoInstallEnabled() {
|
|
r.Debug("auto dependency installation is disabled")
|
|
return nil
|
|
}
|
|
|
|
// Get install command
|
|
cmd, err := depSvc.GetInstallDependencyRequirementsCmdBySpiderId(r.s.Id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if cmd == nil {
|
|
return nil
|
|
}
|
|
|
|
// Set up pipes for stdout and stderr
|
|
stdout, err := cmd.StdoutPipe()
|
|
if err != nil {
|
|
r.Errorf("error creating stdout pipe for dependency installation: %v", err)
|
|
return err
|
|
}
|
|
stderr, err := cmd.StderrPipe()
|
|
if err != nil {
|
|
r.Errorf("error creating stderr pipe for dependency installation: %v", err)
|
|
return err
|
|
}
|
|
|
|
// Start the command
|
|
r.Infof("installing dependencies for spider: %s", r.s.Id.Hex())
|
|
r.Infof("command for dependencies installation: %s", cmd.String())
|
|
if err := cmd.Start(); err != nil {
|
|
r.Errorf("error starting dependency installation command: %v", err)
|
|
return err
|
|
}
|
|
|
|
// Create wait group for log readers
|
|
var wg sync.WaitGroup
|
|
wg.Add(2)
|
|
|
|
// Read stdout
|
|
go func() {
|
|
defer wg.Done()
|
|
scanner := bufio.NewScanner(stdout)
|
|
for scanner.Scan() {
|
|
line := scanner.Text()
|
|
r.Info(line)
|
|
}
|
|
}()
|
|
|
|
// Read stderr
|
|
go func() {
|
|
defer wg.Done()
|
|
scanner := bufio.NewScanner(stderr)
|
|
for scanner.Scan() {
|
|
line := scanner.Text()
|
|
r.Error(line)
|
|
}
|
|
}()
|
|
|
|
// Wait for command to complete
|
|
if err := cmd.Wait(); err != nil {
|
|
r.Errorf("dependency installation failed: %v", err)
|
|
return err
|
|
}
|
|
|
|
// Wait for log readers to finish
|
|
wg.Wait()
|
|
|
|
return nil
|
|
}
|
|
|
|
// logInternally sends internal runner logs to the same logging system as the task
|
|
func (r *Runner) logInternally(level string, message string) {
|
|
// Format the internal log with a prefix
|
|
timestamp := time.Now().Local().Format("2006-01-02 15:04:05")
|
|
|
|
// Pad level
|
|
level = fmt.Sprintf("%-5s", level)
|
|
|
|
// Format the log message
|
|
internalLog := fmt.Sprintf("%s [%s] [%s] %s", level, timestamp, "Crawlab", message)
|
|
|
|
// Send to the same log system as task logs
|
|
if r.conn != nil {
|
|
r.writeLogLines([]string{internalLog})
|
|
}
|
|
|
|
// Also log through the standard logger
|
|
switch level {
|
|
case "ERROR":
|
|
r.Logger.Error(message)
|
|
case "WARN":
|
|
r.Logger.Warn(message)
|
|
case "INFO":
|
|
r.Logger.Info(message)
|
|
case "DEBUG":
|
|
r.Logger.Debug(message)
|
|
}
|
|
}
|
|
|
|
func (r *Runner) Error(message string) {
|
|
msg := fmt.Sprintf(message)
|
|
r.logInternally("ERROR", msg)
|
|
}
|
|
|
|
func (r *Runner) Warn(message string) {
|
|
msg := fmt.Sprintf(message)
|
|
r.logInternally("WARN", msg)
|
|
}
|
|
|
|
func (r *Runner) Info(message string) {
|
|
msg := fmt.Sprintf(message)
|
|
r.logInternally("INFO", msg)
|
|
}
|
|
|
|
func (r *Runner) Debug(message string) {
|
|
msg := fmt.Sprintf(message)
|
|
r.logInternally("DEBUG", msg)
|
|
}
|
|
|
|
func (r *Runner) Errorf(format string, args ...interface{}) {
|
|
msg := fmt.Sprintf(format, args...)
|
|
r.logInternally("ERROR", msg)
|
|
}
|
|
|
|
func (r *Runner) Warnf(format string, args ...interface{}) {
|
|
msg := fmt.Sprintf(format, args...)
|
|
r.logInternally("WARN", msg)
|
|
}
|
|
|
|
func (r *Runner) Infof(format string, args ...interface{}) {
|
|
msg := fmt.Sprintf(format, args...)
|
|
r.logInternally("INFO", msg)
|
|
}
|
|
|
|
func (r *Runner) Debugf(format string, args ...interface{}) {
|
|
msg := fmt.Sprintf(format, args...)
|
|
r.logInternally("DEBUG", msg)
|
|
}
|