mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-01-31 18:10:50 +01:00
refactor(all): refactor code
remove redundant code and some code refactor
This commit is contained in:
@@ -2,6 +2,7 @@ package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crawlab/utils"
|
||||
"fmt"
|
||||
"github.com/apex/log"
|
||||
"github.com/gomodule/redigo/redis"
|
||||
@@ -26,7 +27,7 @@ func (r *Redis) subscribe(ctx context.Context, consume ConsumeFunc, channel ...s
|
||||
tick := time.NewTicker(time.Second * 3)
|
||||
defer tick.Stop()
|
||||
go func() {
|
||||
defer func() { _ = psc.Close() }()
|
||||
defer utils.Close(psc)
|
||||
for {
|
||||
switch msg := psc.Receive().(type) {
|
||||
case error:
|
||||
@@ -87,7 +88,7 @@ func (r *Redis) Subscribe(ctx context.Context, consume ConsumeFunc, channel ...s
|
||||
}
|
||||
func (r *Redis) Publish(channel, message string) (n int, err error) {
|
||||
conn := r.pool.Get()
|
||||
defer func() { _ = conn.Close() }()
|
||||
defer utils.Close(conn)
|
||||
n, err = redis.Int(conn.Do("PUBLISH", channel, message))
|
||||
if err != nil {
|
||||
return 0, errors2.Wrapf(err, "redis publish %s %s", channel, message)
|
||||
|
||||
@@ -1,6 +1,10 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crawlab/entity"
|
||||
"crawlab/utils"
|
||||
"github.com/apex/log"
|
||||
"github.com/gomodule/redigo/redis"
|
||||
"github.com/spf13/viper"
|
||||
"runtime/debug"
|
||||
@@ -18,7 +22,7 @@ func NewRedisClient() *Redis {
|
||||
}
|
||||
func (r *Redis) RPush(collection string, value interface{}) error {
|
||||
c := r.pool.Get()
|
||||
defer c.Close()
|
||||
defer utils.Close(c)
|
||||
|
||||
if _, err := c.Do("RPUSH", collection, value); err != nil {
|
||||
debug.PrintStack()
|
||||
@@ -29,7 +33,7 @@ func (r *Redis) RPush(collection string, value interface{}) error {
|
||||
|
||||
func (r *Redis) LPop(collection string) (string, error) {
|
||||
c := r.pool.Get()
|
||||
defer c.Close()
|
||||
defer utils.Close(c)
|
||||
|
||||
value, err2 := redis.String(c.Do("LPOP", collection))
|
||||
if err2 != nil {
|
||||
@@ -40,7 +44,7 @@ func (r *Redis) LPop(collection string) (string, error) {
|
||||
|
||||
func (r *Redis) HSet(collection string, key string, value string) error {
|
||||
c := r.pool.Get()
|
||||
defer c.Close()
|
||||
defer utils.Close(c)
|
||||
|
||||
if _, err := c.Do("HSET", collection, key, value); err != nil {
|
||||
debug.PrintStack()
|
||||
@@ -51,7 +55,7 @@ func (r *Redis) HSet(collection string, key string, value string) error {
|
||||
|
||||
func (r *Redis) HGet(collection string, key string) (string, error) {
|
||||
c := r.pool.Get()
|
||||
defer c.Close()
|
||||
defer utils.Close(c)
|
||||
|
||||
value, err2 := redis.String(c.Do("HGET", collection, key))
|
||||
if err2 != nil {
|
||||
@@ -62,7 +66,7 @@ func (r *Redis) HGet(collection string, key string) (string, error) {
|
||||
|
||||
func (r *Redis) HDel(collection string, key string) error {
|
||||
c := r.pool.Get()
|
||||
defer c.Close()
|
||||
defer utils.Close(c)
|
||||
|
||||
if _, err := c.Do("HDEL", collection, key); err != nil {
|
||||
return err
|
||||
@@ -72,7 +76,7 @@ func (r *Redis) HDel(collection string, key string) error {
|
||||
|
||||
func (r *Redis) HKeys(collection string) ([]string, error) {
|
||||
c := r.pool.Get()
|
||||
defer c.Close()
|
||||
defer utils.Close(c)
|
||||
|
||||
value, err2 := redis.Strings(c.Do("HKeys", collection))
|
||||
if err2 != nil {
|
||||
@@ -120,3 +124,22 @@ func InitRedis() error {
|
||||
RedisClient = NewRedisClient()
|
||||
return nil
|
||||
}
|
||||
|
||||
func Pub(channel string, msg entity.NodeMessage) error {
|
||||
if _, err := RedisClient.Publish(channel, utils.GetJson(msg)); err != nil {
|
||||
log.Errorf("publish redis error: %s", err.Error())
|
||||
debug.PrintStack()
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func Sub(channel string, consume ConsumeFunc) error {
|
||||
ctx := context.Background()
|
||||
if err := RedisClient.Subscribe(ctx, consume, channel); err != nil {
|
||||
log.Errorf("subscribe redis error: %s", err.Error())
|
||||
debug.PrintStack()
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user