From 5dd06adfbba2142bb2b7d58f2e2d087cabe79ce3 Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Tue, 22 Oct 2024 16:22:35 +0800 Subject: [PATCH] refactor: code cleanup --- db/errors/base.go | 6 - db/errors/errors.go | 10 - db/errors/redis.go | 15 - db/generic/base.go | 8 - db/go.mod | 5 - db/interfaces.go | 39 --- db/mongo/col.go | 6 +- db/mongo/result.go | 14 +- db/redis/client.go | 550 ----------------------------------- db/redis/constants.go | 10 - db/redis/options.go | 20 -- db/redis/pool.go | 54 ---- db/redis/test/base.go | 91 ------ db/redis/test/client_test.go | 273 ----------------- db/sql/sql.go | 36 --- db/utils/utils.go | 19 -- 16 files changed, 5 insertions(+), 1151 deletions(-) delete mode 100644 db/errors/base.go delete mode 100644 db/errors/errors.go delete mode 100644 db/errors/redis.go delete mode 100644 db/generic/base.go delete mode 100644 db/interfaces.go delete mode 100644 db/redis/client.go delete mode 100644 db/redis/constants.go delete mode 100644 db/redis/options.go delete mode 100644 db/redis/pool.go delete mode 100644 db/redis/test/base.go delete mode 100644 db/redis/test/client_test.go delete mode 100644 db/sql/sql.go delete mode 100644 db/utils/utils.go diff --git a/db/errors/base.go b/db/errors/base.go deleted file mode 100644 index a0310075..00000000 --- a/db/errors/base.go +++ /dev/null @@ -1,6 +0,0 @@ -package errors - -const ( - errorPrefixMongo = "mongo" - errorPrefixRedis = "redis" -) diff --git a/db/errors/errors.go b/db/errors/errors.go deleted file mode 100644 index 5e5e23a0..00000000 --- a/db/errors/errors.go +++ /dev/null @@ -1,10 +0,0 @@ -package errors - -import "errors" - -var ( - ErrInvalidType = errors.New("invalid type") - ErrMissingValue = errors.New("missing value") - ErrNoCursor = errors.New("no cursor") - ErrAlreadyLocked = errors.New("already locked") -) diff --git a/db/errors/redis.go b/db/errors/redis.go deleted file mode 100644 index 70db1e79..00000000 --- a/db/errors/redis.go +++ /dev/null @@ -1,15 +0,0 @@ -package errors - -import ( - "errors" - "fmt" -) - -var ( - ErrorRedisInvalidType = NewRedisError("invalid type") - ErrorRedisLocked = NewRedisError("locked") -) - -func NewRedisError(msg string) (err error) { - return errors.New(fmt.Sprintf("%s: %s", errorPrefixRedis, msg)) -} diff --git a/db/generic/base.go b/db/generic/base.go deleted file mode 100644 index c3cc5b29..00000000 --- a/db/generic/base.go +++ /dev/null @@ -1,8 +0,0 @@ -package generic - -const ( - DataSourceTypeMongo = "mongo" - DataSourceTypeMysql = "mysql" - DataSourceTypePostgres = "postgres" - DataSourceTypeElasticSearch = "postgres" -) diff --git a/db/go.mod b/db/go.mod index 58717539..e4beaafb 100644 --- a/db/go.mod +++ b/db/go.mod @@ -8,8 +8,6 @@ require ( github.com/apex/log v1.9.0 github.com/cenkalti/backoff/v4 v4.3.0 github.com/crawlab-team/crawlab/trace v0.0.0-20240614095218-7b4ee8399ab0 - github.com/gomodule/redigo v2.0.0+incompatible - github.com/jmoiron/sqlx v1.2.0 github.com/spf13/viper v1.19.0 github.com/stretchr/testify v1.9.0 go.mongodb.org/mongo-driver v1.15.1 @@ -18,13 +16,10 @@ require ( require ( github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect - github.com/go-sql-driver/mysql v1.6.0 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/klauspost/compress v1.17.7 // indirect - github.com/lib/pq v1.10.4 // indirect github.com/magiconair/properties v1.8.7 // indirect - github.com/mattn/go-sqlite3 v1.14.9 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect github.com/pelletier/go-toml/v2 v2.2.2 // indirect diff --git a/db/interfaces.go b/db/interfaces.go deleted file mode 100644 index c6640a73..00000000 --- a/db/interfaces.go +++ /dev/null @@ -1,39 +0,0 @@ -package db - -import "time" - -type RedisClient interface { - Ping() (err error) - Keys(pattern string) (values []string, err error) - AllKeys() (values []string, err error) - Get(collection string) (value string, err error) - Set(collection string, value string) (err error) - Del(collection string) (err error) - RPush(collection string, value interface{}) (err error) - LPush(collection string, value interface{}) (err error) - LPop(collection string) (value string, err error) - RPop(collection string) (value string, err error) - LLen(collection string) (count int, err error) - BRPop(collection string, timeout int) (value string, err error) - BLPop(collection string, timeout int) (value string, err error) - HSet(collection string, key string, value string) (err error) - HGet(collection string, key string) (value string, err error) - HDel(collection string, key string) (err error) - HScan(collection string) (results map[string]string, err error) - HKeys(collection string) (results []string, err error) - ZAdd(collection string, score float32, value interface{}) (err error) - ZCount(collection string, min string, max string) (count int, err error) - ZCountAll(collection string) (count int, err error) - ZScan(collection string, pattern string, count int) (results []string, err error) - ZPopMax(collection string, count int) (results []string, err error) - ZPopMin(collection string, count int) (results []string, err error) - ZPopMaxOne(collection string) (value string, err error) - ZPopMinOne(collection string) (value string, err error) - BZPopMax(collection string, timeout int) (value string, err error) - BZPopMin(collection string, timeout int) (value string, err error) - Lock(lockKey string) (value int64, err error) - UnLock(lockKey string, value int64) - MemoryStats() (stats map[string]int64, err error) - SetBackoffMaxInterval(interval time.Duration) - SetTimeout(timeout int) -} diff --git a/db/mongo/col.go b/db/mongo/col.go index 67f3b347..56b91a5f 100644 --- a/db/mongo/col.go +++ b/db/mongo/col.go @@ -2,7 +2,7 @@ package mongo import ( "context" - "github.com/crawlab-team/crawlab/db/errors" + "errors" "github.com/crawlab-team/crawlab/trace" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" @@ -58,7 +58,7 @@ func (col *Col) Insert(doc interface{}) (id primitive.ObjectID, err error) { if id, ok := res.InsertedID.(primitive.ObjectID); ok { return id, nil } - return primitive.NilObjectID, trace.TraceError(errors.ErrInvalidType) + return primitive.NilObjectID, trace.TraceError(errors.New("InsertedID is not ObjectID")) } func (col *Col) InsertMany(docs []interface{}) (ids []primitive.ObjectID, err error) { @@ -72,7 +72,7 @@ func (col *Col) InsertMany(docs []interface{}) (ids []primitive.ObjectID, err er id := v.(primitive.ObjectID) ids = append(ids, id) default: - return nil, trace.TraceError(errors.ErrInvalidType) + return nil, trace.TraceError(errors.New("InsertedID is not ObjectID")) } } return ids, nil diff --git a/db/mongo/result.go b/db/mongo/result.go index add5f356..8cdf11b9 100644 --- a/db/mongo/result.go +++ b/db/mongo/result.go @@ -2,7 +2,7 @@ package mongo import ( "context" - "github.com/crawlab-team/crawlab/db/errors" + "errors" "go.mongodb.org/mongo-driver/mongo" ) @@ -15,16 +15,6 @@ type FindResultInterface interface { GetError() (err error) } -func NewFindResult() (fr *FindResult) { - return &FindResult{} -} - -func NewFindResultWithError(err error) (fr *FindResult) { - return &FindResult{ - err: err, - } -} - type FindResult struct { col *Col res *mongo.SingleResult @@ -61,7 +51,7 @@ func (fr *FindResult) All(val interface{}) (err error) { ctx = fr.col.ctx } if fr.cur == nil { - return errors.ErrNoCursor + return errors.New("no cursor") } if !fr.cur.TryNext(ctx) { return ctx.Err() diff --git a/db/redis/client.go b/db/redis/client.go deleted file mode 100644 index 97dcbef3..00000000 --- a/db/redis/client.go +++ /dev/null @@ -1,550 +0,0 @@ -package redis - -import ( - "github.com/apex/log" - "github.com/crawlab-team/crawlab/db" - "github.com/crawlab-team/crawlab/db/errors" - "github.com/crawlab-team/crawlab/db/utils" - "github.com/crawlab-team/crawlab/trace" - "github.com/gomodule/redigo/redis" - "reflect" - "strings" - "time" -) - -type Client struct { - // settings - backoffMaxInterval time.Duration - timeout int - - // internals - pool *redis.Pool -} - -func (client *Client) Ping() error { - c := client.pool.Get() - defer utils.Close(c) - if _, err := redis.String(c.Do("PING")); err != nil { - if err != redis.ErrNil { - return trace.TraceError(err) - } - return err - } - return nil -} - -func (client *Client) Keys(pattern string) (values []string, err error) { - c := client.pool.Get() - defer utils.Close(c) - - values, err = redis.Strings(c.Do("KEYS", pattern)) - if err != nil { - return nil, trace.TraceError(err) - } - return values, nil -} - -func (client *Client) AllKeys() (values []string, err error) { - return client.Keys("*") -} - -func (client *Client) Get(collection string) (value string, err error) { - c := client.pool.Get() - defer utils.Close(c) - - value, err = redis.String(c.Do("GET", collection)) - if err != nil { - return "", trace.TraceError(err) - } - return value, nil -} - -func (client *Client) Set(collection string, value string) (err error) { - c := client.pool.Get() - defer utils.Close(c) - - value, err = redis.String(c.Do("SET", collection, value)) - if err != nil { - return trace.TraceError(err) - } - return nil -} - -func (client *Client) Del(collection string) error { - c := client.pool.Get() - defer utils.Close(c) - - if _, err := c.Do("DEL", collection); err != nil { - return trace.TraceError(err) - } - return nil -} - -func (client *Client) RPush(collection string, value interface{}) error { - c := client.pool.Get() - defer utils.Close(c) - - if _, err := c.Do("RPUSH", collection, value); err != nil { - return trace.TraceError(err) - } - return nil -} - -func (client *Client) LPush(collection string, value interface{}) error { - c := client.pool.Get() - defer utils.Close(c) - - if _, err := c.Do("LPUSH", collection, value); err != nil { - if err != redis.ErrNil { - return trace.TraceError(err) - } - return err - } - return nil -} - -func (client *Client) LPop(collection string) (string, error) { - c := client.pool.Get() - defer utils.Close(c) - - value, err := redis.String(c.Do("LPOP", collection)) - if err != nil { - if err != redis.ErrNil { - return value, trace.TraceError(err) - } - return value, err - } - return value, nil -} - -func (client *Client) RPop(collection string) (string, error) { - c := client.pool.Get() - defer utils.Close(c) - - value, err := redis.String(c.Do("RPOP", collection)) - if err != nil { - if err != redis.ErrNil { - return value, trace.TraceError(err) - } - return value, err - } - return value, nil -} - -func (client *Client) LLen(collection string) (int, error) { - c := client.pool.Get() - defer utils.Close(c) - - value, err := redis.Int(c.Do("LLEN", collection)) - if err != nil { - return 0, trace.TraceError(err) - } - return value, nil -} - -func (client *Client) BRPop(collection string, timeout int) (value string, err error) { - if timeout <= 0 { - timeout = 60 - } - c := client.pool.Get() - defer utils.Close(c) - - values, err := redis.Strings(c.Do("BRPOP", collection, timeout)) - if err != nil { - if err != redis.ErrNil { - return value, trace.TraceError(err) - } - return value, err - } - return values[1], nil -} - -func (client *Client) BLPop(collection string, timeout int) (value string, err error) { - if timeout <= 0 { - timeout = 60 - } - c := client.pool.Get() - defer utils.Close(c) - - values, err := redis.Strings(c.Do("BLPOP", collection, timeout)) - if err != nil { - if err != redis.ErrNil { - return value, trace.TraceError(err) - } - return value, err - } - return values[1], nil -} - -func (client *Client) HSet(collection string, key string, value string) error { - c := client.pool.Get() - defer utils.Close(c) - - if _, err := c.Do("HSET", collection, key, value); err != nil { - if err != redis.ErrNil { - return trace.TraceError(err) - } - return err - } - return nil -} - -func (client *Client) HGet(collection string, key string) (string, error) { - c := client.pool.Get() - defer utils.Close(c) - value, err := redis.String(c.Do("HGET", collection, key)) - if err != nil && err != redis.ErrNil { - if err != redis.ErrNil { - return value, trace.TraceError(err) - } - return value, err - } - return value, nil -} - -func (client *Client) HDel(collection string, key string) error { - c := client.pool.Get() - defer utils.Close(c) - - if _, err := c.Do("HDEL", collection, key); err != nil { - return trace.TraceError(err) - } - return nil -} - -func (client *Client) HScan(collection string) (results map[string]string, err error) { - c := client.pool.Get() - defer utils.Close(c) - - var ( - cursor int64 - items []string - ) - - results = map[string]string{} - - for { - values, err := redis.Values(c.Do("HSCAN", collection, cursor)) - if err != nil { - if err != redis.ErrNil { - return nil, trace.TraceError(err) - } - return nil, err - } - - values, err = redis.Scan(values, &cursor, &items) - if err != nil { - if err != redis.ErrNil { - return nil, trace.TraceError(err) - } - return nil, err - } - for i := 0; i < len(items); i += 2 { - key := items[i] - value := items[i+1] - results[key] = value - } - if cursor == 0 { - break - } - } - return results, nil -} - -func (client *Client) HKeys(collection string) (results []string, err error) { - c := client.pool.Get() - defer utils.Close(c) - - results, err = redis.Strings(c.Do("HKEYS", collection)) - if err != nil { - if err != redis.ErrNil { - return results, trace.TraceError(err) - } - return results, err - } - return results, nil -} - -func (client *Client) ZAdd(collection string, score float32, value interface{}) (err error) { - c := client.pool.Get() - defer utils.Close(c) - - if _, err := c.Do("ZADD", collection, score, value); err != nil { - return trace.TraceError(err) - } - return nil -} - -func (client *Client) ZCount(collection string, min string, max string) (count int, err error) { - c := client.pool.Get() - defer utils.Close(c) - - count, err = redis.Int(c.Do("ZCOUNT", collection, min, max)) - if err != nil { - return 0, trace.TraceError(err) - } - return count, nil -} - -func (client *Client) ZCountAll(collection string) (count int, err error) { - return client.ZCount(collection, "-inf", "+inf") -} - -func (client *Client) ZScan(collection string, pattern string, count int) (values []string, err error) { - c := client.pool.Get() - defer utils.Close(c) - - values, err = redis.Strings(c.Do("ZSCAN", collection, 0, pattern, count)) - if err != nil { - if err != redis.ErrNil { - return nil, trace.TraceError(err) - } - return nil, err - } - return values, nil -} - -func (client *Client) ZPopMax(collection string, count int) (results []string, err error) { - c := client.pool.Get() - defer utils.Close(c) - - results = []string{} - - values, err := redis.Strings(c.Do("ZPOPMAX", collection, count)) - if err != nil { - if err != redis.ErrNil { - return nil, trace.TraceError(err) - } - return nil, err - } - - for i := 0; i < len(values); i += 2 { - v := values[i] - results = append(results, v) - } - - return results, nil -} - -func (client *Client) ZPopMin(collection string, count int) (results []string, err error) { - c := client.pool.Get() - defer utils.Close(c) - - results = []string{} - - values, err := redis.Strings(c.Do("ZPOPMIN", collection, count)) - if err != nil { - if err != redis.ErrNil { - return nil, trace.TraceError(err) - } - return nil, err - } - - for i := 0; i < len(values); i += 2 { - v := values[i] - results = append(results, v) - } - - return results, nil -} - -func (client *Client) ZPopMaxOne(collection string) (value string, err error) { - c := client.pool.Get() - defer utils.Close(c) - - values, err := client.ZPopMax(collection, 1) - if err != nil { - return "", err - } - if values == nil || len(values) == 0 { - return "", nil - } - return values[0], nil -} - -func (client *Client) ZPopMinOne(collection string) (value string, err error) { - c := client.pool.Get() - defer utils.Close(c) - - values, err := client.ZPopMin(collection, 1) - if err != nil { - return "", err - } - if values == nil || len(values) == 0 { - return "", nil - } - return values[0], nil -} - -func (client *Client) BZPopMax(collection string, timeout int) (value string, err error) { - c := client.pool.Get() - defer utils.Close(c) - - values, err := redis.Strings(c.Do("BZPOPMAX", collection, timeout)) - if err != nil { - if err != redis.ErrNil { - return "", trace.TraceError(err) - } - return "", err - } - if len(values) < 3 { - return "", trace.TraceError(errors.ErrorRedisInvalidType) - } - return values[1], nil -} - -func (client *Client) BZPopMin(collection string, timeout int) (value string, err error) { - c := client.pool.Get() - defer utils.Close(c) - - values, err := redis.Strings(c.Do("BZPOPMIN", collection, timeout)) - if err != nil { - if err != redis.ErrNil { - return "", trace.TraceError(err) - } - return "", err - } - if len(values) < 3 { - return "", trace.TraceError(errors.ErrorRedisInvalidType) - } - return values[1], nil -} - -func (client *Client) Lock(lockKey string) (value int64, err error) { - c := client.pool.Get() - defer utils.Close(c) - lockKey = client.getLockKey(lockKey) - - ts := time.Now().Unix() - ok, err := c.Do("SET", lockKey, ts, "NX", "PX", 30000) - if err != nil { - if err != redis.ErrNil { - return value, trace.TraceError(err) - } - return value, err - } - if ok == nil { - return 0, trace.TraceError(errors.ErrorRedisLocked) - } - return ts, nil -} - -func (client *Client) UnLock(lockKey string, value int64) { - c := client.pool.Get() - defer utils.Close(c) - lockKey = client.getLockKey(lockKey) - - getValue, err := redis.Int64(c.Do("GET", lockKey)) - if err != nil { - log.Errorf("get lockKey error: %s", err.Error()) - return - } - - if getValue != value { - log.Errorf("the lockKey value diff: %d, %d", value, getValue) - return - } - - v, err := redis.Int64(c.Do("DEL", lockKey)) - if err != nil { - log.Errorf("unlock failed, error: %s", err.Error()) - return - } - - if v == 0 { - log.Errorf("unlock failed: key=%s", lockKey) - return - } -} - -func (client *Client) MemoryStats() (stats map[string]int64, err error) { - stats = map[string]int64{} - c := client.pool.Get() - defer utils.Close(c) - values, err := redis.Values(c.Do("MEMORY", "STATS")) - for i, v := range values { - t := reflect.TypeOf(v) - if t.Kind() == reflect.Slice { - vc, _ := redis.String(v, err) - if utils.ContainsString(MemoryStatsMetrics, vc) { - stats[vc], _ = redis.Int64(values[i+1], err) - } - } - } - if err != nil { - if err != redis.ErrNil { - return stats, trace.TraceError(err) - } - return stats, err - } - return stats, nil -} - -func (client *Client) SetBackoffMaxInterval(interval time.Duration) { - client.backoffMaxInterval = interval -} - -func (client *Client) SetTimeout(timeout int) { - client.timeout = timeout -} - -func (client *Client) init() (err error) { - b := backoff.NewExponentialBackOff() - b.MaxInterval = client.backoffMaxInterval - if err := backoff.Retry(func() error { - err := client.Ping() - if err != nil { - log.WithError(err).Warnf("waiting for redis pool active connection. will after %f seconds try again.", b.NextBackOff().Seconds()) - } - return nil - }, b); err != nil { - return trace.TraceError(err) - } - return nil -} - -func (client *Client) getLockKey(lockKey string) string { - lockKey = strings.ReplaceAll(lockKey, ":", "-") - return "nodes:lock:" + lockKey -} - -func (client *Client) getTimeout(timeout int) (res int) { - if timeout == 0 { - return client.timeout - } - return timeout -} - -var client db.RedisClient - -func NewRedisClient(opts ...Option) (client *Client, err error) { - // client - client = &Client{ - backoffMaxInterval: 20 * time.Second, - pool: NewRedisPool(), - } - - // apply options - for _, opt := range opts { - opt(client) - } - - // init - if err := client.init(); err != nil { - return nil, err - } - - return client, nil -} - -func GetRedisClient() (c db.RedisClient, err error) { - if client != nil { - return client, nil - } - c, err = NewRedisClient() - if err != nil { - return nil, err - } - - return c, nil -} diff --git a/db/redis/constants.go b/db/redis/constants.go deleted file mode 100644 index fc7c2e56..00000000 --- a/db/redis/constants.go +++ /dev/null @@ -1,10 +0,0 @@ -package redis - -var MemoryStatsMetrics = []string{ - "peak.allocated", - "total.allocated", - "startup.allocated", - "overhead.total", - "keys.count", - "dataset.bytes", -} diff --git a/db/redis/options.go b/db/redis/options.go deleted file mode 100644 index 299bcbf1..00000000 --- a/db/redis/options.go +++ /dev/null @@ -1,20 +0,0 @@ -package redis - -import ( - "github.com/crawlab-team/crawlab/db" - "time" -) - -type Option func(c db.RedisClient) - -func WithBackoffMaxInterval(interval time.Duration) Option { - return func(c db.RedisClient) { - c.SetBackoffMaxInterval(interval) - } -} - -func WithTimeout(timeout int) Option { - return func(c db.RedisClient) { - c.SetTimeout(timeout) - } -} diff --git a/db/redis/pool.go b/db/redis/pool.go deleted file mode 100644 index cc08388a..00000000 --- a/db/redis/pool.go +++ /dev/null @@ -1,54 +0,0 @@ -package redis - -import ( - "github.com/crawlab-team/crawlab/trace" - "github.com/gomodule/redigo/redis" - "github.com/spf13/viper" - "time" -) - -func NewRedisPool() *redis.Pool { - var address = viper.GetString("redis.address") - var port = viper.GetString("redis.port") - var database = viper.GetString("redis.database") - var password = viper.GetString("redis.password") - - // normalize params - if address == "" { - address = "localhost" - } - if port == "" { - port = "6379" - } - if database == "" { - database = "1" - } - - var url string - if password == "" { - url = "redis://" + address + ":" + port + "/" + database - } else { - url = "redis://x:" + password + "@" + address + ":" + port + "/" + database - } - return &redis.Pool{ - Dial: func() (conn redis.Conn, e error) { - return redis.DialURL(url, - redis.DialConnectTimeout(time.Second*10), - redis.DialReadTimeout(time.Second*600), - redis.DialWriteTimeout(time.Second*10), - ) - }, - TestOnBorrow: func(c redis.Conn, t time.Time) error { - if time.Since(t) < time.Minute { - return nil - } - _, err := c.Do("PING") - return trace.TraceError(err) - }, - MaxIdle: 10, - MaxActive: 0, - IdleTimeout: 300 * time.Second, - Wait: false, - MaxConnLifetime: 0, - } -} diff --git a/db/redis/test/base.go b/db/redis/test/base.go deleted file mode 100644 index 3eaf5e2d..00000000 --- a/db/redis/test/base.go +++ /dev/null @@ -1,91 +0,0 @@ -package test - -import ( - "github.com/crawlab-team/crawlab/db" - "github.com/crawlab-team/crawlab/db/redis" - "testing" -) - -func init() { - var err error - T, err = NewTest() - if err != nil { - panic(err) - } -} - -type Test struct { - client db.RedisClient - TestCollection string - TestMessage string - TestMessages []string - TestMessagesMap map[string]string - TestKeysAlpha []string - TestKeysBeta []string - TestLockKey string -} - -func (t *Test) Setup(t2 *testing.T) { - t2.Cleanup(t.Cleanup) -} - -func (t *Test) Cleanup() { - keys, _ := t.client.AllKeys() - for _, key := range keys { - _ = t.client.Del(key) - } -} - -var T *Test - -func NewTest() (t *Test, err error) { - // test - t = &Test{} - - // client - t.client, err = redis.GetRedisClient() - if err != nil { - return nil, err - } - - // test collection - t.TestCollection = "test_collection" - - // test message - t.TestMessage = "this is a test message" - - // test messages - t.TestMessages = []string{ - "test message 1", - "test message 2", - "test message 3", - } - - // test messages map - t.TestMessagesMap = map[string]string{ - "test key 1": "test value 1", - "test key 2": "test value 2", - "test key 3": "test value 3", - } - - // test keys alpha - t.TestKeysAlpha = []string{ - "test key alpha 1", - "test key alpha 2", - "test key alpha 3", - } - - // test keys beta - t.TestKeysBeta = []string{ - "test key beta 1", - "test key beta 2", - "test key beta 3", - "test key beta 4", - "test key beta 5", - } - - // test lock key - t.TestLockKey = "test lock key" - - return t, nil -} diff --git a/db/redis/test/client_test.go b/db/redis/test/client_test.go deleted file mode 100644 index 6fa363fa..00000000 --- a/db/redis/test/client_test.go +++ /dev/null @@ -1,273 +0,0 @@ -package test - -import ( - "github.com/crawlab-team/crawlab/db/redis" - "github.com/stretchr/testify/require" - "testing" - "time" -) - -func TestRedisClient_Ping(t *testing.T) { - var err error - T.Setup(t) - - err = T.client.Ping() - require.Nil(t, err) -} - -func TestRedisClient_Get_Set(t *testing.T) { - var err error - T.Setup(t) - - err = T.client.Set(T.TestCollection, T.TestMessage) - require.Nil(t, err) - - value, err := T.client.Get(T.TestCollection) - require.Nil(t, err) - require.Equal(t, T.TestMessage, value) -} - -func TestRedisClient_Keys_AllKeys(t *testing.T) { - var err error - T.Setup(t) - - for _, key := range T.TestKeysAlpha { - err = T.client.Set(key, key) - require.Nil(t, err) - } - for _, key := range T.TestKeysBeta { - err = T.client.Set(key, key) - require.Nil(t, err) - } - - keys, err := T.client.Keys("*alpha*") - require.Nil(t, err) - require.Len(t, keys, len(T.TestKeysAlpha)) - - keys, err = T.client.Keys("*beta*") - require.Nil(t, err) - require.Len(t, keys, len(T.TestKeysBeta)) - - keys, err = T.client.AllKeys() - require.Nil(t, err) - require.Len(t, keys, len(T.TestKeysAlpha)+len(T.TestKeysBeta)) -} - -func TestRedisClient_RPush_LPop_LLen(t *testing.T) { - var err error - T.Setup(t) - - for _, msg := range T.TestMessages { - err = T.client.RPush(T.TestCollection, msg) - require.Nil(t, err) - } - - n, err := T.client.LLen(T.TestCollection) - require.Nil(t, err) - require.Equal(t, len(T.TestMessages), n) - - value, err := T.client.LPop(T.TestCollection) - require.Nil(t, err) - require.Equal(t, T.TestMessages[0], value) -} - -func TestRedisClient_LPush_RPop(t *testing.T) { - var err error - T.Setup(t) - - for _, msg := range T.TestMessages { - err = T.client.LPush(T.TestCollection, msg) - require.Nil(t, err) - } - - n, err := T.client.LLen(T.TestCollection) - require.Nil(t, err) - require.Equal(t, len(T.TestMessages), n) - - value, err := T.client.RPop(T.TestCollection) - require.Nil(t, err) - require.Equal(t, T.TestMessages[0], value) -} - -func TestRedisClient_BRPop(t *testing.T) { - var err error - T.Setup(t) - - isErr := true - go func(t *testing.T) { - value, err := T.client.BRPop(T.TestCollection, 0) - require.Nil(t, err) - require.Equal(t, T.TestMessage, value) - isErr = false - }(t) - - err = T.client.LPush(T.TestCollection, T.TestMessage) - require.Nil(t, err) - time.Sleep(500 * time.Millisecond) - require.False(t, isErr) -} - -func TestRedisClient_BLPop(t *testing.T) { - var err error - T.Setup(t) - - isErr := true - go func(t *testing.T) { - value, err := T.client.BLPop(T.TestCollection, 0) - require.Nil(t, err) - require.Equal(t, T.TestMessage, value) - isErr = false - }(t) - - err = T.client.RPush(T.TestCollection, T.TestMessage) - require.Nil(t, err) - time.Sleep(500 * time.Millisecond) - require.False(t, isErr) -} - -func TestRedisClient_HSet_HGet_HDel(t *testing.T) { - var err error - T.Setup(t) - - for k, v := range T.TestMessagesMap { - err = T.client.HSet(T.TestCollection, k, v) - require.Nil(t, err) - } - - for k, v := range T.TestMessagesMap { - vr, err := T.client.HGet(T.TestCollection, k) - require.Nil(t, err) - require.Equal(t, v, vr) - } - - for k := range T.TestMessagesMap { - err = T.client.HDel(T.TestCollection, k) - require.Nil(t, err) - - v, err := T.client.HGet(T.TestCollection, k) - require.Nil(t, err) - require.Empty(t, v) - } -} - -func TestRedisClient_HScan(t *testing.T) { - var err error - T.Setup(t) - - for k, v := range T.TestMessagesMap { - err = T.client.HSet(T.TestCollection, k, v) - require.Nil(t, err) - } - - results, err := T.client.HScan(T.TestCollection) - require.Nil(t, err) - - for k, vr := range results { - v, ok := T.TestMessagesMap[k] - require.True(t, ok) - require.Equal(t, v, vr) - } -} - -func TestRedisClient_HKeys(t *testing.T) { - var err error - T.Setup(t) - - for k, v := range T.TestMessagesMap { - err = T.client.HSet(T.TestCollection, k, v) - require.Nil(t, err) - } - - keys, err := T.client.HKeys(T.TestCollection) - require.Nil(t, err) - - for _, k := range keys { - _, ok := T.TestMessagesMap[k] - require.True(t, ok) - } -} - -func TestRedisClient_ZAdd_ZCount_ZCountAll_ZPopMax_ZPopMin(t *testing.T) { - var err error - T.Setup(t) - - for i, v := range T.TestMessages { - score := float32(i) - err = T.client.ZAdd(T.TestCollection, score, v) - require.Nil(t, err) - } - - count, err := T.client.ZCountAll(T.TestCollection) - require.Nil(t, err) - require.Equal(t, len(T.TestMessages), count) - - value, err := T.client.ZPopMaxOne(T.TestCollection) - require.Nil(t, err) - require.Equal(t, T.TestMessages[len(T.TestMessages)-1], value) - - value, err = T.client.ZPopMinOne(T.TestCollection) - require.Nil(t, err) - require.Equal(t, T.TestMessages[0], value) -} - -func TestRedisClient_BZPopMax_BZPopMin(t *testing.T) { - var err error - T.Setup(t) - - isErr := true - go func(t *testing.T) { - value, err := T.client.BZPopMax(T.TestCollection, 0) - require.Nil(t, err) - require.Equal(t, T.TestMessage, value) - isErr = false - }(t) - - err = T.client.ZAdd(T.TestCollection, 1, T.TestMessage) - require.Nil(t, err) - time.Sleep(500 * time.Millisecond) - require.False(t, isErr) - - isErr = true - go func(t *testing.T) { - value, err := T.client.BZPopMin(T.TestCollection, 0) - require.Nil(t, err) - require.Equal(t, T.TestMessage, value) - isErr = false - }(t) - - err = T.client.ZAdd(T.TestCollection, 1, T.TestMessage) - require.Nil(t, err) - time.Sleep(500 * time.Millisecond) - require.False(t, isErr) -} - -func TestRedisClient_Lock_Unlock(t *testing.T) { - var err error - T.Setup(t) - - ts, err := T.client.Lock(T.TestLockKey) - require.Nil(t, err) - - _, err = T.client.Lock(T.TestLockKey) - require.NotNil(t, err) - - T.client.UnLock(T.TestLockKey, ts) - - ts, err = T.client.Lock(T.TestLockKey) - require.Nil(t, err) - -} - -func TestRedisClient_MemoryStats(t *testing.T) { - var err error - T.Setup(t) - - stats, err := T.client.MemoryStats() - require.Nil(t, err) - - for _, k := range redis.MemoryStatsMetrics { - v, ok := stats[k] - require.True(t, ok) - require.Greater(t, v, int64(-1)) - } -} diff --git a/db/sql/sql.go b/db/sql/sql.go deleted file mode 100644 index 0a35ffc6..00000000 --- a/db/sql/sql.go +++ /dev/null @@ -1,36 +0,0 @@ -package sql - -import ( - "errors" - "fmt" - "github.com/crawlab-team/crawlab/trace" - "github.com/jmoiron/sqlx" -) - -func GetSqlDatabaseConnectionString(dataSourceType string, host string, port string, username string, password string, database string) (connStr string, err error) { - if dataSourceType == "mysql" { - connStr = fmt.Sprintf("%s:%s@(%s:%s)/%s?charset=utf8&parseTime=True&loc=Local", username, password, host, port, database) - } else if dataSourceType == "postgres" { - connStr = fmt.Sprintf("host=%s port=%s user=%s dbname=%s password=%s sslmode=%s", host, port, username, database, password, "disable") - } else { - err = errors.New(dataSourceType + " is not implemented") - return connStr, trace.TraceError(err) - } - return connStr, nil -} - -func GetSqlConn(dataSourceType string, host string, port string, username string, password string, database string) (db *sqlx.DB, err error) { - // get database connection string - connStr, err := GetSqlDatabaseConnectionString(dataSourceType, host, port, username, password, database) - if err != nil { - return db, trace.TraceError(err) - } - - // get database instance - db, err = sqlx.Open(dataSourceType, connStr) - if err != nil { - return db, trace.TraceError(err) - } - - return db, nil -} diff --git a/db/utils/utils.go b/db/utils/utils.go deleted file mode 100644 index f2ff4893..00000000 --- a/db/utils/utils.go +++ /dev/null @@ -1,19 +0,0 @@ -package utils - -import "io" - -func Close(c io.Closer) { - err := c.Close() - if err != nil { - //log.WithError(err).Error("关闭资源文件失败。") - } -} - -func ContainsString(list []string, item string) bool { - for _, d := range list { - if d == item { - return true - } - } - return false -}