mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-01-27 17:50:53 +01:00
Revert "Revert "V0.4.0 imporve error response""
This reverts commit 9744f45e86.
This commit is contained in:
@@ -1,90 +1,97 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/apex/log"
|
||||
"github.com/gomodule/redigo/redis"
|
||||
errors2 "github.com/pkg/errors"
|
||||
"time"
|
||||
"unsafe"
|
||||
)
|
||||
|
||||
type SubscribeCallback func(channel, message string)
|
||||
type ConsumeFunc func(message redis.Message) error
|
||||
|
||||
type Subscriber struct {
|
||||
client redis.PubSubConn
|
||||
cbMap map[string]SubscribeCallback
|
||||
}
|
||||
|
||||
func (c *Subscriber) Connect() {
|
||||
conn, err := GetRedisConn()
|
||||
if err != nil {
|
||||
log.Fatalf("redis dial failed.")
|
||||
}
|
||||
|
||||
c.client = redis.PubSubConn{Conn: conn}
|
||||
c.cbMap = make(map[string]SubscribeCallback)
|
||||
|
||||
//retry connect redis 5 times, or panic
|
||||
index := 0
|
||||
go func(i int) {
|
||||
for {
|
||||
log.Debug("wait...")
|
||||
switch res := c.client.Receive().(type) {
|
||||
case redis.Message:
|
||||
i = 0
|
||||
channel := (*string)(unsafe.Pointer(&res.Channel))
|
||||
message := (*string)(unsafe.Pointer(&res.Data))
|
||||
c.cbMap[*channel](*channel, *message)
|
||||
case redis.Subscription:
|
||||
fmt.Printf("%s: %s %d\n", res.Channel, res.Kind, res.Count)
|
||||
case error:
|
||||
log.Error("error handle redis connection...")
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
if i > 5 {
|
||||
panic(errors.New("redis connection failed too many times, panic"))
|
||||
}
|
||||
con, err := GetRedisConn()
|
||||
if err != nil {
|
||||
log.Error("redis dial failed")
|
||||
continue
|
||||
}
|
||||
c.client = redis.PubSubConn{Conn: con}
|
||||
i += 1
|
||||
|
||||
continue
|
||||
}
|
||||
}
|
||||
}(index)
|
||||
|
||||
}
|
||||
|
||||
func (c *Subscriber) Close() {
|
||||
err := c.client.Close()
|
||||
func (r *Redis) Close() {
|
||||
err := r.pool.Close()
|
||||
if err != nil {
|
||||
log.Errorf("redis close error.")
|
||||
}
|
||||
}
|
||||
func (r *Redis) subscribe(ctx context.Context, consume ConsumeFunc, channel ...string) error {
|
||||
psc := redis.PubSubConn{Conn: r.pool.Get()}
|
||||
if err := psc.Subscribe(redis.Args{}.AddFlat(channel)); err != nil {
|
||||
return err
|
||||
}
|
||||
done := make(chan error, 1)
|
||||
tick := time.NewTicker(time.Second * 3)
|
||||
defer tick.Stop()
|
||||
go func() {
|
||||
defer func() { _ = psc.Close() }()
|
||||
for {
|
||||
switch msg := psc.Receive().(type) {
|
||||
case error:
|
||||
done <- fmt.Errorf("redis pubsub receive err: %v", msg)
|
||||
return
|
||||
case redis.Message:
|
||||
fmt.Println(msg)
|
||||
if err := consume(msg); err != nil {
|
||||
fmt.Printf("redis pubsub consume message err: %v", err)
|
||||
continue
|
||||
}
|
||||
case redis.Subscription:
|
||||
fmt.Println(msg)
|
||||
|
||||
func (c *Subscriber) Subscribe(channel interface{}, cb SubscribeCallback) {
|
||||
err := c.client.Subscribe(channel)
|
||||
if err != nil {
|
||||
log.Fatalf("redis Subscribe error.")
|
||||
if msg.Count == 0 {
|
||||
// all channels are unsubscribed
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}()
|
||||
// start a new goroutine to receive message
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if err := psc.Unsubscribe(); err != nil {
|
||||
fmt.Printf("redis pubsub unsubscribe err: %v \n", err)
|
||||
}
|
||||
done <- nil
|
||||
case <-tick.C:
|
||||
//fmt.Printf("ping message \n")
|
||||
if err := psc.Ping(""); err != nil {
|
||||
done <- err
|
||||
}
|
||||
case err := <-done:
|
||||
close(done)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
c.cbMap[channel.(string)] = cb
|
||||
}
|
||||
func (r *Redis) Subscribe(ctx context.Context, consume ConsumeFunc, channel ...string) error {
|
||||
index := 0
|
||||
go func() {
|
||||
for {
|
||||
err := r.subscribe(ctx, consume, channel...)
|
||||
fmt.Println(err)
|
||||
|
||||
func Publish(channel string, msg string) error {
|
||||
c, err := GetRedisConn()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := c.Do("PUBLISH", channel, msg); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
time.Sleep(5 * time.Second)
|
||||
index += 1
|
||||
fmt.Printf("try reconnect %d times \n", index)
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
func (r *Redis) Publish(channel, message string) (n int, err error) {
|
||||
conn := r.pool.Get()
|
||||
defer func() { _ = conn.Close() }()
|
||||
n, err = redis.Int(conn.Do("PUBLISH", channel, message))
|
||||
if err != nil {
|
||||
return 0, errors2.Wrapf(err, "redis publish %s %s", channel, message)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user