diff --git a/Dockerfile b/Dockerfile index 893cf6fe..0809a0ba 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,7 +14,8 @@ ADD ./frontend /app WORKDIR /app # install frontend -RUN npm install -g yarn && yarn install +RUN npm config set unsafe-perm true +RUN npm install -g yarn && yarn install --registry=https://registry.npm.taobao.org RUN npm run build:prod @@ -56,4 +57,4 @@ EXPOSE 8080 EXPOSE 8000 # start backend -CMD ["/bin/sh", "/app/docker_init.sh"] \ No newline at end of file +CMD ["/bin/sh", "/app/docker_init.sh"] diff --git a/backend/database/pubsub.go b/backend/database/pubsub.go index 0eb8639b..7f647cda 100644 --- a/backend/database/pubsub.go +++ b/backend/database/pubsub.go @@ -2,6 +2,7 @@ package database import ( "context" + "crawlab/utils" "fmt" "github.com/apex/log" "github.com/gomodule/redigo/redis" @@ -26,7 +27,7 @@ func (r *Redis) subscribe(ctx context.Context, consume ConsumeFunc, channel ...s tick := time.NewTicker(time.Second * 3) defer tick.Stop() go func() { - defer func() { _ = psc.Close() }() + defer utils.Close(psc) for { switch msg := psc.Receive().(type) { case error: @@ -87,7 +88,7 @@ func (r *Redis) Subscribe(ctx context.Context, consume ConsumeFunc, channel ...s } func (r *Redis) Publish(channel, message string) (n int, err error) { conn := r.pool.Get() - defer func() { _ = conn.Close() }() + defer utils.Close(conn) n, err = redis.Int(conn.Do("PUBLISH", channel, message)) if err != nil { return 0, errors2.Wrapf(err, "redis publish %s %s", channel, message) diff --git a/backend/database/redis.go b/backend/database/redis.go index ede229a2..348a74bb 100644 --- a/backend/database/redis.go +++ b/backend/database/redis.go @@ -1,6 +1,10 @@ package database import ( + "context" + "crawlab/entity" + "crawlab/utils" + "github.com/apex/log" "github.com/gomodule/redigo/redis" "github.com/spf13/viper" "runtime/debug" @@ -18,7 +22,7 @@ func NewRedisClient() *Redis { } func (r *Redis) RPush(collection string, value interface{}) error { c := r.pool.Get() - defer c.Close() + defer utils.Close(c) if _, err := c.Do("RPUSH", collection, value); err != nil { debug.PrintStack() @@ -29,7 +33,7 @@ func (r *Redis) RPush(collection string, value interface{}) error { func (r *Redis) LPop(collection string) (string, error) { c := r.pool.Get() - defer c.Close() + defer utils.Close(c) value, err2 := redis.String(c.Do("LPOP", collection)) if err2 != nil { @@ -40,7 +44,7 @@ func (r *Redis) LPop(collection string) (string, error) { func (r *Redis) HSet(collection string, key string, value string) error { c := r.pool.Get() - defer c.Close() + defer utils.Close(c) if _, err := c.Do("HSET", collection, key, value); err != nil { debug.PrintStack() @@ -51,7 +55,7 @@ func (r *Redis) HSet(collection string, key string, value string) error { func (r *Redis) HGet(collection string, key string) (string, error) { c := r.pool.Get() - defer c.Close() + defer utils.Close(c) value, err2 := redis.String(c.Do("HGET", collection, key)) if err2 != nil { @@ -62,7 +66,7 @@ func (r *Redis) HGet(collection string, key string) (string, error) { func (r *Redis) HDel(collection string, key string) error { c := r.pool.Get() - defer c.Close() + defer utils.Close(c) if _, err := c.Do("HDEL", collection, key); err != nil { return err @@ -72,7 +76,7 @@ func (r *Redis) HDel(collection string, key string) error { func (r *Redis) HKeys(collection string) ([]string, error) { c := r.pool.Get() - defer c.Close() + defer utils.Close(c) value, err2 := redis.Strings(c.Do("HKeys", collection)) if err2 != nil { @@ -120,3 +124,22 @@ func InitRedis() error { RedisClient = NewRedisClient() return nil } + +func Pub(channel string, msg entity.NodeMessage) error { + if _, err := RedisClient.Publish(channel, utils.GetJson(msg)); err != nil { + log.Errorf("publish redis error: %s", err.Error()) + debug.PrintStack() + return err + } + return nil +} + +func Sub(channel string, consume ConsumeFunc) error { + ctx := context.Background() + if err := RedisClient.Subscribe(ctx, consume, channel); err != nil { + log.Errorf("subscribe redis error: %s", err.Error()) + debug.PrintStack() + return err + } + return nil +} diff --git a/backend/errors/errors.go b/backend/errors/errors.go index f191cd3e..d896e4d4 100644 --- a/backend/errors/errors.go +++ b/backend/errors/errors.go @@ -24,7 +24,6 @@ func (O OPError) Error() string { switch O.Scope { case ScopeSystem: scope = "system" - break case ScopeBusiness: scope = "business" } diff --git a/backend/lib/cron/cron_test.go b/backend/lib/cron/cron_test.go index 36f06bf7..35266df1 100644 --- a/backend/lib/cron/cron_test.go +++ b/backend/lib/cron/cron_test.go @@ -44,17 +44,14 @@ func TestFuncPanicRecovery(t *testing.T) { WithChain(Recover(newBufLogger(&buf)))) cron.Start() defer cron.Stop() - cron.AddFunc("* * * * * ?", func() { + _, _ = cron.AddFunc("* * * * * ?", func() { panic("YOLO") }) - - select { - case <-time.After(OneSecond): - if !strings.Contains(buf.String(), "YOLO") { - t.Error("expected a panic to be logged, got none") - } - return + <-time.After(OneSecond) + if !strings.Contains(buf.String(), "YOLO") { + t.Error("expected a panic to be logged, got none") } + } type DummyJob struct{} @@ -71,7 +68,7 @@ func TestJobPanicRecovery(t *testing.T) { WithChain(Recover(newBufLogger(&buf)))) cron.Start() defer cron.Stop() - cron.AddJob("* * * * * ?", job) + _, _ = cron.AddJob("* * * * * ?", job) select { case <-time.After(OneSecond): @@ -102,7 +99,7 @@ func TestStopCausesJobsToNotRun(t *testing.T) { cron := newWithSeconds() cron.Start() cron.Stop() - cron.AddFunc("* * * * * ?", func() { wg.Done() }) + _, _ = cron.AddFunc("* * * * * ?", func() { wg.Done() }) select { case <-time.After(OneSecond): @@ -118,7 +115,7 @@ func TestAddBeforeRunning(t *testing.T) { wg.Add(1) cron := newWithSeconds() - cron.AddFunc("* * * * * ?", func() { wg.Done() }) + _, _ = cron.AddFunc("* * * * * ?", func() { wg.Done() }) cron.Start() defer cron.Stop() @@ -138,7 +135,7 @@ func TestAddWhileRunning(t *testing.T) { cron := newWithSeconds() cron.Start() defer cron.Stop() - cron.AddFunc("* * * * * ?", func() { wg.Done() }) + _, _ = cron.AddFunc("* * * * * ?", func() { wg.Done() }) select { case <-time.After(OneSecond): @@ -154,7 +151,7 @@ func TestAddWhileRunningWithDelay(t *testing.T) { defer cron.Stop() time.Sleep(5 * time.Second) var calls int64 - cron.AddFunc("* * * * * *", func() { atomic.AddInt64(&calls, 1) }) + _, _ = cron.AddFunc("* * * * * *", func() { atomic.AddInt64(&calls, 1) }) <-time.After(OneSecond) if atomic.LoadInt64(&calls) != 1 { @@ -205,7 +202,7 @@ func TestSnapshotEntries(t *testing.T) { wg.Add(1) cron := New() - cron.AddFunc("@every 2s", func() { wg.Done() }) + _, _ = cron.AddFunc("@every 2s", func() { wg.Done() }) cron.Start() defer cron.Stop() @@ -232,12 +229,12 @@ func TestMultipleEntries(t *testing.T) { wg.Add(2) cron := newWithSeconds() - cron.AddFunc("0 0 0 1 1 ?", func() {}) - cron.AddFunc("* * * * * ?", func() { wg.Done() }) + _, _ = cron.AddFunc("0 0 0 1 1 ?", func() {}) + _, _ = cron.AddFunc("* * * * * ?", func() { wg.Done() }) id1, _ := cron.AddFunc("* * * * * ?", func() { t.Fatal() }) id2, _ := cron.AddFunc("* * * * * ?", func() { t.Fatal() }) - cron.AddFunc("0 0 0 31 12 ?", func() {}) - cron.AddFunc("* * * * * ?", func() { wg.Done() }) + _, _ = cron.AddFunc("0 0 0 31 12 ?", func() {}) + _, _ = cron.AddFunc("* * * * * ?", func() { wg.Done() }) cron.Remove(id1) cron.Start() @@ -257,9 +254,9 @@ func TestRunningJobTwice(t *testing.T) { wg.Add(2) cron := newWithSeconds() - cron.AddFunc("0 0 0 1 1 ?", func() {}) - cron.AddFunc("0 0 0 31 12 ?", func() {}) - cron.AddFunc("* * * * * ?", func() { wg.Done() }) + _, _ = cron.AddFunc("0 0 0 1 1 ?", func() {}) + _, _ = cron.AddFunc("0 0 0 31 12 ?", func() {}) + _, _ = cron.AddFunc("* * * * * ?", func() { wg.Done() }) cron.Start() defer cron.Stop() @@ -276,9 +273,9 @@ func TestRunningMultipleSchedules(t *testing.T) { wg.Add(2) cron := newWithSeconds() - cron.AddFunc("0 0 0 1 1 ?", func() {}) - cron.AddFunc("0 0 0 31 12 ?", func() {}) - cron.AddFunc("* * * * * ?", func() { wg.Done() }) + _, _ = cron.AddFunc("0 0 0 1 1 ?", func() {}) + _, _ = cron.AddFunc("0 0 0 31 12 ?", func() {}) + _, _ = cron.AddFunc("* * * * * ?", func() { wg.Done() }) cron.Schedule(Every(time.Minute), FuncJob(func() {})) cron.Schedule(Every(time.Second), FuncJob(func() { wg.Done() })) cron.Schedule(Every(time.Hour), FuncJob(func() {})) @@ -310,7 +307,7 @@ func TestLocalTimezone(t *testing.T) { now.Second()+1, now.Second()+2, now.Minute(), now.Hour(), now.Day(), now.Month()) cron := newWithSeconds() - cron.AddFunc(spec, func() { wg.Done() }) + _, _ = cron.AddFunc(spec, func() { wg.Done() }) cron.Start() defer cron.Stop() @@ -344,7 +341,7 @@ func TestNonLocalTimezone(t *testing.T) { now.Second()+1, now.Second()+2, now.Minute(), now.Hour(), now.Day(), now.Month()) cron := New(WithLocation(loc), WithParser(secondParser)) - cron.AddFunc(spec, func() { wg.Done() }) + _, _ = cron.AddFunc(spec, func() { wg.Done() }) cron.Start() defer cron.Stop() @@ -386,7 +383,7 @@ func TestBlockingRun(t *testing.T) { wg.Add(1) cron := newWithSeconds() - cron.AddFunc("* * * * * ?", func() { wg.Done() }) + _, _ = cron.AddFunc("* * * * * ?", func() { wg.Done() }) var unblockChan = make(chan struct{}) @@ -410,7 +407,7 @@ func TestStartNoop(t *testing.T) { var tickChan = make(chan struct{}, 2) cron := newWithSeconds() - cron.AddFunc("* * * * * ?", func() { + _, _ = cron.AddFunc("* * * * * ?", func() { tickChan <- struct{}{} }) @@ -438,10 +435,10 @@ func TestJob(t *testing.T) { wg.Add(1) cron := newWithSeconds() - cron.AddJob("0 0 0 30 Feb ?", testJob{wg, "job0"}) - cron.AddJob("0 0 0 1 1 ?", testJob{wg, "job1"}) + _, _ = cron.AddJob("0 0 0 30 Feb ?", testJob{wg, "job0"}) + _, _ = cron.AddJob("0 0 0 1 1 ?", testJob{wg, "job1"}) job2, _ := cron.AddJob("* * * * * ?", testJob{wg, "job2"}) - cron.AddJob("1 0 0 1 1 ?", testJob{wg, "job3"}) + _, _ = cron.AddJob("1 0 0 1 1 ?", testJob{wg, "job3"}) cron.Schedule(Every(5*time.Second+5*time.Nanosecond), testJob{wg, "job4"}) job5 := cron.Schedule(Every(5*time.Minute), testJob{wg, "job5"}) @@ -465,7 +462,7 @@ func TestJob(t *testing.T) { // Ensure the entries are in the right order. expecteds := []string{"job2", "job4", "job5", "job1", "job3", "job0"} - var actuals []string + var actuals = make([]string, 0, len(cron.Entries())) for _, entry := range cron.Entries() { actuals = append(actuals, entry.Job.(testJob).name) } @@ -545,7 +542,7 @@ func (*ZeroSchedule) Next(time.Time) time.Time { func TestJobWithZeroTimeDoesNotRun(t *testing.T) { cron := newWithSeconds() var calls int64 - cron.AddFunc("* * * * * *", func() { atomic.AddInt64(&calls, 1) }) + _, _ = cron.AddFunc("* * * * * *", func() { atomic.AddInt64(&calls, 1) }) cron.Schedule(new(ZeroSchedule), FuncJob(func() { t.Error("expected zero task will not run") })) cron.Start() defer cron.Stop() @@ -582,11 +579,11 @@ func TestStopAndWait(t *testing.T) { t.Run("a couple fast jobs added, still returns immediately", func(t *testing.T) { cron := newWithSeconds() - cron.AddFunc("* * * * * *", func() {}) + _, _ = cron.AddFunc("* * * * * *", func() {}) cron.Start() - cron.AddFunc("* * * * * *", func() {}) - cron.AddFunc("* * * * * *", func() {}) - cron.AddFunc("* * * * * *", func() {}) + _, _ = cron.AddFunc("* * * * * *", func() {}) + _, _ = cron.AddFunc("* * * * * *", func() {}) + _, _ = cron.AddFunc("* * * * * *", func() {}) time.Sleep(time.Second) ctx := cron.Stop() select { @@ -598,10 +595,10 @@ func TestStopAndWait(t *testing.T) { t.Run("a couple fast jobs and a slow job added, waits for slow job", func(t *testing.T) { cron := newWithSeconds() - cron.AddFunc("* * * * * *", func() {}) + _, _ = cron.AddFunc("* * * * * *", func() {}) cron.Start() - cron.AddFunc("* * * * * *", func() { time.Sleep(2 * time.Second) }) - cron.AddFunc("* * * * * *", func() {}) + _, _ = cron.AddFunc("* * * * * *", func() { time.Sleep(2 * time.Second) }) + _, _ = cron.AddFunc("* * * * * *", func() {}) time.Sleep(time.Second) ctx := cron.Stop() @@ -625,10 +622,10 @@ func TestStopAndWait(t *testing.T) { t.Run("repeated calls to stop, waiting for completion and after", func(t *testing.T) { cron := newWithSeconds() - cron.AddFunc("* * * * * *", func() {}) - cron.AddFunc("* * * * * *", func() { time.Sleep(2 * time.Second) }) + _, _ = cron.AddFunc("* * * * * *", func() {}) + _, _ = cron.AddFunc("* * * * * *", func() { time.Sleep(2 * time.Second) }) cron.Start() - cron.AddFunc("* * * * * *", func() {}) + _, _ = cron.AddFunc("* * * * * *", func() {}) time.Sleep(time.Second) ctx := cron.Stop() ctx2 := cron.Stop() diff --git a/backend/lib/cron/logger.go b/backend/lib/cron/logger.go index b4efcc05..46314da8 100644 --- a/backend/lib/cron/logger.go +++ b/backend/lib/cron/logger.go @@ -9,10 +9,10 @@ import ( ) // DefaultLogger is used by Cron if none is specified. -var DefaultLogger Logger = PrintfLogger(log.New(os.Stdout, "cron: ", log.LstdFlags)) +var DefaultLogger = PrintfLogger(log.New(os.Stdout, "cron: ", log.LstdFlags)) // DiscardLogger can be used by callers to discard all log messages. -var DiscardLogger Logger = PrintfLogger(log.New(ioutil.Discard, "", 0)) +var DiscardLogger = PrintfLogger(log.New(ioutil.Discard, "", 0)) // Logger is the interface used in this package for logging, so that any backend // can be plugged in. It is a subset of the github.com/go-logr/logr interface. diff --git a/backend/lib/cron/option_test.go b/backend/lib/cron/option_test.go index 8aef1682..57dbaa4b 100644 --- a/backend/lib/cron/option_test.go +++ b/backend/lib/cron/option_test.go @@ -30,7 +30,7 @@ func TestWithVerboseLogger(t *testing.T) { t.Error("expected provided logger") } - c.AddFunc("@every 1s", func() {}) + _, _ = c.AddFunc("@every 1s", func() {}) c.Start() time.Sleep(OneSecond) c.Stop() diff --git a/backend/lib/cron/parser_test.go b/backend/lib/cron/parser_test.go index 41c8c520..f95a54bb 100644 --- a/backend/lib/cron/parser_test.go +++ b/backend/lib/cron/parser_test.go @@ -304,6 +304,7 @@ func TestNormalizeFields_Errors(t *testing.T) { actual, err := normalizeFields(test.input, test.options) if err == nil { t.Errorf("expected an error, got none. results: %v", actual) + return } if !strings.Contains(err.Error(), test.err) { t.Errorf("expected error %q, got %q", test.err, err.Error()) diff --git a/backend/lib/cron/spec.go b/backend/lib/cron/spec.go index fa1e241e..9821a6a2 100644 --- a/backend/lib/cron/spec.go +++ b/backend/lib/cron/spec.go @@ -178,8 +178,8 @@ WRAP: // restrictions are satisfied by the given time. func dayMatches(s *SpecSchedule, t time.Time) bool { var ( - domMatch bool = 1< 0 - dowMatch bool = 1< 0 + domMatch = 1< 0 + dowMatch = 1< 0 ) if s.Dom&starBit > 0 || s.Dow&starBit > 0 { return domMatch && dowMatch diff --git a/backend/main.go b/backend/main.go index 5d95dbaf..2c92ab37 100644 --- a/backend/main.go +++ b/backend/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "crawlab/config" "crawlab/database" "crawlab/lib/validate_bridge" @@ -12,7 +13,13 @@ import ( "github.com/gin-gonic/gin" "github.com/gin-gonic/gin/binding" "github.com/spf13/viper" + "net" + "net/http" + "os" + "os/signal" "runtime/debug" + "syscall" + "time" ) func main() { @@ -166,8 +173,26 @@ func main() { // 运行服务器 host := viper.GetString("server.host") port := viper.GetString("server.port") - if err := app.Run(host + ":" + port); err != nil { + address := net.JoinHostPort(host, port) + srv := &http.Server{ + Handler: app, + Addr: address, + } + go func() { + if err := srv.ListenAndServe(); err != nil { + if err != http.ErrServerClosed { + log.Error("run server error:" + err.Error()) + } else { + log.Info("server graceful down") + } + } + }() + quit := make(chan os.Signal, 1) + signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) + <-quit + ctx2, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + if err := srv.Shutdown(ctx2); err != nil { log.Error("run server error:" + err.Error()) - panic(err) } } diff --git a/backend/mock/node.go b/backend/mock/node.go index 789d0a9a..6c77c32e 100644 --- a/backend/mock/node.go +++ b/backend/mock/node.go @@ -188,7 +188,7 @@ func DeleteNode(c *gin.Context) { id := bson.ObjectId("5d429e6c19f7abede924fee2") for _, node := range NodeList { - if node.Id == bson.ObjectId(id) { + if node.Id == id { log.Infof("Delete a node") } } diff --git a/backend/model/log.go b/backend/model/log.go index ae6973b1..abb77ed9 100644 --- a/backend/model/log.go +++ b/backend/model/log.go @@ -1,6 +1,7 @@ package model import ( + "crawlab/utils" "github.com/apex/log" "os" "runtime/debug" @@ -21,9 +22,9 @@ func GetLocalLog(logPath string) (fileBytes []byte, err error) { debug.PrintStack() return nil, err } - defer f.Close() + defer utils.Close(f) - const bufLen = 2 * 1024 * 1024 + const bufLen = 1 * 1024 * 1024 logBuf := make([]byte, bufLen) off := int64(0) diff --git a/backend/model/node.go b/backend/model/node.go index 7af93dbe..1c63fc3e 100644 --- a/backend/model/node.go +++ b/backend/model/node.go @@ -4,6 +4,7 @@ import ( "crawlab/constants" "crawlab/database" "crawlab/services/register" + "errors" "github.com/apex/log" "github.com/globalsign/mgo" "github.com/globalsign/mgo/bson" @@ -33,7 +34,6 @@ type Node struct { const ( Yes = "Y" - No = "N" ) // 当前节点是否为主节点 @@ -157,15 +157,20 @@ func GetNodeList(filter interface{}) ([]Node, error) { } func GetNode(id bson.ObjectId) (Node, error) { + var node Node + + if id.Hex() == "" { + log.Infof("id is empty") + debug.PrintStack() + return node, errors.New("id is empty") + } + s, c := database.GetCol("nodes") defer s.Close() - var node Node if err := c.FindId(id).One(&node); err != nil { - if err != mgo.ErrNotFound { - log.Errorf(err.Error()) - debug.PrintStack() - } + log.Errorf(err.Error()) + debug.PrintStack() return node, err } return node, nil diff --git a/backend/model/schedule.go b/backend/model/schedule.go index 6415e22b..36799ac3 100644 --- a/backend/model/schedule.go +++ b/backend/model/schedule.go @@ -16,6 +16,7 @@ type Schedule struct { Description string `json:"description" bson:"description"` SpiderId bson.ObjectId `json:"spider_id" bson:"spider_id"` NodeId bson.ObjectId `json:"node_id" bson:"node_id"` + NodeKey string `json:"node_key" bson:"node_key"` Cron string `json:"cron" bson:"cron"` EntryId cron.EntryID `json:"entry_id" bson:"entry_id"` Param string `json:"param" bson:"param"` @@ -38,6 +39,33 @@ func (sch *Schedule) Save() error { return nil } +func (sch *Schedule) Delete() error { + s, c := database.GetCol("schedules") + defer s.Close() + return c.RemoveId(sch.Id) +} + +func (sch *Schedule) SyncNodeIdAndSpiderId(node Node, spider Spider) { + sch.syncNodeId(node) + sch.syncSpiderId(spider) +} + +func (sch *Schedule) syncNodeId(node Node) { + if node.Id.Hex() == sch.NodeId.Hex() { + return + } + sch.NodeId = node.Id + _ = sch.Save() +} + +func (sch *Schedule) syncSpiderId(spider Spider) { + if spider.Id.Hex() == sch.SpiderId.Hex() { + return + } + sch.SpiderId = spider.Id + _ = sch.Save() +} + func GetScheduleList(filter interface{}) ([]Schedule, error) { s, c := database.GetCol("schedules") defer s.Close() @@ -47,11 +75,12 @@ func GetScheduleList(filter interface{}) ([]Schedule, error) { return schedules, err } - for i, schedule := range schedules { + var schs []Schedule + for _, schedule := range schedules { // 获取节点名称 if schedule.NodeId == bson.ObjectIdHex(constants.ObjectIdNull) { // 选择所有节点 - schedules[i].NodeName = "All Nodes" + schedule.NodeName = "All Nodes" } else { // 选择单一节点 node, err := GetNode(schedule.NodeId) @@ -59,7 +88,7 @@ func GetScheduleList(filter interface{}) ([]Schedule, error) { log.Errorf(err.Error()) continue } - schedules[i].NodeName = node.Name + schedule.NodeName = node.Name } // 获取爬虫名称 @@ -67,11 +96,13 @@ func GetScheduleList(filter interface{}) ([]Schedule, error) { if err != nil { log.Errorf("get spider by id: %s, error: %s", schedule.SpiderId.Hex(), err.Error()) debug.PrintStack() + _ = schedule.Delete() continue } - schedules[i].SpiderName = spider.Name + schedule.SpiderName = spider.Name + schs = append(schs, schedule) } - return schedules, nil + return schs, nil } func GetSchedule(id bson.ObjectId) (Schedule, error) { @@ -93,7 +124,12 @@ func UpdateSchedule(id bson.ObjectId, item Schedule) error { if err := c.FindId(id).One(&result); err != nil { return err } + node, err := GetNode(item.NodeId) + if err != nil { + return err + } + item.NodeKey = node.Key if err := item.Save(); err != nil { return err } @@ -104,9 +140,15 @@ func AddSchedule(item Schedule) error { s, c := database.GetCol("schedules") defer s.Close() + node, err := GetNode(item.NodeId) + if err != nil { + return err + } + item.Id = bson.NewObjectId() item.CreateTs = time.Now() item.UpdateTs = time.Now() + item.NodeKey = node.Key if err := c.Insert(&item); err != nil { debug.PrintStack() diff --git a/backend/model/spider.go b/backend/model/spider.go index 1f88acff..5c2c92e8 100644 --- a/backend/model/spider.go +++ b/backend/model/spider.go @@ -98,13 +98,19 @@ func (spider *Spider) GetLastTask() (Task, error) { return tasks[0], nil } +func (spider *Spider) Delete() error { + s, c := database.GetCol("spiders") + defer s.Close() + return c.RemoveId(spider.Id) +} + // 爬虫列表 func GetSpiderList(filter interface{}, skip int, limit int) ([]Spider, int, error) { s, c := database.GetCol("spiders") defer s.Close() // 获取爬虫列表 - spiders := []Spider{} + var spiders []Spider if err := c.Find(filter).Skip(skip).Limit(limit).Sort("+name").All(&spiders); err != nil { debug.PrintStack() return spiders, 0, err @@ -225,7 +231,7 @@ func RemoveAllSpider() error { s, c := database.GetCol("spiders") defer s.Close() - spiders := []Spider{} + var spiders []Spider err := c.Find(nil).All(&spiders) if err != nil { log.Error("get all spiders error:" + err.Error()) @@ -256,15 +262,14 @@ func GetSpiderTypes() ([]*entity.SpiderType, error) { s, c := database.GetCol("spiders") defer s.Close() - group := bson.M{ "$group": bson.M{ - "_id": "$type", + "_id": "$type", "count": bson.M{"$sum": 1}, }, } var types []*entity.SpiderType - if err := c.Pipe([]bson.M{ group}).All(&types); err != nil { + if err := c.Pipe([]bson.M{group}).All(&types); err != nil { log.Errorf("get spider types error: %s", err.Error()) debug.PrintStack() return nil, err diff --git a/backend/model/task.go b/backend/model/task.go index f568b7fe..df046ecc 100644 --- a/backend/model/task.go +++ b/backend/model/task.go @@ -4,7 +4,6 @@ import ( "crawlab/constants" "crawlab/database" "github.com/apex/log" - "github.com/globalsign/mgo" "github.com/globalsign/mgo/bson" "runtime/debug" "time" @@ -118,20 +117,16 @@ func GetTaskList(filter interface{}, skip int, limit int, sortKey string) ([]Tas for i, task := range tasks { // 获取爬虫名称 spider, err := task.GetSpider() - if err == mgo.ErrNotFound { - // do nothing - } else if err != nil { - return tasks, err + if spider.Id.Hex() == "" || err != nil { + _ = spider.Delete() } else { tasks[i].SpiderName = spider.DisplayName } // 获取节点名称 node, err := task.GetNode() - if err == mgo.ErrNotFound { - // do nothing - } else if err != nil { - return tasks, err + if node.Id.Hex() == "" || err != nil { + _ = task.Delete() } else { tasks[i].NodeName = node.Name } diff --git a/backend/routes/node.go b/backend/routes/node.go index f86c152d..7d030773 100644 --- a/backend/routes/node.go +++ b/backend/routes/node.go @@ -15,9 +15,9 @@ func GetNodeList(c *gin.Context) { return } - for i, node := range nodes { - nodes[i].IsMaster = services.IsMasterNode(node.Id.Hex()) - } + //for i, node := range nodes { + // nodes[i].IsMaster = services.IsMasterNode(node.Id.Hex()) + //} c.JSON(http.StatusOK, Response{ Status: "ok", @@ -109,11 +109,11 @@ func GetSystemInfo(c *gin.Context) { }) } -func DeleteNode(c *gin.Context) { +func DeleteNode(c *gin.Context) { id := c.Param("id") node, err := model.GetNode(bson.ObjectIdHex(id)) if err != nil { - HandleError(http.StatusInternalServerError, c ,err) + HandleError(http.StatusInternalServerError, c, err) return } err = node.Delete() diff --git a/backend/routes/schedule.go b/backend/routes/schedule.go index b447abb5..73b75323 100644 --- a/backend/routes/schedule.go +++ b/backend/routes/schedule.go @@ -1,7 +1,6 @@ package routes import ( - "crawlab/constants" "crawlab/model" "crawlab/services" "github.com/gin-gonic/gin" @@ -46,13 +45,14 @@ func PostSchedule(c *gin.Context) { HandleError(http.StatusBadRequest, c, err) return } - newItem.Id = bson.ObjectIdHex(id) - // 如果node_id为空,则置为空ObjectId - if newItem.NodeId == "" { - newItem.NodeId = bson.ObjectIdHex(constants.ObjectIdNull) + // 验证cron表达式 + if err := services.ParserCron(newItem.Cron); err != nil { + HandleError(http.StatusOK, c, err) + return } + newItem.Id = bson.ObjectIdHex(id) // 更新数据库 if err := model.UpdateSchedule(bson.ObjectIdHex(id), newItem); err != nil { HandleError(http.StatusInternalServerError, c, err) @@ -80,9 +80,10 @@ func PutSchedule(c *gin.Context) { return } - // 如果node_id为空,则置为空ObjectId - if item.NodeId == "" { - item.NodeId = bson.ObjectIdHex(constants.ObjectIdNull) + // 验证cron表达式 + if err := services.ParserCron(item.Cron); err != nil { + HandleError(http.StatusOK, c, err) + return } // 更新数据库 diff --git a/backend/services/context/context.go b/backend/services/context/context.go index ce8eb72e..e8b37f8e 100644 --- a/backend/services/context/context.go +++ b/backend/services/context/context.go @@ -66,7 +66,7 @@ func (c *Context) failed(err error, httpCode int, variables ...interface{}) { "message": "error", "error": errStr, }) - break + case validator.ValidationErrors: validatorErrors := causeError.(validator.ValidationErrors) //firstError := validatorErrors[0].(validator.FieldError) @@ -75,7 +75,6 @@ func (c *Context) failed(err error, httpCode int, variables ...interface{}) { "message": "error", "error": validatorErrors.Error(), }) - break default: fmt.Println("deprecated....") c.AbortWithStatusJSON(httpCode, gin.H{ diff --git a/backend/services/log.go b/backend/services/log.go index 485cb7dd..5b5cd7ae 100644 --- a/backend/services/log.go +++ b/backend/services/log.go @@ -15,6 +15,7 @@ import ( "os" "path/filepath" "runtime/debug" + "time" ) // 任务日志频道映射 @@ -45,8 +46,14 @@ func GetRemoteLog(task model.Task) (logStr string, err error) { // 生成频道,等待获取log ch := TaskLogChanMap.ChanBlocked(task.Id) - // 此处阻塞,等待结果 - logStr = <-ch + select { + case logStr = <-ch: + log.Infof("get remote log") + break + case <-time.After(30 * time.Second): + logStr = "get remote log timeout" + break + } return logStr, nil } @@ -67,7 +74,7 @@ func DeleteLogPeriodically() { for _, fi := range rd { if fi.IsDir() { log.Info(filepath.Join(logDir, fi.Name())) - os.RemoveAll(filepath.Join(logDir, fi.Name())) + _ = os.RemoveAll(filepath.Join(logDir, fi.Name())) log.Info("Delete Log File Success") } } diff --git a/backend/services/log_test.go b/backend/services/log_test.go index 1e9a21c7..1e3f76d4 100644 --- a/backend/services/log_test.go +++ b/backend/services/log_test.go @@ -13,10 +13,8 @@ import ( func TestDeleteLogPeriodically(t *testing.T) { Convey("Test DeleteLogPeriodically", t, func() { - if err := config.InitConfig("../conf/config.yml"); err != nil { - log.Error("init config error:" + err.Error()) - panic(err) - } + err := config.InitConfig("../conf/config.yml") + So(err, ShouldBeNil) log.Info("初始化配置成功") logDir := viper.GetString("log.path") log.Info(logDir) @@ -28,24 +26,16 @@ func TestGetLocalLog(t *testing.T) { //create a log file for test logPath := "../logs/crawlab/test.log" f, err := os.Create(logPath) - defer f.Close() + defer utils.Close(f) if err != nil { - fmt.Println(err.Error()) + fmt.Println(err) } else { _, err = f.WriteString("This is for test") + fmt.Println(err) } - Convey("Test GetLocalLog", t, func() { - Convey("Test response", func() { - logStr, err := GetLocalLog(logPath) - log.Info(utils.BytesToString(logStr)) - fmt.Println(err) - So(err, ShouldEqual, nil) - - }) - }) //delete the test log file - os.Remove(logPath) + _ = os.Remove(logPath) } diff --git a/backend/services/msg_handler/handler.go b/backend/services/msg_handler/handler.go index 848e0c5d..b8b8e231 100644 --- a/backend/services/msg_handler/handler.go +++ b/backend/services/msg_handler/handler.go @@ -3,6 +3,7 @@ package msg_handler import ( "crawlab/constants" "crawlab/entity" + "github.com/apex/log" ) type Handler interface { @@ -10,6 +11,7 @@ type Handler interface { } func GetMsgHandler(msg entity.NodeMessage) Handler { + log.Infof("received msg , type is : %s", msg.Type) if msg.Type == constants.MsgTypeGetLog || msg.Type == constants.MsgTypeRemoveLog { // 日志相关 return &Log{ diff --git a/backend/services/msg_handler/msg_log.go b/backend/services/msg_handler/msg_log.go index 37080bd6..993fad9a 100644 --- a/backend/services/msg_handler/msg_log.go +++ b/backend/services/msg_handler/msg_log.go @@ -2,6 +2,7 @@ package msg_handler import ( "crawlab/constants" + "crawlab/database" "crawlab/entity" "crawlab/model" "crawlab/utils" @@ -40,8 +41,11 @@ func (g *Log) get() error { } // 发布消息给主节点 if err := utils.Pub(constants.ChannelMasterNode, msgSd); err != nil { + log.Errorf("pub log to master node error: %s", err.Error()) + debug.PrintStack() return err } + log.Infof(msgSd.Log) return nil } diff --git a/backend/services/msg_handler/msg_system_info.go b/backend/services/msg_handler/msg_system_info.go index 6b88e2cf..9de5c74a 100644 --- a/backend/services/msg_handler/msg_system_info.go +++ b/backend/services/msg_handler/msg_system_info.go @@ -2,9 +2,9 @@ package msg_handler import ( "crawlab/constants" + "crawlab/database" "crawlab/entity" "crawlab/model" - "crawlab/utils" ) type SystemInfo struct { @@ -22,7 +22,7 @@ func (s *SystemInfo) Handle() error { NodeId: s.msg.NodeId, SysInfo: sysInfo, } - if err := utils.Pub(constants.ChannelMasterNode, msgSd); err != nil { + if err := database.Pub(constants.ChannelMasterNode, msgSd); err != nil { return err } return nil diff --git a/backend/services/node.go b/backend/services/node.go index 53af8d32..dffe5ac9 100644 --- a/backend/services/node.go +++ b/backend/services/node.go @@ -88,6 +88,8 @@ func UpdateNodeStatus() { handleNodeInfo(key, data) } + // 重新获取list + list, _ = database.RedisClient.HKeys("nodes") // 重置不在redis的key为offline model.ResetNodeStatusToOffline(list) } @@ -100,7 +102,7 @@ func handleNodeInfo(key string, data Data) { // 同个key可能因为并发,被注册多次 var nodes []model.Node _ = c.Find(bson.M{"key": key}).All(&nodes) - if nodes != nil && len(nodes) > 1 { + if len(nodes) > 1 { for _, node := range nodes { _ = c.RemoveId(node.Id) } @@ -110,13 +112,15 @@ func handleNodeInfo(key string, data Data) { if err := c.Find(bson.M{"key": key}).One(&node); err != nil { // 数据库不存在该节点 node = model.Node{ - Key: key, - Name: data.Ip, - Ip: data.Ip, - Port: "8000", - Mac: data.Mac, - Status: constants.StatusOnline, - IsMaster: data.Master, + Key: key, + Name: data.Ip, + Ip: data.Ip, + Port: "8000", + Mac: data.Mac, + Status: constants.StatusOnline, + IsMaster: data.Master, + UpdateTs: time.Now(), + UpdateTsUnix: time.Now().Unix(), } if err := node.Add(); err != nil { log.Errorf(err.Error()) @@ -125,6 +129,8 @@ func handleNodeInfo(key string, data Data) { } else { // 数据库存在该节点 node.Status = constants.StatusOnline + node.UpdateTs = time.Now() + node.UpdateTsUnix = time.Now().Unix() if err := node.Save(); err != nil { log.Errorf(err.Error()) return @@ -149,7 +155,11 @@ func UpdateNodeData() { } // 获取redis的key key, err := register.GetRegister().GetKey() - + if err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + return + } // 构造节点数据 data := Data{ Key: key, @@ -201,6 +211,8 @@ func WorkerNodeCallback(message redis.Message) (err error) { // 反序列化 msg := utils.GetMessage(message) if err := msg_handler.GetMsgHandler(*msg).Handle(); err != nil { + log.Errorf("msg handler error: %s", err.Error()) + debug.PrintStack() return err } return nil @@ -230,19 +242,19 @@ func InitNodeService() error { if model.IsMaster() { // 如果为主节点,订阅主节点通信频道 - if err := utils.Sub(constants.ChannelMasterNode, MasterNodeCallback); err != nil { + if err := database.Sub(constants.ChannelMasterNode, MasterNodeCallback); err != nil { return err } } else { // 若为工作节点,订阅单独指定通信频道 channel := constants.ChannelWorkerNode + node.Id.Hex() - if err := utils.Sub(channel, WorkerNodeCallback); err != nil { + if err := database.Sub(channel, WorkerNodeCallback); err != nil { return err } } // 订阅全通道 - if err := utils.Sub(constants.ChannelAllNode, WorkerNodeCallback); err != nil { + if err := database.Sub(constants.ChannelAllNode, WorkerNodeCallback); err != nil { return err } diff --git a/backend/services/schedule.go b/backend/services/schedule.go index 58cdf628..d4c1635b 100644 --- a/backend/services/schedule.go +++ b/backend/services/schedule.go @@ -5,7 +5,7 @@ import ( "crawlab/lib/cron" "crawlab/model" "github.com/apex/log" - uuid "github.com/satori/go.uuid" + "github.com/satori/go.uuid" "runtime/debug" ) @@ -17,7 +17,22 @@ type Scheduler struct { func AddTask(s model.Schedule) func() { return func() { - nodeId := s.NodeId + node, err := model.GetNodeByKey(s.NodeKey) + if err != nil || node.Id.Hex() == "" { + log.Errorf("get node by key error: %s", err.Error()) + debug.PrintStack() + return + } + + spider := model.GetSpiderByName(s.SpiderName) + if spider == nil || spider.Id.Hex() == "" { + log.Errorf("get spider by name error: %s", err.Error()) + debug.PrintStack() + return + } + + // 同步ID到定时任务 + s.SyncNodeIdAndSpiderId(node, *spider) // 生成任务ID id := uuid.NewV4() @@ -25,8 +40,8 @@ func AddTask(s model.Schedule) func() { // 生成任务模型 t := model.Task{ Id: id.String(), - SpiderId: s.SpiderId, - NodeId: nodeId, + SpiderId: spider.Id, + NodeId: node.Id, Status: constants.StatusPending, Param: s.Param, } @@ -107,6 +122,18 @@ func (s *Scheduler) RemoveAll() { } } +// 验证cron表达式是否正确 +func ParserCron(spec string) error { + parser := cron.NewParser( + cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor, + ) + + if _, err := parser.Parse(spec); err != nil { + return err + } + return nil +} + func (s *Scheduler) Update() error { // 删除所有定时任务 s.RemoveAll() diff --git a/backend/services/spider.go b/backend/services/spider.go index 7aea456f..84d218bb 100644 --- a/backend/services/spider.go +++ b/backend/services/spider.go @@ -82,7 +82,7 @@ func ReadFileByStep(filePath string, handle func([]byte, *mgo.GridFile), fileCre log.Infof("can't opened this file") return err } - defer f.Close() + defer utils.Close(f) s := make([]byte, 4096) for { switch nr, err := f.Read(s[:]); true { @@ -173,7 +173,7 @@ func RemoveSpider(id string) error { Type: constants.MsgTypeRemoveSpider, SpiderId: id, } - if err := utils.Pub(constants.ChannelAllNode, msg); err != nil { + if err := database.Pub(constants.ChannelAllNode, msg); err != nil { return err } diff --git a/backend/services/spider_handler/spider.go b/backend/services/spider_handler/spider.go index 53c83b9a..cce025dc 100644 --- a/backend/services/spider_handler/spider.go +++ b/backend/services/spider_handler/spider.go @@ -28,7 +28,7 @@ func (s *SpiderSync) CreateMd5File(md5 string) { fileName := filepath.Join(path, Md5File) file := utils.OpenFile(fileName) - defer file.Close() + defer utils.Close(file) if file != nil { if _, err := file.WriteString(md5 + "\n"); err != nil { log.Errorf("file write string error: %s", err.Error()) @@ -80,7 +80,7 @@ func (s *SpiderSync) Download() { defer session.Close() f, err := gf.OpenId(bson.ObjectIdHex(fileId)) - defer f.Close() + defer utils.Close(f) if err != nil { log.Errorf("open file id: " + fileId + ", spider id:" + spiderId + ", error: " + err.Error()) debug.PrintStack() @@ -99,7 +99,7 @@ func (s *SpiderSync) Download() { // 创建临时文件 tmpFilePath := filepath.Join(tmpPath, randomId.String()+".zip") tmpFile := utils.OpenFile(tmpFilePath) - defer tmpFile.Close() + defer utils.Close(tmpFile) // 将该文件写入临时文件 if _, err := io.Copy(tmpFile, f); err != nil { diff --git a/backend/services/task.go b/backend/services/task.go index ce62a95e..9e584e82 100644 --- a/backend/services/task.go +++ b/backend/services/task.go @@ -100,91 +100,85 @@ func AssignTask(task model.Task) error { return nil } -// 执行shell命令 -func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider) (err error) { - log.Infof("cwd: " + cwd) - log.Infof("cmd: " + cmdStr) +// 设置环境变量 +func SetEnv(cmd *exec.Cmd, envs []model.Env, taskId string, dataCol string) *exec.Cmd { + // 默认环境变量 + cmd.Env = append(os.Environ(), "CRAWLAB_TASK_ID="+taskId) + cmd.Env = append(cmd.Env, "CRAWLAB_COLLECTION="+dataCol) + cmd.Env = append(cmd.Env, "PYTHONUNBUFFERED=0") + cmd.Env = append(cmd.Env, "PYTHONIOENCODING=utf-8") + cmd.Env = append(cmd.Env, "TZ=Asia/Shanghai") - // 生成执行命令 - var cmd *exec.Cmd - if runtime.GOOS == constants.Windows { - cmd = exec.Command("cmd", "/C", cmdStr) - } else { - cmd = exec.Command("sh", "-c", cmdStr) - } - - // 工作目录 - cmd.Dir = cwd - - // 指定stdout, stderr日志位置 - fLog, err := os.Create(t.LogPath) - if err != nil { - HandleTaskError(t, err) - return err - } - defer fLog.Close() - cmd.Stdout = fLog - cmd.Stderr = fLog - - // 添加默认环境变量 - cmd.Env = append(cmd.Env, "CRAWLAB_TASK_ID="+t.Id) - cmd.Env = append(cmd.Env, "CRAWLAB_COLLECTION="+s.Col) - - // 添加任务环境变量 - for _, env := range s.Envs { + //任务环境变量 + for _, env := range envs { cmd.Env = append(cmd.Env, env.Name+"="+env.Value) } - // 起一个goroutine来监控进程 - ch := utils.TaskExecChanMap.ChanBlocked(t.Id) - go func() { - // 传入信号,此处阻塞 - signal := <-ch - log.Infof("cancel process signal: %s", signal) - if signal == constants.TaskCancel && cmd.Process != nil { - // 取消进程 - if err := syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL); err != nil { - log.Errorf("process kill error: %s", err.Error()) - debug.PrintStack() - } - t.Status = constants.StatusCancelled - } else { - // 保存任务 - t.Status = constants.StatusFinished - } - t.FinishTs = time.Now() - t.Error = "user kill the process ..." - if err := t.Save(); err != nil { - log.Infof("save task error: %s", err.Error()) + // TODO 全局环境变量 + return cmd +} + +func SetLogConfig(cmd *exec.Cmd, path string) error { + fLog, err := os.Create(path) + if err != nil { + log.Errorf("create task log file error: %s", path) + debug.PrintStack() + return err + } + cmd.Stdout = fLog + cmd.Stderr = fLog + return nil +} + +func FinishOrCancelTask(ch chan string, cmd *exec.Cmd, t model.Task) { + // 传入信号,此处阻塞 + signal := <-ch + log.Infof("process received signal: %s", signal) + + if signal == constants.TaskCancel && cmd.Process != nil { + // 取消进程 + if err := syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL); err != nil { + log.Errorf("process kill error: %s", err.Error()) debug.PrintStack() - return + + t.Error = "kill process error: " + err.Error() + t.Status = constants.StatusError + } else { + t.Error = "user kill the process ..." + t.Status = constants.StatusCancelled } - }() + } else { + // 保存任务 + t.Status = constants.StatusFinished + } - // 在选择所有节点执行的时候,实际就是随机一个节点执行的, - cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + t.FinishTs = time.Now() + _ = t.Save() +} - // 异步启动进程 +func StartTaskProcess(cmd *exec.Cmd, t model.Task) error { if err := cmd.Start(); err != nil { log.Errorf("start spider error:{}", err.Error()) debug.PrintStack() - return err - } - // 保存pid到task - t.Pid = cmd.Process.Pid - if err := t.Save(); err != nil { - log.Errorf("save task pid error: %s", err.Error()) - debug.PrintStack() + t.Error = "start task error: " + err.Error() + t.Status = constants.StatusError + t.FinishTs = time.Now() + _ = t.Save() return err } - // 同步等待进程完成 + return nil +} + +func WaitTaskProcess(cmd *exec.Cmd, t model.Task) error { if err := cmd.Wait(); err != nil { log.Errorf("wait process finish error: %s", err.Error()) debug.PrintStack() + if exitError, ok := err.(*exec.ExitError); ok { exitCode := exitError.ExitCode() log.Errorf("exit error, exit code: %d", exitCode) + // 非kill 的错误类型 if exitCode != -1 { // 非手动kill保存为错误状态 @@ -194,6 +188,52 @@ func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider) (e _ = t.Save() } } + + return err + } + return nil +} + +// 执行shell命令 +func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider) (err error) { + log.Infof("cwd: %s", cwd) + log.Infof("cmd: %s", cmdStr) + + // 生成执行命令 + var cmd *exec.Cmd + if runtime.GOOS == constants.Windows { + cmd = exec.Command("cmd", "/C", cmdStr) + } else { + cmd = exec.Command("") + cmd = exec.Command("sh", "-c", cmdStr) + } + + // 工作目录 + cmd.Dir = cwd + + // 日志配置 + if err := SetLogConfig(cmd, t.LogPath); err != nil { + return err + } + + // 环境变量配置 + cmd = SetEnv(cmd, s.Envs, t.Id, s.Col) + + // 起一个goroutine来监控进程 + ch := utils.TaskExecChanMap.ChanBlocked(t.Id) + + go FinishOrCancelTask(ch, cmd, t) + + // kill的时候,可以kill所有的子进程 + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + + // 启动进程 + if err := StartTaskProcess(cmd, t); err != nil { + return err + } + + // 同步等待进程完成 + if err := WaitTaskProcess(cmd, t); err != nil { return err } ch <- constants.TaskFinish @@ -208,6 +248,7 @@ func MakeLogDir(t model.Task) (fileDir string, err error) { // 如果日志目录不存在,生成该目录 if !utils.Exists(fileDir) { if err := os.MkdirAll(fileDir, 0777); err != nil { + log.Errorf("execute task, make log dir error: %s", err.Error()) debug.PrintStack() return "", err } @@ -272,82 +313,55 @@ func ExecuteTask(id int) { // 获取当前节点 node, err := model.GetCurrentNode() if err != nil { - log.Errorf(GetWorkerPrefix(id) + err.Error()) + log.Errorf("execute task get current node error: %s", err.Error()) + debug.PrintStack() return } - // 公共队列 - queuePub := "tasks:public" - // 节点队列 queueCur := "tasks:node:" + node.Id.Hex() - // 节点队列任务 var msg string - msg, err = database.RedisClient.LPop(queueCur) - if err != nil { - if msg == "" { - // 节点队列没有任务,获取公共队列任务 - msg, err = database.RedisClient.LPop(queuePub) - if err != nil { - if msg == "" { - // 公共队列没有任务 - log.Debugf(GetWorkerPrefix(id) + "没有任务...") - return - } else { - log.Errorf(GetWorkerPrefix(id) + err.Error()) - debug.PrintStack() - return - } - } - } else { - log.Errorf(GetWorkerPrefix(id) + err.Error()) - debug.PrintStack() - return + if msg, err = database.RedisClient.LPop(queueCur); err != nil { + // 节点队列没有任务,获取公共队列任务 + queuePub := "tasks:public" + if msg, err = database.RedisClient.LPop(queuePub); err != nil { } } + if msg == "" { + return + } + // 反序列化 tMsg := TaskMessage{} if err := json.Unmarshal([]byte(msg), &tMsg); err != nil { - log.Errorf(GetWorkerPrefix(id) + err.Error()) - debug.PrintStack() + log.Errorf("json string to struct error: %s", err.Error()) return } // 获取任务 t, err := model.GetTask(tMsg.Id) if err != nil { - log.Errorf(GetWorkerPrefix(id) + err.Error()) + log.Errorf("execute task, get task error: %s", err.Error()) return } // 获取爬虫 spider, err := t.GetSpider() if err != nil { - log.Errorf(GetWorkerPrefix(id) + err.Error()) + log.Errorf("execute task, get spider error: %s", err.Error()) return } // 创建日志目录 - fileDir, err := MakeLogDir(t) - if err != nil { - log.Errorf(GetWorkerPrefix(id) + err.Error()) + var fileDir string + if fileDir, err = MakeLogDir(t); err != nil { return } - // 获取日志文件路径 t.LogPath = GetLogFilePaths(fileDir) - // 创建日志目录文件夹 - fileStdoutDir := filepath.Dir(t.LogPath) - if !utils.Exists(fileStdoutDir) { - if err := os.MkdirAll(fileStdoutDir, os.ModePerm); err != nil { - log.Errorf(GetWorkerPrefix(id) + err.Error()) - return - } - } - // 工作目录 cwd := filepath.Join( viper.GetString("spider.path"), @@ -432,33 +446,29 @@ func ExecuteTask(id int) { func GetTaskLog(id string) (logStr string, err error) { task, err := model.GetTask(id) + if err != nil { - return "", err + return } - logStr = "" if IsMasterNode(task.NodeId.Hex()) { // 若为主节点,获取本机日志 logBytes, err := model.GetLocalLog(task.LogPath) - logStr = utils.BytesToString(logBytes) if err != nil { log.Errorf(err.Error()) logStr = err.Error() - // return "", err } else { logStr = utils.BytesToString(logBytes) } - - } else { - // 若不为主节点,获取远端日志 - logStr, err = GetRemoteLog(task) - if err != nil { - log.Errorf(err.Error()) - return "", err - } + return logStr, err } + // 若不为主节点,获取远端日志 + logStr, err = GetRemoteLog(task) + if err != nil { + log.Errorf(err.Error()) - return logStr, nil + } + return logStr, err } func CancelTask(id string) (err error) { diff --git a/backend/services/user.go b/backend/services/user.go index 4811f767..61fd952e 100644 --- a/backend/services/user.go +++ b/backend/services/user.go @@ -17,9 +17,7 @@ func InitUserService() error { Password: utils.EncryptPassword("admin"), Role: constants.RoleAdmin, } - if err := adminUser.Add(); err != nil { - // pass - } + _ = adminUser.Add() return nil } func MakeToken(user *model.User) (tokenStr string, err error) { diff --git a/backend/utils/file.go b/backend/utils/file.go index d65e7ab1..babc0d69 100644 --- a/backend/utils/file.go +++ b/backend/utils/file.go @@ -21,7 +21,7 @@ func RemoveFiles(path string) { // 读取文件一行 func ReadFileOneLine(fileName string) string { file := OpenFile(fileName) - defer file.Close() + defer Close(file) buf := bufio.NewReader(file) line, err := buf.ReadString('\n') if err != nil { @@ -57,10 +57,7 @@ func CreateFilePath(filePath string) { func Exists(path string) bool { _, err := os.Stat(path) //os.Stat获取文件信息 if err != nil { - if os.IsExist(err) { - return true - } - return false + return os.IsExist(err) } return true } @@ -88,7 +85,7 @@ func DeCompressByPath(tarFile, dest string) error { if err != nil { return err } - defer srcFile.Close() + defer Close(srcFile) return DeCompress(srcFile, dest) } @@ -112,7 +109,7 @@ func DeCompress(srcFile *os.File, dstPath string) error { debug.PrintStack() return err } - defer zipFile.Close() + defer Close(zipFile) // 遍历zip内所有文件和目录 for _, innerFile := range zipFile.File { @@ -156,7 +153,7 @@ func DeCompress(srcFile *os.File, dstPath string) error { debug.PrintStack() continue } - defer newFile.Close() + defer Close(newFile) // 拷贝该文件到新文件中 if _, err := io.Copy(newFile, srcFile); err != nil { @@ -184,9 +181,9 @@ func DeCompress(srcFile *os.File, dstPath string) error { //dest 压缩文件存放地址 func Compress(files []*os.File, dest string) error { d, _ := os.Create(dest) - defer d.Close() + defer Close(d) w := zip.NewWriter(d) - defer w.Close() + defer Close(w) for _, file := range files { err := _Compress(file, "", w) if err != nil { @@ -234,7 +231,7 @@ func _Compress(file *os.File, prefix string, zw *zip.Writer) error { return err } _, err = io.Copy(writer, file) - file.Close() + Close(file) if err != nil { debug.PrintStack() return err diff --git a/backend/utils/file_test.go b/backend/utils/file_test.go index 64f2df6d..4af32d0d 100644 --- a/backend/utils/file_test.go +++ b/backend/utils/file_test.go @@ -60,7 +60,7 @@ func TestCompress(t *testing.T) { So(er, ShouldEqual, nil) }) }) - os.RemoveAll("testCompress") + _ = os.RemoveAll("testCompress") } func Zip(zipFile string, fileList []string) error { @@ -69,16 +69,11 @@ func Zip(zipFile string, fileList []string) error { if err != nil { log.Fatal() } - defer fw.Close() + defer Close(fw) // 实例化新的 zip.Writer zw := zip.NewWriter(fw) - defer func() { - // 检测一下是否成功关闭 - if err := zw.Close(); err != nil { - log.Fatalln(err) - } - }() + defer Close(zw) for _, fileName := range fileList { fr, err := os.Open(fileName) @@ -91,6 +86,9 @@ func Zip(zipFile string, fileList []string) error { } // 写入文件的头信息 fh, err := zip.FileInfoHeader(fi) + if err != nil { + return err + } w, err := zw.CreateHeader(fh) if err != nil { return err @@ -106,6 +104,10 @@ func Zip(zipFile string, fileList []string) error { func TestDeCompress(t *testing.T) { err := os.Mkdir("testDeCompress", os.ModePerm) + if err != nil { + t.Error(err) + + } err = Zip("demo.zip", []string{}) if err != nil { t.Error("create zip file failed") @@ -121,7 +123,7 @@ func TestDeCompress(t *testing.T) { err := DeCompress(tmpFile, dstPath) So(err, ShouldEqual, nil) }) - os.RemoveAll("testDeCompress") - os.Remove("demo.zip") + _ = os.RemoveAll("testDeCompress") + _ = os.Remove("demo.zip") } diff --git a/backend/utils/helpers.go b/backend/utils/helpers.go index edc6200e..541d9002 100644 --- a/backend/utils/helpers.go +++ b/backend/utils/helpers.go @@ -1,12 +1,11 @@ package utils import ( - "context" - "crawlab/database" "crawlab/entity" "encoding/json" "github.com/apex/log" "github.com/gomodule/redigo/redis" + "io" "runtime/debug" "unsafe" ) @@ -35,21 +34,9 @@ func GetMessage(message redis.Message) *entity.NodeMessage { return &msg } -func Pub(channel string, msg entity.NodeMessage) error { - if _, err := database.RedisClient.Publish(channel, GetJson(msg)); err != nil { - log.Errorf("publish redis error: %s", err.Error()) - debug.PrintStack() - return err +func Close(c io.Closer) { + err := c.Close() + if err != nil { + log.WithError(err).Error("关闭资源文件失败。") } - return nil -} - -func Sub(channel string, consume database.ConsumeFunc) error { - ctx := context.Background() - if err := database.RedisClient.Subscribe(ctx, consume, channel); err != nil { - log.Errorf("subscribe redis error: %s", err.Error()) - debug.PrintStack() - return err - } - return nil } diff --git a/backend/utils/model.go b/backend/utils/model.go index 867ae620..21a295d6 100644 --- a/backend/utils/model.go +++ b/backend/utils/model.go @@ -12,15 +12,16 @@ func IsObjectIdNull(id bson.ObjectId) bool { } func InterfaceToString(value interface{}) string { - switch value.(type) { + switch realValue := value.(type) { case bson.ObjectId: - return value.(bson.ObjectId).Hex() + return realValue.Hex() case string: - return value.(string) + return realValue case int: - return strconv.Itoa(value.(int)) + return strconv.Itoa(realValue) case time.Time: - return value.(time.Time).String() + return realValue.String() + default: + return "" } - return "" } diff --git a/backend/utils/user.go b/backend/utils/user.go index 9d1bdceb..46933f9e 100644 --- a/backend/utils/user.go +++ b/backend/utils/user.go @@ -8,7 +8,7 @@ import ( func EncryptPassword(str string) string { w := md5.New() - io.WriteString(w, str) + _, _ = io.WriteString(w, str) md5str := fmt.Sprintf("%x", w.Sum(nil)) return md5str } diff --git a/docker_init.sh b/docker_init.sh index 4d571769..09f63e9b 100755 --- a/docker_init.sh +++ b/docker_init.sh @@ -6,7 +6,7 @@ then : else jspath=`ls /app/dist/js/app.*.js` - sed -i "s?localhost:8000?${CRAWLAB_API_ADDRESS}?g" ${jspath} + sed -i "s?http://localhost:8000?${CRAWLAB_API_ADDRESS}?g" ${jspath} fi # replace base url diff --git a/frontend/src/components/Common/CrawlConfirmDialog.vue b/frontend/src/components/Common/CrawlConfirmDialog.vue index 266ef2eb..2286beb2 100644 --- a/frontend/src/components/Common/CrawlConfirmDialog.vue +++ b/frontend/src/components/Common/CrawlConfirmDialog.vue @@ -9,9 +9,8 @@ - diff --git a/frontend/src/views/schedule/ScheduleList.vue b/frontend/src/views/schedule/ScheduleList.vue index c44d46e2..b170c9ed 100644 --- a/frontend/src/views/schedule/ScheduleList.vue +++ b/frontend/src/views/schedule/ScheduleList.vue @@ -14,9 +14,9 @@ - + - + - - - + + + + + + + + + + + - {{$t('schedules.add_cron')}} + + {{$t('Cancel')}} {{$t('Submit')}} @@ -76,9 +78,9 @@ - - - + + + @@ -131,28 +133,15 @@ diff --git a/frontend/src/views/spider/SpiderList.vue b/frontend/src/views/spider/SpiderList.vue index 743aabbe..eb1e548f 100644 --- a/frontend/src/views/spider/SpiderList.vue +++ b/frontend/src/views/spider/SpiderList.vue @@ -424,12 +424,10 @@ export default { this.dialogVisible = true }, isShowRun (row) { - if (this.isCustomized(row)) { - // customized spider - return !!row.cmd + if (row.cmd) { + return true } else { - // configurable spider - return !!row.fields + return false } }, isCustomized (row) { diff --git a/frontend/src/views/task/TaskDetail.vue b/frontend/src/views/task/TaskDetail.vue index b4ec0652..d61394e8 100644 --- a/frontend/src/views/task/TaskDetail.vue +++ b/frontend/src/views/task/TaskDetail.vue @@ -35,6 +35,7 @@ import { import TaskOverview from '../../components/Overview/TaskOverview' import GeneralTableView from '../../components/TableView/GeneralTableView' import LogView from '../../components/ScrollView/LogView' +import request from '../../api/request' export default { name: 'TaskDetail', @@ -46,12 +47,12 @@ export default { data () { return { activeTabName: 'overview', - handle: undefined + handle: undefined, + taskLog: '' } }, computed: { ...mapState('task', [ - 'taskLog', 'taskResultsData', 'taskResultsTotalCount' ]), @@ -97,18 +98,23 @@ export default { downloadCSV () { this.$store.dispatch('task/getTaskResultExcel', this.$route.params.id) this.$st.sendEv('任务详情-结果', '下载CSV') + }, + getTaskLog () { + if (this.$route.params.id) { + request.get(`/tasks/${this.$route.params.id}/log`).then(response => { + this.taskLog = response.data.data + }) + } } }, - async created () { - await this.$store.dispatch('task/getTaskData', this.$route.params.id) - this.$store.dispatch('task/getTaskLog', this.$route.params.id) + created () { + this.$store.dispatch('task/getTaskData', this.$route.params.id) this.$store.dispatch('task/getTaskResults', this.$route.params.id) - if (this.taskForm && ['running'].includes(this.taskForm.status)) { - this.handle = setInterval(() => { - this.$store.dispatch('task/getTaskLog', this.$route.params.id) - }, 5000) - } + this.getTaskLog() + this.handle = setInterval(() => { + this.getTaskLog() + }, 5000) }, destroyed () { clearInterval(this.handle)