refactor: code cleanup

This commit is contained in:
Marvin Zhang
2024-10-22 16:22:35 +08:00
parent a3694fb1fb
commit 5dd06adfbb
16 changed files with 5 additions and 1151 deletions

View File

@@ -1,6 +0,0 @@
package errors
const (
errorPrefixMongo = "mongo"
errorPrefixRedis = "redis"
)

View File

@@ -1,10 +0,0 @@
package errors
import "errors"
var (
ErrInvalidType = errors.New("invalid type")
ErrMissingValue = errors.New("missing value")
ErrNoCursor = errors.New("no cursor")
ErrAlreadyLocked = errors.New("already locked")
)

View File

@@ -1,15 +0,0 @@
package errors
import (
"errors"
"fmt"
)
var (
ErrorRedisInvalidType = NewRedisError("invalid type")
ErrorRedisLocked = NewRedisError("locked")
)
func NewRedisError(msg string) (err error) {
return errors.New(fmt.Sprintf("%s: %s", errorPrefixRedis, msg))
}

View File

@@ -1,8 +0,0 @@
package generic
const (
DataSourceTypeMongo = "mongo"
DataSourceTypeMysql = "mysql"
DataSourceTypePostgres = "postgres"
DataSourceTypeElasticSearch = "postgres"
)

View File

