1. Mongo dial add 5 seconds connection timeout.
 2. Redis uses connection pool mode.
 3. Redis pool new connection have 10 seconds write timeout and read timeout and connection timeout.
This commit is contained in:
yaziming
2019-09-01 17:18:08 +08:00
parent d97134b288
commit 443d697c6f
9 changed files with 166 additions and 160 deletions

View File

@@ -71,7 +71,7 @@ func GetRemoteLog(task model.Task) (logStr string, err error) {
// 发布获取日志消息
channel := "nodes:" + task.NodeId.Hex()
if err := database.Publish(channel, string(msgBytes)); err != nil {
if _, err := database.RedisClient.Publish(channel, string(msgBytes)); err != nil {
log.Errorf(err.Error())
return "", err
}

View File

@@ -1,6 +1,7 @@
package services
import (
"context"
"crawlab/constants"
"crawlab/database"
"crawlab/lib/cron"
@@ -10,6 +11,7 @@ import (
"fmt"
"github.com/apex/log"
"github.com/globalsign/mgo/bson"
"github.com/gomodule/redigo/redis"
"github.com/spf13/viper"
"runtime/debug"
"time"
@@ -258,13 +260,12 @@ func UpdateNodeData() {
}
}
func MasterNodeCallback(channel string, msgStr string) {
func MasterNodeCallback(message redis.Message) (err error) {
// 反序列化
var msg NodeMessage
if err := json.Unmarshal([]byte(msgStr), &msg); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
if err := json.Unmarshal(message.Data, &msg); err != nil {
return err
}
if msg.Type == constants.MsgTypeGetLog {
@@ -281,16 +282,15 @@ func MasterNodeCallback(channel string, msgStr string) {
sysInfoBytes, _ := json.Marshal(&msg.SysInfo)
ch <- string(sysInfoBytes)
}
return nil
}
func WorkerNodeCallback(channel string, msgStr string) {
func WorkerNodeCallback(message redis.Message) (err error) {
// 反序列化
msg := NodeMessage{}
fmt.Println(msgStr)
if err := json.Unmarshal([]byte(msgStr), &msg); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
if err := json.Unmarshal(message.Data, &msg); err != nil {
return err
}
if msg.Type == constants.MsgTypeGetLog {
@@ -317,16 +317,14 @@ func WorkerNodeCallback(channel string, msgStr string) {
// 序列化
msgSdBytes, err := json.Marshal(&msgSd)
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
return err
}
// 发布消息给主节点
log.Info("publish get log msg to master")
if err := database.Publish("nodes:master", string(msgSdBytes)); err != nil {
log.Errorf(err.Error())
return
if _, err := database.RedisClient.Publish("nodes:master", string(msgSdBytes)); err != nil {
return err
}
} else if msg.Type == constants.MsgTypeCancelTask {
// 取消任务
@@ -336,8 +334,7 @@ func WorkerNodeCallback(channel string, msgStr string) {
// 获取环境信息
sysInfo, err := GetLocalSystemInfo()
if err != nil {
log.Errorf(err.Error())
return
return err
}
msgSd := NodeMessage{
Type: constants.MsgTypeGetSystemInfo,
@@ -348,14 +345,14 @@ func WorkerNodeCallback(channel string, msgStr string) {
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
return err
}
fmt.Println(msgSd)
if err := database.Publish("nodes:master", string(msgSdBytes)); err != nil {
if _, err := database.RedisClient.Publish("nodes:master", string(msgSdBytes)); err != nil {
log.Errorf(err.Error())
return
return err
}
}
return
}
// 初始化节点服务
@@ -373,25 +370,27 @@ func InitNodeService() error {
// 首次更新节点数据注册到Redis
UpdateNodeData()
// 消息订阅
var sub database.Subscriber
sub.Connect()
// 获取当前节点
node, err := GetCurrentNode()
if err != nil {
log.Errorf(err.Error())
return err
}
ctx := context.Background()
if IsMaster() {
// 如果为主节点,订阅主节点通信频道
channel := "nodes:master"
sub.Subscribe(channel, MasterNodeCallback)
err := database.RedisClient.Subscribe(ctx, MasterNodeCallback, channel)
if err != nil {
return err
}
} else {
// 若为工作节点,订阅单独指定通信频道
channel := "nodes:" + node.Id.Hex()
sub.Subscribe(channel, WorkerNodeCallback)
err := database.RedisClient.Subscribe(ctx, WorkerNodeCallback, channel)
if err != nil {
return err
}
}
// 如果为主节点每30秒刷新所有节点信息

View File

@@ -1,6 +1,7 @@
package services
import (
"context"
"crawlab/constants"
"crawlab/database"
"crawlab/lib/cron"
@@ -11,6 +12,7 @@ import (
"github.com/apex/log"
"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
"github.com/gomodule/redigo/redis"
"github.com/pkg/errors"
"github.com/satori/go.uuid"
"github.com/spf13/viper"
@@ -142,10 +144,7 @@ func ZipSpider(spider model.Spider) (filePath string, err error) {
// 临时文件路径
randomId := uuid.NewV4()
if err != nil {
debug.PrintStack()
return "", err
}
filePath = filepath.Join(
viper.GetString("other.tmppath"),
randomId.String()+".zip",
@@ -302,7 +301,7 @@ func PublishSpider(spider model.Spider) (err error) {
return
}
channel := "files:upload"
if err = database.Publish(channel, string(msgStr)); err != nil {
if _, err = database.RedisClient.Publish(channel, string(msgStr)); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
@@ -312,16 +311,16 @@ func PublishSpider(spider model.Spider) (err error) {
}
// 上传爬虫回调
func OnFileUpload(channel string, msgStr string) {
func OnFileUpload(message redis.Message) (err error) {
s, gf := database.GetGridFs("files")
defer s.Close()
// 反序列化消息
var msg SpiderUploadMessage
if err := json.Unmarshal([]byte(msgStr), &msg); err != nil {
if err := json.Unmarshal(message.Data, &msg); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
return err
}
// 从GridFS获取该文件
@@ -329,7 +328,7 @@ func OnFileUpload(channel string, msgStr string) {
if err != nil {
log.Errorf("open file id: " + msg.FileId + ", spider id:" + msg.SpiderId + ", error: " + err.Error())
debug.PrintStack()
return
return err
}
defer f.Close()
@@ -342,7 +341,7 @@ func OnFileUpload(channel string, msgStr string) {
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
return err
}
defer tmpFile.Close()
@@ -350,7 +349,7 @@ func OnFileUpload(channel string, msgStr string) {
if _, err := io.Copy(tmpFile, f); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
return err
}
// 解压缩临时文件到目标文件夹
@@ -361,22 +360,23 @@ func OnFileUpload(channel string, msgStr string) {
if err := utils.DeCompress(tmpFile, dstPath); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
return err
}
// 关闭临时文件
if err := tmpFile.Close(); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
return err
}
// 删除临时文件
if err := os.Remove(tmpFilePath); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
return err
}
return nil
}
// 启动爬虫服务
@@ -401,9 +401,11 @@ func InitSpiderService() error {
// 订阅文件上传
channel := "files:upload"
var sub database.Subscriber
sub.Connect()
sub.Subscribe(channel, OnFileUpload)
//sub.Connect()
ctx := context.Background()
return database.RedisClient.Subscribe(ctx, OnFileUpload, channel)
}
// 启动定时任务

View File

@@ -112,7 +112,7 @@ func GetRemoteSystemInfo(id string) (sysInfo model.SystemInfo, err error) {
// 序列化
msgBytes, _ := json.Marshal(&msg)
if err := database.Publish("nodes:"+id, string(msgBytes)); err != nil {
if _, err := database.RedisClient.Publish("nodes:"+id, string(msgBytes)); err != nil {
return model.SystemInfo{}, err
}

View File

@@ -466,7 +466,7 @@ func CancelTask(id string) (err error) {
}
// 发布消息
if err := database.Publish("nodes:"+task.NodeId.Hex(), string(msgBytes)); err != nil {
if _, err := database.RedisClient.Publish("nodes:"+task.NodeId.Hex(), string(msgBytes)); err != nil {
return err
}
}