From acccdb65bd459c88ccfaefc9b9ebe2975d775f84 Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Tue, 3 Sep 2019 15:24:26 +0800 Subject: [PATCH 1/2] Revert "V0.4.0 imporve error response" --- backend/conf/config.yml | 2 - backend/config/config_test.go | 2 +- backend/constants/context.go | 5 - backend/constants/errors.go | 13 -- backend/database/mongo.go | 3 +- backend/database/pubsub.go | 145 +++++++------- backend/database/redis.go | 73 +++---- backend/errors/errors.go | 55 ------ backend/go.mod | 4 - backend/go.sum | 8 - backend/lib/validate_bridge/validator.go | 54 ----- backend/main.go | 111 +++++------ backend/middlewares/auth.go | 13 +- backend/mock/node_test.go | 6 - backend/mock/schedule.go | 2 +- backend/mock/spider.go | 179 +---------------- backend/mock/spider_test.go | 137 ------------- backend/model/node.go | 47 ----- backend/model/node_test.go | 50 ----- backend/model/spider.go | 37 +--- backend/model/task.go | 17 +- backend/routes/spider.go | 6 - backend/routes/user.go | 27 +-- backend/routes/utils.go | 6 +- backend/services/context/context.go | 100 ---------- backend/services/log.go | 72 +------ backend/services/log_test.go | 50 ----- backend/services/node.go | 185 ++++++++++-------- backend/services/spider.go | 77 +++----- backend/services/system.go | 2 +- backend/services/task.go | 10 +- backend/services/user.go | 40 ++-- backend/utils/file.go | 2 +- backend/utils/file_test.go | 67 +------ docker/Dockerfile.master.alpine | 2 +- docker/Dockerfile.worker.alpine | 2 +- frontend/package.json | 1 - frontend/src/api/request.js | 49 ++--- .../src/components/InfoView/NodeInfoView.vue | 2 +- .../components/InfoView/SpiderInfoView.vue | 3 - .../src/components/InfoView/TaskInfoView.vue | 6 +- frontend/src/i18n/zh.js | 21 +- frontend/src/router/index.js | 6 + frontend/src/store/modules/node.js | 18 +- frontend/src/store/modules/task.js | 16 -- .../src/views/layout/components/Navbar.vue | 1 - frontend/src/views/node/NodeList.vue | 2 +- frontend/src/views/schedule/ScheduleList.vue | 12 +- frontend/src/views/spider/SpiderList.vue | 25 +-- frontend/src/views/task/TaskDetail.vue | 8 +- frontend/src/views/task/TaskList.vue | 4 +- 51 files changed, 414 insertions(+), 1371 deletions(-) delete mode 100644 backend/constants/context.go delete mode 100644 backend/constants/errors.go delete mode 100644 backend/errors/errors.go delete mode 100644 backend/lib/validate_bridge/validator.go delete mode 100644 backend/mock/spider_test.go delete mode 100644 backend/model/node_test.go delete mode 100644 backend/services/context/context.go delete mode 100644 backend/services/log_test.go diff --git a/backend/conf/config.yml b/backend/conf/config.yml index 3805762a..f1042ca6 100644 --- a/backend/conf/config.yml +++ b/backend/conf/config.yml @@ -15,8 +15,6 @@ redis: log: level: info path: "/var/logs/crawlab" - isDeletePeriodically: "Y" - deleteFrequency: "@hourly" server: host: 0.0.0.0 port: 8000 diff --git a/backend/config/config_test.go b/backend/config/config_test.go index 0068e6ad..ee966877 100644 --- a/backend/config/config_test.go +++ b/backend/config/config_test.go @@ -7,7 +7,7 @@ import ( func TestInitConfig(t *testing.T) { Convey("Test InitConfig func", t, func() { - x := InitConfig("../conf/config.yml") + x := InitConfig("") Convey("The value should be nil", func() { So(x, ShouldEqual, nil) diff --git a/backend/constants/context.go b/backend/constants/context.go deleted file mode 100644 index 0759b54b..00000000 --- a/backend/constants/context.go +++ /dev/null @@ -1,5 +0,0 @@ -package constants - -const ( - ContextUser = "currentUser" -) diff --git a/backend/constants/errors.go b/backend/constants/errors.go deleted file mode 100644 index a273cb75..00000000 --- a/backend/constants/errors.go +++ /dev/null @@ -1,13 +0,0 @@ -package constants - -import ( - "crawlab/errors" - "net/http" -) - -var ( - ErrorMongoError = errors.NewSystemOPError(1001, "system error:[mongo]%s", http.StatusInternalServerError) - //users - ErrorUserNotFound = errors.NewBusinessError(10001, "user not found.", http.StatusUnauthorized) - ErrorUsernameOrPasswordInvalid = errors.NewBusinessError(11001, "username or password invalid", http.StatusUnauthorized) -) diff --git a/backend/database/mongo.go b/backend/database/mongo.go index 1c2d6433..6b155791 100644 --- a/backend/database/mongo.go +++ b/backend/database/mongo.go @@ -3,7 +3,6 @@ package database import ( "github.com/globalsign/mgo" "github.com/spf13/viper" - "time" ) var Session *mgo.Session @@ -45,7 +44,7 @@ func InitMongo() error { } else { uri = "mongodb://" + mongoUsername + ":" + mongoPassword + "@" + mongoHost + ":" + mongoPort + "/" + mongoDb + "?authSource=" + mongoAuth } - sess, err := mgo.DialWithTimeout(uri, time.Second*5) + sess, err := mgo.Dial(uri) if err != nil { return err } diff --git a/backend/database/pubsub.go b/backend/database/pubsub.go index c5fdbda9..b100535f 100644 --- a/backend/database/pubsub.go +++ b/backend/database/pubsub.go @@ -1,97 +1,90 @@ package database import ( - "context" + "errors" "fmt" "github.com/apex/log" "github.com/gomodule/redigo/redis" - errors2 "github.com/pkg/errors" "time" + "unsafe" ) -type ConsumeFunc func(message redis.Message) error +type SubscribeCallback func(channel, message string) -func (r *Redis) Close() { - err := r.pool.Close() +type Subscriber struct { + client redis.PubSubConn + cbMap map[string]SubscribeCallback +} + +func (c *Subscriber) Connect() { + conn, err := GetRedisConn() + if err != nil { + log.Fatalf("redis dial failed.") + } + + c.client = redis.PubSubConn{Conn: conn} + c.cbMap = make(map[string]SubscribeCallback) + + //retry connect redis 5 times, or panic + index := 0 + go func(i int) { + for { + log.Debug("wait...") + switch res := c.client.Receive().(type) { + case redis.Message: + i = 0 + channel := (*string)(unsafe.Pointer(&res.Channel)) + message := (*string)(unsafe.Pointer(&res.Data)) + c.cbMap[*channel](*channel, *message) + case redis.Subscription: + fmt.Printf("%s: %s %d\n", res.Channel, res.Kind, res.Count) + case error: + log.Error("error handle redis connection...") + + time.Sleep(2 * time.Second) + if i > 5 { + panic(errors.New("redis connection failed too many times, panic")) + } + con, err := GetRedisConn() + if err != nil { + log.Error("redis dial failed") + continue + } + c.client = redis.PubSubConn{Conn: con} + i += 1 + + continue + } + } + }(index) + +} + +func (c *Subscriber) Close() { + err := c.client.Close() if err != nil { log.Errorf("redis close error.") } } -func (r *Redis) subscribe(ctx context.Context, consume ConsumeFunc, channel ...string) error { - psc := redis.PubSubConn{Conn: r.pool.Get()} - if err := psc.Subscribe(redis.Args{}.AddFlat(channel)); err != nil { + +func (c *Subscriber) Subscribe(channel interface{}, cb SubscribeCallback) { + err := c.client.Subscribe(channel) + if err != nil { + log.Fatalf("redis Subscribe error.") + } + + c.cbMap[channel.(string)] = cb +} + +func Publish(channel string, msg string) error { + c, err := GetRedisConn() + if err != nil { return err } - done := make(chan error, 1) - tick := time.NewTicker(time.Second * 3) - defer tick.Stop() - go func() { - defer func() { _ = psc.Close() }() - for { - switch msg := psc.Receive().(type) { - case error: - done <- fmt.Errorf("redis pubsub receive err: %v", msg) - return - case redis.Message: - fmt.Println(msg) - if err := consume(msg); err != nil { - fmt.Printf("redis pubsub consume message err: %v", err) - continue - } - case redis.Subscription: - fmt.Println(msg) - if msg.Count == 0 { - // all channels are unsubscribed - return - } - } - - } - }() - // start a new goroutine to receive message - for { - select { - case <-ctx.Done(): - if err := psc.Unsubscribe(); err != nil { - fmt.Printf("redis pubsub unsubscribe err: %v \n", err) - } - done <- nil - case <-tick.C: - //fmt.Printf("ping message \n") - if err := psc.Ping(""); err != nil { - done <- err - } - case err := <-done: - close(done) - return err - } + if _, err := c.Do("PUBLISH", channel, msg); err != nil { + return err } -} -func (r *Redis) Subscribe(ctx context.Context, consume ConsumeFunc, channel ...string) error { - index := 0 - go func() { - for { - err := r.subscribe(ctx, consume, channel...) - fmt.Println(err) - - if err == nil { - break - } - time.Sleep(5 * time.Second) - index += 1 - fmt.Printf("try reconnect %d times \n", index) - } - }() return nil } -func (r *Redis) Publish(channel, message string) (n int, err error) { - conn := r.pool.Get() - defer func() { _ = conn.Close() }() - n, err = redis.Int(conn.Do("PUBLISH", channel, message)) - if err != nil { - return 0, errors2.Wrapf(err, "redis publish %s %s", channel, message) - } - return -} diff --git a/backend/database/redis.go b/backend/database/redis.go index ede229a2..ffebf776 100644 --- a/backend/database/redis.go +++ b/backend/database/redis.go @@ -4,20 +4,21 @@ import ( "github.com/gomodule/redigo/redis" "github.com/spf13/viper" "runtime/debug" - "time" ) -var RedisClient *Redis +var RedisClient = Redis{} + +type ConsumeFunc func(channel string, message []byte) error type Redis struct { - pool *redis.Pool } -func NewRedisClient() *Redis { - return &Redis{pool: NewRedisPool()} -} func (r *Redis) RPush(collection string, value interface{}) error { - c := r.pool.Get() + c, err := GetRedisConn() + if err != nil { + debug.PrintStack() + return err + } defer c.Close() if _, err := c.Do("RPUSH", collection, value); err != nil { @@ -28,7 +29,11 @@ func (r *Redis) RPush(collection string, value interface{}) error { } func (r *Redis) LPop(collection string) (string, error) { - c := r.pool.Get() + c, err := GetRedisConn() + if err != nil { + debug.PrintStack() + return "", err + } defer c.Close() value, err2 := redis.String(c.Do("LPOP", collection)) @@ -39,7 +44,11 @@ func (r *Redis) LPop(collection string) (string, error) { } func (r *Redis) HSet(collection string, key string, value string) error { - c := r.pool.Get() + c, err := GetRedisConn() + if err != nil { + debug.PrintStack() + return err + } defer c.Close() if _, err := c.Do("HSET", collection, key, value); err != nil { @@ -50,7 +59,11 @@ func (r *Redis) HSet(collection string, key string, value string) error { } func (r *Redis) HGet(collection string, key string) (string, error) { - c := r.pool.Get() + c, err := GetRedisConn() + if err != nil { + debug.PrintStack() + return "", err + } defer c.Close() value, err2 := redis.String(c.Do("HGET", collection, key)) @@ -61,7 +74,11 @@ func (r *Redis) HGet(collection string, key string) (string, error) { } func (r *Redis) HDel(collection string, key string) error { - c := r.pool.Get() + c, err := GetRedisConn() + if err != nil { + debug.PrintStack() + return err + } defer c.Close() if _, err := c.Do("HDEL", collection, key); err != nil { @@ -71,7 +88,11 @@ func (r *Redis) HDel(collection string, key string) error { } func (r *Redis) HKeys(collection string) ([]string, error) { - c := r.pool.Get() + c, err := GetRedisConn() + if err != nil { + debug.PrintStack() + return []string{}, err + } defer c.Close() value, err2 := redis.Strings(c.Do("HKeys", collection)) @@ -81,7 +102,7 @@ func (r *Redis) HKeys(collection string) ([]string, error) { return value, nil } -func NewRedisPool() *redis.Pool { +func GetRedisConn() (redis.Conn, error) { var address = viper.GetString("redis.address") var port = viper.GetString("redis.port") var database = viper.GetString("redis.database") @@ -93,30 +114,14 @@ func NewRedisPool() *redis.Pool { } else { url = "redis://x:" + password + "@" + address + ":" + port + "/" + database } - return &redis.Pool{ - Dial: func() (conn redis.Conn, e error) { - return redis.DialURL(url, - redis.DialConnectTimeout(time.Second*10), - redis.DialReadTimeout(time.Second*10), - redis.DialWriteTimeout(time.Second*10), - ) - }, - TestOnBorrow: func(c redis.Conn, t time.Time) error { - if time.Since(t) < time.Minute { - return nil - } - _, err := c.Do("PING") - return err - }, - MaxIdle: 10, - MaxActive: 0, - IdleTimeout: 300 * time.Second, - Wait: false, - MaxConnLifetime: 0, + c, err := redis.DialURL(url) + if err != nil { + debug.PrintStack() + return c, err } + return c, nil } func InitRedis() error { - RedisClient = NewRedisClient() return nil } diff --git a/backend/errors/errors.go b/backend/errors/errors.go deleted file mode 100644 index f191cd3e..00000000 --- a/backend/errors/errors.go +++ /dev/null @@ -1,55 +0,0 @@ -package errors - -import ( - "fmt" - "net/http" -) - -type Scope int - -const ( - ScopeSystem Scope = 1 - ScopeBusiness Scope = 2 -) - -type OPError struct { - HttpCode int - Message string - Code int - Scope Scope -} - -func (O OPError) Error() string { - var scope string - switch O.Scope { - case ScopeSystem: - scope = "system" - break - case ScopeBusiness: - scope = "business" - } - return fmt.Sprintf("%s error: [%d]%s.", scope, O.Code, O.Message) -} - -func NewSystemOPError(code int, message string, httpCodes ...int) *OPError { - httpCode := http.StatusOK - if len(httpCodes) > 0 { - httpCode = httpCodes[0] - } - return NewOpError(code, message, ScopeSystem, httpCode) -} -func NewOpError(code int, message string, scope Scope, httpCode int) *OPError { - return &OPError{ - Message: message, - Code: code, - Scope: scope, - HttpCode: httpCode, - } -} -func NewBusinessError(code int, message string, httpCodes ...int) *OPError { - httpCode := http.StatusOK - if len(httpCodes) > 0 { - httpCode = httpCodes[0] - } - return NewOpError(code, message, ScopeBusiness, httpCode) -} diff --git a/backend/go.mod b/backend/go.mod index 428c2fd3..5a575910 100644 --- a/backend/go.mod +++ b/backend/go.mod @@ -8,13 +8,9 @@ require ( github.com/fsnotify/fsnotify v1.4.7 github.com/gin-gonic/gin v1.4.0 github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8 - github.com/go-playground/locales v0.12.1 // indirect - github.com/go-playground/universal-translator v0.16.0 // indirect github.com/gomodule/redigo v2.0.0+incompatible - github.com/leodido/go-urn v1.1.0 // indirect github.com/pkg/errors v0.8.1 github.com/satori/go.uuid v1.2.0 github.com/smartystreets/goconvey v0.0.0-20190731233626-505e41936337 github.com/spf13/viper v1.4.0 - gopkg.in/go-playground/validator.v9 v9.29.1 ) diff --git a/backend/go.sum b/backend/go.sum index cc056d70..910e18be 100644 --- a/backend/go.sum +++ b/backend/go.sum @@ -39,10 +39,6 @@ github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8/go.mod h1:xkRDCp4j0 github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= -github.com/go-playground/locales v0.12.1 h1:2FITxuFt/xuCNP1Acdhv62OzaCiviiE4kotfhkmOqEc= -github.com/go-playground/locales v0.12.1/go.mod h1:IUMDtCfWo/w/mtMfIE/IG2K+Ey3ygWanZIBtBW0W2TM= -github.com/go-playground/universal-translator v0.16.0 h1:X++omBR/4cE2MNg91AoC3rmGrCjJ8eAeUP/K/EKx4DM= -github.com/go-playground/universal-translator v0.16.0/go.mod h1:1AnU7NaIRDWWzGEKwgtJRd2xk99HeFyHw3yid4rvQIY= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= @@ -81,8 +77,6 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/leodido/go-urn v1.1.0 h1:Sm1gr51B1kKyfD2BlRcLSiEkffoG96g6TPv6eRoEiB8= -github.com/leodido/go-urn v1.1.0/go.mod h1:+cyI34gQWZcE1eQU7NVgKkkzdXDQHr1dBMtdAPozLkw= github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDePerRcY= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ= @@ -208,8 +202,6 @@ gopkg.in/go-playground/assert.v1 v1.2.1 h1:xoYuJVE7KT85PYWrN730RguIQO0ePzVRfFMXa gopkg.in/go-playground/assert.v1 v1.2.1/go.mod h1:9RXL0bg/zibRAgZUYszZSwO/z8Y/a8bDuhia5mkpMnE= gopkg.in/go-playground/validator.v8 v8.18.2 h1:lFB4DoMU6B626w8ny76MV7VX6W2VHct2GVOI3xgiMrQ= gopkg.in/go-playground/validator.v8 v8.18.2/go.mod h1:RX2a/7Ha8BgOhfk7j780h4/u/RRjR0eouCJSH80/M2Y= -gopkg.in/go-playground/validator.v9 v9.29.1 h1:SvGtYmN60a5CVKTOzMSyfzWDeZRxRuGvRQyEAKbw1xc= -gopkg.in/go-playground/validator.v9 v9.29.1/go.mod h1:+c9/zcJMFNgbLvly1L1V+PpxWdVbfP1avr/N00E2vyQ= gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= diff --git a/backend/lib/validate_bridge/validator.go b/backend/lib/validate_bridge/validator.go deleted file mode 100644 index 509dc475..00000000 --- a/backend/lib/validate_bridge/validator.go +++ /dev/null @@ -1,54 +0,0 @@ -package validate_bridge - -import ( - "reflect" - "sync" - - "github.com/gin-gonic/gin/binding" - "gopkg.in/go-playground/validator.v9" -) - -type DefaultValidator struct { - once sync.Once - validate *validator.Validate -} - -var _ binding.StructValidator = &DefaultValidator{validate: validator.New()} - -func (v *DefaultValidator) ValidateStruct(obj interface{}) error { - if kindOfData(obj) == reflect.Struct { - - v.lazyinit() - - if err := v.validate.Struct(obj); err != nil { - return err - } - } - - return nil -} - -func (v *DefaultValidator) Engine() interface{} { - v.lazyinit() - return v.validate -} - -func (v *DefaultValidator) lazyinit() { - v.once.Do(func() { - v.validate = validator.New() - v.validate.SetTagName("binding") - - // add any custom validations etc. here - }) -} - -func kindOfData(data interface{}) reflect.Kind { - - value := reflect.ValueOf(data) - valueType := value.Kind() - - if valueType == reflect.Ptr { - valueType = value.Elem().Kind() - } - return valueType -} diff --git a/backend/main.go b/backend/main.go index bf98674e..489a17ce 100644 --- a/backend/main.go +++ b/backend/main.go @@ -3,19 +3,16 @@ package main import ( "crawlab/config" "crawlab/database" - "crawlab/lib/validate_bridge" "crawlab/middlewares" "crawlab/routes" "crawlab/services" "github.com/apex/log" "github.com/gin-gonic/gin" - "github.com/gin-gonic/gin/binding" "github.com/spf13/viper" "runtime/debug" ) func main() { - binding.Validator = new(validate_bridge.DefaultValidator) app := gin.Default() // 初始化配置 @@ -32,15 +29,6 @@ func main() { } log.Info("初始化日志设置成功") - if viper.GetString("log.isDeletePeriodically") == "Y" { - err := services.InitDeleteLogPeriodically() - if err != nil { - log.Error("Init DeletePeriodically Failed") - panic(err) - } - log.Info("初始化定期清理日志配置成功") - } - // 初始化Mongodb数据库 if err := database.InitMongo(); err != nil { log.Error("init mongodb error:" + err.Error()) @@ -102,60 +90,53 @@ func main() { if services.IsMaster() { // 中间件 app.Use(middlewares.CORSMiddleware()) - //app.Use(middlewares.AuthorizationMiddleware()) - anonymousGroup := app.Group("/") - { - anonymousGroup.POST("/login", routes.Login) // 用户登录 - anonymousGroup.PUT("/users", routes.PutUser) // 添加用户 - - } - authGroup := app.Group("/", middlewares.AuthorizationMiddleware()) - { - // 路由 - // 节点 - authGroup.GET("/nodes", routes.GetNodeList) // 节点列表 - authGroup.GET("/nodes/:id", routes.GetNode) // 节点详情 - authGroup.POST("/nodes/:id", routes.PostNode) // 修改节点 - authGroup.GET("/nodes/:id/tasks", routes.GetNodeTaskList) // 节点任务列表 - authGroup.GET("/nodes/:id/system", routes.GetSystemInfo) // 节点任务列表 - authGroup.DELETE("/nodes/:id", routes.DeleteNode) // 删除节点 - // 爬虫 - authGroup.GET("/spiders", routes.GetSpiderList) // 爬虫列表 - authGroup.GET("/spiders/:id", routes.GetSpider) // 爬虫详情 - authGroup.POST("/spiders", routes.PutSpider) // 上传爬虫 - authGroup.POST("/spiders/:id", routes.PostSpider) // 修改爬虫 - authGroup.POST("/spiders/:id/publish", routes.PublishSpider) // 发布爬虫 - authGroup.DELETE("/spiders/:id", routes.DeleteSpider) // 删除爬虫 - authGroup.GET("/spiders/:id/tasks", routes.GetSpiderTasks) // 爬虫任务列表 - authGroup.GET("/spiders/:id/file", routes.GetSpiderFile) // 爬虫文件读取 - authGroup.POST("/spiders/:id/file", routes.PostSpiderFile) // 爬虫目录写入 - authGroup.GET("/spiders/:id/dir", routes.GetSpiderDir) // 爬虫目录 - authGroup.GET("/spiders/:id/stats", routes.GetSpiderStats) // 爬虫统计数据 - // 任务 - authGroup.GET("/tasks", routes.GetTaskList) // 任务列表 - authGroup.GET("/tasks/:id", routes.GetTask) // 任务详情 - authGroup.PUT("/tasks", routes.PutTask) // 派发任务 - authGroup.DELETE("/tasks/:id", routes.DeleteTask) // 删除任务 - authGroup.POST("/tasks/:id/cancel", routes.CancelTask) // 取消任务 - authGroup.GET("/tasks/:id/log", routes.GetTaskLog) // 任务日志 - authGroup.GET("/tasks/:id/results", routes.GetTaskResults) // 任务结果 - authGroup.GET("/tasks/:id/results/download", routes.DownloadTaskResultsCsv) // 下载任务结果 - // 定时任务 - authGroup.GET("/schedules", routes.GetScheduleList) // 定时任务列表 - authGroup.GET("/schedules/:id", routes.GetSchedule) // 定时任务详情 - authGroup.PUT("/schedules", routes.PutSchedule) // 创建定时任务 - authGroup.POST("/schedules/:id", routes.PostSchedule) // 修改定时任务 - authGroup.DELETE("/schedules/:id", routes.DeleteSchedule) // 删除定时任务 - // 统计数据 - authGroup.GET("/stats/home", routes.GetHomeStats) // 首页统计数据 - // 用户 - authGroup.GET("/users", routes.GetUserList) // 用户列表 - authGroup.GET("/users/:id", routes.GetUser) // 用户详情 - authGroup.POST("/users/:id", routes.PostUser) // 更改用户 - authGroup.DELETE("/users/:id", routes.DeleteUser) // 删除用户 - authGroup.GET("/me", routes.GetMe) // 获取自己账户 - } + app.Use(middlewares.AuthorizationMiddleware()) + // 路由 + // 节点 + app.GET("/nodes", routes.GetNodeList) // 节点列表 + app.GET("/nodes/:id", routes.GetNode) // 节点详情 + app.POST("/nodes/:id", routes.PostNode) // 修改节点 + app.GET("/nodes/:id/tasks", routes.GetNodeTaskList) // 节点任务列表 + app.GET("/nodes/:id/system", routes.GetSystemInfo) // 节点任务列表 + app.DELETE("/nodes/:id", routes.DeleteNode) // 删除节点 + // 爬虫 + app.GET("/spiders", routes.GetSpiderList) // 爬虫列表 + app.GET("/spiders/:id", routes.GetSpider) // 爬虫详情 + app.POST("/spiders", routes.PutSpider) // 上传爬虫 + app.POST("/spiders/:id", routes.PostSpider) // 修改爬虫 + app.POST("/spiders/:id/publish", routes.PublishSpider) // 发布爬虫 + app.DELETE("/spiders/:id", routes.DeleteSpider) // 删除爬虫 + app.GET("/spiders/:id/tasks", routes.GetSpiderTasks) // 爬虫任务列表 + app.GET("/spiders/:id/file", routes.GetSpiderFile) // 爬虫文件读取 + app.POST("/spiders/:id/file", routes.PostSpiderFile) // 爬虫目录写入 + app.GET("/spiders/:id/dir", routes.GetSpiderDir) // 爬虫目录 + app.GET("/spiders/:id/stats", routes.GetSpiderStats) // 爬虫统计数据 + // 任务 + app.GET("/tasks", routes.GetTaskList) // 任务列表 + app.GET("/tasks/:id", routes.GetTask) // 任务详情 + app.PUT("/tasks", routes.PutTask) // 派发任务 + app.DELETE("/tasks/:id", routes.DeleteTask) // 删除任务 + app.POST("/tasks/:id/cancel", routes.CancelTask) // 取消任务 + app.GET("/tasks/:id/log", routes.GetTaskLog) // 任务日志 + app.GET("/tasks/:id/results", routes.GetTaskResults) // 任务结果 + app.GET("/tasks/:id/results/download", routes.DownloadTaskResultsCsv) // 下载任务结果 + // 定时任务 + app.GET("/schedules", routes.GetScheduleList) // 定时任务列表 + app.GET("/schedules/:id", routes.GetSchedule) // 定时任务详情 + app.PUT("/schedules", routes.PutSchedule) // 创建定时任务 + app.POST("/schedules/:id", routes.PostSchedule) // 修改定时任务 + app.DELETE("/schedules/:id", routes.DeleteSchedule) // 删除定时任务 + // 统计数据 + app.GET("/stats/home", routes.GetHomeStats) // 首页统计数据 + // 用户 + app.GET("/users", routes.GetUserList) // 用户列表 + app.GET("/users/:id", routes.GetUser) // 用户详情 + app.PUT("/users", routes.PutUser) // 添加用户 + app.POST("/users/:id", routes.PostUser) // 更改用户 + app.DELETE("/users/:id", routes.DeleteUser) // 删除用户 + app.POST("/login", routes.Login) // 用户登录 + app.GET("/me", routes.GetMe) // 获取自己账户 } // 路由ping diff --git a/backend/middlewares/auth.go b/backend/middlewares/auth.go index 07249e82..977fea78 100644 --- a/backend/middlewares/auth.go +++ b/backend/middlewares/auth.go @@ -12,12 +12,12 @@ import ( func AuthorizationMiddleware() gin.HandlerFunc { return func(c *gin.Context) { // 如果为登录或注册,不用校验 - //if c.Request.URL.Path == "/login" || - // (c.Request.URL.Path == "/users" && c.Request.Method == "PUT") || - // strings.HasSuffix(c.Request.URL.Path, "download") { - // c.Next() - // return - //} + if c.Request.URL.Path == "/login" || + (c.Request.URL.Path == "/users" && c.Request.Method == "PUT") || + strings.HasSuffix(c.Request.URL.Path, "download") { + c.Next() + return + } // 获取token string tokenStr := c.GetHeader("Authorization") @@ -46,7 +46,6 @@ func AuthorizationMiddleware() gin.HandlerFunc { return } } - c.Set(constants.ContextUser, &user) // 校验成功 c.Next() diff --git a/backend/mock/node_test.go b/backend/mock/node_test.go index eca321ad..cc2f94e5 100644 --- a/backend/mock/node_test.go +++ b/backend/mock/node_test.go @@ -41,12 +41,6 @@ func init() { app.DELETE("/tasks/:id", DeleteTask) // 删除任务 app.GET("/tasks/:id/results",GetTaskResults) // 任务结果 app.GET("/tasks/:id/results/download", DownloadTaskResultsCsv) // 下载任务结果 - app.GET("/spiders", GetSpiderList) // 爬虫列表 - app.GET("/spiders/:id", GetSpider) // 爬虫详情 - app.POST("/spiders/:id", PostSpider) // 修改爬虫 - app.DELETE("/spiders/:id",DeleteSpider) // 删除爬虫 - app.GET("/spiders/:id/tasks",GetSpiderTasks) // 爬虫任务列表 - app.GET("/spiders/:id/dir",GetSpiderDir) // 爬虫目录 } //mock test, test data in ./mock diff --git a/backend/mock/schedule.go b/backend/mock/schedule.go index 702e8754..ae982ca6 100644 --- a/backend/mock/schedule.go +++ b/backend/mock/schedule.go @@ -113,7 +113,7 @@ func PutSchedule(c *gin.Context) { func DeleteSchedule(c *gin.Context) { id := bson.ObjectIdHex("5d429e6c19f7abede924fee2") for _, sch := range scheduleList { - if sch.Id == id { + if sch.Id == bson.ObjectId(id) { fmt.Println("delete a schedule") } } diff --git a/backend/mock/spider.go b/backend/mock/spider.go index ef3e6104..c4807247 100644 --- a/backend/mock/spider.go +++ b/backend/mock/spider.go @@ -1,178 +1 @@ -package mock - -import ( - "crawlab/model" - "github.com/apex/log" - "github.com/gin-gonic/gin" - "github.com/globalsign/mgo/bson" - "io/ioutil" - "net/http" - "os" - "path/filepath" - "time" -) - -var SpiderList = []model.Spider{ - { - Id: bson.ObjectId("5d429e6c19f7abede924fee2"), - Name: "For test", - DisplayName: "test", - Type: "test", - Col: "test", - Site: "www.baidu.com", - Envs: nil, - Src: "../app/spiders", - Cmd: "scrapy crawl test", - LastRunTs: time.Now(), - CreateTs: time.Now(), - UpdateTs: time.Now(), - }, -} - -func GetSpiderList(c *gin.Context) { - - // mock get spider list from database - results := SpiderList - - c.JSON(http.StatusOK, Response{ - Status: "ok", - Message: "success", - Data: results, - }) -} - -func GetSpider(c *gin.Context) { - id := c.Param("id") - var result model.Spider - - if !bson.IsObjectIdHex(id) { - HandleErrorF(http.StatusBadRequest, c, "invalid id") - } - - for _, spider := range SpiderList { - if spider.Id == bson.ObjectId(id) { - result = spider - } - } - - c.JSON(http.StatusOK, Response{ - Status: "ok", - Message: "success", - Data: result, - }) -} - -func PostSpider(c *gin.Context) { - id := c.Param("id") - - if !bson.IsObjectIdHex(id) { - HandleErrorF(http.StatusBadRequest, c, "invalid id") - } - - var item model.Spider - if err := c.ShouldBindJSON(&item); err != nil { - HandleError(http.StatusBadRequest, c, err) - return - } - - log.Info("modify the item") - - c.JSON(http.StatusOK, Response{ - Status: "ok", - Message: "success", - }) -} -func GetSpiderDir(c *gin.Context) { - // 爬虫ID - id := c.Param("id") - - // 目录相对路径 - path := c.Query("path") - var spi model.Spider - - // 获取爬虫 - for _, spider := range SpiderList { - if spider.Id == bson.ObjectId(id) { - spi = spider - } - } - - // 获取目录下文件列表 - f, err := ioutil.ReadDir(filepath.Join(spi.Src, path)) - if err != nil { - HandleError(http.StatusInternalServerError, c, err) - return - } - - // 遍历文件列表 - var fileList []model.File - for _, file := range f { - fileList = append(fileList, model.File{ - Name: file.Name(), - IsDir: file.IsDir(), - Size: file.Size(), - Path: filepath.Join(path, file.Name()), - }) - } - - // 返回结果 - c.JSON(http.StatusOK, Response{ - Status: "ok", - Message: "success", - Data: fileList, - }) -} - -func GetSpiderTasks(c *gin.Context) { - id := c.Param("id") - - var spider model.Spider - for _, spi := range SpiderList { - if spi.Id == bson.ObjectId(id) { - spider = spi - } - } - - var tasks model.Task - for _, task := range TaskList { - if task.SpiderId == spider.Id { - tasks = task - } - } - - c.JSON(http.StatusOK, Response{ - Status: "ok", - Message: "success", - Data: tasks, - }) -} - -func DeleteSpider(c *gin.Context) { - id := c.Param("id") - - if !bson.IsObjectIdHex(id) { - HandleErrorF(http.StatusBadRequest, c, "invalid id") - return - } - - // 获取该爬虫,get this spider - var spider model.Spider - for _, spi := range SpiderList { - if spi.Id == bson.ObjectId(id) { - spider = spi - } - } - - // 删除爬虫文件目录,delete the spider dir - if err := os.RemoveAll(spider.Src); err != nil { - HandleError(http.StatusInternalServerError, c, err) - return - } - - // 从数据库中删除该爬虫,delete this spider from database - - c.JSON(http.StatusOK, Response{ - Status: "ok", - Message: "success", - }) -} +package mock \ No newline at end of file diff --git a/backend/mock/spider_test.go b/backend/mock/spider_test.go deleted file mode 100644 index 87634ff7..00000000 --- a/backend/mock/spider_test.go +++ /dev/null @@ -1,137 +0,0 @@ -package mock - -import ( - "crawlab/model" - "encoding/json" - "github.com/globalsign/mgo/bson" - . "github.com/smartystreets/goconvey/convey" - "net/http" - "net/http/httptest" - "strings" - "testing" - "time" -) - -func TestGetSpiderList(t *testing.T) { - var resp Response - w := httptest.NewRecorder() - req, _ := http.NewRequest("GET", "/spiders", nil) - app.ServeHTTP(w, req) - err := json.Unmarshal([]byte(w.Body.String()), &resp) - if err != nil { - t.Fatal("unmarshal resp faild") - } - Convey("Test API GetSpiderList", t, func() { - Convey("Test response status", func() { - So(resp.Status, ShouldEqual, "ok") - So(resp.Message, ShouldEqual, "success") - }) - }) -} - -func TestGetSpider(t *testing.T) { - var resp Response - var spiderId = "5d429e6c19f7abede924fee2" - w := httptest.NewRecorder() - req, _ := http.NewRequest("GET", "/spiders/"+spiderId, nil) - app.ServeHTTP(w, req) - err := json.Unmarshal([]byte(w.Body.String()), &resp) - if err != nil { - t.Fatal("unmarshal resp failed") - } - Convey("Test API GetSpider", t, func() { - Convey("Test response status", func() { - So(resp.Status, ShouldEqual, "ok") - So(resp.Message, ShouldEqual, "success") - }) - }) -} - -func TestPostSpider(t *testing.T) { - var spider = model.Spider{ - Id: bson.ObjectIdHex("5d429e6c19f7abede924fee2"), - Name: "For test", - DisplayName: "test", - Type: "test", - Col: "test", - Site: "www.baidu.com", - Envs: nil, - Src: "/app/spider", - Cmd: "scrapy crawl test", - LastRunTs: time.Now(), - CreateTs: time.Now(), - UpdateTs: time.Now(), - } - var resp Response - var spiderId = "5d429e6c19f7abede924fee2" - w := httptest.NewRecorder() - body, _ := json.Marshal(spider) - req, _ := http.NewRequest("POST", "/spiders/"+spiderId, strings.NewReader(string(body))) - app.ServeHTTP(w, req) - err := json.Unmarshal([]byte(w.Body.String()), &resp) - if err != nil { - t.Fatal("unmarshal resp failed") - } - Convey("Test API PostSpider", t, func() { - Convey("Test response status", func() { - So(resp.Status, ShouldEqual, "ok") - So(resp.Message, ShouldEqual, "success") - }) - }) - -} - -func TestGetSpiderDir(t *testing.T) { - var spiderId = "5d429e6c19f7abede924fee2" - var resp Response - w := httptest.NewRecorder() - req, _ := http.NewRequest("GET", "/spiders/"+spiderId+"/dir", nil) - app.ServeHTTP(w, req) - err := json.Unmarshal([]byte(w.Body.String()), &resp) - if err != nil { - t.Fatal("unmarshal resp failed") - } - Convey("Test API GetSpiderDir", t, func() { - Convey("Test response status", func() { - So(resp.Status, ShouldEqual, "ok") - So(resp.Message, ShouldEqual, "success") - }) - }) - -} - -func TestGetSpiderTasks(t *testing.T) { - var spiderId = "5d429e6c19f7abede924fee2" - var resp Response - w := httptest.NewRecorder() - req, _ := http.NewRequest("GET", "/spiders/"+spiderId+"/tasks", nil) - app.ServeHTTP(w, req) - err := json.Unmarshal([]byte(w.Body.String()), &resp) - if err != nil { - t.Fatal("unmarshal resp failed") - } - Convey("Test API GetSpiderTasks", t, func() { - Convey("Test response status", func() { - So(resp.Status, ShouldEqual, "ok") - So(resp.Message, ShouldEqual, "success") - }) - }) -} - -func TestDeleteSpider(t *testing.T) { - var spiderId = "5d429e6c19f7abede924fee2" - var resp Response - w := httptest.NewRecorder() - req, _ := http.NewRequest("DELETE", "/spiders/"+spiderId, nil) - app.ServeHTTP(w, req) - err := json.Unmarshal([]byte(w.Body.String()), &resp) - if err != nil { - t.Fatal("unmarshal resp failed") - } - Convey("Test API DeleteSpider", t, func() { - Convey("Test response status", func() { - So(resp.Status, ShouldEqual, "ok") - So(resp.Message, ShouldEqual, "success") - }) - }) -} \ No newline at end of file diff --git a/backend/model/node.go b/backend/model/node.go index 6211115c..61c20473 100644 --- a/backend/model/node.go +++ b/backend/model/node.go @@ -1,9 +1,7 @@ package model import ( - "crawlab/constants" "crawlab/database" - "crawlab/services/register" "github.com/apex/log" "github.com/globalsign/mgo" "github.com/globalsign/mgo/bson" @@ -81,7 +79,6 @@ func GetNodeList(filter interface{}) ([]Node, error) { var results []Node if err := c.Find(filter).All(&results); err != nil { - log.Error("get node list error: " + err.Error()) debug.PrintStack() return results, err } @@ -156,47 +153,3 @@ func GetNodeCount(query interface{}) (int, error) { return count, nil } - -// 节点基本信息 -func GetNodeBaseInfo() (ip string, mac string, key string, error error) { - ip, err := register.GetRegister().GetIp() - if err != nil { - debug.PrintStack() - return "", "", "", err - } - - mac, err = register.GetRegister().GetMac() - if err != nil { - debug.PrintStack() - return "", "", "", err - } - - key, err = register.GetRegister().GetKey() - if err != nil { - debug.PrintStack() - return "", "", "", err - } - return ip, mac, key, nil -} - -// 根据redis的key值,重置node节点为offline -func ResetNodeStatusToOffline(list []string) { - nodes, _ := GetNodeList(nil) - for _, node := range nodes { - hasNode := false - for _, key := range list { - if key == node.Key { - hasNode = true - break - } - } - if !hasNode || node.Status == "" { - node.Status = constants.StatusOffline - if err := node.Save(); err != nil { - log.Errorf(err.Error()) - return - } - continue - } - } -} diff --git a/backend/model/node_test.go b/backend/model/node_test.go deleted file mode 100644 index ba3f4aaa..00000000 --- a/backend/model/node_test.go +++ /dev/null @@ -1,50 +0,0 @@ -package model - -import ( - "crawlab/config" - "crawlab/constants" - "crawlab/database" - "github.com/apex/log" - . "github.com/smartystreets/goconvey/convey" - "runtime/debug" - "testing" -) - -func TestAddNode(t *testing.T) { - Convey("Test AddNode", t, func() { - if err := config.InitConfig("../conf/config.yml"); err != nil { - log.Error("init config error:" + err.Error()) - panic(err) - } - log.Info("初始化配置成功") - - // 初始化Mongodb数据库 - if err := database.InitMongo(); err != nil { - log.Error("init mongodb error:" + err.Error()) - debug.PrintStack() - panic(err) - } - log.Info("初始化Mongodb数据库成功") - - // 初始化Redis数据库 - if err := database.InitRedis(); err != nil { - log.Error("init redis error:" + err.Error()) - debug.PrintStack() - panic(err) - } - - var node = Node{ - Key: "c4:b3:01:bd:b5:e7", - Name: "10.27.238.101", - Ip: "10.27.238.101", - Port: "8000", - Mac: "c4:b3:01:bd:b5:e7", - Status: constants.StatusOnline, - IsMaster: true, - } - if err := node.Add(); err != nil { - log.Error("add node error:" + err.Error()) - panic(err) - } - }) -} diff --git a/backend/model/spider.go b/backend/model/spider.go index e0e5f836..c4c94edf 100644 --- a/backend/model/spider.go +++ b/backend/model/spider.go @@ -23,14 +23,13 @@ type Spider struct { Col string `json:"col"` // 结果储存位置 Site string `json:"site"` // 爬虫网站 Envs []Env `json:"envs" bson:"envs"` // 环境变量 - Remark string `json:"remark"` // 备注 + // 自定义爬虫 Src string `json:"src" bson:"src"` // 源码位置 Cmd string `json:"cmd" bson:"cmd"` // 执行命令 // 前端展示 - LastRunTs time.Time `json:"last_run_ts"` // 最后一次执行时间 - LastStatus string `json:"last_status"` // 最后执行状态 + LastRunTs time.Time `json:"last_run_ts"` // 最后一次执行时间 // TODO: 可配置爬虫 //Fields []interface{} `json:"fields"` @@ -93,13 +92,15 @@ func (spider *Spider) GetLastTask() (Task, error) { return tasks[0], nil } + + func GetSpiderList(filter interface{}, skip int, limit int) ([]Spider, error) { s, c := database.GetCol("spiders") defer s.Close() // 获取爬虫列表 spiders := []Spider{} - if err := c.Find(filter).Skip(skip).Limit(limit).Sort("+name").All(&spiders); err != nil { + if err := c.Find(filter).Skip(skip).Limit(limit).All(&spiders); err != nil { debug.PrintStack() return spiders, err } @@ -116,7 +117,6 @@ func GetSpiderList(filter interface{}, skip int, limit int) ([]Spider, error) { // 赋值 spiders[i].LastRunTs = task.CreateTs - spiders[i].LastStatus = task.Status } return spiders, nil @@ -165,33 +165,6 @@ func RemoveSpider(id bson.ObjectId) error { return err } - // gf上的文件 - s, gf := database.GetGridFs("files") - defer s.Close() - - if err := gf.RemoveId(result.FileId); err != nil { - log.Error("remove file error, id:" + result.FileId.Hex()) - return err - } - - return nil -} - -func RemoveAllSpider() error { - s, c := database.GetCol("spiders") - defer s.Close() - - spiders := []Spider{} - err := c.Find(nil).All(&spiders) - if err != nil { - log.Error("get all spiders error:" + err.Error()) - return err - } - for _, spider := range spiders { - if err := RemoveSpider(spider.Id); err != nil { - log.Error("remove spider error:" + err.Error()) - } - } return nil } diff --git a/backend/model/task.go b/backend/model/task.go index 968055a6..8ae782b5 100644 --- a/backend/model/task.go +++ b/backend/model/task.go @@ -190,21 +190,6 @@ func RemoveTask(id string) error { return nil } -func RemoveTaskBySpiderId(id string) error { - tasks, err := GetTaskList(bson.M{"spider_id": id}, 0, constants.Infinite, "-create_ts") - if err != nil { - log.Error("get tasks error:" + err.Error()) - } - - for _, task := range tasks { - if err := RemoveTask(task.Id); err != nil { - log.Error("remove task error:" + err.Error()) - continue - } - } - return nil -} - func GetTaskCount(query interface{}) (int, error) { s, c := database.GetCol("tasks") defer s.Close() @@ -222,7 +207,7 @@ func GetDailyTaskStats(query bson.M) ([]TaskDailyItem, error) { defer s.Close() // 起始日期 - startDate := time.Now().Add(-30 * 24 * time.Hour) + startDate := time.Now().Add(- 30 * 24 * time.Hour) endDate := time.Now() // query diff --git a/backend/routes/spider.go b/backend/routes/spider.go index f1a3c9e5..dceb2651 100644 --- a/backend/routes/spider.go +++ b/backend/routes/spider.go @@ -229,12 +229,6 @@ func DeleteSpider(c *gin.Context) { return } - // 删除爬虫对应的task任务 - if err := model.RemoveTaskBySpiderId(spider.Id.Hex()); err != nil { - HandleError(http.StatusInternalServerError, c, err) - return - } - c.JSON(http.StatusOK, Response{ Status: "ok", Message: "success", diff --git a/backend/routes/user.go b/backend/routes/user.go index a6d44cae..a3d5a431 100644 --- a/backend/routes/user.go +++ b/backend/routes/user.go @@ -4,7 +4,6 @@ import ( "crawlab/constants" "crawlab/model" "crawlab/services" - "crawlab/services/context" "crawlab/utils" "github.com/gin-gonic/gin" "github.com/globalsign/mgo/bson" @@ -172,7 +171,7 @@ func Login(c *gin.Context) { } // 获取token - tokenStr, err := services.MakeToken(&user) + tokenStr, err := services.GetToken(user.Username) if err != nil { HandleError(http.StatusUnauthorized, c, errors.New("not authorized")) return @@ -186,16 +185,20 @@ func Login(c *gin.Context) { } func GetMe(c *gin.Context) { - ctx := context.WithGinContext(c) - user := ctx.User() - if user == nil { - ctx.FailedWithError(constants.ErrorUserNotFound, http.StatusUnauthorized) + // 获取token string + tokenStr := c.GetHeader("Authorization") + + // 校验token + user, err := services.CheckToken(tokenStr) + if err != nil { + HandleError(http.StatusUnauthorized, c, errors.New("not authorized")) return } - ctx.Success(struct { - *model.User - Password string `json:"password,omitempty"` - }{ - User: user, - }, nil) + user.Password = "" + + c.JSON(http.StatusOK, Response{ + Status: "ok", + Message: "success", + Data: user, + }) } diff --git a/backend/routes/utils.go b/backend/routes/utils.go index 38ca35bb..14c5853e 100644 --- a/backend/routes/utils.go +++ b/backend/routes/utils.go @@ -1,15 +1,13 @@ package routes import ( - "github.com/apex/log" "github.com/gin-gonic/gin" "runtime/debug" ) func HandleError(statusCode int, c *gin.Context, err error) { - log.Errorf("handle error:" + err.Error()) debug.PrintStack() - c.AbortWithStatusJSON(statusCode, Response{ + c.JSON(statusCode, Response{ Status: "ok", Message: "error", Error: err.Error(), @@ -18,7 +16,7 @@ func HandleError(statusCode int, c *gin.Context, err error) { func HandleErrorF(statusCode int, c *gin.Context, err string) { debug.PrintStack() - c.AbortWithStatusJSON(statusCode, Response{ + c.JSON(statusCode, Response{ Status: "ok", Message: "error", Error: err, diff --git a/backend/services/context/context.go b/backend/services/context/context.go deleted file mode 100644 index ce8eb72e..00000000 --- a/backend/services/context/context.go +++ /dev/null @@ -1,100 +0,0 @@ -package context - -import ( - "crawlab/constants" - "crawlab/errors" - "crawlab/model" - "fmt" - "github.com/apex/log" - "github.com/gin-gonic/gin" - errors2 "github.com/pkg/errors" - "gopkg.in/go-playground/validator.v9" - "net/http" - "runtime/debug" -) - -type Context struct { - *gin.Context -} - -func (c *Context) User() *model.User { - userIfe, exists := c.Get(constants.ContextUser) - if !exists { - return nil - } - user, ok := userIfe.(*model.User) - if !ok { - return nil - } - return user -} -func (c *Context) Success(data interface{}, metas ...interface{}) { - var meta interface{} - if len(metas) == 0 { - meta = gin.H{} - } else { - meta = metas[0] - } - if data == nil { - data = gin.H{} - } - c.JSON(http.StatusOK, gin.H{ - "status": "ok", - "message": "success", - "data": data, - "meta": meta, - "error": "", - }) -} -func (c *Context) Failed(err error, variables ...interface{}) { - c.failed(err, http.StatusOK, variables...) -} -func (c *Context) failed(err error, httpCode int, variables ...interface{}) { - errStr := err.Error() - if len(variables) > 0 { - errStr = fmt.Sprintf(errStr, variables...) - } - log.Errorf("handle error:" + errStr) - debug.PrintStack() - causeError := errors2.Cause(err) - switch causeError.(type) { - case errors.OPError: - opError := causeError.(errors.OPError) - - c.AbortWithStatusJSON(opError.HttpCode, gin.H{ - "status": "ok", - "message": "error", - "error": errStr, - }) - break - case validator.ValidationErrors: - validatorErrors := causeError.(validator.ValidationErrors) - //firstError := validatorErrors[0].(validator.FieldError) - c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{ - "status": "ok", - "message": "error", - "error": validatorErrors.Error(), - }) - break - default: - fmt.Println("deprecated....") - c.AbortWithStatusJSON(httpCode, gin.H{ - "status": "ok", - "message": "error", - "error": errStr, - }) - } -} -func (c *Context) FailedWithError(err error, httpCode ...int) { - - var code = 200 - if len(httpCode) > 0 { - code = httpCode[0] - } - c.failed(err, code) - -} - -func WithGinContext(context *gin.Context) *Context { - return &Context{Context: context} -} diff --git a/backend/services/log.go b/backend/services/log.go index 088a825a..d59e463e 100644 --- a/backend/services/log.go +++ b/backend/services/log.go @@ -3,15 +3,11 @@ package services import ( "crawlab/constants" "crawlab/database" - "crawlab/lib/cron" "crawlab/model" "crawlab/utils" "encoding/json" "github.com/apex/log" - "github.com/spf13/viper" "io/ioutil" - "os" - "path/filepath" "runtime/debug" ) @@ -20,38 +16,13 @@ var TaskLogChanMap = utils.NewChanMap() // 获取本地日志 func GetLocalLog(logPath string) (fileBytes []byte, err error) { - - f, err := os.Open(logPath) + fileBytes, err = ioutil.ReadFile(logPath) if err != nil { - log.Error(err.Error()) + log.Errorf(err.Error()) debug.PrintStack() - return nil, err + return fileBytes, err } - fi, err := f.Stat() - if err != nil { - log.Error(err.Error()) - debug.PrintStack() - return nil, err - } - defer f.Close() - - const bufLen = 2 * 1024 * 1024 - logBuf := make([]byte, bufLen) - - off := int64(0) - if fi.Size() > int64(len(logBuf)) { - off = fi.Size() - int64(len(logBuf)) - } - n, err := f.ReadAt(logBuf, off) - - //到文件结尾会有EOF标识 - if err != nil && err.Error() != "EOF" { - log.Error(err.Error()) - debug.PrintStack() - return nil, err - } - logBuf = logBuf[:n] - return logBuf, nil + return fileBytes, nil } // 获取远端日志 @@ -71,7 +42,7 @@ func GetRemoteLog(task model.Task) (logStr string, err error) { // 发布获取日志消息 channel := "nodes:" + task.NodeId.Hex() - if _, err := database.RedisClient.Publish(channel, string(msgBytes)); err != nil { + if err := database.Publish(channel, string(msgBytes)); err != nil { log.Errorf(err.Error()) return "", err } @@ -84,36 +55,3 @@ func GetRemoteLog(task model.Task) (logStr string, err error) { return logStr, nil } - -func DeleteLogPeriodically() { - logDir := viper.GetString("log.path") - if !utils.Exists(logDir) { - log.Error("Can Not Set Delete Logs Periodically,No Log Dir") - return - } - rd, err := ioutil.ReadDir(logDir) - if err != nil { - log.Error("Read Log Dir Failed") - return - } - - for _, fi := range rd { - if fi.IsDir() { - log.Info(filepath.Join(logDir, fi.Name())) - os.RemoveAll(filepath.Join(logDir, fi.Name())) - log.Info("Delete Log File Success") - } - } - -} - -func InitDeleteLogPeriodically() error { - c := cron.New(cron.WithSeconds()) - if _, err := c.AddFunc(viper.GetString("log.deleteFrequency"), DeleteLogPeriodically); err != nil { - return err - } - - c.Start() - return nil - -} diff --git a/backend/services/log_test.go b/backend/services/log_test.go deleted file mode 100644 index a0b049c5..00000000 --- a/backend/services/log_test.go +++ /dev/null @@ -1,50 +0,0 @@ -package services - -import ( - "crawlab/config" - "fmt" - "github.com/apex/log" - . "github.com/smartystreets/goconvey/convey" - "github.com/spf13/viper" - "os" - "testing" -) - -func TestDeleteLogPeriodically(t *testing.T) { - Convey("Test DeleteLogPeriodically", t, func() { - if err := config.InitConfig("../conf/config.yml"); err != nil { - log.Error("init config error:" + err.Error()) - panic(err) - } - log.Info("初始化配置成功") - logDir := viper.GetString("log.path") - log.Info(logDir) - DeleteLogPeriodically() - }) -} - -func TestGetLocalLog(t *testing.T) { - //create a log file for test - logPath := "../logs/crawlab/test.log" - f, err := os.Create(logPath) - defer f.Close() - if err != nil { - fmt.Println(err.Error()) - - } else { - _, err = f.Write([]byte("This is for test")) - } - - Convey("Test GetLocalLog", t, func() { - Convey("Test response", func() { - logStr, err := GetLocalLog(logPath) - log.Info(string(logStr)) - fmt.Println(err) - So(err, ShouldEqual, nil) - - }) - }) - //delete the test log file - os.Remove(logPath) - -} diff --git a/backend/services/node.go b/backend/services/node.go index eb24f759..1fa2370c 100644 --- a/backend/services/node.go +++ b/backend/services/node.go @@ -1,7 +1,6 @@ package services import ( - "context" "crawlab/constants" "crawlab/database" "crawlab/lib/cron" @@ -11,7 +10,6 @@ import ( "fmt" "github.com/apex/log" "github.com/globalsign/mgo/bson" - "github.com/gomodule/redigo/redis" "github.com/spf13/viper" "runtime/debug" "time" @@ -75,11 +73,23 @@ func GetCurrentNode() (model.Node, error) { if err != nil { // 如果为主节点,表示为第一次注册,插入节点信息 if IsMaster() { - // 获取本机信息 - ip, mac, key, err := model.GetNodeBaseInfo() + // 获取本机IP地址 + ip, err := register.GetRegister().GetIp() if err != nil { debug.PrintStack() - return node, err + return model.Node{}, err + } + + mac, err := register.GetRegister().GetMac() + if err != nil { + debug.PrintStack() + return model.Node{}, err + } + + key, err := register.GetRegister().GetKey() + if err != nil { + debug.PrintStack() + return model.Node{}, err } // 生成节点 @@ -87,7 +97,7 @@ func GetCurrentNode() (model.Node, error) { Key: key, Id: bson.NewObjectId(), Ip: ip, - Name: ip, + Name: key, Mac: mac, IsMaster: true, } @@ -114,7 +124,6 @@ func IsMaster() bool { return viper.GetString("server.master") == Yes } -// 所有调用IsMasterNode的方法,都永远会在master节点执行,所以GetCurrentNode方法返回永远是master节点 // 该ID的节点是否为主节点 func IsMasterNode(id string) bool { curNode, _ := GetCurrentNode() @@ -167,54 +176,72 @@ func UpdateNodeStatus() { // 在Redis中删除该节点 if err := database.RedisClient.HDel("nodes", data.Key); err != nil { log.Errorf(err.Error()) + return + } + + // 在MongoDB中该节点设置状态为离线 + s, c := database.GetCol("nodes") + defer s.Close() + var node model.Node + if err := c.Find(bson.M{"key": key}).One(&node); err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + return + } + node.Status = constants.StatusOffline + if err := node.Save(); err != nil { + log.Errorf(err.Error()) + return } continue } - // 处理node信息 - handleNodeInfo(key, data) - } - - // 重置不在redis的key为offline - model.ResetNodeStatusToOffline(list) -} - -func handleNodeInfo(key string, data Data) { - // 更新节点信息到数据库 - s, c := database.GetCol("nodes") - defer s.Close() - - // 同个key可能因为并发,被注册多次 - var nodes []model.Node - _ = c.Find(bson.M{"key": key}).All(&nodes) - if nodes != nil && len(nodes) > 1 { - for _, node := range nodes { - _ = c.RemoveId(node.Id) + // 更新节点信息到数据库 + s, c := database.GetCol("nodes") + defer s.Close() + var node model.Node + if err := c.Find(bson.M{"key": key}).One(&node); err != nil { + // 数据库不存在该节点 + node = model.Node{ + Key: key, + Name: key, + Ip: data.Ip, + Port: "8000", + Mac: data.Mac, + Status: constants.StatusOnline, + IsMaster: data.Master, + } + if err := node.Add(); err != nil { + log.Errorf(err.Error()) + return + } + } else { + // 数据库存在该节点 + node.Status = constants.StatusOnline + if err := node.Save(); err != nil { + log.Errorf(err.Error()) + return + } } } - var node model.Node - if err := c.Find(bson.M{"key": key}).One(&node); err != nil { - // 数据库不存在该节点 - node = model.Node{ - Key: key, - Name: data.Ip, - Ip: data.Ip, - Port: "8000", - Mac: data.Mac, - Status: constants.StatusOnline, - IsMaster: data.Master, + // 遍历数据库中的节点列表 + nodes, err := model.GetNodeList(nil) + for _, node := range nodes { + hasNode := false + for _, key := range list { + if key == node.Key { + hasNode = true + break + } } - if err := node.Add(); err != nil { - log.Errorf(err.Error()) - return - } - } else { - // 数据库存在该节点 - node.Status = constants.StatusOnline - if err := node.Save(); err != nil { - log.Errorf(err.Error()) - return + if !hasNode { + node.Status = constants.StatusOffline + if err := node.Save(); err != nil { + log.Errorf(err.Error()) + return + } + continue } } } @@ -260,12 +287,13 @@ func UpdateNodeData() { } } -func MasterNodeCallback(message redis.Message) (err error) { +func MasterNodeCallback(channel string, msgStr string) { // 反序列化 var msg NodeMessage - if err := json.Unmarshal(message.Data, &msg); err != nil { - - return err + if err := json.Unmarshal([]byte(msgStr), &msg); err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + return } if msg.Type == constants.MsgTypeGetLog { @@ -282,15 +310,16 @@ func MasterNodeCallback(message redis.Message) (err error) { sysInfoBytes, _ := json.Marshal(&msg.SysInfo) ch <- string(sysInfoBytes) } - return nil } -func WorkerNodeCallback(message redis.Message) (err error) { +func WorkerNodeCallback(channel string, msgStr string) { // 反序列化 msg := NodeMessage{} - if err := json.Unmarshal(message.Data, &msg); err != nil { - - return err + fmt.Println(msgStr) + if err := json.Unmarshal([]byte(msgStr), &msg); err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + return } if msg.Type == constants.MsgTypeGetLog { @@ -304,27 +333,26 @@ func WorkerNodeCallback(message redis.Message) (err error) { // 获取本地日志 logStr, err := GetLocalLog(msg.LogPath) - log.Info(string(logStr)) if err != nil { log.Errorf(err.Error()) debug.PrintStack() msgSd.Error = err.Error() - msgSd.Log = err.Error() - } else { - msgSd.Log = string(logStr) } + msgSd.Log = string(logStr) // 序列化 msgSdBytes, err := json.Marshal(&msgSd) if err != nil { - return err + log.Errorf(err.Error()) + debug.PrintStack() + return } // 发布消息给主节点 - log.Info("publish get log msg to master") - if _, err := database.RedisClient.Publish("nodes:master", string(msgSdBytes)); err != nil { - - return err + fmt.Println(msgSd) + if err := database.Publish("nodes:master", string(msgSdBytes)); err != nil { + log.Errorf(err.Error()) + return } } else if msg.Type == constants.MsgTypeCancelTask { // 取消任务 @@ -334,7 +362,8 @@ func WorkerNodeCallback(message redis.Message) (err error) { // 获取环境信息 sysInfo, err := GetLocalSystemInfo() if err != nil { - return err + log.Errorf(err.Error()) + return } msgSd := NodeMessage{ Type: constants.MsgTypeGetSystemInfo, @@ -345,14 +374,14 @@ func WorkerNodeCallback(message redis.Message) (err error) { if err != nil { log.Errorf(err.Error()) debug.PrintStack() - return err + return } - if _, err := database.RedisClient.Publish("nodes:master", string(msgSdBytes)); err != nil { + fmt.Println(msgSd) + if err := database.Publish("nodes:master", string(msgSdBytes)); err != nil { log.Errorf(err.Error()) - return err + return } } - return } // 初始化节点服务 @@ -370,27 +399,25 @@ func InitNodeService() error { // 首次更新节点数据(注册到Redis) UpdateNodeData() + // 消息订阅 + var sub database.Subscriber + sub.Connect() + // 获取当前节点 node, err := GetCurrentNode() if err != nil { log.Errorf(err.Error()) return err } - ctx := context.Background() + if IsMaster() { // 如果为主节点,订阅主节点通信频道 channel := "nodes:master" - err := database.RedisClient.Subscribe(ctx, MasterNodeCallback, channel) - if err != nil { - return err - } + sub.Subscribe(channel, MasterNodeCallback) } else { // 若为工作节点,订阅单独指定通信频道 channel := "nodes:" + node.Id.Hex() - err := database.RedisClient.Subscribe(ctx, WorkerNodeCallback, channel) - if err != nil { - return err - } + sub.Subscribe(channel, WorkerNodeCallback) } // 如果为主节点,每30秒刷新所有节点信息 diff --git a/backend/services/spider.go b/backend/services/spider.go index 87b4a1d5..f4f856e6 100644 --- a/backend/services/spider.go +++ b/backend/services/spider.go @@ -1,7 +1,6 @@ package services import ( - "context" "crawlab/constants" "crawlab/database" "crawlab/lib/cron" @@ -12,7 +11,6 @@ import ( "github.com/apex/log" "github.com/globalsign/mgo" "github.com/globalsign/mgo/bson" - "github.com/gomodule/redigo/redis" "github.com/pkg/errors" "github.com/satori/go.uuid" "github.com/spf13/viper" @@ -22,7 +20,6 @@ import ( "path/filepath" "runtime/debug" "strings" - "syscall" ) type SpiderFileData struct { @@ -33,7 +30,6 @@ type SpiderFileData struct { type SpiderUploadMessage struct { FileId string FileName string - SpiderId string } // 从项目目录中获取爬虫列表 @@ -43,9 +39,7 @@ func GetSpidersFromDir() ([]model.Spider, error) { // 如果爬虫项目目录不存在,则创建一个 if !utils.Exists(srcPath) { - mask := syscall.Umask(0) // 改为 0000 八进制 - defer syscall.Umask(mask) // 改为原来的 umask - if err := os.MkdirAll(srcPath, 0766); err != nil { + if err := os.MkdirAll(srcPath, 0666); err != nil { debug.PrintStack() return []model.Spider{}, err } @@ -91,25 +85,16 @@ func GetSpidersFromDir() ([]model.Spider, error) { // 将爬虫保存到数据库 func SaveSpiders(spiders []model.Spider) error { - s, c := database.GetCol("spiders") - defer s.Close() - - if len(spiders) == 0 { - err := model.RemoveAllSpider() - if err != nil { - log.Error("remove all spider error:" + err.Error()) - return err - } - log.Info("get spider from dir is empty,removed all spider") - return nil - } - // 如果该爬虫不存在于数据库,则保存爬虫到数据库 + // 遍历爬虫列表 for _, spider := range spiders { // 忽略非自定义爬虫 if spider.Type != constants.Customized { continue } + // 如果该爬虫不存在于数据库,则保存爬虫到数据库 + s, c := database.GetCol("spiders") + defer s.Close() var spider_ *model.Spider if err := c.Find(bson.M{"src": spider.Src}).One(&spider_); err != nil { // 不存在 @@ -117,8 +102,11 @@ func SaveSpiders(spiders []model.Spider) error { debug.PrintStack() return err } + } else { + // 存在 } } + return nil } @@ -143,14 +131,15 @@ func ZipSpider(spider model.Spider) (filePath string, err error) { // 如果源文件夹不存在,抛错 if !utils.Exists(spider.Src) { debug.PrintStack() - // 删除该爬虫,否则会一直报错 - _ = model.RemoveSpider(spider.Id) return "", errors.New("source path does not exist") } // 临时文件路径 randomId := uuid.NewV4() - + if err != nil { + debug.PrintStack() + return "", err + } filePath = filepath.Join( viper.GetString("other.tmppath"), randomId.String()+".zip", @@ -182,7 +171,6 @@ func UploadToGridFs(spider model.Spider, fileName string, filePath string) (fid // 如果存在FileId删除GridFS上的老文件 if !utils.IsObjectIdNull(spider.FileId) { if err = gf.RemoveId(spider.FileId); err != nil { - log.Error("remove gf file:" + err.Error()) debug.PrintStack() } } @@ -235,7 +223,7 @@ func ReadFileByStep(filePath string, handle func([]byte, *mgo.GridFile), fileCre for { switch nr, err := f.Read(s[:]); true { case nr < 0: - _, _ = fmt.Fprintf(os.Stderr, "cat: error reading: %s\n", err.Error()) + fmt.Fprintf(os.Stderr, "cat: error reading: %s\n", err.Error()) debug.PrintStack() case nr == 0: // EOF return nil @@ -243,6 +231,7 @@ func ReadFileByStep(filePath string, handle func([]byte, *mgo.GridFile), fileCre handle(s[0:nr], fileCreate) } } + return nil } // 发布所有爬虫 @@ -258,8 +247,8 @@ func PublishAllSpiders() error { for _, spider := range spiders { // 发布爬虫 if err := PublishSpider(spider); err != nil { - log.Errorf("publish spider error:" + err.Error()) - // return err + log.Errorf(err.Error()) + return err } } @@ -300,14 +289,13 @@ func PublishSpider(spider model.Spider) (err error) { msg := SpiderUploadMessage{ FileId: fid.Hex(), FileName: fileName, - SpiderId: spider.Id.Hex(), } msgStr, err := json.Marshal(msg) if err != nil { return } channel := "files:upload" - if _, err = database.RedisClient.Publish(channel, string(msgStr)); err != nil { + if err = database.Publish(channel, string(msgStr)); err != nil { log.Errorf(err.Error()) debug.PrintStack() return @@ -317,24 +305,24 @@ func PublishSpider(spider model.Spider) (err error) { } // 上传爬虫回调 -func OnFileUpload(message redis.Message) (err error) { +func OnFileUpload(channel string, msgStr string) { s, gf := database.GetGridFs("files") defer s.Close() // 反序列化消息 var msg SpiderUploadMessage - if err := json.Unmarshal(message.Data, &msg); err != nil { + if err := json.Unmarshal([]byte(msgStr), &msg); err != nil { log.Errorf(err.Error()) debug.PrintStack() - return err + return } // 从GridFS获取该文件 f, err := gf.OpenId(bson.ObjectIdHex(msg.FileId)) if err != nil { - log.Errorf("open file id: " + msg.FileId + ", spider id:" + msg.SpiderId + ", error: " + err.Error()) + log.Errorf(err.Error()) debug.PrintStack() - return err + return } defer f.Close() @@ -347,7 +335,7 @@ func OnFileUpload(message redis.Message) (err error) { if err != nil { log.Errorf(err.Error()) debug.PrintStack() - return err + return } defer tmpFile.Close() @@ -355,34 +343,33 @@ func OnFileUpload(message redis.Message) (err error) { if _, err := io.Copy(tmpFile, f); err != nil { log.Errorf(err.Error()) debug.PrintStack() - return err + return } // 解压缩临时文件到目标文件夹 dstPath := filepath.Join( viper.GetString("spider.path"), - // strings.Replace(msg.FileName, ".zip", "", -1), + //strings.Replace(msg.FileName, ".zip", "", -1), ) if err := utils.DeCompress(tmpFile, dstPath); err != nil { log.Errorf(err.Error()) debug.PrintStack() - return err + return } // 关闭临时文件 if err := tmpFile.Close(); err != nil { log.Errorf(err.Error()) debug.PrintStack() - return err + return } // 删除临时文件 if err := os.Remove(tmpFilePath); err != nil { log.Errorf(err.Error()) debug.PrintStack() - return err + return } - return nil } // 启动爬虫服务 @@ -407,11 +394,9 @@ func InitSpiderService() error { // 订阅文件上传 channel := "files:upload" - - //sub.Connect() - ctx := context.Background() - return database.RedisClient.Subscribe(ctx, OnFileUpload, channel) - + var sub database.Subscriber + sub.Connect() + sub.Subscribe(channel, OnFileUpload) } // 启动定时任务 diff --git a/backend/services/system.go b/backend/services/system.go index ff177aa0..5f50dec9 100644 --- a/backend/services/system.go +++ b/backend/services/system.go @@ -112,7 +112,7 @@ func GetRemoteSystemInfo(id string) (sysInfo model.SystemInfo, err error) { // 序列化 msgBytes, _ := json.Marshal(&msg) - if _, err := database.RedisClient.Publish("nodes:"+id, string(msgBytes)); err != nil { + if err := database.Publish("nodes:"+id, string(msgBytes)); err != nil { return model.SystemInfo{}, err } diff --git a/backend/services/task.go b/backend/services/task.go index 6ba6b257..8c0ff8a1 100644 --- a/backend/services/task.go +++ b/backend/services/task.go @@ -408,12 +408,9 @@ func GetTaskLog(id string) (logStr string, err error) { logStr = string(logBytes) if err != nil { log.Errorf(err.Error()) - logStr = string(err.Error()) - // return "", err - } else { - logStr = string(logBytes) + return "", err } - + logStr = string(logBytes) } else { // 若不为主节点,获取远端日志 logStr, err = GetRemoteLog(task) @@ -466,7 +463,7 @@ func CancelTask(id string) (err error) { } // 发布消息 - if _, err := database.RedisClient.Publish("nodes:"+task.NodeId.Hex(), string(msgBytes)); err != nil { + if err := database.Publish("nodes:"+task.NodeId.Hex(), string(msgBytes)); err != nil { return err } } @@ -475,7 +472,6 @@ func CancelTask(id string) (err error) { } func HandleTaskError(t model.Task, err error) { - log.Error("handle task error:" + err.Error()) t.Status = constants.StatusError t.Error = err.Error() t.FinishTs = time.Now() diff --git a/backend/services/user.go b/backend/services/user.go index 4811f767..fb688fd1 100644 --- a/backend/services/user.go +++ b/backend/services/user.go @@ -5,9 +5,11 @@ import ( "crawlab/model" "crawlab/utils" "errors" + "github.com/apex/log" "github.com/dgrijalva/jwt-go" "github.com/globalsign/mgo/bson" "github.com/spf13/viper" + "runtime/debug" "time" ) @@ -22,38 +24,28 @@ func InitUserService() error { } return nil } -func MakeToken(user *model.User) (tokenStr string, err error) { + +func GetToken(username string) (tokenStr string, err error) { + user, err := model.GetUserByUsername(username) + if err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + return + } + token := jwt.NewWithClaims(jwt.SigningMethodHS256, jwt.MapClaims{ "id": user.Id, "username": user.Username, "nbf": time.Now().Unix(), }) - return token.SignedString([]byte(viper.GetString("server.secret"))) - + tokenStr, err = token.SignedString([]byte(viper.GetString("server.secret"))) + if err != nil { + return + } + return } -//func GetToken(username string) (tokenStr string, err error) { -// user, err := model.GetUserByUsername(username) -// if err != nil { -// log.Errorf(err.Error()) -// debug.PrintStack() -// return -// } -// -// token := jwt.NewWithClaims(jwt.SigningMethodHS256, jwt.MapClaims{ -// "id": user.Id, -// "username": user.Username, -// "nbf": time.Now().Unix(), -// }) -// -// tokenStr, err = token.SignedString([]byte(viper.GetString("server.secret"))) -// if err != nil { -// return -// } -// return -//} - func SecretFunc() jwt.Keyfunc { return func(token *jwt.Token) (interface{}, error) { return []byte(viper.GetString("server.secret")), nil diff --git a/backend/utils/file.go b/backend/utils/file.go index dda73c13..9a4300a1 100644 --- a/backend/utils/file.go +++ b/backend/utils/file.go @@ -179,11 +179,11 @@ func _Compress(file *os.File, prefix string, zw *zip.Writer) error { } } else { header, err := zip.FileInfoHeader(info) + header.Name = prefix + "/" + header.Name if err != nil { debug.PrintStack() return err } - header.Name = prefix + "/" + header.Name writer, err := zw.CreateHeader(header) if err != nil { debug.PrintStack() diff --git a/backend/utils/file_test.go b/backend/utils/file_test.go index 64f2df6d..484366f5 100644 --- a/backend/utils/file_test.go +++ b/backend/utils/file_test.go @@ -1,12 +1,8 @@ package utils import ( - "archive/zip" . "github.com/smartystreets/goconvey/convey" - "io" - "log" "os" - "runtime/debug" "testing" ) @@ -42,13 +38,9 @@ func TestIsDir(t *testing.T) { } func TestCompress(t *testing.T) { - err := os.Mkdir("testCompress", os.ModePerm) - if err != nil { - t.Error("create testCompress failed") - } - var pathString = "testCompress" + var pathString = "../utils" var files []*os.File - var disPath = "testCompress" + var disPath = "../utils/test" file, err := os.Open(pathString) if err != nil { t.Error("open source path failed") @@ -60,60 +52,15 @@ func TestCompress(t *testing.T) { So(er, ShouldEqual, nil) }) }) - os.RemoveAll("testCompress") } -func Zip(zipFile string, fileList []string) error { - // 创建 zip 包文件 - fw, err := os.Create(zipFile) - if err != nil { - log.Fatal() - } - defer fw.Close() - - // 实例化新的 zip.Writer - zw := zip.NewWriter(fw) - defer func() { - // 检测一下是否成功关闭 - if err := zw.Close(); err != nil { - log.Fatalln(err) - } - }() - - for _, fileName := range fileList { - fr, err := os.Open(fileName) - if err != nil { - return err - } - fi, err := fr.Stat() - if err != nil { - return err - } - // 写入文件的头信息 - fh, err := zip.FileInfoHeader(fi) - w, err := zw.CreateHeader(fh) - if err != nil { - return err - } - // 写入文件内容 - _, err = io.Copy(w, fr) - if err != nil { - return err - } - } - return nil -} +// 测试之前需存在有效的test(.zip)文件 func TestDeCompress(t *testing.T) { - err := os.Mkdir("testDeCompress", os.ModePerm) - err = Zip("demo.zip", []string{}) + var tmpFilePath = "./test" + tmpFile, err := os.OpenFile(tmpFilePath, os.O_RDONLY, 0777) if err != nil { - t.Error("create zip file failed") - } - tmpFile, err := os.OpenFile("demo.zip", os.O_RDONLY, 0777) - if err != nil { - debug.PrintStack() - t.Error("open demo.zip failed") + t.Fatal("open zip file failed") } var dstPath = "./testDeCompress" Convey("Test DeCopmress func", t, func() { @@ -121,7 +68,5 @@ func TestDeCompress(t *testing.T) { err := DeCompress(tmpFile, dstPath) So(err, ShouldEqual, nil) }) - os.RemoveAll("testDeCompress") - os.Remove("demo.zip") } diff --git a/docker/Dockerfile.master.alpine b/docker/Dockerfile.master.alpine index b9dbb742..6979861b 100644 --- a/docker/Dockerfile.master.alpine +++ b/docker/Dockerfile.master.alpine @@ -75,7 +75,7 @@ RUN sed -i 's/#rc_sys=""/rc_sys="lxc"/g' /etc/rc.conf && \ # working directory WORKDIR /app/backend -ENV PYTHONIOENCODING utf-8 + # frontend port EXPOSE 8080 diff --git a/docker/Dockerfile.worker.alpine b/docker/Dockerfile.worker.alpine index 388125a2..e7a66776 100644 --- a/docker/Dockerfile.worker.alpine +++ b/docker/Dockerfile.worker.alpine @@ -35,7 +35,7 @@ RUN apk del .build-deps # working directory WORKDIR /app/backend -ENV PYTHONIOENCODING utf-8 + # backend port EXPOSE 8000 diff --git a/frontend/package.json b/frontend/package.json index e3bc84f8..139297d3 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -6,7 +6,6 @@ "serve": "vue-cli-service serve --ip=0.0.0.0", "serve:prod": "vue-cli-service serve --mode=production --ip=0.0.0.0", "config": "vue ui", - "build:dev": "vue-cli-service build --mode development", "build:prod": "vue-cli-service build --mode production", "lint": "vue-cli-service lint", "test:unit": "vue-cli-service test:unit" diff --git a/frontend/src/api/request.js b/frontend/src/api/request.js index 22707159..38734c46 100644 --- a/frontend/src/api/request.js +++ b/frontend/src/api/request.js @@ -3,51 +3,28 @@ import router from '../router' let baseUrl = process.env.VUE_APP_BASE_URL ? process.env.VUE_APP_BASE_URL : 'http://localhost:8000' -const request = async (method, path, params, data, others = {}) => { - try { +const request = (method, path, params, data) => { + return new Promise((resolve, reject) => { const url = baseUrl + path const headers = { 'Authorization': window.localStorage.getItem('token') } - const response = await axios({ + axios({ method, url, params, data, - headers, - ...others + headers }) - // console.log(response) - return response - } catch (e) { - if (e.response.status === 401 && router.currentRoute.path !== '/login') { - router.push('/login') - } - await Promise.reject(e) - } - - // return new Promise((resolve, reject) => { - // const url = baseUrl + path - // const headers = { - // 'Authorization': window.localStorage.getItem('token') - // } - // axios({ - // method, - // url, - // params, - // data, - // headers, - // ...others - // }) - // .then(resolve) - // .catch(error => { - // console.log(error) - // if (error.response.status === 401) { - // router.push('/login') - // } - // reject(error) - // }) - // }) + .then(resolve) + .catch(error => { + console.log(error) + if (error.response.status === 401) { + router.push('/login') + } + reject(error) + }) + }) } const get = (path, params) => { diff --git a/frontend/src/components/InfoView/NodeInfoView.vue b/frontend/src/components/InfoView/NodeInfoView.vue index e6ffb58a..8e350448 100644 --- a/frontend/src/components/InfoView/NodeInfoView.vue +++ b/frontend/src/components/InfoView/NodeInfoView.vue @@ -22,7 +22,7 @@ - {{$t('Save')}} + {{$t('Save')}} diff --git a/frontend/src/components/InfoView/SpiderInfoView.vue b/frontend/src/components/InfoView/SpiderInfoView.vue index 39702a5d..661e4757 100644 --- a/frontend/src/components/InfoView/SpiderInfoView.vue +++ b/frontend/src/components/InfoView/SpiderInfoView.vue @@ -44,9 +44,6 @@ - - - diff --git a/frontend/src/components/InfoView/TaskInfoView.vue b/frontend/src/components/InfoView/TaskInfoView.vue index e902e959..bfe6419a 100644 --- a/frontend/src/components/InfoView/TaskInfoView.vue +++ b/frontend/src/components/InfoView/TaskInfoView.vue @@ -86,15 +86,15 @@ export default { return dayjs(str).format('YYYY-MM-DD HH:mm:ss') }, getWaitDuration (row) { - if (!row.start_ts || row.start_ts.match('^0001')) return 'NA' + if (row.start_ts.match('^0001')) return 'NA' return dayjs(row.start_ts).diff(row.create_ts, 'second') }, getRuntimeDuration (row) { - if (!row.finish_ts || row.finish_ts.match('^0001')) return 'NA' + if (row.finish_ts.match('^0001')) return 'NA' return dayjs(row.finish_ts).diff(row.start_ts, 'second') }, getTotalDuration (row) { - if (!row.finish_ts || row.finish_ts.match('^0001')) return 'NA' + if (row.finish_ts.match('^0001')) return 'NA' return dayjs(row.finish_ts).diff(row.create_ts, 'second') } } diff --git a/frontend/src/i18n/zh.js b/frontend/src/i18n/zh.js index d3c8243f..b1de3b47 100644 --- a/frontend/src/i18n/zh.js +++ b/frontend/src/i18n/zh.js @@ -154,8 +154,6 @@ export default { 'Last Run': '上次运行', 'Action': '操作', 'No command line': '没有执行命令', - 'Last Status': '上次运行状态', - 'Remark': '备注', // 任务 'Task Info': '任务信息', @@ -247,7 +245,7 @@ export default { 'username already exists': '用户名已存在', 'Deleted successfully': '成功删除', 'Saved successfully': '成功保存', - 'English': 'English', + // 登录 'Sign in': '登录', 'Sign-in': '登录', @@ -266,20 +264,5 @@ export default { 'admin': '管理用户', 'Role': '角色', 'Edit User': '更改用户', - 'Users': '用户', - tagsView: { - closeOthers: '关闭其他', - close: '关闭', - refresh: '刷新', - closeAll: '关闭所有' - }, - nodeList: { - type: '节点类型' - }, - schedules: { - cron: 'Cron', - add_cron: '生成Cron', - // Cron Format: [second] [minute] [hour] [day of month] [month] [day of week] - cron_format: 'Cron 格式: [秒] [分] [小时] [日] [月] [周]' - } + 'Users': '用户' } diff --git a/frontend/src/router/index.js b/frontend/src/router/index.js index 84c96cd3..9a238d08 100644 --- a/frontend/src/router/index.js +++ b/frontend/src/router/index.js @@ -46,6 +46,7 @@ export const constantRouterMap = [ ] }, { + name: 'Node', path: '/nodes', component: Layout, meta: { @@ -75,6 +76,7 @@ export const constantRouterMap = [ ] }, { + name: 'Spider', path: '/spiders', component: Layout, meta: { @@ -104,6 +106,7 @@ export const constantRouterMap = [ ] }, { + name: 'Task', path: '/tasks', component: Layout, meta: { @@ -133,6 +136,7 @@ export const constantRouterMap = [ ] }, { + name: 'Schedule', path: '/schedules', component: Layout, meta: { @@ -153,6 +157,7 @@ export const constantRouterMap = [ ] }, { + name: 'Site', path: '/sites', component: Layout, hidden: true, @@ -173,6 +178,7 @@ export const constantRouterMap = [ ] }, { + name: 'User', path: '/users', component: Layout, meta: { diff --git a/frontend/src/store/modules/node.js b/frontend/src/store/modules/node.js index 5e21a222..266beb3e 100644 --- a/frontend/src/store/modules/node.js +++ b/frontend/src/store/modules/node.js @@ -25,7 +25,15 @@ const mutations = { const { id, systemInfo } = payload for (let i = 0; i < state.nodeList.length; i++) { if (state.nodeList[i]._id === id) { + // Vue.set(state.nodeList[i], 'systemInfo', {}) state.nodeList[i].systemInfo = systemInfo + // for (const key in systemInfo) { + // if (systemInfo.hasOwnProperty(key)) { + // console.log(key) + // state.nodeList[i].systemInfo[key] = systemInfo[key] + // // Vue.set(state.nodeList[i].systemInfo, key, systemInfo[key]) + // } + // } break } } @@ -68,12 +76,10 @@ const actions = { getTaskList ({ state, commit }, id) { return request.get(`/nodes/${id}/tasks`) .then(response => { - if (response.data.data) { - commit('task/SET_TASK_LIST', - response.data.data.map(d => d) - .sort((a, b) => a.create_ts < b.create_ts ? 1 : -1), - { root: true }) - } + commit('task/SET_TASK_LIST', + response.data.data.map(d => d) + .sort((a, b) => a.create_ts < b.create_ts ? 1 : -1), + { root: true }) }) }, getNodeSystemInfo ({ state, commit }, id) { diff --git a/frontend/src/store/modules/task.js b/frontend/src/store/modules/task.js index 1d7e6c09..545a169b 100644 --- a/frontend/src/store/modules/task.js +++ b/frontend/src/store/modules/task.js @@ -120,22 +120,6 @@ const actions = { commit('SET_TASK_RESULTS_TOTAL_COUNT', response.data.total) }) }, - async getTaskResultExcel ({ state, commit }, id) { - const { data } = await request.request('GET', '/tasks/' + id + '/results/download', {}, { - responseType: 'blob' // important - }) - const downloadUrl = window.URL.createObjectURL(new Blob([data])) - - const link = document.createElement('a') - - link.href = downloadUrl - - link.setAttribute('download', 'data.csv') // any other extension - - document.body.appendChild(link) - link.click() - link.remove() - }, cancelTask ({ state, dispatch }, id) { return request.post(`/tasks/${id}/cancel`) .then(() => { diff --git a/frontend/src/views/layout/components/Navbar.vue b/frontend/src/views/layout/components/Navbar.vue index 3b30c049..f60c0051 100644 --- a/frontend/src/views/layout/components/Navbar.vue +++ b/frontend/src/views/layout/components/Navbar.vue @@ -32,7 +32,6 @@ {{$t('Documentation')}} - diff --git a/frontend/src/views/node/NodeList.vue b/frontend/src/views/node/NodeList.vue index 9ea51502..641009f3 100644 --- a/frontend/src/views/node/NodeList.vue +++ b/frontend/src/views/node/NodeList.vue @@ -163,7 +163,7 @@ export default { columns: [ { name: 'name', label: 'Name', width: '220' }, { name: 'ip', label: 'IP', width: '160' }, - { name: 'type', label: 'nodeList.type', width: '120' }, + { name: 'type', label: 'Type', width: '120' }, // { name: 'port', label: 'Port', width: '80' }, { name: 'status', label: 'Status', width: '120' }, { name: 'description', label: 'Description', width: 'auto' } diff --git a/frontend/src/views/schedule/ScheduleList.vue b/frontend/src/views/schedule/ScheduleList.vue index 477302b8..28ca4961 100644 --- a/frontend/src/views/schedule/ScheduleList.vue +++ b/frontend/src/views/schedule/ScheduleList.vue @@ -38,21 +38,21 @@ - + + :placeholder="$t('Cron')"> - {{$t('schedules.add_cron')}} + {{$t('生成Cron')}} - - - { this.$store.dispatch('task/getTaskLog', this.$route.params.id) }, 5000) diff --git a/frontend/src/views/task/TaskList.vue b/frontend/src/views/task/TaskList.vue index ec9537c3..9cbceb20 100644 --- a/frontend/src/views/task/TaskList.vue +++ b/frontend/src/views/task/TaskList.vue @@ -45,7 +45,7 @@ :label="$t(col.label)" :sortable="col.sortable" :align="col.align" - > + :width="col.width"> @@ -119,7 +119,7 @@ :width="col.width"> - + diff --git a/frontend/src/components/InfoView/SpiderInfoView.vue b/frontend/src/components/InfoView/SpiderInfoView.vue index 661e4757..39702a5d 100644 --- a/frontend/src/components/InfoView/SpiderInfoView.vue +++ b/frontend/src/components/InfoView/SpiderInfoView.vue @@ -44,6 +44,9 @@ + + + diff --git a/frontend/src/components/InfoView/TaskInfoView.vue b/frontend/src/components/InfoView/TaskInfoView.vue index bfe6419a..e902e959 100644 --- a/frontend/src/components/InfoView/TaskInfoView.vue +++ b/frontend/src/components/InfoView/TaskInfoView.vue @@ -86,15 +86,15 @@ export default { return dayjs(str).format('YYYY-MM-DD HH:mm:ss') }, getWaitDuration (row) { - if (row.start_ts.match('^0001')) return 'NA' + if (!row.start_ts || row.start_ts.match('^0001')) return 'NA' return dayjs(row.start_ts).diff(row.create_ts, 'second') }, getRuntimeDuration (row) { - if (row.finish_ts.match('^0001')) return 'NA' + if (!row.finish_ts || row.finish_ts.match('^0001')) return 'NA' return dayjs(row.finish_ts).diff(row.start_ts, 'second') }, getTotalDuration (row) { - if (row.finish_ts.match('^0001')) return 'NA' + if (!row.finish_ts || row.finish_ts.match('^0001')) return 'NA' return dayjs(row.finish_ts).diff(row.create_ts, 'second') } } diff --git a/frontend/src/i18n/zh.js b/frontend/src/i18n/zh.js index b1de3b47..d3c8243f 100644 --- a/frontend/src/i18n/zh.js +++ b/frontend/src/i18n/zh.js @@ -154,6 +154,8 @@ export default { 'Last Run': '上次运行', 'Action': '操作', 'No command line': '没有执行命令', + 'Last Status': '上次运行状态', + 'Remark': '备注', // 任务 'Task Info': '任务信息', @@ -245,7 +247,7 @@ export default { 'username already exists': '用户名已存在', 'Deleted successfully': '成功删除', 'Saved successfully': '成功保存', - + 'English': 'English', // 登录 'Sign in': '登录', 'Sign-in': '登录', @@ -264,5 +266,20 @@ export default { 'admin': '管理用户', 'Role': '角色', 'Edit User': '更改用户', - 'Users': '用户' + 'Users': '用户', + tagsView: { + closeOthers: '关闭其他', + close: '关闭', + refresh: '刷新', + closeAll: '关闭所有' + }, + nodeList: { + type: '节点类型' + }, + schedules: { + cron: 'Cron', + add_cron: '生成Cron', + // Cron Format: [second] [minute] [hour] [day of month] [month] [day of week] + cron_format: 'Cron 格式: [秒] [分] [小时] [日] [月] [周]' + } } diff --git a/frontend/src/router/index.js b/frontend/src/router/index.js index 9a238d08..84c96cd3 100644 --- a/frontend/src/router/index.js +++ b/frontend/src/router/index.js @@ -46,7 +46,6 @@ export const constantRouterMap = [ ] }, { - name: 'Node', path: '/nodes', component: Layout, meta: { @@ -76,7 +75,6 @@ export const constantRouterMap = [ ] }, { - name: 'Spider', path: '/spiders', component: Layout, meta: { @@ -106,7 +104,6 @@ export const constantRouterMap = [ ] }, { - name: 'Task', path: '/tasks', component: Layout, meta: { @@ -136,7 +133,6 @@ export const constantRouterMap = [ ] }, { - name: 'Schedule', path: '/schedules', component: Layout, meta: { @@ -157,7 +153,6 @@ export const constantRouterMap = [ ] }, { - name: 'Site', path: '/sites', component: Layout, hidden: true, @@ -178,7 +173,6 @@ export const constantRouterMap = [ ] }, { - name: 'User', path: '/users', component: Layout, meta: { diff --git a/frontend/src/store/modules/node.js b/frontend/src/store/modules/node.js index 266beb3e..5e21a222 100644 --- a/frontend/src/store/modules/node.js +++ b/frontend/src/store/modules/node.js @@ -25,15 +25,7 @@ const mutations = { const { id, systemInfo } = payload for (let i = 0; i < state.nodeList.length; i++) { if (state.nodeList[i]._id === id) { - // Vue.set(state.nodeList[i], 'systemInfo', {}) state.nodeList[i].systemInfo = systemInfo - // for (const key in systemInfo) { - // if (systemInfo.hasOwnProperty(key)) { - // console.log(key) - // state.nodeList[i].systemInfo[key] = systemInfo[key] - // // Vue.set(state.nodeList[i].systemInfo, key, systemInfo[key]) - // } - // } break } } @@ -76,10 +68,12 @@ const actions = { getTaskList ({ state, commit }, id) { return request.get(`/nodes/${id}/tasks`) .then(response => { - commit('task/SET_TASK_LIST', - response.data.data.map(d => d) - .sort((a, b) => a.create_ts < b.create_ts ? 1 : -1), - { root: true }) + if (response.data.data) { + commit('task/SET_TASK_LIST', + response.data.data.map(d => d) + .sort((a, b) => a.create_ts < b.create_ts ? 1 : -1), + { root: true }) + } }) }, getNodeSystemInfo ({ state, commit }, id) { diff --git a/frontend/src/store/modules/task.js b/frontend/src/store/modules/task.js index 545a169b..1d7e6c09 100644 --- a/frontend/src/store/modules/task.js +++ b/frontend/src/store/modules/task.js @@ -120,6 +120,22 @@ const actions = { commit('SET_TASK_RESULTS_TOTAL_COUNT', response.data.total) }) }, + async getTaskResultExcel ({ state, commit }, id) { + const { data } = await request.request('GET', '/tasks/' + id + '/results/download', {}, { + responseType: 'blob' // important + }) + const downloadUrl = window.URL.createObjectURL(new Blob([data])) + + const link = document.createElement('a') + + link.href = downloadUrl + + link.setAttribute('download', 'data.csv') // any other extension + + document.body.appendChild(link) + link.click() + link.remove() + }, cancelTask ({ state, dispatch }, id) { return request.post(`/tasks/${id}/cancel`) .then(() => { diff --git a/frontend/src/views/layout/components/Navbar.vue b/frontend/src/views/layout/components/Navbar.vue index f60c0051..3b30c049 100644 --- a/frontend/src/views/layout/components/Navbar.vue +++ b/frontend/src/views/layout/components/Navbar.vue @@ -32,6 +32,7 @@ {{$t('Documentation')}} + diff --git a/frontend/src/views/node/NodeList.vue b/frontend/src/views/node/NodeList.vue index 641009f3..9ea51502 100644 --- a/frontend/src/views/node/NodeList.vue +++ b/frontend/src/views/node/NodeList.vue @@ -163,7 +163,7 @@ export default { columns: [ { name: 'name', label: 'Name', width: '220' }, { name: 'ip', label: 'IP', width: '160' }, - { name: 'type', label: 'Type', width: '120' }, + { name: 'type', label: 'nodeList.type', width: '120' }, // { name: 'port', label: 'Port', width: '80' }, { name: 'status', label: 'Status', width: '120' }, { name: 'description', label: 'Description', width: 'auto' } diff --git a/frontend/src/views/schedule/ScheduleList.vue b/frontend/src/views/schedule/ScheduleList.vue index 28ca4961..477302b8 100644 --- a/frontend/src/views/schedule/ScheduleList.vue +++ b/frontend/src/views/schedule/ScheduleList.vue @@ -38,21 +38,21 @@ - + + :placeholder="$t('schedules.cron')"> - {{$t('生成Cron')}} + {{$t('schedules.add_cron')}} + + + { this.$store.dispatch('task/getTaskLog', this.$route.params.id) }, 5000) diff --git a/frontend/src/views/task/TaskList.vue b/frontend/src/views/task/TaskList.vue index 9cbceb20..ec9537c3 100644 --- a/frontend/src/views/task/TaskList.vue +++ b/frontend/src/views/task/TaskList.vue @@ -45,7 +45,7 @@ :label="$t(col.label)" :sortable="col.sortable" :align="col.align" - :width="col.width"> + > @@ -119,7 +119,7 @@ :width="col.width"> - +