@@ -8,8 +8,6 @@ require (
github.com/apex/log v1.9.0
github.com/cenkalti/backoff/v4 v4.3.0
github.com/crawlab-team/crawlab/trace v0.0.0-20240614095218-7b4ee8399ab0
github.com/gomodule/redigo v2.0.0+incompatible
github.com/jmoiron/sqlx v1.2.0
github.com/spf13/viper v1.19.0
github.com/stretchr/testify v1.9.0
go.mongodb.org/mongo-driver v1.15.1
@@ -18,13 +16,10 @@ require (
require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-sql-driver/mysql v1.6.0 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/klauspost/compress v1.17.7 // indirect
github.com/lib/pq v1.10.4 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mattn/go-sqlite3 v1.14.9 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
github.com/pelletier/go-toml/v2 v2.2.2 // indirect

View File

@@ -1,39 +0,0 @@
package db
import "time"
type RedisClient interface {
Ping() (err error)
Keys(pattern string) (values []string, err error)
AllKeys() (values []string, err error)
Get(collection string) (value string, err error)
Set(collection string, value string) (err error)
Del(collection string) (err error)
RPush(collection string, value interface{}) (err error)
LPush(collection string, value interface{}) (err error)
LPop(collection string) (value string, err error)
RPop(collection string) (value string, err error)
LLen(collection string) (count int, err error)
BRPop(collection string, timeout int) (value string, err error)
BLPop(collection string, timeout int) (value string, err error)
HSet(collection string, key string, value string) (err error)
HGet(collection string, key string) (value string, err error)
HDel(collection string, key string) (err error)
HScan(collection string) (results map[string]string, err error)
HKeys(collection string) (results []string, err error)
ZAdd(collection string, score float32, value interface{}) (err error)
ZCount(collection string, min string, max string) (count int, err error)
ZCountAll(collection string) (count int, err error)
ZScan(collection string, pattern string, count int) (results []string, err error)
ZPopMax(collection string, count int) (results []string, err error)
ZPopMin(collection string, count int) (results []string, err error)
ZPopMaxOne(collection string) (value string, err error)
ZPopMinOne(collection string) (value string, err error)
BZPopMax(collection string, timeout int) (value string, err error)
BZPopMin(collection string, timeout int) (value string, err error)
Lock(lockKey string) (value int64, err error)
UnLock(lockKey string, value int64)
MemoryStats() (stats map[string]int64, err error)
SetBackoffMaxInterval(interval time.Duration)
SetTimeout(timeout int)
}

View File

@@ -2,7 +2,7 @@ package mongo
import (
"context"
"github.com/crawlab-team/crawlab/db/errors"
"errors"
"github.com/crawlab-team/crawlab/trace"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
@@ -58,7 +58,7 @@ func (col *Col) Insert(doc interface{}) (id primitive.ObjectID, err error) {
if id, ok := res.InsertedID.(primitive.ObjectID); ok {
return id, nil
}
return primitive.NilObjectID, trace.TraceError(errors.ErrInvalidType)
return primitive.NilObjectID, trace.TraceError(errors.New("InsertedID is not ObjectID"))
}
func (col *Col) InsertMany(docs []interface{}) (ids []primitive.ObjectID, err error) {
@@ -72,7 +72,7 @@ func (col *Col) InsertMany(docs []interface{}) (ids []primitive.ObjectID, err er
id := v.(primitive.ObjectID)
ids = append(ids, id)
default:
return nil, trace.TraceError(errors.ErrInvalidType)
return nil, trace.TraceError(errors.New("InsertedID is not ObjectID"))
}
}
return ids, nil

View File

@@ -2,7 +2,7 @@ package mongo
import (
"context"
"github.com/crawlab-team/crawlab/db/errors"
"errors"
"go.mongodb.org/mongo-driver/mongo"
)
@@ -15,16 +15,6 @@ type FindResultInterface interface {
GetError() (err error)
}
func NewFindResult() (fr *FindResult) {
return &FindResult{}
}
func NewFindResultWithError(err error) (fr *FindResult) {
return &FindResult{
err: err,
}
}
type FindResult struct {
col *Col
res *mongo.SingleResult
@@ -61,7 +51,7 @@ func (fr *FindResult) All(val interface{}) (err error) {
ctx = fr.col.ctx
}
if fr.cur == nil {
return errors.ErrNoCursor
return errors.New("no cursor")
}
if !fr.cur.TryNext(ctx) {
return ctx.Err()

View File

@@ -1,550 +0,0 @@
package redis
import (
"github.com/apex/log"
"github.com/crawlab-team/crawlab/db"
"github.com/crawlab-team/crawlab/db/errors"
"github.com/crawlab-team/crawlab/db/utils"
"github.com/crawlab-team/crawlab/trace"
"github.com/gomodule/redigo/redis"
"reflect"
"strings"
"time"
)
type Client struct {
// settings
backoffMaxInterval time.Duration
timeout int
// internals
pool *redis.Pool
}
func (client *Client) Ping() error {
c := client.pool.Get()
defer utils.Close(c)
if _, err := redis.String(c.Do("PING")); err != nil {
if err != redis.ErrNil {
return trace.TraceError(err)
}
return err
}
return nil
}
func (client *Client) Keys(pattern string) (values []string, err error) {
c := client.pool.Get()
defer utils.Close(c)
values, err = redis.Strings(c.Do("KEYS", pattern))
if err != nil {
return nil, trace.TraceError(err)
}
return values, nil
}
func (client *Client) AllKeys() (values []string, err error) {
return client.Keys("*")
}
func (client *Client) Get(collection string) (value string, err error) {
c := client.pool.Get()
defer utils.Close(c)
value, err = redis.String(c.Do("GET", collection))
if err != nil {
return "", trace.TraceError(err)
}
return value, nil
}
func (client *Client) Set(collection string, value string) (err error) {
c := client.pool.Get()
defer utils.Close(c)
value, err = redis.String(c.Do("SET", collection, value))
if err != nil {
return trace.TraceError(err)
}
return nil
}
func (client *Client) Del(collection string) error {
c := client.pool.Get()
defer utils.Close(c)
if _, err := c.Do("DEL", collection); err != nil {
return trace.TraceError(err)
}
return nil
}
func (client *Client) RPush(collection string, value interface{}) error {
c := client.pool.Get()
defer utils.Close(c)
if _, err := c.Do("RPUSH", collection, value); err != nil {
return trace.TraceError(err)
}
return nil
}
func (client *Client) LPush(collection string, value interface{}) error {
c := client.pool.Get()
defer utils.Close(c)
if _, err := c.Do("LPUSH", collection, value); err != nil {
if err != redis.ErrNil {
return trace.TraceError(err)
}
return err
}
return nil
}
func (client *Client) LPop(collection string) (string, error) {
c := client.pool.Get()
defer utils.Close(c)
value, err := redis.String(c.Do("LPOP", collection))
if err != nil {
if err != redis.ErrNil {
return value, trace.TraceError(err)
}
return value, err
}
return value, nil
}
func (client *Client) RPop(collection string) (string, error) {
c := client.pool.Get()
defer utils.Close(c)
value, err := redis.String(c.Do("RPOP", collection))
if err != nil {
if err != redis.ErrNil {
return value, trace.TraceError(err)
}
return value, err
}
return value, nil
}
func (client *Client) LLen(collection string) (int, error) {
c := client.pool.Get()
defer utils.Close(c)
value, err := redis.Int(c.Do("LLEN", collection))
if err != nil {
return 0, trace.TraceError(err)
}
return value, nil
}
func (client *Client) BRPop(collection string, timeout int) (value string, err error) {
if timeout <= 0 {
timeout = 60
}
c := client.pool.Get()
defer utils.Close(c)
values, err := redis.Strings(c.Do("BRPOP", collection, timeout))
if err != nil {
if err != redis.ErrNil {
return value, trace.TraceError(err)
}
return value, err
}
return values[1], nil
}
func (client *Client) BLPop(collection string, timeout int) (value string, err error) {
if timeout <= 0 {
timeout = 60
}
c := client.pool.Get()
defer utils.Close(c)
values, err := redis.Strings(c.Do("BLPOP", collection, timeout))
if err != nil {
if err != redis.ErrNil {
return value, trace.TraceError(err)
}
return value, err
}
return values[1], nil
}
func (client *Client) HSet(collection string, key string, value string) error {
c := client.pool.Get()
defer utils.Close(c)
if _, err := c.Do("HSET", collection, key, value); err != nil {
if err != redis.ErrNil {
return trace.TraceError(err)
}
return err
}
return nil
}
func (client *Client) HGet(collection string, key string) (string, error) {
c := client.pool.Get()
defer utils.Close(c)
value, err := redis.String(c.Do("HGET", collection, key))
if err != nil && err != redis.ErrNil {
if err != redis.ErrNil {
return value, trace.TraceError(err)
}
return value, err
}
return value, nil
}
func (client *Client) HDel(collection string, key string) error {
c := client.pool.Get()
defer utils.Close(c)
if _, err := c.Do("HDEL", collection, key); err != nil {
return trace.TraceError(err)
}
return nil
}
func (client *Client) HScan(collection string) (results map[string]string, err error) {
c := client.pool.Get()
defer utils.Close(c)
var (
cursor int64
items []string
)
results = map[string]string{}
for {
values, err := redis.Values(c.Do("HSCAN", collection, cursor))
if err != nil {
if err != redis.ErrNil {
return nil, trace.TraceError(err)
}
return nil, err
}
values, err = redis.Scan(values, &cursor, &items)
if err != nil {
if err != redis.ErrNil {
return nil, trace.TraceError(err)
}
return nil, err
}
for i := 0; i < len(items); i += 2 {
key := items[i]
value := items[i+1]
results[key] = value
}
if cursor == 0 {
break
}
}
return results, nil
}
func (client *Client) HKeys(collection string) (results []string, err error) {
c := client.pool.Get()
defer utils.Close(c)
results, err = redis.Strings(c.Do("HKEYS", collection))
if err != nil {
if err != redis.ErrNil {
return results, trace.TraceError(err)
}
return results, err
}
return results, nil
}
func (client *Client) ZAdd(collection string, score float32, value interface{}) (err error) {
c := client.pool.Get()
defer utils.Close(c)
if _, err := c.Do("ZADD", collection, score, value); err != nil {
return trace.TraceError(err)
}
return nil
}
func (client *Client) ZCount(collection string, min string, max string) (count int, err error) {
c := client.pool.Get()
defer utils.Close(c)
count, err = redis.Int(c.Do("ZCOUNT", collection, min, max))
if err != nil {
return 0, trace.TraceError(err)
}
return count, nil
}
func (client *Client) ZCountAll(collection string) (count int, err error) {
return client.ZCount(collection, "-inf", "+inf")
}
func (client *Client) ZScan(collection string, pattern string, count int) (values []string, err error) {
c := client.pool.Get()
defer utils.Close(c)
values, err = redis.Strings(c.Do("ZSCAN", collection, 0, pattern, count))
if err != nil {
if err != redis.ErrNil {
return nil, trace.TraceError(err)
}
return nil, err
}
return values, nil
}
func (client *Client) ZPopMax(collection string, count int) (results []string, err error) {
c := client.pool.Get()
defer utils.Close(c)
results = []string{}
values, err := redis.Strings(c.Do("ZPOPMAX", collection, count))
if err != nil {
if err != redis.ErrNil {
return nil, trace.TraceError(err)
}
return nil, err
}
for i := 0; i < len(values); i += 2 {
v := values[i]
results = append(results, v)
}
return results, nil
}
func (client *Client) ZPopMin(collection string, count int) (results []string, err error) {
c := client.pool.Get()
defer utils.Close(c)
results = []string{}
values, err := redis.Strings(c.Do("ZPOPMIN", collection, count))
if err != nil {
if err != redis.ErrNil {
return nil, trace.TraceError(err)
}
return nil, err
}
for i := 0; i < len(values); i += 2 {
v := values[i]
results = append(results, v)
}
return results, nil
}
func (client *Client) ZPopMaxOne(collection string) (value string, err error) {
c := client.pool.Get()
defer utils.Close(c)
values, err := client.ZPopMax(collection, 1)
if err != nil {
return "", err
}
if values == nil || len(values) == 0 {
return "", nil
}
return values[0], nil
}
func (client *Client) ZPopMinOne(collection string) (value string, err error) {
c := client.pool.Get()
defer utils.Close(c)
values, err := client.ZPopMin(collection, 1)
if err != nil {
return "", err
}
if values == nil || len(values) == 0 {
return "", nil
}
return values[0], nil
}
func (client *Client) BZPopMax(collection string, timeout int) (value string, err error) {
c := client.pool.Get()
defer utils.Close(c)
values, err := redis.Strings(c.Do("BZPOPMAX", collection, timeout))
if err != nil {
if err != redis.ErrNil {
return "", trace.TraceError(err)
}
return "", err
}
if len(values) < 3 {
return "", trace.TraceError(errors.ErrorRedisInvalidType)
}
return values[1], nil
}
func (client *Client) BZPopMin(collection string, timeout int) (value string, err error) {
c := client.pool.Get()
defer utils.Close(c)
values, err := redis.Strings(c.Do("BZPOPMIN", collection, timeout))
if err != nil {
if err != redis.ErrNil {
return "", trace.TraceError(err)
}
return "", err
}
if len(values) < 3 {
return "", trace.TraceError(errors.ErrorRedisInvalidType)
}
return values[1], nil
}
func (client *Client) Lock(lockKey string) (value int64, err error) {
c := client.pool.Get()
defer utils.Close(c)
lockKey = client.getLockKey(lockKey)
ts := time.Now().Unix()
ok, err := c.Do("SET", lockKey, ts, "NX", "PX", 30000)
if err != nil {
if err != redis.ErrNil {
return value, trace.TraceError(err)
}
return value, err
}
if ok == nil {
return 0, trace.TraceError(errors.ErrorRedisLocked)
}
return ts, nil
}
func (client *Client) UnLock(lockKey string, value int64) {
c := client.pool.Get()
defer utils.Close(c)
lockKey = client.getLockKey(lockKey)
getValue, err := redis.Int64(c.Do("GET", lockKey))
if err != nil {
log.Errorf("get lockKey error: %s", err.Error())
return
}
if getValue != value {
log.Errorf("the lockKey value diff: %d, %d", value, getValue)
return
}
v, err := redis.Int64(c.Do("DEL", lockKey))
if err != nil {
log.Errorf("unlock failed, error: %s", err.Error())
return
}
if v == 0 {
log.Errorf("unlock failed: key=%s", lockKey)
return
}
}
func (client *Client) MemoryStats() (stats map[string]int64, err error) {
stats = map[string]int64{}
c := client.pool.Get()
defer utils.Close(c)
values, err := redis.Values(c.Do("MEMORY", "STATS"))
for i, v := range values {
t := reflect.TypeOf(v)
if t.Kind() == reflect.Slice {
vc, _ := redis.String(v, err)
if utils.ContainsString(MemoryStatsMetrics, vc) {
stats[vc], _ = redis.Int64(values[i+1], err)
}
}
}
if err != nil {
if err != redis.ErrNil {
return stats, trace.TraceError(err)
}
return stats, err
}
return stats, nil
}
func (client *Client) SetBackoffMaxInterval(interval time.Duration) {
client.backoffMaxInterval = interval
}
func (client *Client) SetTimeout(timeout int) {
client.timeout = timeout
}
func (client *Client) init() (err error) {
b := backoff.NewExponentialBackOff()
b.MaxInterval = client.backoffMaxInterval
if err := backoff.Retry(func() error {
err := client.Ping()
if err != nil {
log.WithError(err).Warnf("waiting for redis pool active connection. will after %f seconds try again.", b.NextBackOff().Seconds())
}
return nil
}, b); err != nil {
return trace.TraceError(err)
}
return nil
}
func (client *Client) getLockKey(lockKey string) string {
lockKey = strings.ReplaceAll(lockKey, ":", "-")
return "nodes:lock:" + lockKey
}
func (client *Client) getTimeout(timeout int) (res int) {
if timeout == 0 {
return client.timeout
}
return timeout
}
var client db.RedisClient
func NewRedisClient(opts ...Option) (client *Client, err error) {
// client
client = &Client{
backoffMaxInterval: 20 * time.Second,
pool: NewRedisPool(),
}
// apply options
for _, opt := range opts {
opt(client)
}
// init
if err := client.init(); err != nil {
return nil, err
}
return client, nil
}
func GetRedisClient() (c db.RedisClient, err error) {
if client != nil {
return client, nil
}
c, err = NewRedisClient()
if err != nil {
return nil, err
}
return c, nil
}

View File

@@ -1,10 +0,0 @@
package redis
var MemoryStatsMetrics = []string{
"peak.allocated",
"total.allocated",
"startup.allocated",
"overhead.total",
"keys.count",
"dataset.bytes",
}

View File

@@ -1,20 +0,0 @@
package redis
import (
"github.com/crawlab-team/crawlab/db"
"time"
)
type Option func(c db.RedisClient)
func WithBackoffMaxInterval(interval time.Duration) Option {
return func(c db.RedisClient) {
c.SetBackoffMaxInterval(interval)
}
}
func WithTimeout(timeout int) Option {
return func(c db.RedisClient) {
c.SetTimeout(timeout)
}
}

View File

@@ -1,54 +0,0 @@
package redis
import (
"github.com/crawlab-team/crawlab/trace"
"github.com/gomodule/redigo/redis"
"github.com/spf13/viper"
"time"
)
func NewRedisPool() *redis.Pool {
var address = viper.GetString("redis.address")
var port = viper.GetString("redis.port")
var database = viper.GetString("redis.database")
var password = viper.GetString("redis.password")
// normalize params
if address == "" {
address = "localhost"
}
if port == "" {
port = "6379"
}
if database == "" {
database = "1"
}
var url string
if password == "" {
url = "redis://" + address + ":" + port + "/" + database
} else {
url = "redis://x:" + password + "@" + address + ":" + port + "/" + database
}
return &redis.Pool{
Dial: func() (conn redis.Conn, e error) {
return redis.DialURL(url,
redis.DialConnectTimeout(time.Second*10),
redis.DialReadTimeout(time.Second*600),
redis.DialWriteTimeout(time.Second*10),
)
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
if time.Since(t) < time.Minute {
return nil
}
_, err := c.Do("PING")
return trace.TraceError(err)
},
MaxIdle: 10,
MaxActive: 0,
IdleTimeout: 300 * time.Second,
Wait: false,
MaxConnLifetime: 0,
}
}

View File

@@ -1,91 +0,0 @@
package test
import (
"github.com/crawlab-team/crawlab/db"
"github.com/crawlab-team/crawlab/db/redis"
"testing"
)
func init() {
var err error
T, err = NewTest()
if err != nil {
panic(err)
}
}
type Test struct {
client db.RedisClient
TestCollection string
TestMessage string
TestMessages []string
TestMessagesMap map[string]string
TestKeysAlpha []string
TestKeysBeta []string
TestLockKey string
}
func (t *Test) Setup(t2 *testing.T) {
t2.Cleanup(t.Cleanup)
}
func (t *Test) Cleanup() {
keys, _ := t.client.AllKeys()
for _, key := range keys {
_ = t.client.Del(key)
}
}
var T *Test
func NewTest() (t *Test, err error) {
// test
t = &Test{}
// client
t.client, err = redis.GetRedisClient()
if err != nil {
return nil, err
}
// test collection
t.TestCollection = "test_collection"
// test message
t.TestMessage = "this is a test message"
// test messages
t.TestMessages = []string{
"test message 1",
"test message 2",
"test message 3",
}
// test messages map
t.TestMessagesMap = map[string]string{
"test key 1": "test value 1",
"test key 2": "test value 2",
"test key 3": "test value 3",
}
// test keys alpha
t.TestKeysAlpha = []string{
"test key alpha 1",
"test key alpha 2",
"test key alpha 3",
}
// test keys beta
t.TestKeysBeta = []string{
"test key beta 1",
"test key beta 2",
"test key beta 3",
"test key beta 4",
"test key beta 5",
}
// test lock key
t.TestLockKey = "test lock key"
return t, nil
}

View File

@@ -1,273 +0,0 @@
package test
import (
"github.com/crawlab-team/crawlab/db/redis"
"github.com/stretchr/testify/require"
"testing"
"time"
)
func TestRedisClient_Ping(t *testing.T) {
var err error
T.Setup(t)
err = T.client.Ping()
require.Nil(t, err)
}
func TestRedisClient_Get_Set(t *testing.T) {
var err error
T.Setup(t)
err = T.client.Set(T.TestCollection, T.TestMessage)
require.Nil(t, err)
value, err := T.client.Get(T.TestCollection)
require.Nil(t, err)
require.Equal(t, T.TestMessage, value)
}
func TestRedisClient_Keys_AllKeys(t *testing.T) {
var err error
T.Setup(t)
for _, key := range T.TestKeysAlpha {
err = T.client.Set(key, key)
require.Nil(t, err)
}
for _, key := range T.TestKeysBeta {
err = T.client.Set(key, key)
require.Nil(t, err)
}
keys, err := T.client.Keys("*alpha*")
require.Nil(t, err)
require.Len(t, keys, len(T.TestKeysAlpha))
keys, err = T.client.Keys("*beta*")
require.Nil(t, err)
require.Len(t, keys, len(T.TestKeysBeta))
keys, err = T.client.AllKeys()
require.Nil(t, err)
require.Len(t, keys, len(T.TestKeysAlpha)+len(T.TestKeysBeta))
}
func TestRedisClient_RPush_LPop_LLen(t *testing.T) {
var err error
T.Setup(t)
for _, msg := range T.TestMessages {
err = T.client.RPush(T.TestCollection, msg)
require.Nil(t, err)
}
n, err := T.client.LLen(T.TestCollection)
require.Nil(t, err)
require.Equal(t, len(T.TestMessages), n)
value, err := T.client.LPop(T.TestCollection)
require.Nil(t, err)
require.Equal(t, T.TestMessages[0], value)
}
func TestRedisClient_LPush_RPop(t *testing.T) {
var err error
T.Setup(t)
for _, msg := range T.TestMessages {
err = T.client.LPush(T.TestCollection, msg)
require.Nil(t, err)
}
n, err := T.client.LLen(T.TestCollection)
require.Nil(t, err)
require.Equal(t, len(T.TestMessages), n)
value, err := T.client.RPop(T.TestCollection)
require.Nil(t, err)
require.Equal(t, T.TestMessages[0], value)
}
func TestRedisClient_BRPop(t *testing.T) {
var err error
T.Setup(t)
isErr := true
go func(t *testing.T) {
value, err := T.client.BRPop(T.TestCollection, 0)
require.Nil(t, err)
require.Equal(t, T.TestMessage, value)
isErr = false
}(t)
err = T.client.LPush(T.TestCollection, T.TestMessage)
require.Nil(t, err)
time.Sleep(500 * time.Millisecond)
require.False(t, isErr)
}
func TestRedisClient_BLPop(t *testing.T) {
var err error
T.Setup(t)
isErr := true
go func(t *testing.T) {
value, err := T.client.BLPop(T.TestCollection, 0)
require.Nil(t, err)
require.Equal(t, T.TestMessage, value)
isErr = false
}(t)
err = T.client.RPush(T.TestCollection, T.TestMessage)
require.Nil(t, err)
time.Sleep(500 * time.Millisecond)
require.False(t, isErr)
}
func TestRedisClient_HSet_HGet_HDel(t *testing.T) {
var err error
T.Setup(t)
for k, v := range T.TestMessagesMap {
err = T.client.HSet(T.TestCollection, k, v)
require.Nil(t, err)
}
for k, v := range T.TestMessagesMap {
vr, err := T.client.HGet(T.TestCollection, k)
require.Nil(t, err)
require.Equal(t, v, vr)
}
for k := range T.TestMessagesMap {
err = T.client.HDel(T.TestCollection, k)
require.Nil(t, err)
v, err := T.client.HGet(T.TestCollection, k)
require.Nil(t, err)
require.Empty(t, v)
}
}
func TestRedisClient_HScan(t *testing.T) {
var err error
T.Setup(t)
for k, v := range T.TestMessagesMap {
err = T.client.HSet(T.TestCollection, k, v)
require.Nil(t, err)
}
results, err := T.client.HScan(T.TestCollection)
require.Nil(t, err)
for k, vr := range results {
v, ok := T.TestMessagesMap[k]
require.True(t, ok)
require.Equal(t, v, vr)
}
}
func TestRedisClient_HKeys(t *testing.T) {
var err error
T.Setup(t)
for k, v := range T.TestMessagesMap {
err = T.client.HSet(T.TestCollection, k, v)
require.Nil(t, err)
}
keys, err := T.client.HKeys(T.TestCollection)
require.Nil(t, err)
for _, k := range keys {
_, ok := T.TestMessagesMap[k]
require.True(t, ok)
}
}
func TestRedisClient_ZAdd_ZCount_ZCountAll_ZPopMax_ZPopMin(t *testing.T) {
var err error
T.Setup(t)
for i, v := range T.TestMessages {
score := float32(i)
err = T.client.ZAdd(T.TestCollection, score, v)
require.Nil(t, err)
}
count, err := T.client.ZCountAll(T.TestCollection)
require.Nil(t, err)
require.Equal(t, len(T.TestMessages), count)
value, err := T.client.ZPopMaxOne(T.TestCollection)
require.Nil(t, err)
require.Equal(t, T.TestMessages[len(T.TestMessages)-1], value)
value, err = T.client.ZPopMinOne(T.TestCollection)
require.Nil(t, err)
require.Equal(t, T.TestMessages[0], value)
}
func TestRedisClient_BZPopMax_BZPopMin(t *testing.T) {
var err error
T.Setup(t)
isErr := true
go func(t *testing.T) {
value, err := T.client.BZPopMax(T.TestCollection, 0)
require.Nil(t, err)
require.Equal(t, T.TestMessage, value)
isErr = false
}(t)
err = T.client.ZAdd(T.TestCollection, 1, T.TestMessage)
require.Nil(t, err)
time.Sleep(500 * time.Millisecond)
require.False(t, isErr)
isErr = true
go func(t *testing.T) {
value, err := T.client.BZPopMin(T.TestCollection, 0)
require.Nil(t, err)
require.Equal(t, T.TestMessage, value)
isErr = false
}(t)
err = T.client.ZAdd(T.TestCollection, 1, T.TestMessage)
require.Nil(t, err)
time.Sleep(500 * time.Millisecond)
require.False(t, isErr)
}
func TestRedisClient_Lock_Unlock(t *testing.T) {
var err error
T.Setup(t)
ts, err := T.client.Lock(T.TestLockKey)
require.Nil(t, err)
_, err = T.client.Lock(T.TestLockKey)
require.NotNil(t, err)
T.client.UnLock(T.TestLockKey, ts)
ts, err = T.client.Lock(T.TestLockKey)
require.Nil(t, err)
}
func TestRedisClient_MemoryStats(t *testing.T) {
var err error
T.Setup(t)
stats, err := T.client.MemoryStats()
require.Nil(t, err)
for _, k := range redis.MemoryStatsMetrics {
v, ok := stats[k]
require.True(t, ok)
require.Greater(t, v, int64(-1))
}
}

View File

@@ -1,36 +0,0 @@
package sql
import (
"errors"
"fmt"
"github.com/crawlab-team/crawlab/trace"
"github.com/jmoiron/sqlx"
)
func GetSqlDatabaseConnectionString(dataSourceType string, host string, port string, username string, password string, database string) (connStr string, err error) {
if dataSourceType == "mysql" {
connStr = fmt.Sprintf("%s:%s@(%s:%s)/%s?charset=utf8&parseTime=True&loc=Local", username, password, host, port, database)
} else if dataSourceType == "postgres" {
connStr = fmt.Sprintf("host=%s port=%s user=%s dbname=%s password=%s sslmode=%s", host, port, username, database, password, "disable")
} else {
err = errors.New(dataSourceType + " is not implemented")
return connStr, trace.TraceError(err)
}
return connStr, nil
}
func GetSqlConn(dataSourceType string, host string, port string, username string, password string, database string) (db *sqlx.DB, err error) {
// get database connection string
connStr, err := GetSqlDatabaseConnectionString(dataSourceType, host, port, username, password, database)
if err != nil {
return db, trace.TraceError(err)
}
// get database instance
db, err = sqlx.Open(dataSourceType, connStr)
if err != nil {
return db, trace.TraceError(err)
}
return db, nil
}

View File

@@ -1,19 +0,0 @@
package utils
import "io"
func Close(c io.Closer) {
err := c.Close()
if err != nil {
//log.WithError(err).Error("关闭资源文件失败。")
}
}
func ContainsString(list []string, item string) bool {
for _, d := range list {
if d == item {
return true
}
}
return false
}