diff --git a/backend/conf/config.yml b/backend/conf/config.yml
index f1042ca6..3805762a 100644
--- a/backend/conf/config.yml
+++ b/backend/conf/config.yml
@@ -15,6 +15,8 @@ redis:
log:
level: info
path: "/var/logs/crawlab"
+ isDeletePeriodically: "Y"
+ deleteFrequency: "@hourly"
server:
host: 0.0.0.0
port: 8000
diff --git a/backend/config/config_test.go b/backend/config/config_test.go
index ee966877..0068e6ad 100644
--- a/backend/config/config_test.go
+++ b/backend/config/config_test.go
@@ -7,7 +7,7 @@ import (
func TestInitConfig(t *testing.T) {
Convey("Test InitConfig func", t, func() {
- x := InitConfig("")
+ x := InitConfig("../conf/config.yml")
Convey("The value should be nil", func() {
So(x, ShouldEqual, nil)
diff --git a/backend/constants/context.go b/backend/constants/context.go
new file mode 100644
index 00000000..0759b54b
--- /dev/null
+++ b/backend/constants/context.go
@@ -0,0 +1,5 @@
+package constants
+
+const (
+ ContextUser = "currentUser"
+)
diff --git a/backend/constants/errors.go b/backend/constants/errors.go
new file mode 100644
index 00000000..a273cb75
--- /dev/null
+++ b/backend/constants/errors.go
@@ -0,0 +1,13 @@
+package constants
+
+import (
+ "crawlab/errors"
+ "net/http"
+)
+
+var (
+ ErrorMongoError = errors.NewSystemOPError(1001, "system error:[mongo]%s", http.StatusInternalServerError)
+ //users
+ ErrorUserNotFound = errors.NewBusinessError(10001, "user not found.", http.StatusUnauthorized)
+ ErrorUsernameOrPasswordInvalid = errors.NewBusinessError(11001, "username or password invalid", http.StatusUnauthorized)
+)
diff --git a/backend/database/mongo.go b/backend/database/mongo.go
index 6b155791..1c2d6433 100644
--- a/backend/database/mongo.go
+++ b/backend/database/mongo.go
@@ -3,6 +3,7 @@ package database
import (
"github.com/globalsign/mgo"
"github.com/spf13/viper"
+ "time"
)
var Session *mgo.Session
@@ -44,7 +45,7 @@ func InitMongo() error {
} else {
uri = "mongodb://" + mongoUsername + ":" + mongoPassword + "@" + mongoHost + ":" + mongoPort + "/" + mongoDb + "?authSource=" + mongoAuth
}
- sess, err := mgo.Dial(uri)
+ sess, err := mgo.DialWithTimeout(uri, time.Second*5)
if err != nil {
return err
}
diff --git a/backend/database/pubsub.go b/backend/database/pubsub.go
index b100535f..c5fdbda9 100644
--- a/backend/database/pubsub.go
+++ b/backend/database/pubsub.go
@@ -1,90 +1,97 @@
package database
import (
- "errors"
+ "context"
"fmt"
"github.com/apex/log"
"github.com/gomodule/redigo/redis"
+ errors2 "github.com/pkg/errors"
"time"
- "unsafe"
)
-type SubscribeCallback func(channel, message string)
+type ConsumeFunc func(message redis.Message) error
-type Subscriber struct {
- client redis.PubSubConn
- cbMap map[string]SubscribeCallback
-}
-
-func (c *Subscriber) Connect() {
- conn, err := GetRedisConn()
- if err != nil {
- log.Fatalf("redis dial failed.")
- }
-
- c.client = redis.PubSubConn{Conn: conn}
- c.cbMap = make(map[string]SubscribeCallback)
-
- //retry connect redis 5 times, or panic
- index := 0
- go func(i int) {
- for {
- log.Debug("wait...")
- switch res := c.client.Receive().(type) {
- case redis.Message:
- i = 0
- channel := (*string)(unsafe.Pointer(&res.Channel))
- message := (*string)(unsafe.Pointer(&res.Data))
- c.cbMap[*channel](*channel, *message)
- case redis.Subscription:
- fmt.Printf("%s: %s %d\n", res.Channel, res.Kind, res.Count)
- case error:
- log.Error("error handle redis connection...")
-
- time.Sleep(2 * time.Second)
- if i > 5 {
- panic(errors.New("redis connection failed too many times, panic"))
- }
- con, err := GetRedisConn()
- if err != nil {
- log.Error("redis dial failed")
- continue
- }
- c.client = redis.PubSubConn{Conn: con}
- i += 1
-
- continue
- }
- }
- }(index)
-
-}
-
-func (c *Subscriber) Close() {
- err := c.client.Close()
+func (r *Redis) Close() {
+ err := r.pool.Close()
if err != nil {
log.Errorf("redis close error.")
}
}
+func (r *Redis) subscribe(ctx context.Context, consume ConsumeFunc, channel ...string) error {
+ psc := redis.PubSubConn{Conn: r.pool.Get()}
+ if err := psc.Subscribe(redis.Args{}.AddFlat(channel)); err != nil {
+ return err
+ }
+ done := make(chan error, 1)
+ tick := time.NewTicker(time.Second * 3)
+ defer tick.Stop()
+ go func() {
+ defer func() { _ = psc.Close() }()
+ for {
+ switch msg := psc.Receive().(type) {
+ case error:
+ done <- fmt.Errorf("redis pubsub receive err: %v", msg)
+ return
+ case redis.Message:
+ fmt.Println(msg)
+ if err := consume(msg); err != nil {
+ fmt.Printf("redis pubsub consume message err: %v", err)
+ continue
+ }
+ case redis.Subscription:
+ fmt.Println(msg)
-func (c *Subscriber) Subscribe(channel interface{}, cb SubscribeCallback) {
- err := c.client.Subscribe(channel)
- if err != nil {
- log.Fatalf("redis Subscribe error.")
+ if msg.Count == 0 {
+ // all channels are unsubscribed
+ return
+ }
+ }
+
+ }
+ }()
+ // start a new goroutine to receive message
+ for {
+ select {
+ case <-ctx.Done():
+ if err := psc.Unsubscribe(); err != nil {
+ fmt.Printf("redis pubsub unsubscribe err: %v \n", err)
+ }
+ done <- nil
+ case <-tick.C:
+ //fmt.Printf("ping message \n")
+ if err := psc.Ping(""); err != nil {
+ done <- err
+ }
+ case err := <-done:
+ close(done)
+ return err
+ }
}
- c.cbMap[channel.(string)] = cb
}
+func (r *Redis) Subscribe(ctx context.Context, consume ConsumeFunc, channel ...string) error {
+ index := 0
+ go func() {
+ for {
+ err := r.subscribe(ctx, consume, channel...)
+ fmt.Println(err)
-func Publish(channel string, msg string) error {
- c, err := GetRedisConn()
- if err != nil {
- return err
- }
-
- if _, err := c.Do("PUBLISH", channel, msg); err != nil {
- return err
- }
-
+ if err == nil {
+ break
+ }
+ time.Sleep(5 * time.Second)
+ index += 1
+ fmt.Printf("try reconnect %d times \n", index)
+ }
+ }()
return nil
}
+func (r *Redis) Publish(channel, message string) (n int, err error) {
+ conn := r.pool.Get()
+ defer func() { _ = conn.Close() }()
+ n, err = redis.Int(conn.Do("PUBLISH", channel, message))
+ if err != nil {
+ return 0, errors2.Wrapf(err, "redis publish %s %s", channel, message)
+ }
+ return
+}
diff --git a/backend/database/redis.go b/backend/database/redis.go
index ffebf776..ede229a2 100644
--- a/backend/database/redis.go
+++ b/backend/database/redis.go
@@ -4,21 +4,20 @@ import (
"github.com/gomodule/redigo/redis"
"github.com/spf13/viper"
"runtime/debug"
+ "time"
)
-var RedisClient = Redis{}
-
-type ConsumeFunc func(channel string, message []byte) error
+var RedisClient *Redis
type Redis struct {
+ pool *redis.Pool
}
+func NewRedisClient() *Redis {
+ return &Redis{pool: NewRedisPool()}
+}
func (r *Redis) RPush(collection string, value interface{}) error {
- c, err := GetRedisConn()
- if err != nil {
- debug.PrintStack()
- return err
- }
+ c := r.pool.Get()
defer c.Close()
if _, err := c.Do("RPUSH", collection, value); err != nil {
@@ -29,11 +28,7 @@ func (r *Redis) RPush(collection string, value interface{}) error {
}
func (r *Redis) LPop(collection string) (string, error) {
- c, err := GetRedisConn()
- if err != nil {
- debug.PrintStack()
- return "", err
- }
+ c := r.pool.Get()
defer c.Close()
value, err2 := redis.String(c.Do("LPOP", collection))
@@ -44,11 +39,7 @@ func (r *Redis) LPop(collection string) (string, error) {
}
func (r *Redis) HSet(collection string, key string, value string) error {
- c, err := GetRedisConn()
- if err != nil {
- debug.PrintStack()
- return err
- }
+ c := r.pool.Get()
defer c.Close()
if _, err := c.Do("HSET", collection, key, value); err != nil {
@@ -59,11 +50,7 @@ func (r *Redis) HSet(collection string, key string, value string) error {
}
func (r *Redis) HGet(collection string, key string) (string, error) {
- c, err := GetRedisConn()
- if err != nil {
- debug.PrintStack()
- return "", err
- }
+ c := r.pool.Get()
defer c.Close()
value, err2 := redis.String(c.Do("HGET", collection, key))
@@ -74,11 +61,7 @@ func (r *Redis) HGet(collection string, key string) (string, error) {
}
func (r *Redis) HDel(collection string, key string) error {
- c, err := GetRedisConn()
- if err != nil {
- debug.PrintStack()
- return err
- }
+ c := r.pool.Get()
defer c.Close()
if _, err := c.Do("HDEL", collection, key); err != nil {
@@ -88,11 +71,7 @@ func (r *Redis) HDel(collection string, key string) error {
}
func (r *Redis) HKeys(collection string) ([]string, error) {
- c, err := GetRedisConn()
- if err != nil {
- debug.PrintStack()
- return []string{}, err
- }
+ c := r.pool.Get()
defer c.Close()
value, err2 := redis.Strings(c.Do("HKeys", collection))
@@ -102,7 +81,7 @@ func (r *Redis) HKeys(collection string) ([]string, error) {
return value, nil
}
-func GetRedisConn() (redis.Conn, error) {
+func NewRedisPool() *redis.Pool {
var address = viper.GetString("redis.address")
var port = viper.GetString("redis.port")
var database = viper.GetString("redis.database")
@@ -114,14 +93,30 @@ func GetRedisConn() (redis.Conn, error) {
} else {
url = "redis://x:" + password + "@" + address + ":" + port + "/" + database
}
- c, err := redis.DialURL(url)
- if err != nil {
- debug.PrintStack()
- return c, err
+ return &redis.Pool{
+ Dial: func() (conn redis.Conn, e error) {
+ return redis.DialURL(url,
+ redis.DialConnectTimeout(time.Second*10),
+ redis.DialReadTimeout(time.Second*10),
+ redis.DialWriteTimeout(time.Second*10),
+ )
+ },
+ TestOnBorrow: func(c redis.Conn, t time.Time) error {
+ if time.Since(t) < time.Minute {
+ return nil
+ }
+ _, err := c.Do("PING")
+ return err
+ },
+ MaxIdle: 10,
+ MaxActive: 0,
+ IdleTimeout: 300 * time.Second,
+ Wait: false,
+ MaxConnLifetime: 0,
}
- return c, nil
}
func InitRedis() error {
+ RedisClient = NewRedisClient()
return nil
}
diff --git a/backend/errors/errors.go b/backend/errors/errors.go
new file mode 100644
index 00000000..f191cd3e
--- /dev/null
+++ b/backend/errors/errors.go
@@ -0,0 +1,55 @@
+package errors
+
+import (
+ "fmt"
+ "net/http"
+)
+
+type Scope int
+
+const (
+ ScopeSystem Scope = 1
+ ScopeBusiness Scope = 2
+)
+
+type OPError struct {
+ HttpCode int
+ Message string
+ Code int
+ Scope Scope
+}
+
+func (O OPError) Error() string {
+ var scope string
+ switch O.Scope {
+ case ScopeSystem:
+ scope = "system"
+ break
+ case ScopeBusiness:
+ scope = "business"
+ }
+ return fmt.Sprintf("%s error: [%d]%s.", scope, O.Code, O.Message)
+}
+
+func NewSystemOPError(code int, message string, httpCodes ...int) *OPError {
+ httpCode := http.StatusOK
+ if len(httpCodes) > 0 {
+ httpCode = httpCodes[0]
+ }
+ return NewOpError(code, message, ScopeSystem, httpCode)
+}
+func NewOpError(code int, message string, scope Scope, httpCode int) *OPError {
+ return &OPError{
+ Message: message,
+ Code: code,
+ Scope: scope,
+ HttpCode: httpCode,
+ }
+}
+func NewBusinessError(code int, message string, httpCodes ...int) *OPError {
+ httpCode := http.StatusOK
+ if len(httpCodes) > 0 {
+ httpCode = httpCodes[0]
+ }
+ return NewOpError(code, message, ScopeBusiness, httpCode)
+}
diff --git a/backend/go.mod b/backend/go.mod
index 5a575910..428c2fd3 100644
--- a/backend/go.mod
+++ b/backend/go.mod
@@ -8,9 +8,13 @@ require (
github.com/fsnotify/fsnotify v1.4.7
github.com/gin-gonic/gin v1.4.0
github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8
+ github.com/go-playground/locales v0.12.1 // indirect
+ github.com/go-playground/universal-translator v0.16.0 // indirect
github.com/gomodule/redigo v2.0.0+incompatible
+ github.com/leodido/go-urn v1.1.0 // indirect
github.com/pkg/errors v0.8.1
github.com/satori/go.uuid v1.2.0
github.com/smartystreets/goconvey v0.0.0-20190731233626-505e41936337
github.com/spf13/viper v1.4.0
+ gopkg.in/go-playground/validator.v9 v9.29.1
)
diff --git a/backend/go.sum b/backend/go.sum
index 910e18be..55a56852 100644
--- a/backend/go.sum
+++ b/backend/go.sum
@@ -39,6 +39,10 @@ github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8/go.mod h1:xkRDCp4j0
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
+github.com/go-playground/locales v0.12.1 h1:2FITxuFt/xuCNP1Acdhv62OzaCiviiE4kotfhkmOqEc=
+github.com/go-playground/locales v0.12.1/go.mod h1:IUMDtCfWo/w/mtMfIE/IG2K+Ey3ygWanZIBtBW0W2TM=
+github.com/go-playground/universal-translator v0.16.0 h1:X++omBR/4cE2MNg91AoC3rmGrCjJ8eAeUP/K/EKx4DM=
+github.com/go-playground/universal-translator v0.16.0/go.mod h1:1AnU7NaIRDWWzGEKwgtJRd2xk99HeFyHw3yid4rvQIY=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
@@ -53,6 +57,7 @@ github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
@@ -66,6 +71,7 @@ github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22
github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7/go.mod h1:2iMrUgbbvHEiQClaW2NsSzMyGHqN+rDFqY705q49KG0=
github.com/json-iterator/go v1.1.6 h1:MrUvLMLTMxbqFJ9kzlvat/rYZqZnW3u4wkLzWTaFwKs=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
+github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
@@ -77,6 +83,8 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
+github.com/leodido/go-urn v1.1.0 h1:Sm1gr51B1kKyfD2BlRcLSiEkffoG96g6TPv6eRoEiB8=
+github.com/leodido/go-urn v1.1.0/go.mod h1:+cyI34gQWZcE1eQU7NVgKkkzdXDQHr1dBMtdAPozLkw=
github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDePerRcY=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ=
@@ -120,8 +128,10 @@ github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdh
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
+github.com/smartystreets/assertions v1.0.0 h1:UVQPSSmc3qtTi+zPPkCXvZX9VvW/xT/NsRvKfwY81a8=
github.com/smartystreets/assertions v1.0.0/go.mod h1:kHHU4qYBaI3q23Pp3VPrmWhuIUrLW/7eUrw0BU5VaoM=
github.com/smartystreets/go-aws-auth v0.0.0-20180515143844-0c1422d1fdb9/go.mod h1:SnhjPscd9TpLiy1LpzGSKh3bXCfxxXuqd9xmQJy3slM=
+github.com/smartystreets/goconvey v0.0.0-20190731233626-505e41936337 h1:WN9BUFbdyOsSH/XohnWpXOlq9NBD5sGAB2FciQMUEe8=
github.com/smartystreets/goconvey v0.0.0-20190731233626-505e41936337/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/smartystreets/gunit v1.0.0/go.mod h1:qwPWnhz6pn0NnRBP++URONOVyNkPyr4SauJk4cUOwJs=
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
@@ -202,6 +212,8 @@ gopkg.in/go-playground/assert.v1 v1.2.1 h1:xoYuJVE7KT85PYWrN730RguIQO0ePzVRfFMXa
gopkg.in/go-playground/assert.v1 v1.2.1/go.mod h1:9RXL0bg/zibRAgZUYszZSwO/z8Y/a8bDuhia5mkpMnE=
gopkg.in/go-playground/validator.v8 v8.18.2 h1:lFB4DoMU6B626w8ny76MV7VX6W2VHct2GVOI3xgiMrQ=
gopkg.in/go-playground/validator.v8 v8.18.2/go.mod h1:RX2a/7Ha8BgOhfk7j780h4/u/RRjR0eouCJSH80/M2Y=
+gopkg.in/go-playground/validator.v9 v9.29.1 h1:SvGtYmN60a5CVKTOzMSyfzWDeZRxRuGvRQyEAKbw1xc=
+gopkg.in/go-playground/validator.v9 v9.29.1/go.mod h1:+c9/zcJMFNgbLvly1L1V+PpxWdVbfP1avr/N00E2vyQ=
gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74=
diff --git a/backend/lib/validate_bridge/validator.go b/backend/lib/validate_bridge/validator.go
new file mode 100644
index 00000000..509dc475
--- /dev/null
+++ b/backend/lib/validate_bridge/validator.go
@@ -0,0 +1,54 @@
+package validate_bridge
+
+import (
+ "reflect"
+ "sync"
+
+ "github.com/gin-gonic/gin/binding"
+ "gopkg.in/go-playground/validator.v9"
+)
+
+type DefaultValidator struct {
+ once sync.Once
+ validate *validator.Validate
+}
+
+var _ binding.StructValidator = &DefaultValidator{validate: validator.New()}
+
+func (v *DefaultValidator) ValidateStruct(obj interface{}) error {
+ if kindOfData(obj) == reflect.Struct {
+
+ v.lazyinit()
+
+ if err := v.validate.Struct(obj); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (v *DefaultValidator) Engine() interface{} {
+ v.lazyinit()
+ return v.validate
+}
+
+func (v *DefaultValidator) lazyinit() {
+ v.once.Do(func() {
+ v.validate = validator.New()
+ v.validate.SetTagName("binding")
+
+ // add any custom validations etc. here
+ })
+}
+
+func kindOfData(data interface{}) reflect.Kind {
+
+ value := reflect.ValueOf(data)
+ valueType := value.Kind()
+
+ if valueType == reflect.Ptr {
+ valueType = value.Elem().Kind()
+ }
+ return valueType
+}
diff --git a/backend/main.go b/backend/main.go
index 489a17ce..bf98674e 100644
--- a/backend/main.go
+++ b/backend/main.go
@@ -3,16 +3,19 @@ package main
import (
"crawlab/config"
"crawlab/database"
+ "crawlab/lib/validate_bridge"
"crawlab/middlewares"
"crawlab/routes"
"crawlab/services"
"github.com/apex/log"
"github.com/gin-gonic/gin"
+ "github.com/gin-gonic/gin/binding"
"github.com/spf13/viper"
"runtime/debug"
)
func main() {
+ binding.Validator = new(validate_bridge.DefaultValidator)
app := gin.Default()
// 初始化配置
@@ -29,6 +32,15 @@ func main() {
}
log.Info("初始化日志设置成功")
+ if viper.GetString("log.isDeletePeriodically") == "Y" {
+ err := services.InitDeleteLogPeriodically()
+ if err != nil {
+ log.Error("Init DeletePeriodically Failed")
+ panic(err)
+ }
+ log.Info("初始化定期清理日志配置成功")
+ }
+
// 初始化Mongodb数据库
if err := database.InitMongo(); err != nil {
log.Error("init mongodb error:" + err.Error())
@@ -90,53 +102,60 @@ func main() {
if services.IsMaster() {
// 中间件
app.Use(middlewares.CORSMiddleware())
- app.Use(middlewares.AuthorizationMiddleware())
+ //app.Use(middlewares.AuthorizationMiddleware())
+ anonymousGroup := app.Group("/")
+ {
+ anonymousGroup.POST("/login", routes.Login) // 用户登录
+ anonymousGroup.PUT("/users", routes.PutUser) // 添加用户
+
+ }
+ authGroup := app.Group("/", middlewares.AuthorizationMiddleware())
+ {
+ // 路由
+ // 节点
+ authGroup.GET("/nodes", routes.GetNodeList) // 节点列表
+ authGroup.GET("/nodes/:id", routes.GetNode) // 节点详情
+ authGroup.POST("/nodes/:id", routes.PostNode) // 修改节点
+ authGroup.GET("/nodes/:id/tasks", routes.GetNodeTaskList) // 节点任务列表
+ authGroup.GET("/nodes/:id/system", routes.GetSystemInfo) // 节点任务列表
+ authGroup.DELETE("/nodes/:id", routes.DeleteNode) // 删除节点
+ // 爬虫
+ authGroup.GET("/spiders", routes.GetSpiderList) // 爬虫列表
+ authGroup.GET("/spiders/:id", routes.GetSpider) // 爬虫详情
+ authGroup.POST("/spiders", routes.PutSpider) // 上传爬虫
+ authGroup.POST("/spiders/:id", routes.PostSpider) // 修改爬虫
+ authGroup.POST("/spiders/:id/publish", routes.PublishSpider) // 发布爬虫
+ authGroup.DELETE("/spiders/:id", routes.DeleteSpider) // 删除爬虫
+ authGroup.GET("/spiders/:id/tasks", routes.GetSpiderTasks) // 爬虫任务列表
+ authGroup.GET("/spiders/:id/file", routes.GetSpiderFile) // 爬虫文件读取
+ authGroup.POST("/spiders/:id/file", routes.PostSpiderFile) // 爬虫目录写入
+ authGroup.GET("/spiders/:id/dir", routes.GetSpiderDir) // 爬虫目录
+ authGroup.GET("/spiders/:id/stats", routes.GetSpiderStats) // 爬虫统计数据
+ // 任务
+ authGroup.GET("/tasks", routes.GetTaskList) // 任务列表
+ authGroup.GET("/tasks/:id", routes.GetTask) // 任务详情
+ authGroup.PUT("/tasks", routes.PutTask) // 派发任务
+ authGroup.DELETE("/tasks/:id", routes.DeleteTask) // 删除任务
+ authGroup.POST("/tasks/:id/cancel", routes.CancelTask) // 取消任务
+ authGroup.GET("/tasks/:id/log", routes.GetTaskLog) // 任务日志
+ authGroup.GET("/tasks/:id/results", routes.GetTaskResults) // 任务结果
+ authGroup.GET("/tasks/:id/results/download", routes.DownloadTaskResultsCsv) // 下载任务结果
+ // 定时任务
+ authGroup.GET("/schedules", routes.GetScheduleList) // 定时任务列表
+ authGroup.GET("/schedules/:id", routes.GetSchedule) // 定时任务详情
+ authGroup.PUT("/schedules", routes.PutSchedule) // 创建定时任务
+ authGroup.POST("/schedules/:id", routes.PostSchedule) // 修改定时任务
+ authGroup.DELETE("/schedules/:id", routes.DeleteSchedule) // 删除定时任务
+ // 统计数据
+ authGroup.GET("/stats/home", routes.GetHomeStats) // 首页统计数据
+ // 用户
+ authGroup.GET("/users", routes.GetUserList) // 用户列表
+ authGroup.GET("/users/:id", routes.GetUser) // 用户详情
+ authGroup.POST("/users/:id", routes.PostUser) // 更改用户
+ authGroup.DELETE("/users/:id", routes.DeleteUser) // 删除用户
+ authGroup.GET("/me", routes.GetMe) // 获取自己账户
+ }
- // 路由
- // 节点
- app.GET("/nodes", routes.GetNodeList) // 节点列表
- app.GET("/nodes/:id", routes.GetNode) // 节点详情
- app.POST("/nodes/:id", routes.PostNode) // 修改节点
- app.GET("/nodes/:id/tasks", routes.GetNodeTaskList) // 节点任务列表
- app.GET("/nodes/:id/system", routes.GetSystemInfo) // 节点任务列表
- app.DELETE("/nodes/:id", routes.DeleteNode) // 删除节点
- // 爬虫
- app.GET("/spiders", routes.GetSpiderList) // 爬虫列表
- app.GET("/spiders/:id", routes.GetSpider) // 爬虫详情
- app.POST("/spiders", routes.PutSpider) // 上传爬虫
- app.POST("/spiders/:id", routes.PostSpider) // 修改爬虫
- app.POST("/spiders/:id/publish", routes.PublishSpider) // 发布爬虫
- app.DELETE("/spiders/:id", routes.DeleteSpider) // 删除爬虫
- app.GET("/spiders/:id/tasks", routes.GetSpiderTasks) // 爬虫任务列表
- app.GET("/spiders/:id/file", routes.GetSpiderFile) // 爬虫文件读取
- app.POST("/spiders/:id/file", routes.PostSpiderFile) // 爬虫目录写入
- app.GET("/spiders/:id/dir", routes.GetSpiderDir) // 爬虫目录
- app.GET("/spiders/:id/stats", routes.GetSpiderStats) // 爬虫统计数据
- // 任务
- app.GET("/tasks", routes.GetTaskList) // 任务列表
- app.GET("/tasks/:id", routes.GetTask) // 任务详情
- app.PUT("/tasks", routes.PutTask) // 派发任务
- app.DELETE("/tasks/:id", routes.DeleteTask) // 删除任务
- app.POST("/tasks/:id/cancel", routes.CancelTask) // 取消任务
- app.GET("/tasks/:id/log", routes.GetTaskLog) // 任务日志
- app.GET("/tasks/:id/results", routes.GetTaskResults) // 任务结果
- app.GET("/tasks/:id/results/download", routes.DownloadTaskResultsCsv) // 下载任务结果
- // 定时任务
- app.GET("/schedules", routes.GetScheduleList) // 定时任务列表
- app.GET("/schedules/:id", routes.GetSchedule) // 定时任务详情
- app.PUT("/schedules", routes.PutSchedule) // 创建定时任务
- app.POST("/schedules/:id", routes.PostSchedule) // 修改定时任务
- app.DELETE("/schedules/:id", routes.DeleteSchedule) // 删除定时任务
- // 统计数据
- app.GET("/stats/home", routes.GetHomeStats) // 首页统计数据
- // 用户
- app.GET("/users", routes.GetUserList) // 用户列表
- app.GET("/users/:id", routes.GetUser) // 用户详情
- app.PUT("/users", routes.PutUser) // 添加用户
- app.POST("/users/:id", routes.PostUser) // 更改用户
- app.DELETE("/users/:id", routes.DeleteUser) // 删除用户
- app.POST("/login", routes.Login) // 用户登录
- app.GET("/me", routes.GetMe) // 获取自己账户
}
// 路由ping
diff --git a/backend/middlewares/auth.go b/backend/middlewares/auth.go
index 977fea78..07249e82 100644
--- a/backend/middlewares/auth.go
+++ b/backend/middlewares/auth.go
@@ -12,12 +12,12 @@ import (
func AuthorizationMiddleware() gin.HandlerFunc {
return func(c *gin.Context) {
// 如果为登录或注册,不用校验
- if c.Request.URL.Path == "/login" ||
- (c.Request.URL.Path == "/users" && c.Request.Method == "PUT") ||
- strings.HasSuffix(c.Request.URL.Path, "download") {
- c.Next()
- return
- }
+ //if c.Request.URL.Path == "/login" ||
+ // (c.Request.URL.Path == "/users" && c.Request.Method == "PUT") ||
+ // strings.HasSuffix(c.Request.URL.Path, "download") {
+ // c.Next()
+ // return
+ //}
// 获取token string
tokenStr := c.GetHeader("Authorization")
@@ -46,6 +46,7 @@ func AuthorizationMiddleware() gin.HandlerFunc {
return
}
}
+ c.Set(constants.ContextUser, &user)
// 校验成功
c.Next()
diff --git a/backend/mock/node_test.go b/backend/mock/node_test.go
index cc2f94e5..669cafc5 100644
--- a/backend/mock/node_test.go
+++ b/backend/mock/node_test.go
@@ -1,6 +1,7 @@
package mock
import (
+ "bytes"
"crawlab/model"
"encoding/json"
"github.com/gin-gonic/gin"
@@ -8,12 +9,12 @@ import (
. "github.com/smartystreets/goconvey/convey"
"net/http"
"net/http/httptest"
- "strings"
"testing"
"time"
)
var app *gin.Engine
+
// 本测试依赖MongoDB的服务,所以在测试之前需要启动MongoDB及相关服务
func init() {
app = gin.Default()
@@ -28,19 +29,25 @@ func init() {
app.GET("/nodes/:id/system", GetSystemInfo) // 节点任务列表
app.DELETE("/nodes/:id", DeleteNode) // 删除节点
//// 爬虫
- app.GET("/stats/home",GetHomeStats) // 首页统计数据
+ app.GET("/stats/home", GetHomeStats) // 首页统计数据
// 定时任务
- app.GET("/schedules", GetScheduleList) // 定时任务列表
- app.GET("/schedules/:id", GetSchedule) // 定时任务详情
- app.PUT("/schedules", PutSchedule) // 创建定时任务
- app.POST("/schedules/:id", PostSchedule) // 修改定时任务
- app.DELETE("/schedules/:id", DeleteSchedule) // 删除定时任务
+ app.GET("/schedules", GetScheduleList) // 定时任务列表
+ app.GET("/schedules/:id", GetSchedule) // 定时任务详情
+ app.PUT("/schedules", PutSchedule) // 创建定时任务
+ app.POST("/schedules/:id", PostSchedule) // 修改定时任务
+ app.DELETE("/schedules/:id", DeleteSchedule) // 删除定时任务
app.GET("/tasks", GetTaskList) // 任务列表
app.GET("/tasks/:id", GetTask) // 任务详情
app.PUT("/tasks", PutTask) // 派发任务
app.DELETE("/tasks/:id", DeleteTask) // 删除任务
- app.GET("/tasks/:id/results",GetTaskResults) // 任务结果
+ app.GET("/tasks/:id/results", GetTaskResults) // 任务结果
app.GET("/tasks/:id/results/download", DownloadTaskResultsCsv) // 下载任务结果
+ app.GET("/spiders", GetSpiderList) // 爬虫列表
+ app.GET("/spiders/:id", GetSpider) // 爬虫详情
+ app.POST("/spiders/:id", PostSpider) // 修改爬虫
+ app.DELETE("/spiders/:id",DeleteSpider) // 删除爬虫
+ app.GET("/spiders/:id/tasks",GetSpiderTasks) // 爬虫任务列表
+ app.GET("/spiders/:id/dir",GetSpiderDir) // 爬虫目录
}
//mock test, test data in ./mock
@@ -49,7 +56,7 @@ func TestGetNodeList(t *testing.T) {
w := httptest.NewRecorder()
req, _ := http.NewRequest("GET", "/nodes", nil)
app.ServeHTTP(w, req)
- err := json.Unmarshal([]byte(w.Body.String()), &resp)
+ err := json.Unmarshal(w.Body.Bytes(), &resp)
if err != nil {
t.Fatal("Unmarshal resp failed")
}
@@ -68,7 +75,7 @@ func TestGetNode(t *testing.T) {
w := httptest.NewRecorder()
req, _ := http.NewRequest("GET", "/nodes/"+mongoId, nil)
app.ServeHTTP(w, req)
- err := json.Unmarshal([]byte(w.Body.String()), &resp)
+ err := json.Unmarshal(w.Body.Bytes(), &resp)
if err != nil {
t.Fatal("Unmarshal resp failed")
}
@@ -87,7 +94,7 @@ func TestPing(t *testing.T) {
w := httptest.NewRecorder()
req, _ := http.NewRequest("GET", "/ping", nil)
app.ServeHTTP(w, req)
- err := json.Unmarshal([]byte(w.Body.String()), &resp)
+ err := json.Unmarshal(w.Body.Bytes(), &resp)
if err != nil {
t.Fatal("Unmarshal resp failed")
}
@@ -105,7 +112,7 @@ func TestGetNodeTaskList(t *testing.T) {
w := httptest.NewRecorder()
req, _ := http.NewRequest("GET", "nodes/"+mongoId+"/tasks", nil)
app.ServeHTTP(w, req)
- err := json.Unmarshal([]byte(w.Body.String()), &resp)
+ err := json.Unmarshal(w.Body.Bytes(), &resp)
if err != nil {
t.Fatal("Unmarshal resp failed")
}
@@ -124,7 +131,7 @@ func TestDeleteNode(t *testing.T) {
w := httptest.NewRecorder()
req, _ := http.NewRequest("DELETE", "nodes/"+mongoId, nil)
app.ServeHTTP(w, req)
- err := json.Unmarshal([]byte(w.Body.String()), &resp)
+ err := json.Unmarshal(w.Body.Bytes(), &resp)
if err != nil {
t.Fatal("Unmarshal resp failed")
}
@@ -156,10 +163,10 @@ func TestPostNode(t *testing.T) {
var mongoId = "5d429e6c19f7abede924fee2"
w := httptest.NewRecorder()
- req, _ := http.NewRequest("POST", "nodes/"+mongoId, strings.NewReader(string(body)))
+ req, _ := http.NewRequest("POST", "nodes/"+mongoId, bytes.NewReader(body))
app.ServeHTTP(w, req)
- err := json.Unmarshal([]byte(w.Body.String()), &resp)
+ err := json.Unmarshal(w.Body.Bytes(), &resp)
t.Log(resp)
if err != nil {
t.Fatal("Unmarshal resp failed")
@@ -178,7 +185,7 @@ func TestGetSystemInfo(t *testing.T) {
w := httptest.NewRecorder()
req, _ := http.NewRequest("GET", "nodes/"+mongoId+"/system", nil)
app.ServeHTTP(w, req)
- err := json.Unmarshal([]byte(w.Body.String()), &resp)
+ err := json.Unmarshal(w.Body.Bytes(), &resp)
if err != nil {
t.Fatal("Unmarshal resp failed")
}
diff --git a/backend/mock/schedule.go b/backend/mock/schedule.go
index ae982ca6..702e8754 100644
--- a/backend/mock/schedule.go
+++ b/backend/mock/schedule.go
@@ -113,7 +113,7 @@ func PutSchedule(c *gin.Context) {
func DeleteSchedule(c *gin.Context) {
id := bson.ObjectIdHex("5d429e6c19f7abede924fee2")
for _, sch := range scheduleList {
- if sch.Id == bson.ObjectId(id) {
+ if sch.Id == id {
fmt.Println("delete a schedule")
}
}
diff --git a/backend/mock/schedule_test.go b/backend/mock/schedule_test.go
index c24631b2..12843c75 100644
--- a/backend/mock/schedule_test.go
+++ b/backend/mock/schedule_test.go
@@ -1,7 +1,9 @@
package mock
import (
+ "bytes"
"crawlab/model"
+ "crawlab/utils"
"encoding/json"
"github.com/globalsign/mgo/bson"
. "github.com/smartystreets/goconvey/convey"
@@ -17,7 +19,7 @@ func TestGetScheduleList(t *testing.T) {
w := httptest.NewRecorder()
req, _ := http.NewRequest("GET", "/schedules", nil)
app.ServeHTTP(w, req)
- err := json.Unmarshal([]byte(w.Body.String()), &resp)
+ err := json.Unmarshal(w.Body.Bytes(), &resp)
if err != nil {
t.Fatal("Unmarshal resp failed")
}
@@ -36,7 +38,7 @@ func TestGetSchedule(t *testing.T) {
w := httptest.NewRecorder()
req, _ := http.NewRequest("GET", "/schedules/"+mongoId, nil)
app.ServeHTTP(w, req)
- err := json.Unmarshal([]byte(w.Body.String()), &resp)
+ err := json.Unmarshal(w.Body.Bytes(), &resp)
if err != nil {
t.Fatal("Unmarshal resp failed")
}
@@ -56,7 +58,7 @@ func TestDeleteSchedule(t *testing.T) {
req, _ := http.NewRequest("DELETE", "/schedules/"+mongoId, nil)
app.ServeHTTP(w, req)
- err := json.Unmarshal([]byte(w.Body.String()), &resp)
+ err := json.Unmarshal(w.Body.Bytes(), &resp)
if err != nil {
t.Fatal("Unmarshal resp failed")
}
@@ -86,11 +88,12 @@ func TestPostSchedule(t *testing.T) {
var resp Response
var mongoId = "5d429e6c19f7abede924fee2"
- body,_ := json.Marshal(newItem)
+ body, _ := json.Marshal(newItem)
w := httptest.NewRecorder()
- req,_ := http.NewRequest("POST", "/schedules/"+mongoId,strings.NewReader(string(body)))
+ req, _ := http.NewRequest("POST", "/schedules/"+mongoId, strings.NewReader(utils.BytesToString(body)))
app.ServeHTTP(w, req)
- err := json.Unmarshal([]byte(w.Body.String()),&resp)
+
+ err := json.Unmarshal(w.Body.Bytes(), &resp)
t.Log(resp)
if err != nil {
t.Fatal("unmarshal resp failed")
@@ -121,11 +124,11 @@ func TestPutSchedule(t *testing.T) {
}
var resp Response
- body,_ := json.Marshal(newItem)
+ body, _ := json.Marshal(newItem)
w := httptest.NewRecorder()
- req,_ := http.NewRequest("PUT", "/schedules",strings.NewReader(string(body)))
+ req, _ := http.NewRequest("PUT", "/schedules", bytes.NewReader(body))
app.ServeHTTP(w, req)
- err := json.Unmarshal([]byte(w.Body.String()),&resp)
+ err := json.Unmarshal(w.Body.Bytes(), &resp)
t.Log(resp)
if err != nil {
t.Fatal("unmarshal resp failed")
diff --git a/backend/mock/spider.go b/backend/mock/spider.go
index c4807247..ef3e6104 100644
--- a/backend/mock/spider.go
+++ b/backend/mock/spider.go
@@ -1 +1,178 @@
-package mock
\ No newline at end of file
+package mock
+
+import (
+ "crawlab/model"
+ "github.com/apex/log"
+ "github.com/gin-gonic/gin"
+ "github.com/globalsign/mgo/bson"
+ "io/ioutil"
+ "net/http"
+ "os"
+ "path/filepath"
+ "time"
+)
+
+var SpiderList = []model.Spider{
+ {
+ Id: bson.ObjectId("5d429e6c19f7abede924fee2"),
+ Name: "For test",
+ DisplayName: "test",
+ Type: "test",
+ Col: "test",
+ Site: "www.baidu.com",
+ Envs: nil,
+ Src: "../app/spiders",
+ Cmd: "scrapy crawl test",
+ LastRunTs: time.Now(),
+ CreateTs: time.Now(),
+ UpdateTs: time.Now(),
+ },
+}
+
+func GetSpiderList(c *gin.Context) {
+
+ // mock get spider list from database
+ results := SpiderList
+
+ c.JSON(http.StatusOK, Response{
+ Status: "ok",
+ Message: "success",
+ Data: results,
+ })
+}
+
+func GetSpider(c *gin.Context) {
+ id := c.Param("id")
+ var result model.Spider
+
+ if !bson.IsObjectIdHex(id) {
+ HandleErrorF(http.StatusBadRequest, c, "invalid id")
+ }
+
+ for _, spider := range SpiderList {
+ if spider.Id == bson.ObjectId(id) {
+ result = spider
+ }
+ }
+
+ c.JSON(http.StatusOK, Response{
+ Status: "ok",
+ Message: "success",
+ Data: result,
+ })
+}
+
+func PostSpider(c *gin.Context) {
+ id := c.Param("id")
+
+ if !bson.IsObjectIdHex(id) {
+ HandleErrorF(http.StatusBadRequest, c, "invalid id")
+ }
+
+ var item model.Spider
+ if err := c.ShouldBindJSON(&item); err != nil {
+ HandleError(http.StatusBadRequest, c, err)
+ return
+ }
+
+ log.Info("modify the item")
+
+ c.JSON(http.StatusOK, Response{
+ Status: "ok",
+ Message: "success",
+ })
+}
+func GetSpiderDir(c *gin.Context) {
+ // 爬虫ID
+ id := c.Param("id")
+
+ // 目录相对路径
+ path := c.Query("path")
+ var spi model.Spider
+
+ // 获取爬虫
+ for _, spider := range SpiderList {
+ if spider.Id == bson.ObjectId(id) {
+ spi = spider
+ }
+ }
+
+ // 获取目录下文件列表
+ f, err := ioutil.ReadDir(filepath.Join(spi.Src, path))
+ if err != nil {
+ HandleError(http.StatusInternalServerError, c, err)
+ return
+ }
+
+ // 遍历文件列表
+ var fileList []model.File
+ for _, file := range f {
+ fileList = append(fileList, model.File{
+ Name: file.Name(),
+ IsDir: file.IsDir(),
+ Size: file.Size(),
+ Path: filepath.Join(path, file.Name()),
+ })
+ }
+
+ // 返回结果
+ c.JSON(http.StatusOK, Response{
+ Status: "ok",
+ Message: "success",
+ Data: fileList,
+ })
+}
+
+func GetSpiderTasks(c *gin.Context) {
+ id := c.Param("id")
+
+ var spider model.Spider
+ for _, spi := range SpiderList {
+ if spi.Id == bson.ObjectId(id) {
+ spider = spi
+ }
+ }
+
+ var tasks model.Task
+ for _, task := range TaskList {
+ if task.SpiderId == spider.Id {
+ tasks = task
+ }
+ }
+
+ c.JSON(http.StatusOK, Response{
+ Status: "ok",
+ Message: "success",
+ Data: tasks,
+ })
+}
+
+func DeleteSpider(c *gin.Context) {
+ id := c.Param("id")
+
+ if !bson.IsObjectIdHex(id) {
+ HandleErrorF(http.StatusBadRequest, c, "invalid id")
+ return
+ }
+
+ // 获取该爬虫,get this spider
+ var spider model.Spider
+ for _, spi := range SpiderList {
+ if spi.Id == bson.ObjectId(id) {
+ spider = spi
+ }
+ }
+
+ // 删除爬虫文件目录,delete the spider dir
+ if err := os.RemoveAll(spider.Src); err != nil {
+ HandleError(http.StatusInternalServerError, c, err)
+ return
+ }
+
+ // 从数据库中删除该爬虫,delete this spider from database
+
+ c.JSON(http.StatusOK, Response{
+ Status: "ok",
+ Message: "success",
+ })
+}
diff --git a/backend/mock/spider_test.go b/backend/mock/spider_test.go
new file mode 100644
index 00000000..f4dbea63
--- /dev/null
+++ b/backend/mock/spider_test.go
@@ -0,0 +1,137 @@
+package mock
+
+import (
+ "bytes"
+ "crawlab/model"
+ "encoding/json"
+ "github.com/globalsign/mgo/bson"
+ . "github.com/smartystreets/goconvey/convey"
+ "net/http"
+ "net/http/httptest"
+ "testing"
+ "time"
+)
+
+func TestGetSpiderList(t *testing.T) {
+ var resp Response
+ w := httptest.NewRecorder()
+ req, _ := http.NewRequest("GET", "/spiders", nil)
+ app.ServeHTTP(w, req)
+ err := json.Unmarshal(w.Body.Bytes(), &resp)
+ if err != nil {
+ t.Fatal("unmarshal resp faild")
+ }
+ Convey("Test API GetSpiderList", t, func() {
+ Convey("Test response status", func() {
+ So(resp.Status, ShouldEqual, "ok")
+ So(resp.Message, ShouldEqual, "success")
+ })
+ })
+}
+
+func TestGetSpider(t *testing.T) {
+ var resp Response
+ var spiderId = "5d429e6c19f7abede924fee2"
+ w := httptest.NewRecorder()
+ req, _ := http.NewRequest("GET", "/spiders/"+spiderId, nil)
+ app.ServeHTTP(w, req)
+ err := json.Unmarshal(w.Body.Bytes(), &resp)
+ if err != nil {
+ t.Fatal("unmarshal resp failed")
+ }
+ Convey("Test API GetSpider", t, func() {
+ Convey("Test response status", func() {
+ So(resp.Status, ShouldEqual, "ok")
+ So(resp.Message, ShouldEqual, "success")
+ })
+ })
+}
+
+func TestPostSpider(t *testing.T) {
+ var spider = model.Spider{
+ Id: bson.ObjectIdHex("5d429e6c19f7abede924fee2"),
+ Name: "For test",
+ DisplayName: "test",
+ Type: "test",
+ Col: "test",
+ Site: "www.baidu.com",
+ Envs: nil,
+ Src: "/app/spider",
+ Cmd: "scrapy crawl test",
+ LastRunTs: time.Now(),
+ CreateTs: time.Now(),
+ UpdateTs: time.Now(),
+ }
+ var resp Response
+ var spiderId = "5d429e6c19f7abede924fee2"
+ w := httptest.NewRecorder()
+ body, _ := json.Marshal(spider)
+ req, _ := http.NewRequest("POST", "/spiders/"+spiderId, bytes.NewReader(body))
+ app.ServeHTTP(w, req)
+ err := json.Unmarshal(w.Body.Bytes(), &resp)
+ if err != nil {
+ t.Fatal("unmarshal resp failed")
+ }
+ Convey("Test API PostSpider", t, func() {
+ Convey("Test response status", func() {
+ So(resp.Status, ShouldEqual, "ok")
+ So(resp.Message, ShouldEqual, "success")
+ })
+ })
+
+}
+
+func TestGetSpiderDir(t *testing.T) {
+ var spiderId = "5d429e6c19f7abede924fee2"
+ var resp Response
+ w := httptest.NewRecorder()
+ req, _ := http.NewRequest("GET", "/spiders/"+spiderId+"/dir", nil)
+ app.ServeHTTP(w, req)
+ err := json.Unmarshal(w.Body.Bytes(), &resp)
+ if err != nil {
+ t.Fatal("unmarshal resp failed")
+ }
+ Convey("Test API GetSpiderDir", t, func() {
+ Convey("Test response status", func() {
+ So(resp.Status, ShouldEqual, "ok")
+ So(resp.Message, ShouldEqual, "success")
+ })
+ })
+
+}
+
+func TestGetSpiderTasks(t *testing.T) {
+ var spiderId = "5d429e6c19f7abede924fee2"
+ var resp Response
+ w := httptest.NewRecorder()
+ req, _ := http.NewRequest("GET", "/spiders/"+spiderId+"/tasks", nil)
+ app.ServeHTTP(w, req)
+ err := json.Unmarshal(w.Body.Bytes(), &resp)
+ if err != nil {
+ t.Fatal("unmarshal resp failed")
+ }
+ Convey("Test API GetSpiderTasks", t, func() {
+ Convey("Test response status", func() {
+ So(resp.Status, ShouldEqual, "ok")
+ So(resp.Message, ShouldEqual, "success")
+ })
+ })
+}
+
+func TestDeleteSpider(t *testing.T) {
+ var spiderId = "5d429e6c19f7abede924fee2"
+ var resp Response
+ w := httptest.NewRecorder()
+ req, _ := http.NewRequest("DELETE", "/spiders/"+spiderId, nil)
+ app.ServeHTTP(w, req)
+ err := json.Unmarshal(w.Body.Bytes(), &resp)
+ if err != nil {
+ t.Fatal("unmarshal resp failed")
+ }
+ Convey("Test API DeleteSpider", t, func() {
+ Convey("Test response status", func() {
+ So(resp.Status, ShouldEqual, "ok")
+ So(resp.Message, ShouldEqual, "success")
+ })
+ })
+}
diff --git a/backend/mock/stats_test.go b/backend/mock/stats_test.go
index f2054f85..a94e52d4 100644
--- a/backend/mock/stats_test.go
+++ b/backend/mock/stats_test.go
@@ -14,7 +14,7 @@ func TestGetHomeStats(t *testing.T) {
w := httptest.NewRecorder()
req, _ := http.NewRequest("GET", "/stats/home", nil)
app.ServeHTTP(w, req)
- err := json.Unmarshal([]byte(w.Body.String()), &resp)
+ err := json.Unmarshal(w.Body.Bytes(), &resp)
fmt.Println(resp.Data)
if err != nil {
t.Fatal("Unmarshal resp failed")
@@ -26,4 +26,4 @@ func TestGetHomeStats(t *testing.T) {
So(resp.Message, ShouldEqual, "success")
})
})
-}
\ No newline at end of file
+}
diff --git a/backend/mock/task.go b/backend/mock/task.go
index 84dece09..7b77d07e 100644
--- a/backend/mock/task.go
+++ b/backend/mock/task.go
@@ -186,7 +186,7 @@ func DownloadTaskResultsCsv(c *gin.Context) {
bytesBuffer := &bytes.Buffer{}
// 写入UTF-8 BOM,避免使用Microsoft Excel打开乱码
- bytesBuffer.Write([]byte("\xEF\xBB\xBF"))
+ bytesBuffer.WriteString("\xEF\xBB\xBF")
writer := csv.NewWriter(bytesBuffer)
diff --git a/backend/mock/task_test.go b/backend/mock/task_test.go
index 103ed643..1cd4ccfa 100644
--- a/backend/mock/task_test.go
+++ b/backend/mock/task_test.go
@@ -1,13 +1,13 @@
package mock
import (
+ "bytes"
"crawlab/model"
"encoding/json"
"github.com/globalsign/mgo/bson"
. "github.com/smartystreets/goconvey/convey"
"net/http"
"net/http/httptest"
- "strings"
"testing"
"time"
)
@@ -24,7 +24,7 @@ func TestGetTaskList(t *testing.T) {
w := httptest.NewRecorder()
req, _ := http.NewRequest("GET", "/tasks?PageNum=2&PageSize=10&NodeId=342dfsff&SpiderId=f8dsf", nil)
app.ServeHTTP(w, req)
- err := json.Unmarshal([]byte(w.Body.String()), &resp)
+ err := json.Unmarshal(w.Body.Bytes(), &resp)
if err != nil {
t.Fatal("Unmarshal resp failed")
}
@@ -44,7 +44,7 @@ func TestGetTask(t *testing.T) {
w := httptest.NewRecorder()
req, _ := http.NewRequest("GET", "/tasks/"+taskId, nil)
app.ServeHTTP(w, req)
- err := json.Unmarshal([]byte(w.Body.String()), &resp)
+ err := json.Unmarshal(w.Body.Bytes(), &resp)
if err != nil {
t.Fatal("Unmarshal resp failed")
}
@@ -80,9 +80,9 @@ func TestPutTask(t *testing.T) {
var resp Response
body, _ := json.Marshal(&newItem)
w := httptest.NewRecorder()
- req, _ := http.NewRequest("PUT", "/tasks", strings.NewReader(string(body)))
+ req, _ := http.NewRequest("PUT", "/tasks", bytes.NewReader(body))
app.ServeHTTP(w, req)
- err := json.Unmarshal([]byte(w.Body.String()), &resp)
+ err := json.Unmarshal(w.Body.Bytes(), &resp)
if err != nil {
t.Fatal("unmarshal resp failed")
}
@@ -100,7 +100,7 @@ func TestDeleteTask(t *testing.T) {
w := httptest.NewRecorder()
req, _ := http.NewRequest("DELETE", "/tasks/"+taskId, nil)
app.ServeHTTP(w, req)
- err := json.Unmarshal([]byte(w.Body.String()), &resp)
+ err := json.Unmarshal(w.Body.Bytes(), &resp)
if err != nil {
t.Fatal("unmarshal resp failed")
}
@@ -123,7 +123,7 @@ func TestGetTaskResults(t *testing.T) {
w := httptest.NewRecorder()
req, _ := http.NewRequest("GET", "/tasks/"+taskId+"/results?PageNum=2&PageSize=1", nil)
app.ServeHTTP(w, req)
- err := json.Unmarshal([]byte(w.Body.String()), &resp)
+ err := json.Unmarshal(w.Body.Bytes(), &resp)
if err != nil {
t.Fatal("Unmarshal resp failed")
}
diff --git a/backend/model/node.go b/backend/model/node.go
index 61c20473..6211115c 100644
--- a/backend/model/node.go
+++ b/backend/model/node.go
@@ -1,7 +1,9 @@
package model
import (
+ "crawlab/constants"
"crawlab/database"
+ "crawlab/services/register"
"github.com/apex/log"
"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
@@ -79,6 +81,7 @@ func GetNodeList(filter interface{}) ([]Node, error) {
var results []Node
if err := c.Find(filter).All(&results); err != nil {
+ log.Error("get node list error: " + err.Error())
debug.PrintStack()
return results, err
}
@@ -153,3 +156,47 @@ func GetNodeCount(query interface{}) (int, error) {
return count, nil
}
+
+// 节点基本信息
+func GetNodeBaseInfo() (ip string, mac string, key string, error error) {
+ ip, err := register.GetRegister().GetIp()
+ if err != nil {
+ debug.PrintStack()
+ return "", "", "", err
+ }
+
+ mac, err = register.GetRegister().GetMac()
+ if err != nil {
+ debug.PrintStack()
+ return "", "", "", err
+ }
+
+ key, err = register.GetRegister().GetKey()
+ if err != nil {
+ debug.PrintStack()
+ return "", "", "", err
+ }
+ return ip, mac, key, nil
+}
+
+// 根据redis的key值,重置node节点为offline
+func ResetNodeStatusToOffline(list []string) {
+ nodes, _ := GetNodeList(nil)
+ for _, node := range nodes {
+ hasNode := false
+ for _, key := range list {
+ if key == node.Key {
+ hasNode = true
+ break
+ }
+ }
+ if !hasNode || node.Status == "" {
+ node.Status = constants.StatusOffline
+ if err := node.Save(); err != nil {
+ log.Errorf(err.Error())
+ return
+ }
+ continue
+ }
+ }
+}
diff --git a/backend/model/node_test.go b/backend/model/node_test.go
new file mode 100644
index 00000000..ba3f4aaa
--- /dev/null
+++ b/backend/model/node_test.go
@@ -0,0 +1,50 @@
+package model
+
+import (
+ "crawlab/config"
+ "crawlab/constants"
+ "crawlab/database"
+ "github.com/apex/log"
+ . "github.com/smartystreets/goconvey/convey"
+ "runtime/debug"
+ "testing"
+)
+
+func TestAddNode(t *testing.T) {
+ Convey("Test AddNode", t, func() {
+ if err := config.InitConfig("../conf/config.yml"); err != nil {
+ log.Error("init config error:" + err.Error())
+ panic(err)
+ }
+ log.Info("初始化配置成功")
+
+ // 初始化Mongodb数据库
+ if err := database.InitMongo(); err != nil {
+ log.Error("init mongodb error:" + err.Error())
+ debug.PrintStack()
+ panic(err)
+ }
+ log.Info("初始化Mongodb数据库成功")
+
+ // 初始化Redis数据库
+ if err := database.InitRedis(); err != nil {
+ log.Error("init redis error:" + err.Error())
+ debug.PrintStack()
+ panic(err)
+ }
+
+ var node = Node{
+ Key: "c4:b3:01:bd:b5:e7",
+ Name: "10.27.238.101",
+ Ip: "10.27.238.101",
+ Port: "8000",
+ Mac: "c4:b3:01:bd:b5:e7",
+ Status: constants.StatusOnline,
+ IsMaster: true,
+ }
+ if err := node.Add(); err != nil {
+ log.Error("add node error:" + err.Error())
+ panic(err)
+ }
+ })
+}
diff --git a/backend/model/schedule.go b/backend/model/schedule.go
index 9f77c452..1c8db0bd 100644
--- a/backend/model/schedule.go
+++ b/backend/model/schedule.go
@@ -18,6 +18,7 @@ type Schedule struct {
NodeId bson.ObjectId `json:"node_id" bson:"node_id"`
Cron string `json:"cron" bson:"cron"`
EntryId cron.EntryID `json:"entry_id" bson:"entry_id"`
+ Param string `json:"param" bson:"param"`
// 前端展示
SpiderName string `json:"spider_name" bson:"spider_name"`
diff --git a/backend/model/spider.go b/backend/model/spider.go
index c4c94edf..e0e5f836 100644
--- a/backend/model/spider.go
+++ b/backend/model/spider.go
@@ -23,13 +23,14 @@ type Spider struct {
Col string `json:"col"` // 结果储存位置
Site string `json:"site"` // 爬虫网站
Envs []Env `json:"envs" bson:"envs"` // 环境变量
-
+ Remark string `json:"remark"` // 备注
// 自定义爬虫
Src string `json:"src" bson:"src"` // 源码位置
Cmd string `json:"cmd" bson:"cmd"` // 执行命令
// 前端展示
- LastRunTs time.Time `json:"last_run_ts"` // 最后一次执行时间
+ LastRunTs time.Time `json:"last_run_ts"` // 最后一次执行时间
+ LastStatus string `json:"last_status"` // 最后执行状态
// TODO: 可配置爬虫
//Fields []interface{} `json:"fields"`
@@ -92,15 +93,13 @@ func (spider *Spider) GetLastTask() (Task, error) {
return tasks[0], nil
}
-
-
func GetSpiderList(filter interface{}, skip int, limit int) ([]Spider, error) {
s, c := database.GetCol("spiders")
defer s.Close()
// 获取爬虫列表
spiders := []Spider{}
- if err := c.Find(filter).Skip(skip).Limit(limit).All(&spiders); err != nil {
+ if err := c.Find(filter).Skip(skip).Limit(limit).Sort("+name").All(&spiders); err != nil {
debug.PrintStack()
return spiders, err
}
@@ -117,6 +116,7 @@ func GetSpiderList(filter interface{}, skip int, limit int) ([]Spider, error) {
// 赋值
spiders[i].LastRunTs = task.CreateTs
+ spiders[i].LastStatus = task.Status
}
return spiders, nil
@@ -165,6 +165,33 @@ func RemoveSpider(id bson.ObjectId) error {
return err
}
+ // gf上的文件
+ s, gf := database.GetGridFs("files")
+ defer s.Close()
+
+ if err := gf.RemoveId(result.FileId); err != nil {
+ log.Error("remove file error, id:" + result.FileId.Hex())
+ return err
+ }
+
+ return nil
+}
+
+func RemoveAllSpider() error {
+ s, c := database.GetCol("spiders")
+ defer s.Close()
+
+ spiders := []Spider{}
+ err := c.Find(nil).All(&spiders)
+ if err != nil {
+ log.Error("get all spiders error:" + err.Error())
+ return err
+ }
+ for _, spider := range spiders {
+ if err := RemoveSpider(spider.Id); err != nil {
+ log.Error("remove spider error:" + err.Error())
+ }
+ }
return nil
}
diff --git a/backend/model/task.go b/backend/model/task.go
index 8ae782b5..4957b577 100644
--- a/backend/model/task.go
+++ b/backend/model/task.go
@@ -19,6 +19,7 @@ type Task struct {
NodeId bson.ObjectId `json:"node_id" bson:"node_id"`
LogPath string `json:"log_path" bson:"log_path"`
Cmd string `json:"cmd" bson:"cmd"`
+ Param string `json:"param" bson:"param"`
Error string `json:"error" bson:"error"`
ResultCount int `json:"result_count" bson:"result_count"`
WaitDuration float64 `json:"wait_duration" bson:"wait_duration"`
@@ -190,6 +191,21 @@ func RemoveTask(id string) error {
return nil
}
+func RemoveTaskBySpiderId(id string) error {
+ tasks, err := GetTaskList(bson.M{"spider_id": id}, 0, constants.Infinite, "-create_ts")
+ if err != nil {
+ log.Error("get tasks error:" + err.Error())
+ }
+
+ for _, task := range tasks {
+ if err := RemoveTask(task.Id); err != nil {
+ log.Error("remove task error:" + err.Error())
+ continue
+ }
+ }
+ return nil
+}
+
func GetTaskCount(query interface{}) (int, error) {
s, c := database.GetCol("tasks")
defer s.Close()
@@ -207,7 +223,7 @@ func GetDailyTaskStats(query bson.M) ([]TaskDailyItem, error) {
defer s.Close()
// 起始日期
- startDate := time.Now().Add(- 30 * 24 * time.Hour)
+ startDate := time.Now().Add(-30 * 24 * time.Hour)
endDate := time.Now()
// query
diff --git a/backend/routes/file.go b/backend/routes/file.go
index 435f1fba..eaf43ab5 100644
--- a/backend/routes/file.go
+++ b/backend/routes/file.go
@@ -1,6 +1,7 @@
package routes
import (
+ "crawlab/utils"
"github.com/gin-gonic/gin"
"io/ioutil"
"net/http"
@@ -15,6 +16,6 @@ func GetFile(c *gin.Context) {
c.JSON(http.StatusOK, Response{
Status: "ok",
Message: "success",
- Data: string(fileBytes),
+ Data: utils.BytesToString(fileBytes),
})
}
diff --git a/backend/routes/spider.go b/backend/routes/spider.go
index dceb2651..76e5c568 100644
--- a/backend/routes/spider.go
+++ b/backend/routes/spider.go
@@ -229,6 +229,12 @@ func DeleteSpider(c *gin.Context) {
return
}
+ // 删除爬虫对应的task任务
+ if err := model.RemoveTaskBySpiderId(spider.Id.Hex()); err != nil {
+ HandleError(http.StatusInternalServerError, c, err)
+ return
+ }
+
c.JSON(http.StatusOK, Response{
Status: "ok",
Message: "success",
@@ -321,7 +327,7 @@ func GetSpiderFile(c *gin.Context) {
c.JSON(http.StatusOK, Response{
Status: "ok",
Message: "success",
- Data: string(fileBytes),
+ Data: utils.BytesToString(fileBytes),
})
}
diff --git a/backend/routes/task.go b/backend/routes/task.go
index e5efa425..4dc42f6f 100644
--- a/backend/routes/task.go
+++ b/backend/routes/task.go
@@ -215,7 +215,7 @@ func DownloadTaskResultsCsv(c *gin.Context) {
bytesBuffer := &bytes.Buffer{}
// 写入UTF-8 BOM,避免使用Microsoft Excel打开乱码
- bytesBuffer.Write([]byte("\xEF\xBB\xBF"))
+ bytesBuffer.WriteString("\xEF\xBB\xBF")
writer := csv.NewWriter(bytesBuffer)
diff --git a/backend/routes/user.go b/backend/routes/user.go
index a3d5a431..a6d44cae 100644
--- a/backend/routes/user.go
+++ b/backend/routes/user.go
@@ -4,6 +4,7 @@ import (
"crawlab/constants"
"crawlab/model"
"crawlab/services"
+ "crawlab/services/context"
"crawlab/utils"
"github.com/gin-gonic/gin"
"github.com/globalsign/mgo/bson"
@@ -171,7 +172,7 @@ func Login(c *gin.Context) {
}
// 获取token
- tokenStr, err := services.GetToken(user.Username)
+ tokenStr, err := services.MakeToken(&user)
if err != nil {
HandleError(http.StatusUnauthorized, c, errors.New("not authorized"))
return
@@ -185,20 +186,16 @@ func Login(c *gin.Context) {
}
func GetMe(c *gin.Context) {
- // 获取token string
- tokenStr := c.GetHeader("Authorization")
-
- // 校验token
- user, err := services.CheckToken(tokenStr)
- if err != nil {
- HandleError(http.StatusUnauthorized, c, errors.New("not authorized"))
+ ctx := context.WithGinContext(c)
+ user := ctx.User()
+ if user == nil {
+ ctx.FailedWithError(constants.ErrorUserNotFound, http.StatusUnauthorized)
return
}
- user.Password = ""
-
- c.JSON(http.StatusOK, Response{
- Status: "ok",
- Message: "success",
- Data: user,
- })
+ ctx.Success(struct {
+ *model.User
+ Password string `json:"password,omitempty"`
+ }{
+ User: user,
+ }, nil)
}
diff --git a/backend/routes/utils.go b/backend/routes/utils.go
index 14c5853e..38ca35bb 100644
--- a/backend/routes/utils.go
+++ b/backend/routes/utils.go
@@ -1,13 +1,15 @@
package routes
import (
+ "github.com/apex/log"
"github.com/gin-gonic/gin"
"runtime/debug"
)
func HandleError(statusCode int, c *gin.Context, err error) {
+ log.Errorf("handle error:" + err.Error())
debug.PrintStack()
- c.JSON(statusCode, Response{
+ c.AbortWithStatusJSON(statusCode, Response{
Status: "ok",
Message: "error",
Error: err.Error(),
@@ -16,7 +18,7 @@ func HandleError(statusCode int, c *gin.Context, err error) {
func HandleErrorF(statusCode int, c *gin.Context, err string) {
debug.PrintStack()
- c.JSON(statusCode, Response{
+ c.AbortWithStatusJSON(statusCode, Response{
Status: "ok",
Message: "error",
Error: err,
diff --git a/backend/services/context/context.go b/backend/services/context/context.go
new file mode 100644
index 00000000..ce8eb72e
--- /dev/null
+++ b/backend/services/context/context.go
@@ -0,0 +1,100 @@
+package context
+
+import (
+ "crawlab/constants"
+ "crawlab/errors"
+ "crawlab/model"
+ "fmt"
+ "github.com/apex/log"
+ "github.com/gin-gonic/gin"
+ errors2 "github.com/pkg/errors"
+ "gopkg.in/go-playground/validator.v9"
+ "net/http"
+ "runtime/debug"
+)
+
+type Context struct {
+ *gin.Context
+}
+
+func (c *Context) User() *model.User {
+ userIfe, exists := c.Get(constants.ContextUser)
+ if !exists {
+ return nil
+ }
+ user, ok := userIfe.(*model.User)
+ if !ok {
+ return nil
+ }
+ return user
+}
+func (c *Context) Success(data interface{}, metas ...interface{}) {
+ var meta interface{}
+ if len(metas) == 0 {
+ meta = gin.H{}
+ } else {
+ meta = metas[0]
+ }
+ if data == nil {
+ data = gin.H{}
+ }
+ c.JSON(http.StatusOK, gin.H{
+ "status": "ok",
+ "message": "success",
+ "data": data,
+ "meta": meta,
+ "error": "",
+ })
+}
+func (c *Context) Failed(err error, variables ...interface{}) {
+ c.failed(err, http.StatusOK, variables...)
+}
+func (c *Context) failed(err error, httpCode int, variables ...interface{}) {
+ errStr := err.Error()
+ if len(variables) > 0 {
+ errStr = fmt.Sprintf(errStr, variables...)
+ }
+ log.Errorf("handle error:" + errStr)
+ debug.PrintStack()
+ causeError := errors2.Cause(err)
+ switch causeError.(type) {
+ case errors.OPError:
+ opError := causeError.(errors.OPError)
+
+ c.AbortWithStatusJSON(opError.HttpCode, gin.H{
+ "status": "ok",
+ "message": "error",
+ "error": errStr,
+ })
+ break
+ case validator.ValidationErrors:
+ validatorErrors := causeError.(validator.ValidationErrors)
+ //firstError := validatorErrors[0].(validator.FieldError)
+ c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{
+ "status": "ok",
+ "message": "error",
+ "error": validatorErrors.Error(),
+ })
+ break
+ default:
+ fmt.Println("deprecated....")
+ c.AbortWithStatusJSON(httpCode, gin.H{
+ "status": "ok",
+ "message": "error",
+ "error": errStr,
+ })
+ }
+}
+func (c *Context) FailedWithError(err error, httpCode ...int) {
+
+ var code = 200
+ if len(httpCode) > 0 {
+ code = httpCode[0]
+ }
+ c.failed(err, code)
+
+}
+
+func WithGinContext(context *gin.Context) *Context {
+ return &Context{Context: context}
+}
diff --git a/backend/services/log.go b/backend/services/log.go
index d59e463e..1344b02f 100644
--- a/backend/services/log.go
+++ b/backend/services/log.go
@@ -3,11 +3,15 @@ package services
import (
"crawlab/constants"
"crawlab/database"
+ "crawlab/lib/cron"
"crawlab/model"
"crawlab/utils"
"encoding/json"
"github.com/apex/log"
+ "github.com/spf13/viper"
"io/ioutil"
+ "os"
+ "path/filepath"
"runtime/debug"
)
@@ -16,13 +20,38 @@ var TaskLogChanMap = utils.NewChanMap()
// 获取本地日志
func GetLocalLog(logPath string) (fileBytes []byte, err error) {
- fileBytes, err = ioutil.ReadFile(logPath)
+
+ f, err := os.Open(logPath)
if err != nil {
- log.Errorf(err.Error())
+ log.Error(err.Error())
debug.PrintStack()
- return fileBytes, err
+ return nil, err
}
- return fileBytes, nil
+ fi, err := f.Stat()
+ if err != nil {
+ log.Error(err.Error())
+ debug.PrintStack()
+ return nil, err
+ }
+ defer f.Close()
+
+ const bufLen = 2 * 1024 * 1024
+ logBuf := make([]byte, bufLen)
+
+ off := int64(0)
+ if fi.Size() > int64(len(logBuf)) {
+ off = fi.Size() - int64(len(logBuf))
+ }
+ n, err := f.ReadAt(logBuf, off)
+
+ //到文件结尾会有EOF标识
+ if err != nil && err.Error() != "EOF" {
+ log.Error(err.Error())
+ debug.PrintStack()
+ return nil, err
+ }
+ logBuf = logBuf[:n]
+ return logBuf, nil
}
// 获取远端日志
@@ -42,7 +71,7 @@ func GetRemoteLog(task model.Task) (logStr string, err error) {
// 发布获取日志消息
channel := "nodes:" + task.NodeId.Hex()
- if err := database.Publish(channel, string(msgBytes)); err != nil {
+ if _, err := database.RedisClient.Publish(channel, utils.BytesToString(msgBytes)); err != nil {
log.Errorf(err.Error())
return "", err
}
@@ -55,3 +84,36 @@ func GetRemoteLog(task model.Task) (logStr string, err error) {
return logStr, nil
}
+
+func DeleteLogPeriodically() {
+ logDir := viper.GetString("log.path")
+ if !utils.Exists(logDir) {
+ log.Error("Can Not Set Delete Logs Periodically,No Log Dir")
+ return
+ }
+ rd, err := ioutil.ReadDir(logDir)
+ if err != nil {
+ log.Error("Read Log Dir Failed")
+ return
+ }
+
+ for _, fi := range rd {
+ if fi.IsDir() {
+ log.Info(filepath.Join(logDir, fi.Name()))
+ os.RemoveAll(filepath.Join(logDir, fi.Name()))
+ log.Info("Delete Log File Success")
+ }
+ }
+
+}
+
+func InitDeleteLogPeriodically() error {
+ c := cron.New(cron.WithSeconds())
+ if _, err := c.AddFunc(viper.GetString("log.deleteFrequency"), DeleteLogPeriodically); err != nil {
+ return err
+ }
+
+ c.Start()
+ return nil
+
+}
diff --git a/backend/services/log_test.go b/backend/services/log_test.go
new file mode 100644
index 00000000..1e9a21c7
--- /dev/null
+++ b/backend/services/log_test.go
@@ -0,0 +1,51 @@
+package services
+
+import (
+ "crawlab/config"
+ "crawlab/utils"
+ "fmt"
+ "github.com/apex/log"
+ . "github.com/smartystreets/goconvey/convey"
+ "github.com/spf13/viper"
+ "os"
+ "testing"
+)
+
+func TestDeleteLogPeriodically(t *testing.T) {
+ Convey("Test DeleteLogPeriodically", t, func() {
+ if err := config.InitConfig("../conf/config.yml"); err != nil {
+ log.Error("init config error:" + err.Error())
+ panic(err)
+ }
+ log.Info("初始化配置成功")
+ logDir := viper.GetString("log.path")
+ log.Info(logDir)
+ DeleteLogPeriodically()
+ })
+}
+
+func TestGetLocalLog(t *testing.T) {
+ //create a log file for test
+ logPath := "../logs/crawlab/test.log"
+ f, err := os.Create(logPath)
+ defer f.Close()
+ if err != nil {
+ fmt.Println(err.Error())
+
+ } else {
+ _, err = f.WriteString("This is for test")
+ }
+
+ Convey("Test GetLocalLog", t, func() {
+ Convey("Test response", func() {
+ logStr, err := GetLocalLog(logPath)
+ log.Info(utils.BytesToString(logStr))
+ fmt.Println(err)
+ So(err, ShouldEqual, nil)
+
+ })
+ })
+ //delete the test log file
+ os.Remove(logPath)
+
+}
diff --git a/backend/services/node.go b/backend/services/node.go
index 1fa2370c..63373be8 100644
--- a/backend/services/node.go
+++ b/backend/services/node.go
@@ -1,15 +1,18 @@
package services
import (
+ "context"
"crawlab/constants"
"crawlab/database"
"crawlab/lib/cron"
"crawlab/model"
"crawlab/services/register"
+ "crawlab/utils"
"encoding/json"
"fmt"
"github.com/apex/log"
"github.com/globalsign/mgo/bson"
+ "github.com/gomodule/redigo/redis"
"github.com/spf13/viper"
"runtime/debug"
"time"
@@ -73,23 +76,11 @@ func GetCurrentNode() (model.Node, error) {
if err != nil {
// 如果为主节点,表示为第一次注册,插入节点信息
if IsMaster() {
- // 获取本机IP地址
- ip, err := register.GetRegister().GetIp()
+ // 获取本机信息
+ ip, mac, key, err := model.GetNodeBaseInfo()
if err != nil {
debug.PrintStack()
- return model.Node{}, err
- }
-
- mac, err := register.GetRegister().GetMac()
- if err != nil {
- debug.PrintStack()
- return model.Node{}, err
- }
-
- key, err := register.GetRegister().GetKey()
- if err != nil {
- debug.PrintStack()
- return model.Node{}, err
+ return node, err
}
// 生成节点
@@ -97,7 +88,7 @@ func GetCurrentNode() (model.Node, error) {
Key: key,
Id: bson.NewObjectId(),
Ip: ip,
- Name: key,
+ Name: ip,
Mac: mac,
IsMaster: true,
}
@@ -124,6 +115,7 @@ func IsMaster() bool {
return viper.GetString("server.master") == Yes
}
+// 所有调用IsMasterNode的方法,都永远会在master节点执行,所以GetCurrentNode方法返回永远是master节点
// 该ID的节点是否为主节点
func IsMasterNode(id string) bool {
curNode, _ := GetCurrentNode()
@@ -176,72 +168,54 @@ func UpdateNodeStatus() {
// 在Redis中删除该节点
if err := database.RedisClient.HDel("nodes", data.Key); err != nil {
log.Errorf(err.Error())
- return
- }
-
- // 在MongoDB中该节点设置状态为离线
- s, c := database.GetCol("nodes")
- defer s.Close()
- var node model.Node
- if err := c.Find(bson.M{"key": key}).One(&node); err != nil {
- log.Errorf(err.Error())
- debug.PrintStack()
- return
- }
- node.Status = constants.StatusOffline
- if err := node.Save(); err != nil {
- log.Errorf(err.Error())
- return
}
continue
}
- // 更新节点信息到数据库
- s, c := database.GetCol("nodes")
- defer s.Close()
- var node model.Node
- if err := c.Find(bson.M{"key": key}).One(&node); err != nil {
- // 数据库不存在该节点
- node = model.Node{
- Key: key,
- Name: key,
- Ip: data.Ip,
- Port: "8000",
- Mac: data.Mac,
- Status: constants.StatusOnline,
- IsMaster: data.Master,
- }
- if err := node.Add(); err != nil {
- log.Errorf(err.Error())
- return
- }
- } else {
- // 数据库存在该节点
- node.Status = constants.StatusOnline
- if err := node.Save(); err != nil {
- log.Errorf(err.Error())
- return
- }
+ // 处理node信息
+ handleNodeInfo(key, data)
+ }
+
+ // 重置不在redis的key为offline
+ model.ResetNodeStatusToOffline(list)
+}
+
+func handleNodeInfo(key string, data Data) {
+ // 更新节点信息到数据库
+ s, c := database.GetCol("nodes")
+ defer s.Close()
+
+ // 同个key可能因为并发,被注册多次
+ var nodes []model.Node
+ _ = c.Find(bson.M{"key": key}).All(&nodes)
+ if nodes != nil && len(nodes) > 1 {
+ for _, node := range nodes {
+ _ = c.RemoveId(node.Id)
}
}
- // 遍历数据库中的节点列表
- nodes, err := model.GetNodeList(nil)
- for _, node := range nodes {
- hasNode := false
- for _, key := range list {
- if key == node.Key {
- hasNode = true
- break
- }
+ var node model.Node
+ if err := c.Find(bson.M{"key": key}).One(&node); err != nil {
+ // 数据库不存在该节点
+ node = model.Node{
+ Key: key,
+ Name: data.Ip,
+ Ip: data.Ip,
+ Port: "8000",
+ Mac: data.Mac,
+ Status: constants.StatusOnline,
+ IsMaster: data.Master,
}
- if !hasNode {
- node.Status = constants.StatusOffline
- if err := node.Save(); err != nil {
- log.Errorf(err.Error())
- return
- }
- continue
+ if err := node.Add(); err != nil {
+ log.Errorf(err.Error())
+ return
+ }
+ } else {
+ // 数据库存在该节点
+ node.Status = constants.StatusOnline
+ if err := node.Save(); err != nil {
+ log.Errorf(err.Error())
+ return
}
}
}
@@ -281,19 +255,18 @@ func UpdateNodeData() {
debug.PrintStack()
return
}
- if err := database.RedisClient.HSet("nodes", key, string(dataBytes)); err != nil {
+ if err := database.RedisClient.HSet("nodes", key, utils.BytesToString(dataBytes)); err != nil {
log.Errorf(err.Error())
return
}
}
-func MasterNodeCallback(channel string, msgStr string) {
+func MasterNodeCallback(message redis.Message) (err error) {
// 反序列化
var msg NodeMessage
- if err := json.Unmarshal([]byte(msgStr), &msg); err != nil {
- log.Errorf(err.Error())
- debug.PrintStack()
- return
+ if err := json.Unmarshal(message.Data, &msg); err != nil {
+
+ return err
}
if msg.Type == constants.MsgTypeGetLog {
@@ -308,18 +281,17 @@ func MasterNodeCallback(channel string, msgStr string) {
time.Sleep(10 * time.Millisecond)
ch := SystemInfoChanMap.ChanBlocked(msg.NodeId)
sysInfoBytes, _ := json.Marshal(&msg.SysInfo)
- ch <- string(sysInfoBytes)
+ ch <- utils.BytesToString(sysInfoBytes)
}
+ return nil
}
-func WorkerNodeCallback(channel string, msgStr string) {
+func WorkerNodeCallback(message redis.Message) (err error) {
// 反序列化
msg := NodeMessage{}
- fmt.Println(msgStr)
- if err := json.Unmarshal([]byte(msgStr), &msg); err != nil {
- log.Errorf(err.Error())
- debug.PrintStack()
- return
+ if err := json.Unmarshal(message.Data, &msg); err != nil {
+
+ return err
}
if msg.Type == constants.MsgTypeGetLog {
@@ -333,26 +305,27 @@ func WorkerNodeCallback(channel string, msgStr string) {
// 获取本地日志
logStr, err := GetLocalLog(msg.LogPath)
+ log.Info(utils.BytesToString(logStr))
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
msgSd.Error = err.Error()
+ msgSd.Log = err.Error()
+ } else {
+ msgSd.Log = utils.BytesToString(logStr)
}
- msgSd.Log = string(logStr)
// 序列化
msgSdBytes, err := json.Marshal(&msgSd)
if err != nil {
- log.Errorf(err.Error())
- debug.PrintStack()
- return
+ return err
}
// 发布消息给主节点
- fmt.Println(msgSd)
- if err := database.Publish("nodes:master", string(msgSdBytes)); err != nil {
- log.Errorf(err.Error())
- return
+ log.Info("publish get log msg to master")
+ if _, err := database.RedisClient.Publish("nodes:master", utils.BytesToString(msgSdBytes)); err != nil {
+
+ return err
}
} else if msg.Type == constants.MsgTypeCancelTask {
// 取消任务
@@ -362,8 +335,7 @@ func WorkerNodeCallback(channel string, msgStr string) {
// 获取环境信息
sysInfo, err := GetLocalSystemInfo()
if err != nil {
- log.Errorf(err.Error())
- return
+ return err
}
msgSd := NodeMessage{
Type: constants.MsgTypeGetSystemInfo,
@@ -374,14 +346,14 @@ func WorkerNodeCallback(channel string, msgStr string) {
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
- return
+ return err
}
- fmt.Println(msgSd)
- if err := database.Publish("nodes:master", string(msgSdBytes)); err != nil {
+ if _, err := database.RedisClient.Publish("nodes:master", utils.BytesToString(msgSdBytes)); err != nil {
log.Errorf(err.Error())
- return
+ return err
}
}
+ return
}
// 初始化节点服务
@@ -399,25 +371,27 @@ func InitNodeService() error {
// 首次更新节点数据(注册到Redis)
UpdateNodeData()
- // 消息订阅
- var sub database.Subscriber
- sub.Connect()
-
// 获取当前节点
node, err := GetCurrentNode()
if err != nil {
log.Errorf(err.Error())
return err
}
-
+ ctx := context.Background()
if IsMaster() {
// 如果为主节点,订阅主节点通信频道
channel := "nodes:master"
- sub.Subscribe(channel, MasterNodeCallback)
+ err := database.RedisClient.Subscribe(ctx, MasterNodeCallback, channel)
+ if err != nil {
+ return err
+ }
} else {
// 若为工作节点,订阅单独指定通信频道
channel := "nodes:" + node.Id.Hex()
- sub.Subscribe(channel, WorkerNodeCallback)
+ err := database.RedisClient.Subscribe(ctx, WorkerNodeCallback, channel)
+ if err != nil {
+ return err
+ }
}
// 如果为主节点,每30秒刷新所有节点信息
diff --git a/backend/services/schedule.go b/backend/services/schedule.go
index 916e42d0..1c08e0fd 100644
--- a/backend/services/schedule.go
+++ b/backend/services/schedule.go
@@ -28,6 +28,7 @@ func AddTask(s model.Schedule) func() {
SpiderId: s.SpiderId,
NodeId: nodeId,
Status: constants.StatusPending,
+ Param: s.Param,
}
// 将任务存入数据库
diff --git a/backend/services/spider.go b/backend/services/spider.go
index f4f856e6..5763b3de 100644
--- a/backend/services/spider.go
+++ b/backend/services/spider.go
@@ -1,6 +1,7 @@
package services
import (
+ "context"
"crawlab/constants"
"crawlab/database"
"crawlab/lib/cron"
@@ -11,6 +12,7 @@ import (
"github.com/apex/log"
"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
+ "github.com/gomodule/redigo/redis"
"github.com/pkg/errors"
"github.com/satori/go.uuid"
"github.com/spf13/viper"
@@ -20,6 +22,7 @@ import (
"path/filepath"
"runtime/debug"
"strings"
+ "syscall"
)
type SpiderFileData struct {
@@ -30,6 +33,7 @@ type SpiderFileData struct {
type SpiderUploadMessage struct {
FileId string
FileName string
+ SpiderId string
}
// 从项目目录中获取爬虫列表
@@ -39,7 +43,9 @@ func GetSpidersFromDir() ([]model.Spider, error) {
// 如果爬虫项目目录不存在,则创建一个
if !utils.Exists(srcPath) {
- if err := os.MkdirAll(srcPath, 0666); err != nil {
+ mask := syscall.Umask(0) // 改为 0000 八进制
+ defer syscall.Umask(mask) // 改为原来的 umask
+ if err := os.MkdirAll(srcPath, 0766); err != nil {
debug.PrintStack()
return []model.Spider{}, err
}
@@ -85,16 +91,25 @@ func GetSpidersFromDir() ([]model.Spider, error) {
// 将爬虫保存到数据库
func SaveSpiders(spiders []model.Spider) error {
- // 遍历爬虫列表
+ s, c := database.GetCol("spiders")
+ defer s.Close()
+
+ if len(spiders) == 0 {
+ err := model.RemoveAllSpider()
+ if err != nil {
+ log.Error("remove all spider error:" + err.Error())
+ return err
+ }
+ log.Info("get spider from dir is empty,removed all spider")
+ return nil
+ }
+ // 如果该爬虫不存在于数据库,则保存爬虫到数据库
for _, spider := range spiders {
// 忽略非自定义爬虫
if spider.Type != constants.Customized {
continue
}
- // 如果该爬虫不存在于数据库,则保存爬虫到数据库
- s, c := database.GetCol("spiders")
- defer s.Close()
var spider_ *model.Spider
if err := c.Find(bson.M{"src": spider.Src}).One(&spider_); err != nil {
// 不存在
@@ -102,11 +117,8 @@ func SaveSpiders(spiders []model.Spider) error {
debug.PrintStack()
return err
}
- } else {
- // 存在
}
}
-
return nil
}
@@ -131,15 +143,14 @@ func ZipSpider(spider model.Spider) (filePath string, err error) {
// 如果源文件夹不存在,抛错
if !utils.Exists(spider.Src) {
debug.PrintStack()
+ // 删除该爬虫,否则会一直报错
+ _ = model.RemoveSpider(spider.Id)
return "", errors.New("source path does not exist")
}
// 临时文件路径
randomId := uuid.NewV4()
- if err != nil {
- debug.PrintStack()
- return "", err
- }
+
filePath = filepath.Join(
viper.GetString("other.tmppath"),
randomId.String()+".zip",
@@ -171,6 +182,7 @@ func UploadToGridFs(spider model.Spider, fileName string, filePath string) (fid
// 如果存在FileId删除GridFS上的老文件
if !utils.IsObjectIdNull(spider.FileId) {
if err = gf.RemoveId(spider.FileId); err != nil {
+ log.Error("remove gf file:" + err.Error())
debug.PrintStack()
}
}
@@ -223,7 +235,7 @@ func ReadFileByStep(filePath string, handle func([]byte, *mgo.GridFile), fileCre
for {
switch nr, err := f.Read(s[:]); true {
case nr < 0:
- fmt.Fprintf(os.Stderr, "cat: error reading: %s\n", err.Error())
+ _, _ = fmt.Fprintf(os.Stderr, "cat: error reading: %s\n", err.Error())
debug.PrintStack()
case nr == 0: // EOF
return nil
@@ -231,7 +243,6 @@ func ReadFileByStep(filePath string, handle func([]byte, *mgo.GridFile), fileCre
handle(s[0:nr], fileCreate)
}
}
- return nil
}
// 发布所有爬虫
@@ -247,8 +258,8 @@ func PublishAllSpiders() error {
for _, spider := range spiders {
// 发布爬虫
if err := PublishSpider(spider); err != nil {
- log.Errorf(err.Error())
- return err
+ log.Errorf("publish spider error:" + err.Error())
+ // return err
}
}
@@ -289,13 +300,14 @@ func PublishSpider(spider model.Spider) (err error) {
msg := SpiderUploadMessage{
FileId: fid.Hex(),
FileName: fileName,
+ SpiderId: spider.Id.Hex(),
}
msgStr, err := json.Marshal(msg)
if err != nil {
return
}
channel := "files:upload"
- if err = database.Publish(channel, string(msgStr)); err != nil {
+ if _, err = database.RedisClient.Publish(channel, utils.BytesToString(msgStr)); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
@@ -305,24 +317,24 @@ func PublishSpider(spider model.Spider) (err error) {
}
// 上传爬虫回调
-func OnFileUpload(channel string, msgStr string) {
+func OnFileUpload(message redis.Message) (err error) {
s, gf := database.GetGridFs("files")
defer s.Close()
// 反序列化消息
var msg SpiderUploadMessage
- if err := json.Unmarshal([]byte(msgStr), &msg); err != nil {
+ if err := json.Unmarshal(message.Data, &msg); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
- return
+ return err
}
// 从GridFS获取该文件
f, err := gf.OpenId(bson.ObjectIdHex(msg.FileId))
if err != nil {
- log.Errorf(err.Error())
+ log.Errorf("open file id: " + msg.FileId + ", spider id:" + msg.SpiderId + ", error: " + err.Error())
debug.PrintStack()
- return
+ return err
}
defer f.Close()
@@ -335,7 +347,7 @@ func OnFileUpload(channel string, msgStr string) {
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
- return
+ return err
}
defer tmpFile.Close()
@@ -343,33 +355,34 @@ func OnFileUpload(channel string, msgStr string) {
if _, err := io.Copy(tmpFile, f); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
- return
+ return err
}
// 解压缩临时文件到目标文件夹
dstPath := filepath.Join(
viper.GetString("spider.path"),
- //strings.Replace(msg.FileName, ".zip", "", -1),
+ // strings.Replace(msg.FileName, ".zip", "", -1),
)
if err := utils.DeCompress(tmpFile, dstPath); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
- return
+ return err
}
// 关闭临时文件
if err := tmpFile.Close(); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
- return
+ return err
}
// 删除临时文件
if err := os.Remove(tmpFilePath); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
- return
+ return err
}
+ return nil
}
// 启动爬虫服务
@@ -394,9 +407,11 @@ func InitSpiderService() error {
// 订阅文件上传
channel := "files:upload"
- var sub database.Subscriber
- sub.Connect()
- sub.Subscribe(channel, OnFileUpload)
+
+ //sub.Connect()
+ ctx := context.Background()
+ return database.RedisClient.Subscribe(ctx, OnFileUpload, channel)
+
}
// 启动定时任务
diff --git a/backend/services/system.go b/backend/services/system.go
index 5f50dec9..b30b2bc7 100644
--- a/backend/services/system.go
+++ b/backend/services/system.go
@@ -112,7 +112,7 @@ func GetRemoteSystemInfo(id string) (sysInfo model.SystemInfo, err error) {
// 序列化
msgBytes, _ := json.Marshal(&msg)
- if err := database.Publish("nodes:"+id, string(msgBytes)); err != nil {
+ if _, err := database.RedisClient.Publish("nodes:"+id, utils.BytesToString(msgBytes)); err != nil {
return model.SystemInfo{}, err
}
diff --git a/backend/services/task.go b/backend/services/task.go
index 8c0ff8a1..dbaa2800 100644
--- a/backend/services/task.go
+++ b/backend/services/task.go
@@ -16,14 +16,15 @@ import (
"runtime"
"runtime/debug"
"strconv"
+ "sync"
"time"
)
var Exec *Executor
// 任务执行锁
-var LockList []bool
-
+//Added by cloud: 2019/09/04,solve data race
+var LockList sync.Map
// 任务消息
type TaskMessage struct {
Id string
@@ -36,7 +37,7 @@ func (m *TaskMessage) ToString() (string, error) {
if err != nil {
return "", err
}
- return string(data), err
+ return utils.BytesToString(data), err
}
// 任务执行器
@@ -56,7 +57,7 @@ func (ex *Executor) Start() error {
id := i
// 初始化任务锁
- LockList = append(LockList, false)
+ LockList.Store(id, false)
// 加入定时任务
_, err := ex.Cron.AddFunc(spec, GetExecuteTaskFunc(id))
@@ -220,17 +221,18 @@ func SaveTaskResultCount(id string) func() {
// 执行任务
func ExecuteTask(id int) {
- if LockList[id] {
+ if flag, _ := LockList.Load(id); flag.(bool) {
log.Debugf(GetWorkerPrefix(id) + "正在执行任务...")
return
}
// 上锁
- LockList[id] = true
+ LockList.Store(id, true)
// 解锁(延迟执行)
defer func() {
- LockList[id] = false
+ LockList.Delete(id)
+ LockList.Store(id, false)
}()
// 开始计时
@@ -323,8 +325,10 @@ func ExecuteTask(id int) {
// 执行命令
cmd := spider.Cmd
- if t.Cmd != "" {
- cmd = t.Cmd
+
+ // 加入参数
+ if t.Param != "" {
+ cmd += " " + t.Param
}
// 任务赋值
@@ -405,12 +409,15 @@ func GetTaskLog(id string) (logStr string, err error) {
if IsMasterNode(task.NodeId.Hex()) {
// 若为主节点,获取本机日志
logBytes, err := GetLocalLog(task.LogPath)
- logStr = string(logBytes)
+ logStr = utils.BytesToString(logBytes)
if err != nil {
log.Errorf(err.Error())
- return "", err
+ logStr = err.Error()
+ // return "", err
+ } else {
+ logStr = utils.BytesToString(logBytes)
}
- logStr = string(logBytes)
+
} else {
// 若不为主节点,获取远端日志
logStr, err = GetRemoteLog(task)
@@ -463,7 +470,7 @@ func CancelTask(id string) (err error) {
}
// 发布消息
- if err := database.Publish("nodes:"+task.NodeId.Hex(), string(msgBytes)); err != nil {
+ if _, err := database.RedisClient.Publish("nodes:"+task.NodeId.Hex(), utils.BytesToString(msgBytes)); err != nil {
return err
}
}
@@ -472,6 +479,7 @@ func CancelTask(id string) (err error) {
}
func HandleTaskError(t model.Task, err error) {
+ log.Error("handle task error:" + err.Error())
t.Status = constants.StatusError
t.Error = err.Error()
t.FinishTs = time.Now()
diff --git a/backend/services/user.go b/backend/services/user.go
index fb688fd1..4811f767 100644
--- a/backend/services/user.go
+++ b/backend/services/user.go
@@ -5,11 +5,9 @@ import (
"crawlab/model"
"crawlab/utils"
"errors"
- "github.com/apex/log"
"github.com/dgrijalva/jwt-go"
"github.com/globalsign/mgo/bson"
"github.com/spf13/viper"
- "runtime/debug"
"time"
)
@@ -24,28 +22,38 @@ func InitUserService() error {
}
return nil
}
-
-func GetToken(username string) (tokenStr string, err error) {
- user, err := model.GetUserByUsername(username)
- if err != nil {
- log.Errorf(err.Error())
- debug.PrintStack()
- return
- }
-
+func MakeToken(user *model.User) (tokenStr string, err error) {
token := jwt.NewWithClaims(jwt.SigningMethodHS256, jwt.MapClaims{
"id": user.Id,
"username": user.Username,
"nbf": time.Now().Unix(),
})
- tokenStr, err = token.SignedString([]byte(viper.GetString("server.secret")))
- if err != nil {
- return
- }
- return
+ return token.SignedString([]byte(viper.GetString("server.secret")))
+
}
+//func GetToken(username string) (tokenStr string, err error) {
+// user, err := model.GetUserByUsername(username)
+// if err != nil {
+// log.Errorf(err.Error())
+// debug.PrintStack()
+// return
+// }
+//
+// token := jwt.NewWithClaims(jwt.SigningMethodHS256, jwt.MapClaims{
+// "id": user.Id,
+// "username": user.Username,
+// "nbf": time.Now().Unix(),
+// })
+//
+// tokenStr, err = token.SignedString([]byte(viper.GetString("server.secret")))
+// if err != nil {
+// return
+// }
+// return
+//}
+
func SecretFunc() jwt.Keyfunc {
return func(token *jwt.Token) (interface{}, error) {
return []byte(viper.GetString("server.secret")), nil
diff --git a/backend/utils/file.go b/backend/utils/file.go
index 9a4300a1..dda73c13 100644
--- a/backend/utils/file.go
+++ b/backend/utils/file.go
@@ -179,11 +179,11 @@ func _Compress(file *os.File, prefix string, zw *zip.Writer) error {
}
} else {
header, err := zip.FileInfoHeader(info)
- header.Name = prefix + "/" + header.Name
if err != nil {
debug.PrintStack()
return err
}
+ header.Name = prefix + "/" + header.Name
writer, err := zw.CreateHeader(header)
if err != nil {
debug.PrintStack()
diff --git a/backend/utils/file_test.go b/backend/utils/file_test.go
index 484366f5..64f2df6d 100644
--- a/backend/utils/file_test.go
+++ b/backend/utils/file_test.go
@@ -1,8 +1,12 @@
package utils
import (
+ "archive/zip"
. "github.com/smartystreets/goconvey/convey"
+ "io"
+ "log"
"os"
+ "runtime/debug"
"testing"
)
@@ -38,9 +42,13 @@ func TestIsDir(t *testing.T) {
}
func TestCompress(t *testing.T) {
- var pathString = "../utils"
+ err := os.Mkdir("testCompress", os.ModePerm)
+ if err != nil {
+ t.Error("create testCompress failed")
+ }
+ var pathString = "testCompress"
var files []*os.File
- var disPath = "../utils/test"
+ var disPath = "testCompress"
file, err := os.Open(pathString)
if err != nil {
t.Error("open source path failed")
@@ -52,15 +60,60 @@ func TestCompress(t *testing.T) {
So(er, ShouldEqual, nil)
})
})
+ os.RemoveAll("testCompress")
}
-
-// 测试之前需存在有效的test(.zip)文件
-func TestDeCompress(t *testing.T) {
- var tmpFilePath = "./test"
- tmpFile, err := os.OpenFile(tmpFilePath, os.O_RDONLY, 0777)
+func Zip(zipFile string, fileList []string) error {
+ // 创建 zip 包文件
+ fw, err := os.Create(zipFile)
if err != nil {
- t.Fatal("open zip file failed")
+ log.Fatal()
+ }
+ defer fw.Close()
+
+ // 实例化新的 zip.Writer
+ zw := zip.NewWriter(fw)
+ defer func() {
+ // 检测一下是否成功关闭
+ if err := zw.Close(); err != nil {
+ log.Fatalln(err)
+ }
+ }()
+
+ for _, fileName := range fileList {
+ fr, err := os.Open(fileName)
+ if err != nil {
+ return err
+ }
+ fi, err := fr.Stat()
+ if err != nil {
+ return err
+ }
+ // 写入文件的头信息
+ fh, err := zip.FileInfoHeader(fi)
+ w, err := zw.CreateHeader(fh)
+ if err != nil {
+ return err
+ }
+ // 写入文件内容
+ _, err = io.Copy(w, fr)
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func TestDeCompress(t *testing.T) {
+ err := os.Mkdir("testDeCompress", os.ModePerm)
+ err = Zip("demo.zip", []string{})
+ if err != nil {
+ t.Error("create zip file failed")
+ }
+ tmpFile, err := os.OpenFile("demo.zip", os.O_RDONLY, 0777)
+ if err != nil {
+ debug.PrintStack()
+ t.Error("open demo.zip failed")
}
var dstPath = "./testDeCompress"
Convey("Test DeCopmress func", t, func() {
@@ -68,5 +121,7 @@ func TestDeCompress(t *testing.T) {
err := DeCompress(tmpFile, dstPath)
So(err, ShouldEqual, nil)
})
+ os.RemoveAll("testDeCompress")
+ os.Remove("demo.zip")
}
diff --git a/backend/utils/helpers.go b/backend/utils/helpers.go
new file mode 100644
index 00000000..8e6de815
--- /dev/null
+++ b/backend/utils/helpers.go
@@ -0,0 +1,7 @@
+package utils
+
+import "unsafe"
+
+func BytesToString(b []byte) string {
+ return *(*string)(unsafe.Pointer(&b))
+}
diff --git a/backend/vendor/github.com/dgrijalva/jwt-go/request/doc.go b/backend/vendor/github.com/dgrijalva/jwt-go/request/doc.go
deleted file mode 100644
index c01069c9..00000000
--- a/backend/vendor/github.com/dgrijalva/jwt-go/request/doc.go
+++ /dev/null
@@ -1,7 +0,0 @@
-// Utility package for extracting JWT tokens from
-// HTTP requests.
-//
-// The main function is ParseFromRequest and it's WithClaims variant.
-// See examples for how to use the various Extractor implementations
-// or roll your own.
-package request
diff --git a/backend/vendor/github.com/dgrijalva/jwt-go/request/extractor.go b/backend/vendor/github.com/dgrijalva/jwt-go/request/extractor.go
deleted file mode 100644
index 14414fe2..00000000
--- a/backend/vendor/github.com/dgrijalva/jwt-go/request/extractor.go
+++ /dev/null
@@ -1,81 +0,0 @@
-package request
-
-import (
- "errors"
- "net/http"
-)
-
-// Errors
-var (
- ErrNoTokenInRequest = errors.New("no token present in request")
-)
-
-// Interface for extracting a token from an HTTP request.
-// The ExtractToken method should return a token string or an error.
-// If no token is present, you must return ErrNoTokenInRequest.
-type Extractor interface {
- ExtractToken(*http.Request) (string, error)
-}
-
-// Extractor for finding a token in a header. Looks at each specified
-// header in order until there's a match
-type HeaderExtractor []string
-
-func (e HeaderExtractor) ExtractToken(req *http.Request) (string, error) {
- // loop over header names and return the first one that contains data
- for _, header := range e {
- if ah := req.Header.Get(header); ah != "" {
- return ah, nil
- }
- }
- return "", ErrNoTokenInRequest
-}
-
-// Extract token from request arguments. This includes a POSTed form or
-// GET URL arguments. Argument names are tried in order until there's a match.
-// This extractor calls `ParseMultipartForm` on the request
-type ArgumentExtractor []string
-
-func (e ArgumentExtractor) ExtractToken(req *http.Request) (string, error) {
- // Make sure form is parsed
- req.ParseMultipartForm(10e6)
-
- // loop over arg names and return the first one that contains data
- for _, arg := range e {
- if ah := req.Form.Get(arg); ah != "" {
- return ah, nil
- }
- }
-
- return "", ErrNoTokenInRequest
-}
-
-// Tries Extractors in order until one returns a token string or an error occurs
-type MultiExtractor []Extractor
-
-func (e MultiExtractor) ExtractToken(req *http.Request) (string, error) {
- // loop over header names and return the first one that contains data
- for _, extractor := range e {
- if tok, err := extractor.ExtractToken(req); tok != "" {
- return tok, nil
- } else if err != ErrNoTokenInRequest {
- return "", err
- }
- }
- return "", ErrNoTokenInRequest
-}
-
-// Wrap an Extractor in this to post-process the value before it's handed off.
-// See AuthorizationHeaderExtractor for an example
-type PostExtractionFilter struct {
- Extractor
- Filter func(string) (string, error)
-}
-
-func (e *PostExtractionFilter) ExtractToken(req *http.Request) (string, error) {
- if tok, err := e.Extractor.ExtractToken(req); tok != "" {
- return e.Filter(tok)
- } else {
- return "", err
- }
-}
diff --git a/backend/vendor/github.com/dgrijalva/jwt-go/request/oauth2.go b/backend/vendor/github.com/dgrijalva/jwt-go/request/oauth2.go
deleted file mode 100644
index 5948694a..00000000
--- a/backend/vendor/github.com/dgrijalva/jwt-go/request/oauth2.go
+++ /dev/null
@@ -1,28 +0,0 @@
-package request
-
-import (
- "strings"
-)
-
-// Strips 'Bearer ' prefix from bearer token string
-func stripBearerPrefixFromTokenString(tok string) (string, error) {
- // Should be a bearer token
- if len(tok) > 6 && strings.ToUpper(tok[0:7]) == "BEARER " {
- return tok[7:], nil
- }
- return tok, nil
-}
-
-// Extract bearer token from Authorization header
-// Uses PostExtractionFilter to strip "Bearer " prefix from header
-var AuthorizationHeaderExtractor = &PostExtractionFilter{
- HeaderExtractor{"Authorization"},
- stripBearerPrefixFromTokenString,
-}
-
-// Extractor for OAuth2 access tokens. Looks in 'Authorization'
-// header then 'access_token' argument for a token.
-var OAuth2Extractor = &MultiExtractor{
- AuthorizationHeaderExtractor,
- ArgumentExtractor{"access_token"},
-}
diff --git a/backend/vendor/github.com/dgrijalva/jwt-go/request/request.go b/backend/vendor/github.com/dgrijalva/jwt-go/request/request.go
deleted file mode 100644
index 70525cfa..00000000
--- a/backend/vendor/github.com/dgrijalva/jwt-go/request/request.go
+++ /dev/null
@@ -1,68 +0,0 @@
-package request
-
-import (
- "github.com/dgrijalva/jwt-go"
- "net/http"
-)
-
-// Extract and parse a JWT token from an HTTP request.
-// This behaves the same as Parse, but accepts a request and an extractor
-// instead of a token string. The Extractor interface allows you to define
-// the logic for extracting a token. Several useful implementations are provided.
-//
-// You can provide options to modify parsing behavior
-func ParseFromRequest(req *http.Request, extractor Extractor, keyFunc jwt.Keyfunc, options ...ParseFromRequestOption) (token *jwt.Token, err error) {
- // Create basic parser struct
- p := &fromRequestParser{req, extractor, nil, nil}
-
- // Handle options
- for _, option := range options {
- option(p)
- }
-
- // Set defaults
- if p.claims == nil {
- p.claims = jwt.MapClaims{}
- }
- if p.parser == nil {
- p.parser = &jwt.Parser{}
- }
-
- // perform extract
- tokenString, err := p.extractor.ExtractToken(req)
- if err != nil {
- return nil, err
- }
-
- // perform parse
- return p.parser.ParseWithClaims(tokenString, p.claims, keyFunc)
-}
-
-// ParseFromRequest but with custom Claims type
-// DEPRECATED: use ParseFromRequest and the WithClaims option
-func ParseFromRequestWithClaims(req *http.Request, extractor Extractor, claims jwt.Claims, keyFunc jwt.Keyfunc) (token *jwt.Token, err error) {
- return ParseFromRequest(req, extractor, keyFunc, WithClaims(claims))
-}
-
-type fromRequestParser struct {
- req *http.Request
- extractor Extractor
- claims jwt.Claims
- parser *jwt.Parser
-}
-
-type ParseFromRequestOption func(*fromRequestParser)
-
-// Parse with custom claims
-func WithClaims(claims jwt.Claims) ParseFromRequestOption {
- return func(p *fromRequestParser) {
- p.claims = claims
- }
-}
-
-// Parse using a custom parser
-func WithParser(parser *jwt.Parser) ParseFromRequestOption {
- return func(p *fromRequestParser) {
- p.parser = parser
- }
-}
diff --git a/backend/vendor/modules.txt b/backend/vendor/modules.txt
index 57c7d3f1..634c337b 100644
--- a/backend/vendor/modules.txt
+++ b/backend/vendor/modules.txt
@@ -2,7 +2,6 @@
github.com/apex/log
# github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/dgrijalva/jwt-go
-github.com/dgrijalva/jwt-go/request
# github.com/fsnotify/fsnotify v1.4.7
github.com/fsnotify/fsnotify
# github.com/gin-contrib/sse v0.0.0-20190301062529-5545eab6dad3
@@ -18,11 +17,18 @@ github.com/globalsign/mgo/bson
github.com/globalsign/mgo/internal/sasl
github.com/globalsign/mgo/internal/scram
github.com/globalsign/mgo/internal/json
+# github.com/go-playground/locales v0.12.1
+github.com/go-playground/locales
+github.com/go-playground/locales/currency
+# github.com/go-playground/universal-translator v0.16.0
+github.com/go-playground/universal-translator
# github.com/golang/protobuf v1.3.1
github.com/golang/protobuf/proto
# github.com/gomodule/redigo v2.0.0+incompatible
github.com/gomodule/redigo/redis
github.com/gomodule/redigo/internal
+# github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1
+github.com/gopherjs/gopherjs/js
# github.com/hashicorp/hcl v1.0.0
github.com/hashicorp/hcl
github.com/hashicorp/hcl/hcl/printer
@@ -36,6 +42,10 @@ github.com/hashicorp/hcl/json/scanner
github.com/hashicorp/hcl/json/token
# github.com/json-iterator/go v1.1.6
github.com/json-iterator/go
+# github.com/jtolds/gls v4.20.0+incompatible
+github.com/jtolds/gls
+# github.com/leodido/go-urn v1.1.0
+github.com/leodido/go-urn
# github.com/magiconair/properties v1.8.0
github.com/magiconair/properties
# github.com/mattn/go-isatty v0.0.8
@@ -52,6 +62,15 @@ github.com/pelletier/go-toml
github.com/pkg/errors
# github.com/satori/go.uuid v1.2.0
github.com/satori/go.uuid
+# github.com/smartystreets/assertions v1.0.0
+github.com/smartystreets/assertions
+github.com/smartystreets/assertions/internal/go-diff/diffmatchpatch
+github.com/smartystreets/assertions/internal/go-render/render
+github.com/smartystreets/assertions/internal/oglematchers
+# github.com/smartystreets/goconvey v0.0.0-20190731233626-505e41936337
+github.com/smartystreets/goconvey/convey
+github.com/smartystreets/goconvey/convey/reporting
+github.com/smartystreets/goconvey/convey/gotest
# github.com/spf13/afero v1.1.2
github.com/spf13/afero
github.com/spf13/afero/mem
@@ -72,5 +91,7 @@ golang.org/x/text/transform
golang.org/x/text/unicode/norm
# gopkg.in/go-playground/validator.v8 v8.18.2
gopkg.in/go-playground/validator.v8
+# gopkg.in/go-playground/validator.v9 v9.29.1
+gopkg.in/go-playground/validator.v9
# gopkg.in/yaml.v2 v2.2.2
gopkg.in/yaml.v2
diff --git a/docker/Dockerfile.master.alpine b/docker/Dockerfile.master.alpine
index 6979861b..b9dbb742 100644
--- a/docker/Dockerfile.master.alpine
+++ b/docker/Dockerfile.master.alpine
@@ -75,7 +75,7 @@ RUN sed -i 's/#rc_sys=""/rc_sys="lxc"/g' /etc/rc.conf && \
# working directory
WORKDIR /app/backend
-
+ENV PYTHONIOENCODING utf-8
# frontend port
EXPOSE 8080
diff --git a/docker/Dockerfile.worker.alpine b/docker/Dockerfile.worker.alpine
index e7a66776..388125a2 100644
--- a/docker/Dockerfile.worker.alpine
+++ b/docker/Dockerfile.worker.alpine
@@ -35,7 +35,7 @@ RUN apk del .build-deps
# working directory
WORKDIR /app/backend
-
+ENV PYTHONIOENCODING utf-8
# backend port
EXPOSE 8000
diff --git a/frontend/package.json b/frontend/package.json
index 139297d3..e3bc84f8 100644
--- a/frontend/package.json
+++ b/frontend/package.json
@@ -6,6 +6,7 @@
"serve": "vue-cli-service serve --ip=0.0.0.0",
"serve:prod": "vue-cli-service serve --mode=production --ip=0.0.0.0",
"config": "vue ui",
+ "build:dev": "vue-cli-service build --mode development",
"build:prod": "vue-cli-service build --mode production",
"lint": "vue-cli-service lint",
"test:unit": "vue-cli-service test:unit"
diff --git a/frontend/src/api/request.js b/frontend/src/api/request.js
index 38734c46..22707159 100644
--- a/frontend/src/api/request.js
+++ b/frontend/src/api/request.js
@@ -3,28 +3,51 @@ import router from '../router'
let baseUrl = process.env.VUE_APP_BASE_URL ? process.env.VUE_APP_BASE_URL : 'http://localhost:8000'
-const request = (method, path, params, data) => {
- return new Promise((resolve, reject) => {
+const request = async (method, path, params, data, others = {}) => {
+ try {
const url = baseUrl + path
const headers = {
'Authorization': window.localStorage.getItem('token')
}
- axios({
+ const response = await axios({
method,
url,
params,
data,
- headers
+ headers,
+ ...others
})
- .then(resolve)
- .catch(error => {
- console.log(error)
- if (error.response.status === 401) {
- router.push('/login')
- }
- reject(error)
- })
- })
+ // console.log(response)
+ return response
+ } catch (e) {
+ if (e.response.status === 401 && router.currentRoute.path !== '/login') {
+ router.push('/login')
+ }
+ await Promise.reject(e)
+ }
+
+ // return new Promise((resolve, reject) => {
+ // const url = baseUrl + path
+ // const headers = {
+ // 'Authorization': window.localStorage.getItem('token')
+ // }
+ // axios({
+ // method,
+ // url,
+ // params,
+ // data,
+ // headers,
+ // ...others
+ // })
+ // .then(resolve)
+ // .catch(error => {
+ // console.log(error)
+ // if (error.response.status === 401) {
+ // router.push('/login')
+ // }
+ // reject(error)
+ // })
+ // })
}
const get = (path, params) => {
diff --git a/frontend/src/components/Common/CrawlConfirmDialog.vue b/frontend/src/components/Common/CrawlConfirmDialog.vue
index 06ef1dba..266ef2eb 100644
--- a/frontend/src/components/Common/CrawlConfirmDialog.vue
+++ b/frontend/src/components/Common/CrawlConfirmDialog.vue
@@ -19,6 +19,9 @@
/>
+
+
+
{{$t('Cancel')}}
@@ -42,7 +45,8 @@ export default {
},
data () {
return {
- nodeId: ''
+ nodeId: '',
+ param: ''
}
},
methods: {
@@ -50,7 +54,7 @@ export default {
this.$emit('close')
},
onConfirm () {
- this.$store.dispatch('spider/crawlSpider', { id: this.spiderId, nodeId: this.nodeId })
+ this.$store.dispatch('spider/crawlSpider', { id: this.spiderId, nodeId: this.nodeId, param: this.param })
.then(() => {
this.$message.success(this.$t('A task has been scheduled successfully'))
})
diff --git a/frontend/src/components/InfoView/NodeInfoView.vue b/frontend/src/components/InfoView/NodeInfoView.vue
index 8e350448..e6ffb58a 100644
--- a/frontend/src/components/InfoView/NodeInfoView.vue
+++ b/frontend/src/components/InfoView/NodeInfoView.vue
@@ -22,7 +22,7 @@
- {{$t('Save')}}
+ {{$t('Save')}}
diff --git a/frontend/src/components/InfoView/SpiderInfoView.vue b/frontend/src/components/InfoView/SpiderInfoView.vue
index 661e4757..39702a5d 100644
--- a/frontend/src/components/InfoView/SpiderInfoView.vue
+++ b/frontend/src/components/InfoView/SpiderInfoView.vue
@@ -44,6 +44,9 @@
+
+
+
diff --git a/frontend/src/components/InfoView/TaskInfoView.vue b/frontend/src/components/InfoView/TaskInfoView.vue
index bfe6419a..80aaa770 100644
--- a/frontend/src/components/InfoView/TaskInfoView.vue
+++ b/frontend/src/components/InfoView/TaskInfoView.vue
@@ -15,6 +15,9 @@
+
+
+
@@ -86,15 +89,15 @@ export default {
return dayjs(str).format('YYYY-MM-DD HH:mm:ss')
},
getWaitDuration (row) {
- if (row.start_ts.match('^0001')) return 'NA'
+ if (!row.start_ts || row.start_ts.match('^0001')) return 'NA'
return dayjs(row.start_ts).diff(row.create_ts, 'second')
},
getRuntimeDuration (row) {
- if (row.finish_ts.match('^0001')) return 'NA'
+ if (!row.finish_ts || row.finish_ts.match('^0001')) return 'NA'
return dayjs(row.finish_ts).diff(row.start_ts, 'second')
},
getTotalDuration (row) {
- if (row.finish_ts.match('^0001')) return 'NA'
+ if (!row.finish_ts || row.finish_ts.match('^0001')) return 'NA'
return dayjs(row.finish_ts).diff(row.create_ts, 'second')
}
}
diff --git a/frontend/src/i18n/zh.js b/frontend/src/i18n/zh.js
index b1de3b47..c56959c9 100644
--- a/frontend/src/i18n/zh.js
+++ b/frontend/src/i18n/zh.js
@@ -154,6 +154,8 @@ export default {
'Last Run': '上次运行',
'Action': '操作',
'No command line': '没有执行命令',
+ 'Last Status': '上次运行状态',
+ 'Remark': '备注',
// 任务
'Task Info': '任务信息',
@@ -245,7 +247,8 @@ export default {
'username already exists': '用户名已存在',
'Deleted successfully': '成功删除',
'Saved successfully': '成功保存',
-
+ 'Please zip your spider files from the root directory': '爬虫文件请从根目录下开始压缩。',
+ 'English': 'English',
// 登录
'Sign in': '登录',
'Sign-in': '登录',
@@ -264,5 +267,20 @@ export default {
'admin': '管理用户',
'Role': '角色',
'Edit User': '更改用户',
- 'Users': '用户'
+ 'Users': '用户',
+ tagsView: {
+ closeOthers: '关闭其他',
+ close: '关闭',
+ refresh: '刷新',
+ closeAll: '关闭所有'
+ },
+ nodeList: {
+ type: '节点类型'
+ },
+ schedules: {
+ cron: 'Cron',
+ add_cron: '生成Cron',
+ // Cron Format: [second] [minute] [hour] [day of month] [month] [day of week]
+ cron_format: 'Cron 格式: [秒] [分] [小时] [日] [月] [周]'
+ }
}
diff --git a/frontend/src/router/index.js b/frontend/src/router/index.js
index 9a238d08..84c96cd3 100644
--- a/frontend/src/router/index.js
+++ b/frontend/src/router/index.js
@@ -46,7 +46,6 @@ export const constantRouterMap = [
]
},
{
- name: 'Node',
path: '/nodes',
component: Layout,
meta: {
@@ -76,7 +75,6 @@ export const constantRouterMap = [
]
},
{
- name: 'Spider',
path: '/spiders',
component: Layout,
meta: {
@@ -106,7 +104,6 @@ export const constantRouterMap = [
]
},
{
- name: 'Task',
path: '/tasks',
component: Layout,
meta: {
@@ -136,7 +133,6 @@ export const constantRouterMap = [
]
},
{
- name: 'Schedule',
path: '/schedules',
component: Layout,
meta: {
@@ -157,7 +153,6 @@ export const constantRouterMap = [
]
},
{
- name: 'Site',
path: '/sites',
component: Layout,
hidden: true,
@@ -178,7 +173,6 @@ export const constantRouterMap = [
]
},
{
- name: 'User',
path: '/users',
component: Layout,
meta: {
diff --git a/frontend/src/store/modules/node.js b/frontend/src/store/modules/node.js
index 266beb3e..5e21a222 100644
--- a/frontend/src/store/modules/node.js
+++ b/frontend/src/store/modules/node.js
@@ -25,15 +25,7 @@ const mutations = {
const { id, systemInfo } = payload
for (let i = 0; i < state.nodeList.length; i++) {
if (state.nodeList[i]._id === id) {
- // Vue.set(state.nodeList[i], 'systemInfo', {})
state.nodeList[i].systemInfo = systemInfo
- // for (const key in systemInfo) {
- // if (systemInfo.hasOwnProperty(key)) {
- // console.log(key)
- // state.nodeList[i].systemInfo[key] = systemInfo[key]
- // // Vue.set(state.nodeList[i].systemInfo, key, systemInfo[key])
- // }
- // }
break
}
}
@@ -76,10 +68,12 @@ const actions = {
getTaskList ({ state, commit }, id) {
return request.get(`/nodes/${id}/tasks`)
.then(response => {
- commit('task/SET_TASK_LIST',
- response.data.data.map(d => d)
- .sort((a, b) => a.create_ts < b.create_ts ? 1 : -1),
- { root: true })
+ if (response.data.data) {
+ commit('task/SET_TASK_LIST',
+ response.data.data.map(d => d)
+ .sort((a, b) => a.create_ts < b.create_ts ? 1 : -1),
+ { root: true })
+ }
})
},
getNodeSystemInfo ({ state, commit }, id) {
diff --git a/frontend/src/store/modules/spider.js b/frontend/src/store/modules/spider.js
index 2ab37838..b7bccd0d 100644
--- a/frontend/src/store/modules/spider.js
+++ b/frontend/src/store/modules/spider.js
@@ -101,10 +101,11 @@ const actions = {
})
},
crawlSpider ({ state, dispatch }, payload) {
- const { id, nodeId } = payload
+ const { id, nodeId, param } = payload
return request.put(`/tasks`, {
spider_id: id,
- node_id: nodeId
+ node_id: nodeId,
+ param: param
})
},
getTaskList ({ state, commit }, id) {
diff --git a/frontend/src/store/modules/task.js b/frontend/src/store/modules/task.js
index 545a169b..1d7e6c09 100644
--- a/frontend/src/store/modules/task.js
+++ b/frontend/src/store/modules/task.js
@@ -120,6 +120,22 @@ const actions = {
commit('SET_TASK_RESULTS_TOTAL_COUNT', response.data.total)
})
},
+ async getTaskResultExcel ({ state, commit }, id) {
+ const { data } = await request.request('GET', '/tasks/' + id + '/results/download', {}, {
+ responseType: 'blob' // important
+ })
+ const downloadUrl = window.URL.createObjectURL(new Blob([data]))
+
+ const link = document.createElement('a')
+
+ link.href = downloadUrl
+
+ link.setAttribute('download', 'data.csv') // any other extension
+
+ document.body.appendChild(link)
+ link.click()
+ link.remove()
+ },
cancelTask ({ state, dispatch }, id) {
return request.post(`/tasks/${id}/cancel`)
.then(() => {
diff --git a/frontend/src/views/layout/components/Navbar.vue b/frontend/src/views/layout/components/Navbar.vue
index f60c0051..3b30c049 100644
--- a/frontend/src/views/layout/components/Navbar.vue
+++ b/frontend/src/views/layout/components/Navbar.vue
@@ -32,6 +32,7 @@
{{$t('Documentation')}}
+
diff --git a/frontend/src/views/node/NodeList.vue b/frontend/src/views/node/NodeList.vue
index 641009f3..9ea51502 100644
--- a/frontend/src/views/node/NodeList.vue
+++ b/frontend/src/views/node/NodeList.vue
@@ -163,7 +163,7 @@ export default {
columns: [
{ name: 'name', label: 'Name', width: '220' },
{ name: 'ip', label: 'IP', width: '160' },
- { name: 'type', label: 'Type', width: '120' },
+ { name: 'type', label: 'nodeList.type', width: '120' },
// { name: 'port', label: 'Port', width: '80' },
{ name: 'status', label: 'Status', width: '120' },
{ name: 'description', label: 'Description', width: 'auto' }
diff --git a/frontend/src/views/schedule/ScheduleList.vue b/frontend/src/views/schedule/ScheduleList.vue
index 28ca4961..743a186e 100644
--- a/frontend/src/views/schedule/ScheduleList.vue
+++ b/frontend/src/views/schedule/ScheduleList.vue
@@ -38,21 +38,21 @@
-
+
-
- {{$t('Cron')}}
+ {{$t('schedules.cron')}}
+ :placeholder="$t('schedules.cron')">
- {{$t('生成Cron')}}
+ {{$t('schedules.add_cron')}}
-
-
+
@@ -111,7 +111,7 @@
-
+
@@ -156,9 +156,10 @@ export default {
return {
columns: [
{ name: 'name', label: 'Name', width: '180' },
- { name: 'cron', label: 'Cron', width: '120' },
+ { name: 'cron', label: 'schedules.cron', width: '120' },
{ name: 'node_name', label: 'Node', width: '150' },
{ name: 'spider_name', label: 'Spider', width: '150' },
+ { name: 'param', label: 'Parameters', width: '150' },
{ name: 'description', label: 'Description', width: 'auto' }
],
isEdit: false,
diff --git a/frontend/src/views/spider/SpiderList.vue b/frontend/src/views/spider/SpiderList.vue
index 348728a1..dbd1dda8 100644
--- a/frontend/src/views/spider/SpiderList.vue
+++ b/frontend/src/views/spider/SpiderList.vue
@@ -93,7 +93,7 @@
-
+
@@ -190,6 +190,14 @@
{{getTime(scope.row[col.name])}}
+
+
+
+
+
-
+
@@ -239,10 +247,14 @@ import {
} from 'vuex'
import dayjs from 'dayjs'
import CrawlConfirmDialog from '../../components/Common/CrawlConfirmDialog'
+import StatusTag from '../../components/Status/StatusTag'
export default {
name: 'SpiderList',
- components: { CrawlConfirmDialog },
+ components: {
+ CrawlConfirmDialog,
+ StatusTag
+ },
data () {
return {
pagination: {
@@ -263,14 +275,15 @@ export default {
},
// tableData,
columns: [
- { name: 'name', label: 'Name', width: '180', align: 'left' },
+ { name: 'name', label: 'Name', width: '160', align: 'left' },
// { name: 'site_name', label: 'Site', width: '140', align: 'left' },
{ name: 'type', label: 'Spider Type', width: '120' },
// { name: 'cmd', label: 'Command Line', width: '200' },
- // { name: 'lang', label: 'Language', width: '120', sortable: true },
- { name: 'last_run_ts', label: 'Last Run', width: '160' },
- { name: 'create_ts', label: 'Create Time', width: '160' },
- { name: 'update_ts', label: 'Update Time', width: '160' }
+ { name: 'last_status', label: 'Last Status', width: '120' },
+ { name: 'last_run_ts', label: 'Last Run', width: '140' },
+ { name: 'create_ts', label: 'Create Time', width: '140' },
+ { name: 'update_ts', label: 'Update Time', width: '140' },
+ { name: 'remark', label: 'Remark', width: '140' }
// { name: 'last_7d_tasks', label: 'Last 7-Day Tasks', width: '80' },
// { name: 'last_5_errors', label: 'Last 5-Run Errors', width: '80' }
],
diff --git a/frontend/src/views/task/TaskDetail.vue b/frontend/src/views/task/TaskDetail.vue
index a1edb497..b4ec0652 100644
--- a/frontend/src/views/task/TaskDetail.vue
+++ b/frontend/src/views/task/TaskDetail.vue
@@ -95,16 +95,16 @@ export default {
this.$store.dispatch('task/getTaskResults', this.$route.params.id)
},
downloadCSV () {
- window.location.href = this.$request.baseUrl + '/tasks/' + this.$route.params.id + '/results/download'
+ this.$store.dispatch('task/getTaskResultExcel', this.$route.params.id)
this.$st.sendEv('任务详情-结果', '下载CSV')
}
},
- created () {
- this.$store.dispatch('task/getTaskData', this.$route.params.id)
+ async created () {
+ await this.$store.dispatch('task/getTaskData', this.$route.params.id)
this.$store.dispatch('task/getTaskLog', this.$route.params.id)
this.$store.dispatch('task/getTaskResults', this.$route.params.id)
- if (['running'].includes(this.taskForm.status)) {
+ if (this.taskForm && ['running'].includes(this.taskForm.status)) {
this.handle = setInterval(() => {
this.$store.dispatch('task/getTaskLog', this.$route.params.id)
}, 5000)
diff --git a/frontend/src/views/task/TaskList.vue b/frontend/src/views/task/TaskList.vue
index 9cbceb20..5ad1b14f 100644
--- a/frontend/src/views/task/TaskList.vue
+++ b/frontend/src/views/task/TaskList.vue
@@ -45,7 +45,7 @@
:label="$t(col.label)"
:sortable="col.sortable"
:align="col.align"
- :width="col.width">
+ >
{{scope.row[col.name]}}
@@ -119,7 +119,7 @@
:width="col.width">
-
+
@@ -172,6 +172,7 @@ export default {
{ name: 'node_name', label: 'Node', width: '120' },
{ name: 'spider_name', label: 'Spider', width: '120' },
{ name: 'status', label: 'Status', width: '120' },
+ { name: 'param', label: 'Parameters', width: '120' },
// { name: 'create_ts', label: 'Create Time', width: '100' },
{ name: 'start_ts', label: 'Start Time', width: '100' },
{ name: 'finish_ts', label: 'Finish Time', width: '100' },